"""Flask app for CmdForge Registry API (Phase 2).""" from __future__ import annotations import hashlib import json import math import os import re import secrets from dataclasses import dataclass from datetime import date, datetime, timedelta from typing import Any, Dict, Iterable, List, Optional, Tuple from flask import Flask, Response, g, jsonify, request import yaml from functools import wraps from argon2 import PasswordHasher from argon2.exceptions import VerifyMismatchError from .db import connect_db, init_db, query_all, query_one from .rate_limit import RateLimiter from .sync import process_webhook, get_categories_cache_path, get_repo_dir from ..hash_utils import compute_yaml_hash from .stats import ( refresh_tool_stats, get_tool_stats, refresh_publisher_stats, get_publisher_stats, track_tool_usage, calculate_badges, BADGES, get_badge_info, format_count, ) MAX_BODY_BYTES = 512 * 1024 MAX_CONFIG_BYTES = 64 * 1024 MAX_README_BYTES = 256 * 1024 MAX_TOOL_NAME_LEN = 64 MAX_DESC_LEN = 500 MAX_TAG_LEN = 32 MAX_TAGS = 10 MAX_PAGE_SIZE = 100 DEFAULT_PAGE_SIZE = 20 RATE_LIMITS = { "tools": {"limit": 100, "window": 60}, "download": {"limit": 60, "window": 60}, "register": {"limit": 5, "window": 3600}, "login": {"limit": 10, "window": 900}, "login_failed": {"limit": 5, "window": 900}, "tokens": {"limit": 10, "window": 3600}, "publish": {"limit": 20, "window": 3600}, "review": {"limit": 10, "window": 3600}, "issue": {"limit": 20, "window": 3600}, "vote": {"limit": 100, "window": 3600}, } ALLOWED_SORT = { "/tools": {"downloads", "published_at", "name"}, "/tools/search": {"relevance", "downloads", "published_at"}, "/categories": {"name", "tool_count"}, } TOOL_NAME_RE = re.compile(r"^[A-Za-z0-9_-]{1,64}$") OWNER_RE = re.compile(r"^[a-z0-9][a-z0-9-]{0,37}[a-z0-9]$") EMAIL_RE = re.compile(r"^[^@\s]+@[^@\s]+\.[^@\s]+$") RESERVED_SLUGS = {"official", "admin", "system", "api", "registry", "cmdforge"} rate_limiter = RateLimiter() password_hasher = PasswordHasher(memory_cost=65536, time_cost=3, parallelism=4) @dataclass(frozen=True) class Semver: major: int minor: int patch: int prerelease: Tuple[Any, ...] = () @classmethod def parse(cls, value: str) -> Optional["Semver"]: match = re.match(r"^(\d+)\.(\d+)\.(\d+)(?:-([0-9A-Za-z.-]+))?(?:\+.+)?$", value) if not match: return None major, minor, patch = map(int, match.group(1, 2, 3)) prerelease_raw = match.group(4) if not prerelease_raw: return cls(major, minor, patch, ()) parts: List[Any] = [] for part in prerelease_raw.split("."): if part.isdigit(): parts.append(int(part)) else: parts.append(part) return cls(major, minor, patch, tuple(parts)) def is_prerelease(self) -> bool: return bool(self.prerelease) def __lt__(self, other: "Semver") -> bool: if (self.major, self.minor, self.patch) != (other.major, other.minor, other.patch): return (self.major, self.minor, self.patch) < (other.major, other.minor, other.patch) if not self.prerelease and other.prerelease: return False if self.prerelease and not other.prerelease: return True return self.prerelease < other.prerelease @dataclass(frozen=True) class Constraint: op: str version: Semver def parse_constraints(raw: str) -> Tuple[List[Constraint], bool]: raw = raw.strip() if not raw or raw == "*": return [], False allow_prerelease = "-" in raw parts = [part.strip() for part in raw.split(",") if part.strip()] constraints: List[Constraint] = [] for part in parts: if part.startswith("^"): base = Semver.parse(part[1:]) if not base: continue constraints.append(Constraint(">=", base)) if base.major > 0: upper = Semver(base.major + 1, 0, 0, ()) elif base.minor > 0: upper = Semver(base.major, base.minor + 1, 0, ()) else: upper = Semver(base.major, base.minor, base.patch + 1, ()) constraints.append(Constraint("<", upper)) allow_prerelease = allow_prerelease or base.is_prerelease() continue if part.startswith("~"): base = Semver.parse(part[1:]) if not base: continue constraints.append(Constraint(">=", base)) upper = Semver(base.major, base.minor + 1, 0, ()) constraints.append(Constraint("<", upper)) allow_prerelease = allow_prerelease or base.is_prerelease() continue match = re.match(r"^(>=|<=|>|<|=)?\s*(.+)$", part) if not match: continue op = match.group(1) or "=" version = Semver.parse(match.group(2)) if not version: continue constraints.append(Constraint(op, version)) allow_prerelease = allow_prerelease or version.is_prerelease() return constraints, allow_prerelease def satisfies(version: Semver, constraints: List[Constraint]) -> bool: for constraint in constraints: if constraint.op == ">" and not (version > constraint.version): return False if constraint.op == ">=" and not (version >= constraint.version): return False if constraint.op == "<" and not (version < constraint.version): return False if constraint.op == "<=" and not (version <= constraint.version): return False if constraint.op in {"=", "=="} and not (version == constraint.version): return False return True def select_version(versions: List[str], constraint_raw: Optional[str]) -> Optional[str]: parsed_versions: List[Tuple[Semver, str]] = [] for version in versions: parsed = Semver.parse(version) if parsed: parsed_versions.append((parsed, version)) if not parsed_versions: return None if not constraint_raw or constraint_raw.strip() == "*": candidates = [item for item in parsed_versions if not item[0].is_prerelease()] if not candidates: candidates = parsed_versions return max(candidates, key=lambda item: item[0])[1] constraints, allow_prerelease = parse_constraints(constraint_raw) filtered = [] for parsed, raw in parsed_versions: if not allow_prerelease and parsed.is_prerelease(): continue if satisfies(parsed, constraints): filtered.append((parsed, raw)) if not filtered: return None return max(filtered, key=lambda item: item[0])[1] def run_ai_scrutiny_review(scrutiny_report: dict, config: dict, tool_name: str, description: str) -> Optional[dict]: """Run AI-powered secondary review of scrutiny warnings. Uses the scrutiny-ai-review tool to analyze warnings and identify false positives. Args: scrutiny_report: The scrutiny report with findings config: The tool configuration dict tool_name: Name of the tool being reviewed description: Tool description Returns: AI review result dict, or None if review fails """ import subprocess import sys from pathlib import Path # Check if the AI review tool exists tool_path = Path.home() / ".cmdforge" / "scrutiny-ai-review" / "config.yaml" if not tool_path.exists(): return None # Extract warnings from scrutiny report warnings = [ f for f in scrutiny_report.get("findings", []) if f.get("result") == "warning" ] if not warnings: return None # Prepare tool config for review tool_config = { "name": tool_name, "description": description or "", "steps": config.get("steps", []), "arguments": config.get("arguments", []), } # Find cmdforge executable - try venv first, then PATH cmdforge_paths = [ Path(sys.executable).parent / "cmdforge", # Same venv as current Python Path("/srv/mergerfs/data_pool/home/rob/cmdforge-registry/venv/bin/cmdforge"), # Server venv "cmdforge", # PATH ] cmdforge_exe = None for p in cmdforge_paths: p = Path(p) if not isinstance(p, Path) else p if p.exists() if isinstance(p, Path) and p.is_absolute() else True: cmdforge_exe = str(p) break if not cmdforge_exe: return None # Run the tool (use -- to separate cmdforge args from tool args) try: result = subprocess.run( [ cmdforge_exe, "run", "scrutiny-ai-review", "--", "--warnings", json.dumps(warnings), "--tool-config", json.dumps(tool_config), ], capture_output=True, text=True, timeout=60, # 60 second timeout ) if result.returncode == 0 and result.stdout.strip(): return json.loads(result.stdout.strip()) return None except (subprocess.TimeoutExpired, json.JSONDecodeError, FileNotFoundError): return None def create_app() -> Flask: app = Flask(__name__) app.config["MAX_CONTENT_LENGTH"] = MAX_BODY_BYTES # Initialize database schema once at startup with connect_db() as init_conn: init_db(init_conn) @app.before_request def attach_db() -> None: g.db = connect_db() @app.teardown_request def close_db(exc: Optional[BaseException]) -> None: db = getattr(g, "db", None) if db is not None: db.close() @app.before_request def enforce_rate_limit() -> Optional[Response]: path = request.path method = request.method.upper() ip = request.headers.get("X-Forwarded-For", request.remote_addr or "unknown") if method == "GET": if path.startswith("/api/v1/tools/") and path.endswith("/download"): limit_config = RATE_LIMITS["download"] elif path.startswith("/api/v1/tools"): limit_config = RATE_LIMITS["tools"] else: return None elif method == "POST": if path == "/api/v1/register": limit_config = RATE_LIMITS["register"] elif path == "/api/v1/login": limit_config = RATE_LIMITS["login"] else: return None else: return None allowed, state = rate_limiter.check(ip, limit_config["limit"], limit_config["window"]) remaining = max(0, limit_config["limit"] - state.count) reset_at = int(state.reset_at) if not allowed: payload = { "error": { "code": "RATE_LIMITED", "message": f"Too many requests. Try again in {limit_config['window']} seconds.", "details": { "limit": limit_config["limit"], "window": f"{limit_config['window']} seconds", "retry_after": limit_config["window"], }, } } response = jsonify(payload) response.status_code = 429 response.headers["Retry-After"] = str(limit_config["window"]) response.headers["X-RateLimit-Limit"] = str(limit_config["limit"]) response.headers["X-RateLimit-Remaining"] = "0" response.headers["X-RateLimit-Reset"] = str(reset_at) return response request.rate_limit_headers = { "X-RateLimit-Limit": str(limit_config["limit"]), "X-RateLimit-Remaining": str(remaining), "X-RateLimit-Reset": str(reset_at), } return None @app.after_request def add_rate_limit_headers(response: Response) -> Response: headers = getattr(request, "rate_limit_headers", None) if headers: response.headers.update(headers) return response def error_response(code: str, message: str, status: int = 400, details: Optional[dict] = None) -> Response: payload = {"error": {"code": code, "message": message, "details": details or {}}} response = jsonify(payload) response.status_code = status return response def enforce_token_rate_limit(scope: str, token_hash: str) -> Optional[Response]: limit_config = RATE_LIMITS[scope] allowed, state = rate_limiter.check(token_hash, limit_config["limit"], limit_config["window"]) remaining = max(0, limit_config["limit"] - state.count) reset_at = int(state.reset_at) if not allowed: payload = { "error": { "code": "RATE_LIMITED", "message": f"Too many requests. Try again in {limit_config['window']} seconds.", "details": { "limit": limit_config["limit"], "window": f"{limit_config['window']} seconds", "retry_after": limit_config["window"], }, } } response = jsonify(payload) response.status_code = 429 response.headers["Retry-After"] = str(limit_config["window"]) response.headers["X-RateLimit-Limit"] = str(limit_config["limit"]) response.headers["X-RateLimit-Remaining"] = "0" response.headers["X-RateLimit-Reset"] = str(reset_at) return response request.rate_limit_headers = { "X-RateLimit-Limit": str(limit_config["limit"]), "X-RateLimit-Remaining": str(remaining), "X-RateLimit-Reset": str(reset_at), } return None def require_token(f): @wraps(f) def decorated(*args, **kwargs): auth_header = request.headers.get("Authorization") if not auth_header or not auth_header.startswith("Bearer "): return error_response("UNAUTHORIZED", "Missing or invalid token", 401) token = auth_header[7:] token_hash = hashlib.sha256(token.encode()).hexdigest() row = query_one( g.db, """ SELECT t.*, p.slug, p.display_name, p.role, p.banned, p.ban_reason FROM api_tokens t JOIN publishers p ON t.publisher_id = p.id WHERE t.token_hash = ? AND t.revoked_at IS NULL """, [token_hash], ) if not row: return error_response("UNAUTHORIZED", "Invalid or revoked token", 401) # Check if publisher is banned if row["banned"]: return error_response( "ACCOUNT_BANNED", f"Your account has been banned: {row['ban_reason'] or 'No reason given'}", 403, ) g.db.execute( "UPDATE api_tokens SET last_used_at = ? WHERE id = ?", [datetime.utcnow().isoformat(), row["id"]], ) g.current_publisher = { "id": row["publisher_id"], "slug": row["slug"], "display_name": row["display_name"], "role": row["role"] or "user", } g.current_token = {"id": row["id"], "hash": token_hash} g.db.commit() return f(*args, **kwargs) return decorated def require_role(*roles): """Decorator that requires the user to have one of the specified roles.""" def decorator(f): @wraps(f) @require_token def decorated(*args, **kwargs): user_role = g.current_publisher.get("role", "user") if user_role not in roles: return error_response( "FORBIDDEN", f"This action requires one of these roles: {', '.join(roles)}", 403, ) return f(*args, **kwargs) return decorated return decorator def require_admin(f): """Decorator that requires admin role.""" return require_role("admin")(f) def require_moderator(f): """Decorator that requires moderator or admin role.""" return require_role("moderator", "admin")(f) def log_audit(action: str, target_type: str, target_id: str, details: dict = None) -> None: """Log a moderation action to the audit trail.""" actor_id = g.current_publisher["id"] if hasattr(g, "current_publisher") and g.current_publisher else None g.db.execute( """ INSERT INTO audit_log (action, target_type, target_id, actor_id, details) VALUES (?, ?, ?, ?, ?) """, [action, target_type, str(target_id), str(actor_id) if actor_id else "system", json.dumps(details) if details else None], ) g.db.commit() def get_current_user_context() -> Tuple[Optional[str], Optional[str]]: """Get current user's slug and role from request context. Tries to authenticate via Bearer token without requiring it. Returns (slug, role) tuple, both None if not authenticated. """ auth_header = request.headers.get("Authorization") if not auth_header or not auth_header.startswith("Bearer "): return None, None token = auth_header[7:] token_hash = hashlib.sha256(token.encode()).hexdigest() row = query_one( g.db, """ SELECT p.slug, p.role, p.banned FROM api_tokens t JOIN publishers p ON t.publisher_id = p.id WHERE t.token_hash = ? AND t.revoked_at IS NULL """, [token_hash], ) if not row or row["banned"]: return None, None return row["slug"], row["role"] or "user" def build_visibility_filter(table_prefix: str = "") -> Tuple[str, List[Any]]: """Build SQL WHERE clause for tool visibility filtering. Returns (sql_clause, params) tuple. The clause includes leading AND or WHERE as appropriate. """ prefix = f"{table_prefix}." if table_prefix else "" user_slug, user_role = get_current_user_context() # Moderators and admins see everything if user_role in ("moderator", "admin"): return "", [] # Regular users see: # 1. Approved public tools # 2. Their own tools (any status/visibility) if user_slug: return ( f" AND (({prefix}visibility = 'public' AND {prefix}moderation_status = 'approved') " f"OR {prefix}owner = ?)", [user_slug], ) else: # Unauthenticated users only see approved public tools return ( f" AND {prefix}visibility = 'public' AND {prefix}moderation_status = 'approved'", [], ) def generate_token() -> Tuple[str, str]: alphabet = "0123456789ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz" raw = secrets.token_bytes(32) num = int.from_bytes(raw, "big") chars = [] while num > 0: num, rem = divmod(num, 62) chars.append(alphabet[rem]) token_body = "".join(reversed(chars)).rjust(43, "0") token = "reg_" + token_body[:43] token_hash = hashlib.sha256(token.encode()).hexdigest() return token, token_hash def validate_payload_size(field: str, content: str, limit: int) -> Optional[Response]: size = len(content.encode("utf-8")) if size > limit: return error_response( "PAYLOAD_TOO_LARGE", f"{field} exceeds {limit} bytes limit", 413, details={"field": field, "size": size, "limit": limit}, ) return None def paginate(page: int, per_page: int, total: int) -> Dict[str, int]: total_pages = max(1, math.ceil(total / per_page)) if per_page else 1 return { "page": page, "per_page": per_page, "total": total, "total_pages": total_pages, } def parse_pagination(endpoint_key: str, default_sort: str) -> Tuple[int, int, str, str, Optional[Response]]: try: page = int(request.args.get("page", 1)) except ValueError: return 1, DEFAULT_PAGE_SIZE, "downloads", "desc", error_response("VALIDATION_ERROR", "Invalid page") per_page_raw = request.args.get("per_page") if per_page_raw is None and request.args.get("limit") is not None: per_page_raw = request.args.get("limit") try: per_page = int(per_page_raw) if per_page_raw is not None else DEFAULT_PAGE_SIZE except ValueError: return 1, DEFAULT_PAGE_SIZE, "downloads", "desc", error_response("VALIDATION_ERROR", "Invalid per_page") if page < 1: return 1, DEFAULT_PAGE_SIZE, "downloads", "desc", error_response("VALIDATION_ERROR", "Page must be >= 1") if per_page < 1 or per_page > MAX_PAGE_SIZE: return 1, DEFAULT_PAGE_SIZE, "downloads", "desc", error_response("VALIDATION_ERROR", "per_page out of range") sort = request.args.get("sort", default_sort) order = request.args.get("order", "desc").lower() if order not in {"asc", "desc"}: return 1, DEFAULT_PAGE_SIZE, "downloads", "desc", error_response("INVALID_SORT", "Invalid sort order") allowed = ALLOWED_SORT.get(endpoint_key, set()) if sort not in allowed: return 1, DEFAULT_PAGE_SIZE, "downloads", "desc", error_response( "INVALID_SORT", f"Unknown sort field '{sort}'. Allowed: {', '.join(sorted(allowed))}", ) return page, per_page, sort, order, None def load_tool_row(owner: str, name: str, version: Optional[str] = None) -> Optional[dict]: sql = "SELECT * FROM tools WHERE owner = ? AND name = ?" params: List[Any] = [owner, name] if version: sql += " AND version = ?" params.append(version) sql += " ORDER BY id DESC LIMIT 1" row = query_one(g.db, sql, params) return dict(row) if row else None @app.route("/api/v1/tools", methods=["GET"]) def list_tools() -> Response: page, per_page, sort, order, error = parse_pagination("/tools", "downloads") if error: return error category = request.args.get("category") offset = (page - 1) * per_page # Build visibility filter vis_filter, vis_params = build_visibility_filter() base_where = "WHERE 1=1" params: List[Any] = [] if category: base_where += " AND category = ?" params.append(category) # Add visibility filter base_where += vis_filter params.extend(vis_params) count_row = query_one( g.db, f"SELECT COUNT(DISTINCT owner || '/' || name) AS total FROM tools {base_where}", params, ) total = int(count_row["total"]) if count_row else 0 order_dir = "DESC" if order == "desc" else "ASC" order_sql = f"{sort} {order_dir}, published_at DESC, id DESC" rows = query_all( g.db, f""" WITH latest_any AS ( SELECT owner, name, MAX(id) AS max_id FROM tools {base_where} GROUP BY owner, name ), latest_stable AS ( SELECT owner, name, MAX(id) AS max_id FROM tools {base_where} AND version NOT LIKE '%-%' GROUP BY owner, name ) SELECT t.* FROM tools t JOIN ( SELECT a.owner, a.name, COALESCE(s.max_id, a.max_id) AS max_id FROM latest_any a LEFT JOIN latest_stable s ON s.owner = a.owner AND s.name = a.name ) latest ON t.owner = latest.owner AND t.name = latest.name AND t.id = latest.max_id ORDER BY {order_sql} LIMIT ? OFFSET ? """, params + params + [per_page, offset], # params twice for both CTEs ) data = [] for row in rows: data.append({ "owner": row["owner"], "name": row["name"], "version": row["version"], "description": row["description"], "category": row["category"], "tags": json.loads(row["tags"] or "[]"), "downloads": row["downloads"], "published_at": row["published_at"], }) return jsonify({"data": data, "meta": paginate(page, per_page, total)}) @app.route("/api/v1/tools/search", methods=["GET"]) def search_tools() -> Response: query_text = request.args.get("q", "").strip() if not query_text: return error_response("VALIDATION_ERROR", "Missing search query") # Sanitize query for FTS5 - escape special characters that cause syntax errors # FTS5 special chars: * " ( ) : ^ - NOT AND OR NEAR # For safety, we'll quote the entire query if it contains special chars fts5_special = set('"():^-') if any(c in fts5_special for c in query_text) or query_text.upper() in ('NOT', 'AND', 'OR', 'NEAR'): # Escape double quotes and wrap in quotes for literal search query_text = '"' + query_text.replace('"', '""') + '"' else: # Add prefix matching - append * to each word for partial matches # e.g., "summ text" becomes "summ* text*" to match "summarize", "text", "texting" words = query_text.split() query_text = " ".join(word + "*" if not word.endswith("*") else word for word in words) page, per_page, sort, order, error = parse_pagination("/tools/search", "downloads") if error: return error offset = (page - 1) * per_page # Parse filter parameters category = request.args.get("category") # Single category (backward compat) categories_param = request.args.get("categories", "") # Multi-category (OR logic) tags_param = request.args.get("tags", "") # Tags (AND logic) owner_filter = request.args.get("owner") min_downloads = request.args.get("min_downloads", type=int) max_downloads = request.args.get("max_downloads", type=int) published_after = request.args.get("published_after") published_before = request.args.get("published_before") include_deprecated = request.args.get("deprecated", "false").lower() == "true" include_facets = request.args.get("include_facets", "false").lower() == "true" # Parse multi-value params categories = [c.strip() for c in categories_param.split(",") if c.strip()] if category and category not in categories: categories.append(category) tags = [t.strip() for t in tags_param.split(",") if t.strip()] # Build WHERE clause where_clauses = ["tools_fts MATCH ?"] params: List[Any] = [query_text] if categories: placeholders = ",".join(["?" for _ in categories]) where_clauses.append(f"tools.category IN ({placeholders})") params.extend(categories) if owner_filter: where_clauses.append("tools.owner = ?") params.append(owner_filter) if min_downloads is not None: where_clauses.append("tools.downloads >= ?") params.append(min_downloads) if max_downloads is not None: where_clauses.append("tools.downloads <= ?") params.append(max_downloads) if published_after: where_clauses.append("tools.published_at >= ?") params.append(published_after) if published_before: where_clauses.append("tools.published_at <= ?") params.append(published_before) if not include_deprecated: where_clauses.append("tools.deprecated = 0") # Add visibility filtering vis_filter, vis_params = build_visibility_filter("tools") if vis_filter: # Remove leading " AND " since we're adding to a list where_clauses.append(vis_filter.strip().lstrip("AND").strip()) params.extend(vis_params) where_clause = "WHERE " + " AND ".join(where_clauses) # Tag filtering CTE (AND logic - must have ALL specified tags) tag_cte = "" tag_join = "" if tags: tag_placeholders = ",".join(["?" for _ in tags]) tag_cte = f""" tag_matches AS ( SELECT tools.id FROM tools, json_each(tools.tags) AS tag WHERE tag.value IN ({tag_placeholders}) GROUP BY tools.id HAVING COUNT(DISTINCT tag.value) = ? ), """ tag_join = "JOIN tag_matches tm ON m.id = tm.id" # Prepend tag params params = tags + [len(tags)] + params order_dir = "DESC" if order == "desc" else "ASC" if sort == "relevance": order_sql = f"rank {order_dir}, downloads DESC, published_at DESC, id DESC" else: order_sql = f"{sort} {order_dir}, published_at DESC, id DESC" rows = query_all( g.db, f""" WITH {tag_cte} matches AS ( SELECT tools.*, bm25(tools_fts) AS rank FROM tools_fts JOIN tools ON tools_fts.rowid = tools.id {where_clause} ), filtered AS ( SELECT m.* FROM matches m {tag_join} ), latest_any AS ( SELECT owner, name, MAX(id) AS max_id FROM filtered GROUP BY owner, name ), latest_stable AS ( SELECT owner, name, MAX(id) AS max_id FROM filtered WHERE version NOT LIKE '%-%' GROUP BY owner, name ) SELECT f.* FROM filtered f JOIN ( SELECT a.owner, a.name, COALESCE(s.max_id, a.max_id) AS max_id FROM latest_any a LEFT JOIN latest_stable s ON s.owner = a.owner AND s.name = a.name ) latest ON f.owner = latest.owner AND f.name = latest.name AND f.id = latest.max_id ORDER BY {order_sql} LIMIT ? OFFSET ? """, params + [per_page, offset], ) # Count query (reuse same params without pagination) count_row = query_one( g.db, f""" WITH {tag_cte} matches AS ( SELECT tools.* FROM tools_fts JOIN tools ON tools_fts.rowid = tools.id {where_clause} ), filtered AS ( SELECT m.* FROM matches m {tag_join} ) SELECT COUNT(DISTINCT owner || '/' || name) AS total FROM filtered """, params, ) total = int(count_row["total"]) if count_row else 0 data = [] for row in rows: score = 1.0 / (1.0 + row["rank"]) if row["rank"] is not None else None data.append({ "owner": row["owner"], "name": row["name"], "version": row["version"], "description": row["description"], "category": row["category"], "tags": json.loads(row["tags"] or "[]"), "downloads": row["downloads"], "published_at": row["published_at"], "score": score, }) result: dict = {"data": data, "meta": paginate(page, per_page, total)} # Compute facets if requested if include_facets: # Category facets cat_rows = query_all( g.db, f""" WITH {tag_cte} matches AS ( SELECT tools.* FROM tools_fts JOIN tools ON tools_fts.rowid = tools.id {where_clause} ), filtered AS ( SELECT m.* FROM matches m {tag_join} ) SELECT category, COUNT(DISTINCT owner || '/' || name) AS count FROM filtered WHERE category IS NOT NULL GROUP BY category ORDER BY count DESC LIMIT 20 """, params, ) # Tag facets tag_rows = query_all( g.db, f""" WITH {tag_cte} matches AS ( SELECT tools.* FROM tools_fts JOIN tools ON tools_fts.rowid = tools.id {where_clause} ), filtered AS ( SELECT m.* FROM matches m {tag_join} ) SELECT tag.value AS name, COUNT(DISTINCT filtered.owner || '/' || filtered.name) AS count FROM filtered, json_each(filtered.tags) AS tag GROUP BY tag.value ORDER BY count DESC LIMIT 30 """, params, ) # Owner facets owner_rows = query_all( g.db, f""" WITH {tag_cte} matches AS ( SELECT tools.* FROM tools_fts JOIN tools ON tools_fts.rowid = tools.id {where_clause} ), filtered AS ( SELECT m.* FROM matches m {tag_join} ) SELECT owner, COUNT(DISTINCT owner || '/' || name) AS count FROM filtered GROUP BY owner ORDER BY count DESC LIMIT 20 """, params, ) result["facets"] = { "categories": [{"name": r["category"], "count": r["count"]} for r in cat_rows], "tags": [{"name": r["name"], "count": r["count"]} for r in tag_rows], "owners": [{"name": r["owner"], "count": r["count"]} for r in owner_rows], } return jsonify(result) @app.route("/api/v1/tools//", methods=["GET"]) def get_tool(owner: str, name: str) -> Response: if not OWNER_RE.match(owner) or not TOOL_NAME_RE.match(name): return error_response("VALIDATION_ERROR", "Invalid owner or tool name") version = request.args.get("version") if version: row = load_tool_row(owner, name, version) else: row = resolve_tool(owner, name, "*") if not row: return error_response("TOOL_NOT_FOUND", f"Tool '{owner}/{name}' does not exist", 404) # Check visibility permissions user_slug, user_role = get_current_user_context() is_elevated = user_role in ("moderator", "admin") is_owner = user_slug == row["owner"] visibility = row.get("visibility", "public") moderation_status = row.get("moderation_status", "approved") # Public tools require approval, unlisted tools accessible by direct link, private only to owner is_approved = moderation_status == "approved" is_public_approved = visibility == "public" and is_approved is_unlisted_approved = visibility == "unlisted" and is_approved is_accessible = is_elevated or is_owner or is_public_approved or is_unlisted_approved if not is_accessible: return error_response("TOOL_NOT_FOUND", f"Tool '{owner}/{name}' does not exist", 404) # Parse source attribution source_obj = None if row["source_json"]: try: source_obj = json.loads(row["source_json"]) except (json.JSONDecodeError, TypeError): pass # Fall back to legacy fields if no source_json if not source_obj and (row["source"] or row["source_url"]): source_obj = { "type": "imported", "original_tool": row["source"], "url": row["source_url"], } # Count forks of this tool fork_count = 0 fork_pattern = f"{row['owner']}/{row['name']}" fork_row = query_one( g.db, "SELECT COUNT(DISTINCT owner || '/' || name) as cnt FROM tools WHERE forked_from = ? AND moderation_status = 'approved'", [fork_pattern] ) if fork_row: fork_count = fork_row["cnt"] payload = { "owner": row["owner"], "name": row["name"], "version": row["version"], "description": row["description"], "category": row["category"], "tags": json.loads(row["tags"] or "[]"), "downloads": row["downloads"], "published_at": row["published_at"], "deprecated": bool(row["deprecated"]), "deprecated_message": row["deprecated_message"], "replacement": row["replacement"], "config": row["config_yaml"], "readme": row["readme"], "source": source_obj, "forked_from": row.get("forked_from"), "forked_version": row.get("forked_version"), "fork_count": fork_count, } response = jsonify({"data": payload}) response.headers["Cache-Control"] = "max-age=60" return response def resolve_tool(owner: str, name: str, constraint: Optional[str]) -> Optional[dict]: rows = query_all(g.db, "SELECT * FROM tools WHERE owner = ? AND name = ?", [owner, name]) if not rows: return None versions = [row["version"] for row in rows] selected = select_version(versions, constraint) if not selected: return None for row in rows: if row["version"] == selected: return dict(row) return None @app.route("/api/v1/tools///versions", methods=["GET"]) def list_tool_versions(owner: str, name: str) -> Response: if not OWNER_RE.match(owner) or not TOOL_NAME_RE.match(name): return error_response("VALIDATION_ERROR", "Invalid owner or tool name") rows = query_all(g.db, "SELECT version FROM tools WHERE owner = ? AND name = ? ORDER BY id DESC", [owner, name]) if not rows: return error_response("TOOL_NOT_FOUND", f"Tool '{owner}/{name}' does not exist", 404) versions = [row["version"] for row in rows] return jsonify({"data": {"versions": versions}}) @app.route("/api/v1/tools///forks", methods=["GET"]) def list_tool_forks(owner: str, name: str) -> Response: """List all forks of a tool.""" if not OWNER_RE.match(owner) or not TOOL_NAME_RE.match(name): return error_response("VALIDATION_ERROR", "Invalid owner or tool name") # Check if the original tool exists original = query_one( g.db, "SELECT id FROM tools WHERE owner = ? AND name = ? LIMIT 1", [owner, name] ) if not original: return error_response("TOOL_NOT_FOUND", f"Tool '{owner}/{name}' does not exist", 404) # Find all tools that were forked from this one fork_pattern = f"{owner}/{name}" rows = query_all( g.db, """ SELECT DISTINCT owner, name, description, forked_version, (SELECT MAX(version) FROM tools t2 WHERE t2.owner = t.owner AND t2.name = t.name) as latest_version, (SELECT SUM(downloads) FROM tools t2 WHERE t2.owner = t.owner AND t2.name = t.name) as total_downloads FROM tools t WHERE forked_from = ? AND moderation_status = 'approved' ORDER BY total_downloads DESC """, [fork_pattern] ) forks = [ { "owner": row["owner"], "name": row["name"], "description": row["description"], "forked_version": row["forked_version"], "latest_version": row["latest_version"], "downloads": row["total_downloads"] or 0, } for row in rows ] return jsonify({ "data": { "forks": forks, "fork_count": len(forks), } }) @app.route("/api/v1/tools///download", methods=["GET"]) def download_tool(owner: str, name: str) -> Response: if not OWNER_RE.match(owner) or not TOOL_NAME_RE.match(name): return error_response("VALIDATION_ERROR", "Invalid owner or tool name") constraint = request.args.get("version") install_flag = request.args.get("install", "false").lower() == "true" row = resolve_tool(owner, name, constraint) if not row: available = [r["version"] for r in query_all(g.db, "SELECT version FROM tools WHERE owner = ? AND name = ?", [owner, name])] return error_response( "VERSION_NOT_FOUND", f"No version of '{owner}/{name}' satisfies constraint '{constraint or '*'}'", 404, details={ "tool": f"{owner}/{name}", "constraint": constraint or "*", "available_versions": available, "latest_stable": select_version(available, "*") if available else None, }, ) # Check visibility permissions user_slug, user_role = get_current_user_context() is_elevated = user_role in ("moderator", "admin") is_owner = user_slug == row["owner"] visibility = row.get("visibility", "public") moderation_status = row.get("moderation_status", "approved") # Public tools require approval, unlisted tools accessible by direct link, private only to owner is_approved = moderation_status == "approved" is_public_approved = visibility == "public" and is_approved is_unlisted_approved = visibility == "unlisted" and is_approved is_accessible = is_elevated or is_owner or is_public_approved or is_unlisted_approved if not is_accessible: return error_response("TOOL_NOT_FOUND", f"Tool '{owner}/{name}' does not exist", 404) if install_flag: client_id = request.headers.get("X-Client-ID") if not client_id: client_id = f"anon_{hash(request.remote_addr)}" today = date.today().isoformat() try: g.db.execute( "INSERT INTO download_stats (tool_id, client_id, downloaded_at) VALUES (?, ?, ?)", [row["id"], client_id, today], ) g.db.execute("UPDATE tools SET downloads = downloads + 1 WHERE id = ?", [row["id"]]) g.db.commit() except Exception: g.db.rollback() response = jsonify({ "data": { "owner": row["owner"], "name": row["name"], "resolved_version": row["version"], "config": row["config_yaml"], "readme": row["readme"] or "", "config_hash": row.get("config_hash") or "", } }) response.headers["Cache-Control"] = "max-age=3600, immutable" return response @app.route("/api/v1/categories", methods=["GET"]) def list_categories() -> Response: page, per_page, sort, order, error = parse_pagination("/categories", "name") if error: return error cache_path = get_categories_cache_path() categories_payload = None if cache_path.exists(): categories_payload = json.loads(cache_path.read_text(encoding="utf-8")) else: categories_yaml = get_repo_dir() / "categories" / "categories.yaml" if categories_yaml.exists(): categories_payload = yaml.safe_load(categories_yaml.read_text(encoding="utf-8")) or {} predefined_categories = (categories_payload or {}).get("categories", []) # Get counts for all categories in the database (filtered by visibility) vis_filter, vis_params = build_visibility_filter() counts = query_all( g.db, f"SELECT category, COUNT(DISTINCT owner || '/' || name) AS total FROM tools WHERE 1=1 {vis_filter} GROUP BY category", vis_params, ) count_map = {row["category"]: row["total"] for row in counts} # Calculate total tools across all categories total_tools = sum(row["total"] for row in counts) # Build data from predefined categories predefined_names = set() data = [] for cat in predefined_categories: name = cat.get("name") if not name: continue predefined_names.add(name) data.append({ "name": name, "description": cat.get("description"), "icon": cat.get("icon"), "tool_count": count_map.get(name, 0), }) # Add any categories from database that aren't in predefined list for category_name, count in count_map.items(): if category_name and category_name not in predefined_names: # Auto-generate display info for dynamic categories display_name = category_name.replace("-", " ").title() data.append({ "name": category_name, "description": f"Tools in the {display_name} category", "icon": None, "tool_count": count, }) reverse = order == "desc" if sort == "tool_count": data.sort(key=lambda item: item["tool_count"], reverse=reverse) else: data.sort(key=lambda item: item["name"], reverse=reverse) total = len(data) start = (page - 1) * per_page end = start + per_page sliced = data[start:end] meta = paginate(page, per_page, total) meta["total_tools"] = total_tools # Add total tools count to meta response = jsonify({"data": sliced, "meta": meta}) response.headers["Cache-Control"] = "max-age=3600" return response @app.route("/api/v1/tags", methods=["GET"]) def list_tags() -> Response: """List all tags with usage counts.""" category = request.args.get("category") limit = min(int(request.args.get("limit", 100)), 500) # Build visibility filter vis_filter, vis_params = build_visibility_filter("tools") # Build query - extract tags from JSON array and count occurrences if category: rows = query_all( g.db, f""" SELECT tag.value AS name, COUNT(DISTINCT tools.owner || '/' || tools.name) AS count FROM tools, json_each(tools.tags) AS tag WHERE tools.category = ? {vis_filter} GROUP BY tag.value ORDER BY count DESC LIMIT ? """, [category] + vis_params + [limit], ) else: rows = query_all( g.db, f""" SELECT tag.value AS name, COUNT(DISTINCT tools.owner || '/' || tools.name) AS count FROM tools, json_each(tools.tags) AS tag WHERE 1=1 {vis_filter} GROUP BY tag.value ORDER BY count DESC LIMIT ? """, vis_params + [limit], ) data = [{"name": row["name"], "count": row["count"]} for row in rows] response = jsonify({"data": data, "meta": {"total": len(data)}}) response.headers["Cache-Control"] = "max-age=3600" return response # ─── Collections ───────────────────────────────────────────────────────────── @app.route("/api/v1/collections", methods=["GET"]) def list_collections() -> Response: """List all collections.""" rows = query_all( g.db, "SELECT * FROM collections ORDER BY name", ) data = [] for row in rows: tools = json.loads(row["tools"]) if row["tools"] else [] tags = json.loads(row["tags"]) if row["tags"] else [] data.append({ "name": row["name"], "display_name": row["display_name"], "description": row["description"], "icon": row["icon"], "maintainer": row["maintainer"], "tool_count": len(tools), "tags": tags, }) response = jsonify({"data": data}) response.headers["Cache-Control"] = "max-age=3600" return response @app.route("/api/v1/collections/", methods=["GET"]) def get_collection(name: str) -> Response: """Get collection details with tool information.""" row = query_one(g.db, "SELECT * FROM collections WHERE name = ?", [name]) if not row: return error_response("COLLECTION_NOT_FOUND", f"Collection '{name}' not found", 404) tools_refs = json.loads(row["tools"]) if row["tools"] else [] pinned = json.loads(row["pinned"]) if row["pinned"] else {} tags = json.loads(row["tags"]) if row["tags"] else [] # Fetch tool details for each tool in the collection tools_data = [] for ref in tools_refs: parts = ref.split("/") if len(parts) != 2: continue owner, tool_name = parts tool_row = query_one( g.db, """ SELECT * FROM tools WHERE owner = ? AND name = ? AND version NOT LIKE '%-%' ORDER BY id DESC LIMIT 1 """, [owner, tool_name], ) if tool_row: tools_data.append({ "owner": tool_row["owner"], "name": tool_row["name"], "version": tool_row["version"], "description": tool_row["description"], "category": tool_row["category"], "downloads": tool_row["downloads"], "pinned_version": pinned.get(ref), }) else: # Tool not found in registry tools_data.append({ "owner": owner, "name": tool_name, "version": None, "description": None, "category": None, "downloads": 0, "pinned_version": pinned.get(ref), "missing": True, }) response = jsonify({ "data": { "name": row["name"], "display_name": row["display_name"], "description": row["description"], "icon": row["icon"], "maintainer": row["maintainer"], "tools": tools_data, "tags": tags, } }) response.headers["Cache-Control"] = "max-age=3600" return response # ------------------------------------------------------------------------- # Admin Collections Management # ------------------------------------------------------------------------- @app.route("/api/v1/admin/collections", methods=["GET"]) @require_admin def admin_list_collections() -> Response: """List all collections with full details (admin).""" rows = query_all( g.db, "SELECT * FROM collections ORDER BY name", ) data = [] for row in rows: tools = json.loads(row["tools"]) if row["tools"] else [] pinned = json.loads(row["pinned"]) if row["pinned"] else {} tags = json.loads(row["tags"]) if row["tags"] else [] data.append({ "id": row["id"], "name": row["name"], "display_name": row["display_name"], "description": row["description"], "icon": row["icon"], "maintainer": row["maintainer"], "tools": tools, "pinned": pinned, "tags": tags, "created_at": row["created_at"], "updated_at": row["updated_at"], }) return jsonify({"data": data}) @app.route("/api/v1/admin/collections", methods=["POST"]) @require_admin def admin_create_collection() -> Response: """Create a new collection (admin).""" data = request.get_json() or {} name = data.get("name", "").strip().lower() display_name = data.get("display_name", "").strip() description = data.get("description", "").strip() icon = data.get("icon", "").strip() maintainer = data.get("maintainer", g.current_publisher.get("slug", "")).strip() tools = data.get("tools", []) pinned = data.get("pinned", {}) tags = data.get("tags", []) # Validation if not name: return error_response("INVALID_INPUT", "Collection name is required", 400) if not display_name: display_name = name.replace("-", " ").title() if not isinstance(tools, list): return error_response("INVALID_INPUT", "Tools must be a list", 400) # Check for duplicate existing = query_one(g.db, "SELECT id FROM collections WHERE name = ?", [name]) if existing: return error_response("DUPLICATE", f"Collection '{name}' already exists", 409) # Validate tool references validated_tools = [] for tool_ref in tools: if isinstance(tool_ref, str) and "/" in tool_ref: validated_tools.append(tool_ref) try: g.db.execute( """ INSERT INTO collections (name, display_name, description, icon, maintainer, tools, pinned, tags) VALUES (?, ?, ?, ?, ?, ?, ?, ?) """, [ name, display_name, description, icon, maintainer, json.dumps(validated_tools), json.dumps(pinned) if pinned else None, json.dumps(tags) if tags else None, ], ) g.db.commit() # Audit log log_audit("create_collection", "collection", name, {"collection": name}) return jsonify({ "success": True, "message": f"Collection '{name}' created", "name": name, }), 201 except Exception as e: return error_response("SERVER_ERROR", str(e), 500) @app.route("/api/v1/admin/collections/", methods=["PUT"]) @require_admin def admin_update_collection(name: str) -> Response: """Update a collection (admin).""" existing = query_one(g.db, "SELECT * FROM collections WHERE name = ?", [name]) if not existing: return error_response("COLLECTION_NOT_FOUND", f"Collection '{name}' not found", 404) data = request.get_json() or {} # Get updated values (keep existing if not provided) display_name = data.get("display_name", existing["display_name"]) description = data.get("description", existing["description"]) icon = data.get("icon", existing["icon"]) maintainer = data.get("maintainer", existing["maintainer"]) # Handle tools - parse existing if not provided in request if "tools" in data: tools = data["tools"] if not isinstance(tools, list): return error_response("INVALID_INPUT", "Tools must be a list", 400) validated_tools = [t for t in tools if isinstance(t, str) and "/" in t] else: validated_tools = json.loads(existing["tools"]) if existing["tools"] else [] # Handle pinned versions if "pinned" in data: pinned = data["pinned"] if isinstance(data["pinned"], dict) else {} else: pinned = json.loads(existing["pinned"]) if existing["pinned"] else {} # Handle tags if "tags" in data: tags = data["tags"] if isinstance(data["tags"], list) else [] else: tags = json.loads(existing["tags"]) if existing["tags"] else [] try: g.db.execute( """ UPDATE collections SET display_name = ?, description = ?, icon = ?, maintainer = ?, tools = ?, pinned = ?, tags = ?, updated_at = CURRENT_TIMESTAMP WHERE name = ? """, [ display_name, description, icon, maintainer, json.dumps(validated_tools), json.dumps(pinned) if pinned else None, json.dumps(tags) if tags else None, name, ], ) g.db.commit() # Audit log log_audit("update_collection", "collection", name, {"collection": name}) return jsonify({ "success": True, "message": f"Collection '{name}' updated", }) except Exception as e: return error_response("SERVER_ERROR", str(e), 500) @app.route("/api/v1/admin/collections/", methods=["DELETE"]) @require_admin def admin_delete_collection(name: str) -> Response: """Delete a collection (admin).""" existing = query_one(g.db, "SELECT id FROM collections WHERE name = ?", [name]) if not existing: return error_response("COLLECTION_NOT_FOUND", f"Collection '{name}' not found", 404) try: g.db.execute("DELETE FROM collections WHERE name = ?", [name]) g.db.commit() # Audit log log_audit("delete_collection", "collection", name, {"collection": name}) return jsonify({ "success": True, "message": f"Collection '{name}' deleted", }) except Exception as e: return error_response("SERVER_ERROR", str(e), 500) @app.route("/api/v1/stats/popular", methods=["GET"]) def popular_tools() -> Response: limit = min(int(request.args.get("limit", 10)), 50) rows = query_all( g.db, """ WITH latest AS ( SELECT owner, name, MAX(id) AS max_id FROM tools WHERE version NOT LIKE '%-%' GROUP BY owner, name ) SELECT t.* FROM tools t JOIN latest ON t.owner = latest.owner AND t.name = latest.name AND t.id = latest.max_id ORDER BY t.downloads DESC, t.published_at DESC LIMIT ? """, [limit], ) data = [] for row in rows: data.append({ "owner": row["owner"], "name": row["name"], "version": row["version"], "description": row["description"], "category": row["category"], "tags": json.loads(row["tags"] or "[]"), "downloads": row["downloads"], "published_at": row["published_at"], }) return jsonify({"data": data}) @app.route("/api/v1/index.json", methods=["GET"]) def get_index() -> Response: rows = query_all( g.db, """ WITH latest AS ( SELECT owner, name, MAX(id) AS max_id FROM tools WHERE version NOT LIKE '%-%' GROUP BY owner, name ) SELECT t.owner, t.name, t.version, t.description, t.category, t.tags, t.downloads FROM tools t JOIN latest ON t.owner = latest.owner AND t.name = latest.name AND t.id = latest.max_id ORDER BY t.downloads DESC """, ) tools = [] for row in rows: tools.append({ "owner": row["owner"], "name": row["name"], "version": row["version"], "description": row["description"], "category": row["category"], "tags": json.loads(row["tags"] or "[]"), "downloads": row["downloads"], }) # Generate checksum for integrity verification content = json.dumps(tools, sort_keys=True) checksum = "sha256:" + hashlib.sha256(content.encode()).hexdigest() payload = { "version": "1.0", "generated_at": datetime.utcnow().isoformat() + "Z", "checksum": checksum, "tool_count": len(tools), "tools": tools, } response = jsonify(payload) response.headers["Cache-Control"] = "max-age=300, stale-while-revalidate=60" response.headers["ETag"] = f'"{checksum}"' return response @app.route("/api/v1/register", methods=["POST"]) def register() -> Response: if request.content_length and request.content_length > MAX_BODY_BYTES: return error_response("PAYLOAD_TOO_LARGE", "Request body exceeds 512KB limit", 413) payload = request.get_json(silent=True) or {} email = (payload.get("email") or "").strip() password = payload.get("password") or "" slug = (payload.get("slug") or "").strip() display_name = (payload.get("display_name") or "").strip() if not email or not EMAIL_RE.match(email): return error_response("VALIDATION_ERROR", "Invalid email format") if not password or len(password) < 8: return error_response("VALIDATION_ERROR", "Password must be at least 8 characters") if not slug or not OWNER_RE.match(slug) or len(slug) < 2 or len(slug) > 39: return error_response("VALIDATION_ERROR", "Invalid slug format") if slug in RESERVED_SLUGS: return error_response("SLUG_TAKEN", f"Slug '{slug}' is reserved", 409) if not display_name: return error_response("VALIDATION_ERROR", "Display name is required") existing_email = query_one(g.db, "SELECT id FROM publishers WHERE email = ?", [email]) if existing_email: return error_response("VALIDATION_ERROR", "Email already registered") existing_slug = query_one(g.db, "SELECT id FROM publishers WHERE slug = ?", [slug]) if existing_slug: return error_response("SLUG_TAKEN", f"Slug '{slug}' is already taken", 409) password_hash = password_hasher.hash(password) g.db.execute( """ INSERT INTO publishers (email, password_hash, slug, display_name, verified) VALUES (?, ?, ?, ?, ?) """, [email, password_hash, slug, display_name, False], ) g.db.commit() publisher_id = query_one(g.db, "SELECT id FROM publishers WHERE slug = ?", [slug])["id"] response = jsonify({ "data": { "id": publisher_id, "slug": slug, "display_name": display_name, "email": email, } }) response.status_code = 201 return response @app.route("/api/v1/login", methods=["POST"]) def login() -> Response: if request.content_length and request.content_length > MAX_BODY_BYTES: return error_response("PAYLOAD_TOO_LARGE", "Request body exceeds 512KB limit", 413) payload = request.get_json(silent=True) or {} email = (payload.get("email") or "").strip() password = payload.get("password") or "" if not email or not password: return error_response("VALIDATION_ERROR", "Email and password are required") publisher = query_one( g.db, "SELECT * FROM publishers WHERE email = ?", [email], ) if not publisher: return error_response("UNAUTHORIZED", "Invalid credentials", 401) locked_until = publisher["locked_until"] if locked_until: try: locked_dt = datetime.fromisoformat(locked_until) if datetime.utcnow() < locked_dt: return error_response("ACCOUNT_LOCKED", "Account is locked", 403) except ValueError: pass try: password_hasher.verify(publisher["password_hash"], password) except VerifyMismatchError: ip = request.headers.get("X-Forwarded-For", request.remote_addr or "unknown") rate_key = f"{ip}:{email}:login_failed" limit_config = RATE_LIMITS["login_failed"] allowed, _ = rate_limiter.check(rate_key, limit_config["limit"], limit_config["window"]) attempts = int(publisher["failed_login_attempts"] or 0) + 1 locked = None if attempts >= 10: locked = datetime.utcnow() + timedelta(hours=1) elif attempts >= 5: locked = datetime.utcnow() + timedelta(minutes=15) g.db.execute( "UPDATE publishers SET failed_login_attempts = ?, locked_until = ? WHERE id = ?", [attempts, locked.isoformat() if locked else None, publisher["id"]], ) g.db.commit() if not allowed: return error_response("RATE_LIMITED", "Too many failed logins. Try again later.", 429) return error_response("UNAUTHORIZED", "Invalid credentials", 401) g.db.execute( "UPDATE publishers SET failed_login_attempts = 0, locked_until = NULL WHERE id = ?", [publisher["id"]], ) # Delete any existing session tokens for this user (cleanup) g.db.execute( "DELETE FROM api_tokens WHERE publisher_id = ? AND name = 'Web Session'", [publisher["id"]], ) token, token_hash = generate_token() g.db.execute( """ INSERT INTO api_tokens (publisher_id, token_hash, name, created_at) VALUES (?, ?, ?, ?) """, [publisher["id"], token_hash, "Web Session", datetime.utcnow().isoformat()], ) g.db.commit() return jsonify({ "data": { "token": token, "publisher": { "slug": publisher["slug"], "display_name": publisher["display_name"], }, } }) @app.route("/api/v1/tokens", methods=["POST"]) @require_token def create_token() -> Response: if request.content_length and request.content_length > MAX_BODY_BYTES: return error_response("PAYLOAD_TOO_LARGE", "Request body exceeds 512KB limit", 413) rate_resp = enforce_token_rate_limit("tokens", g.current_token["hash"]) if rate_resp: return rate_resp payload = request.get_json(silent=True) or {} name = (payload.get("name") or "CLI token").strip() token, token_hash = generate_token() now = datetime.utcnow().isoformat() g.db.execute( "INSERT INTO api_tokens (publisher_id, token_hash, name, created_at) VALUES (?, ?, ?, ?)", [g.current_publisher["id"], token_hash, name, now], ) g.db.commit() response = jsonify({ "data": { "token": token, "name": name, "created_at": now, } }) response.status_code = 201 return response @app.route("/api/v1/tokens", methods=["GET"]) @require_token def list_tokens() -> Response: rows = query_all( g.db, """ SELECT id, name, created_at, last_used_at FROM api_tokens WHERE publisher_id = ? AND revoked_at IS NULL ORDER BY created_at DESC """, [g.current_publisher["id"]], ) data = [] for row in rows: data.append({ "id": row["id"], "name": row["name"], "created_at": row["created_at"], "last_used_at": row["last_used_at"], }) return jsonify({"data": data}) @app.route("/api/v1/tokens/", methods=["DELETE"]) @require_token def revoke_token(token_id: int) -> Response: row = query_one( g.db, "SELECT id FROM api_tokens WHERE id = ? AND publisher_id = ?", [token_id, g.current_publisher["id"]], ) if not row: return error_response("FORBIDDEN", "Cannot revoke this token", 403) g.db.execute( "UPDATE api_tokens SET revoked_at = ? WHERE id = ?", [datetime.utcnow().isoformat(), token_id], ) g.db.commit() return jsonify({"data": {"revoked": True}}) # ─── App Pairing (Connect Flow) ────────────────────────────────────────────── @app.route("/api/v1/pairing/initiate", methods=["POST"]) @require_token def initiate_pairing() -> Response: """Initiate a pairing request from the website. Creates a pending pairing code.""" import secrets # Clean up old expired pairing requests g.db.execute( "DELETE FROM pairing_requests WHERE expires_at < ? OR status = 'claimed'", [datetime.utcnow().isoformat()], ) g.db.commit() # Cancel any existing pending requests for this user g.db.execute( "UPDATE pairing_requests SET status = 'cancelled' WHERE publisher_id = ? AND status = 'pending'", [g.current_publisher["id"]], ) # Generate pairing code and token pairing_code = secrets.token_urlsafe(16) token, token_hash = generate_token() expires_at = (datetime.utcnow() + timedelta(minutes=5)).isoformat() g.db.execute( """ INSERT INTO pairing_requests (publisher_id, pairing_code, token_hash, token_plain, expires_at) VALUES (?, ?, ?, ?, ?) """, [g.current_publisher["id"], pairing_code, token_hash, token, expires_at], ) g.db.commit() return jsonify({ "data": { "status": "pending", "username": g.current_publisher["slug"], "expires_in": 300, } }) @app.route("/api/v1/pairing/check/", methods=["GET"]) def check_pairing(username: str) -> Response: """CLI polls this to check if pairing is ready. Returns token if ready.""" hostname = request.args.get("hostname", "Unknown Device") # Find pending pairing for this username row = query_one( g.db, """ SELECT pr.*, p.slug FROM pairing_requests pr JOIN publishers p ON pr.publisher_id = p.id WHERE p.slug = ? AND pr.status = 'pending' AND pr.expires_at > ? ORDER BY pr.created_at DESC LIMIT 1 """, [username, datetime.utcnow().isoformat()], ) if not row: return jsonify({"data": {"status": "not_found"}}) # Claim the pairing g.db.execute( """ UPDATE pairing_requests SET status = 'claimed', hostname = ?, claimed_at = ? WHERE id = ? """, [hostname, datetime.utcnow().isoformat(), row["id"]], ) # Create the actual API token with hostname g.db.execute( """ INSERT INTO api_tokens (publisher_id, token_hash, name, hostname, created_at) VALUES (?, ?, ?, ?, ?) """, [row["publisher_id"], row["token_hash"], f"App: {hostname}", hostname, datetime.utcnow().isoformat()], ) g.db.commit() return jsonify({ "data": { "status": "connected", "token": row["token_plain"], "username": username, } }) @app.route("/api/v1/pairing/status", methods=["GET"]) @require_token def pairing_status() -> Response: """Check if there's an active pairing request for the current user.""" row = query_one( g.db, """ SELECT status, expires_at, created_at FROM pairing_requests WHERE publisher_id = ? AND status = 'pending' AND expires_at > ? ORDER BY created_at DESC LIMIT 1 """, [g.current_publisher["id"], datetime.utcnow().isoformat()], ) if not row: return jsonify({"data": {"status": "none"}}) return jsonify({ "data": { "status": "pending", "expires_at": row["expires_at"], } }) @app.route("/api/v1/connected-apps", methods=["GET"]) @require_token def list_connected_apps() -> Response: """List connected apps (tokens with hostnames) for the current user.""" rows = query_all( g.db, """ SELECT id, name, hostname, created_at, last_used_at FROM api_tokens WHERE publisher_id = ? AND revoked_at IS NULL ORDER BY last_used_at DESC NULLS LAST, created_at DESC """, [g.current_publisher["id"]], ) data = [] for row in rows: data.append({ "id": row["id"], "name": row["name"], "hostname": row["hostname"] or "Unknown", "created_at": row["created_at"], "last_used_at": row["last_used_at"], }) return jsonify({"data": data}) @app.route("/api/v1/connected-apps/", methods=["DELETE"]) @require_token def disconnect_app(app_id: int) -> Response: """Disconnect (revoke) an app.""" row = query_one( g.db, "SELECT id FROM api_tokens WHERE id = ? AND publisher_id = ?", [app_id, g.current_publisher["id"]], ) if not row: return error_response("NOT_FOUND", "App not found", 404) g.db.execute( "UPDATE api_tokens SET revoked_at = ? WHERE id = ?", [datetime.utcnow().isoformat(), app_id], ) g.db.commit() return jsonify({"data": {"disconnected": True}}) @app.route("/api/v1/tools", methods=["POST"]) @require_token def publish_tool() -> Response: if request.content_length and request.content_length > MAX_BODY_BYTES: return error_response("PAYLOAD_TOO_LARGE", "Request body exceeds 512KB limit", 413) rate_resp = enforce_token_rate_limit("publish", g.current_token["hash"]) if rate_resp: return rate_resp payload = request.get_json(silent=True) or {} config_text = payload.get("config") or "" readme = payload.get("readme") or "" dry_run = bool(payload.get("dry_run")) size_resp = validate_payload_size("config", config_text, MAX_CONFIG_BYTES) if size_resp: return size_resp if readme: size_resp = validate_payload_size("readme", readme, MAX_README_BYTES) if size_resp: return size_resp try: data = yaml.safe_load(config_text) or {} except yaml.YAMLError: return error_response("VALIDATION_ERROR", "Invalid YAML in config") name = (data.get("name") or "").strip() version = (data.get("version") or "").strip() description = (data.get("description") or "").strip() category = (data.get("category") or "").strip() or None tags = data.get("tags") or [] # Parse visibility from payload or config YAML visibility = (payload.get("visibility") or data.get("visibility") or "public").strip().lower() if visibility not in ("public", "private", "unlisted"): return error_response("VALIDATION_ERROR", "Invalid visibility. Must be: public, private, unlisted") # Handle source attribution - can be a dict (full ToolSource) or string (legacy) source_data = data.get("source") source_json = None source = None source_url = (data.get("source_url") or "").strip() or None if isinstance(source_data, dict): # Full source object from tool YAML source_json = json.dumps(source_data) # Keep legacy fields for backward compat source = source_data.get("original_tool") or source_data.get("author") source_url = source_data.get("url") or source_url elif isinstance(source_data, str) and source_data.strip(): # Legacy string format source = source_data.strip() # Create a minimal source_json for consistency source_json = json.dumps({"type": "imported", "original_tool": source, "url": source_url}) # Extract fork metadata forked_from = (data.get("forked_from") or "").strip() or None forked_version = (data.get("forked_version") or "").strip() or None if not name or not TOOL_NAME_RE.match(name) or len(name) > MAX_TOOL_NAME_LEN: return error_response("VALIDATION_ERROR", "Invalid tool name") if not version or Semver.parse(version) is None: return error_response("INVALID_VERSION", "Version string is not valid semver") if description and len(description) > MAX_DESC_LEN: return error_response("VALIDATION_ERROR", "Description exceeds 500 characters") if tags: if not isinstance(tags, list): return error_response("VALIDATION_ERROR", "Tags must be a list") if len(tags) > MAX_TAGS: return error_response("VALIDATION_ERROR", "Too many tags") for tag in tags: if len(str(tag)) > MAX_TAG_LEN: return error_response("VALIDATION_ERROR", "Tag exceeds 32 characters") owner = g.current_publisher["slug"] # Compute config hash early for idempotency check config_hash = compute_yaml_hash(config_text) existing = query_one( g.db, "SELECT published_at, config_hash, moderation_status, visibility FROM tools WHERE owner = ? AND name = ? AND version = ?", [owner, name, version], ) if existing: # If same content (config_hash matches), treat as idempotent retry - return success if existing["config_hash"] == config_hash: return jsonify({ "data": { "owner": owner, "name": name, "version": version, "config_hash": config_hash, "pr_url": "", "status": existing["moderation_status"], "visibility": existing["visibility"], "already_published": True, } }) # Different content - actual version conflict return error_response( "VERSION_EXISTS", f"Version {version} already exists with different content. Use a new version number.", 409, details={"published_at": existing["published_at"]}, ) suggestions = {"category": None, "similar_tools": []} try: from .categorize import suggest_categories from .similarity import find_similar_tools categories_path = get_repo_dir() / "categories" / "categories.yaml" if not category and categories_path.exists(): ranked = suggest_categories(name, description, tags, categories_path) if ranked: suggestions["category"] = { "suggested": ranked[0][0], "confidence": ranked[0][1], } rows = query_all( g.db, "SELECT owner, name, description, category, tags FROM tools", ) existing = [] for row in rows: try: existing.append({ "owner": row["owner"], "name": row["name"], "description": row["description"] or "", "category": row["category"], "tags": json.loads(row["tags"] or "[]"), }) except Exception: continue similar = find_similar_tools(existing, name, description, tags, category) suggestions["similar_tools"] = [ {"name": f"{tool['owner']}/{tool['name']}", "similarity": score} for tool, score in similar[:5] ] except Exception: pass # Run automated scrutiny scrutiny_report = None try: from .scrutiny import scrutinize_tool scrutiny_report = scrutinize_tool(config_text, description or "", readme) except Exception: pass # Check scrutiny decision if scrutiny_report: suggestions["scrutiny"] = scrutiny_report if scrutiny_report.get("decision") == "reject": # Find the failing check for error message fail_findings = [f for f in scrutiny_report.get("findings", []) if f.get("result") == "fail"] fail_msg = fail_findings[0]["message"] if fail_findings else "Tool failed automated review" return error_response( "SCRUTINY_FAILED", f"Tool rejected: {fail_msg}", 400, details={"scrutiny": scrutiny_report}, ) # Run AI secondary review if there are warnings if scrutiny_report.get("decision") == "review": try: ai_review = run_ai_scrutiny_review(scrutiny_report, data, name, description) if ai_review: scrutiny_report["ai_review"] = ai_review # Update decision based on AI review if ai_review.get("overall_verdict") == "APPROVE" and ai_review.get("confidence", 0) >= 0.8: scrutiny_report["decision"] = "approve" scrutiny_report["ai_approved"] = True except Exception: # Don't fail publish if AI review fails pass if dry_run: return jsonify({ "data": { "owner": owner, "name": name, "version": version, "status": "validated", "suggestions": suggestions, } }) tags_json = json.dumps(tags) # config_hash already computed earlier for idempotency check # Determine status based on scrutiny if scrutiny_report and scrutiny_report.get("decision") == "approve": scrutiny_status = "approved" elif scrutiny_report and scrutiny_report.get("decision") == "review": scrutiny_status = "pending_review" else: scrutiny_status = "pending" scrutiny_json = json.dumps(scrutiny_report) if scrutiny_report else None # Determine moderation_status based on visibility # Private and unlisted tools are auto-approved (no moderation needed) if visibility in ("private", "unlisted"): moderation_status = "approved" else: # Public tools need moderation approval moderation_status = "pending" g.db.execute( """ INSERT INTO tools ( owner, name, version, description, category, tags, config_yaml, readme, publisher_id, deprecated, deprecated_message, replacement, downloads, scrutiny_status, scrutiny_report, source, source_url, source_json, config_hash, visibility, moderation_status, forked_from, forked_version, published_at ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) """, [ owner, name, version, description or None, category, tags_json, config_text, readme, g.current_publisher["id"], int(bool(data.get("deprecated"))), data.get("deprecated_message"), data.get("replacement"), 0, scrutiny_status, scrutiny_json, source, source_url, source_json, config_hash, visibility, moderation_status, forked_from, forked_version, datetime.utcnow().isoformat(), ], ) g.db.commit() response = jsonify({ "data": { "owner": owner, "name": name, "version": version, "config_hash": config_hash, "pr_url": "", "status": moderation_status, "visibility": visibility, "forked_from": forked_from, "forked_version": forked_version, "suggestions": suggestions, } }) response.status_code = 201 return response @app.route("/api/v1/me/tools", methods=["GET"]) @require_token def my_tools() -> Response: rows = query_all( g.db, """ SELECT owner, name, version, description, downloads, deprecated, deprecated_message, replacement, published_at FROM tools WHERE owner = ? ORDER BY published_at DESC """, [g.current_publisher["slug"]], ) data = [] for row in rows: data.append({ "owner": row["owner"], "name": row["name"], "version": row["version"], "description": row["description"], "downloads": row["downloads"], "deprecated": bool(row["deprecated"]), "deprecated_message": row["deprecated_message"], "replacement": row["replacement"], "published_at": row["published_at"], }) return jsonify({"data": data}) @app.route("/api/v1/me/tools//status", methods=["GET"]) @require_token def my_tool_status(name: str) -> Response: """Get the moderation status of a specific tool owned by the current user.""" owner = g.current_publisher["slug"] # Query for the most recent version of this tool row = query_one( g.db, """ SELECT name, version, moderation_status, moderation_note, config_hash, published_at FROM tools WHERE owner = ? AND name = ? ORDER BY published_at DESC LIMIT 1 """, [owner, name], ) if not row: return error_response("TOOL_NOT_FOUND", f"Tool '{name}' not found", 404) result = { "name": row["name"], "version": row["version"], "status": row["moderation_status"], "config_hash": row["config_hash"], "published_at": row["published_at"], } # Include feedback if status is changes_requested or rejected if row["moderation_status"] in ("changes_requested", "rejected") and row["moderation_note"]: result["feedback"] = row["moderation_note"] return jsonify({"data": result}) @app.route("/api/v1/tools///deprecate", methods=["POST"]) @require_token def deprecate_tool(owner: str, name: str) -> Response: """Mark a tool as deprecated.""" if g.current_publisher["slug"] != owner: return error_response("FORBIDDEN", "You can only deprecate your own tools", 403) data = request.get_json() or {} message = (data.get("deprecated_message") or data.get("message") or "").strip() replacement = (data.get("replacement") or "").strip() or None if message and len(message) > 500: return error_response("VALIDATION_ERROR", "Message too long (max 500)", 400) # Update all versions of the tool result = g.db.execute( """ UPDATE tools SET deprecated = 1, deprecated_message = ?, replacement = ? WHERE owner = ? AND name = ? """, [message or None, replacement, owner, name], ) if result.rowcount == 0: return error_response("TOOL_NOT_FOUND", f"Tool {owner}/{name} not found", 404) g.db.commit() return jsonify({"data": {"status": "deprecated", "owner": owner, "name": name}}) @app.route("/api/v1/tools///undeprecate", methods=["POST"]) @require_token def undeprecate_tool(owner: str, name: str) -> Response: """Remove deprecation status from a tool.""" if g.current_publisher["slug"] != owner: return error_response("FORBIDDEN", "You can only undeprecate your own tools", 403) result = g.db.execute( """ UPDATE tools SET deprecated = 0, deprecated_message = NULL, replacement = NULL WHERE owner = ? AND name = ? """, [owner, name], ) if result.rowcount == 0: return error_response("TOOL_NOT_FOUND", f"Tool {owner}/{name} not found", 404) g.db.commit() return jsonify({"data": {"status": "active", "owner": owner, "name": name}}) @app.route("/api/v1/me/settings", methods=["PUT"]) @require_token def update_settings() -> Response: """Update current user's profile settings.""" data = request.get_json() or {} # Validate fields display_name = data.get("display_name", "").strip() bio = data.get("bio", "").strip() if data.get("bio") else None website = data.get("website", "").strip() if data.get("website") else None if display_name and len(display_name) > 100: return error_response("VALIDATION_ERROR", "Display name too long (max 100)", 400) if bio and len(bio) > 500: return error_response("VALIDATION_ERROR", "Bio too long (max 500)", 400) if website and len(website) > 200: return error_response("VALIDATION_ERROR", "Website URL too long (max 200)", 400) # Build update query updates = [] params = [] if display_name: updates.append("display_name = ?") params.append(display_name) if bio is not None: updates.append("bio = ?") params.append(bio) if website is not None: updates.append("website = ?") params.append(website) if not updates: return error_response("VALIDATION_ERROR", "No valid fields to update", 400) updates.append("updated_at = CURRENT_TIMESTAMP") params.append(g.current_publisher["id"]) g.db.execute( f"UPDATE publishers SET {', '.join(updates)} WHERE id = ?", params, ) g.db.commit() return jsonify({"data": {"status": "updated"}}) @app.route("/api/v1/me", methods=["POST"]) @require_token def update_profile() -> Response: """Update current user's profile (POST version for web forms).""" data = request.get_json() or {} # Validate fields display_name = data.get("display_name", "").strip() bio = data.get("bio", "").strip() if data.get("bio") else None website = data.get("website", "").strip() if data.get("website") else None if display_name and len(display_name) > 100: return error_response("VALIDATION_ERROR", "Display name too long (max 100)", 400) if bio and len(bio) > 500: return error_response("VALIDATION_ERROR", "Bio too long (max 500)", 400) if website and len(website) > 200: return error_response("VALIDATION_ERROR", "Website URL too long (max 200)", 400) # Build update query updates = [] params = [] if display_name: updates.append("display_name = ?") params.append(display_name) if bio is not None: updates.append("bio = ?") params.append(bio) if website is not None: updates.append("website = ?") params.append(website) if not updates: return error_response("VALIDATION_ERROR", "No valid fields to update", 400) updates.append("updated_at = CURRENT_TIMESTAMP") params.append(g.current_publisher["id"]) g.db.execute( f"UPDATE publishers SET {', '.join(updates)} WHERE id = ?", params, ) g.db.commit() return jsonify({"data": {"status": "updated"}}) @app.route("/api/v1/me/password", methods=["POST"]) @require_token def change_password() -> Response: """Change current user's password.""" data = request.get_json() or {} current_password = data.get("current_password", "") new_password = data.get("new_password", "") if not current_password or not new_password: return error_response("VALIDATION_ERROR", "Current and new password required", 400) if len(new_password) < 8: return error_response("VALIDATION_ERROR", "New password must be at least 8 characters", 400) # Verify current password publisher = query_one( g.db, "SELECT password_hash FROM publishers WHERE id = ?", [g.current_publisher["id"]], ) if not publisher: return error_response("NOT_FOUND", "Publisher not found", 404) try: password_hasher.verify(publisher["password_hash"], current_password) except VerifyMismatchError: return error_response("INVALID_PASSWORD", "Current password is incorrect", 400) # Hash and save new password new_hash = password_hasher.hash(new_password) g.db.execute( "UPDATE publishers SET password_hash = ?, updated_at = CURRENT_TIMESTAMP WHERE id = ?", [new_hash, g.current_publisher["id"]], ) g.db.commit() return jsonify({"data": {"status": "password_changed"}}) @app.route("/api/v1/featured/tools", methods=["GET"]) def featured_tools() -> Response: """Get featured tools for homepage/landing.""" placement = request.args.get("placement", "homepage") limit = min(int(request.args.get("limit", 6)), 20) rows = query_all( g.db, """ SELECT t.owner, t.name, t.version, t.description, t.category, t.downloads, ft.priority FROM featured_tools ft JOIN tools t ON ft.tool_id = t.id WHERE ft.placement = ? AND ft.status = 'active' AND (ft.start_at IS NULL OR ft.start_at <= CURRENT_TIMESTAMP) AND (ft.end_at IS NULL OR ft.end_at > CURRENT_TIMESTAMP) ORDER BY ft.priority DESC, t.downloads DESC LIMIT ? """, [placement, limit], ) # If no featured tools, fall back to popular if not rows: rows = query_all( g.db, """ SELECT owner, name, version, description, category, downloads FROM tools WHERE deprecated = 0 ORDER BY downloads DESC LIMIT ? """, [limit], ) data = [] for row in rows: data.append({ "owner": row["owner"], "name": row["name"], "version": row["version"], "description": row["description"], "category": row["category"], "downloads": row["downloads"], }) return jsonify({"data": data}) @app.route("/api/v1/featured/contributors", methods=["GET"]) def featured_contributors() -> Response: """Get featured contributor for homepage.""" placement = request.args.get("placement", "homepage") row = query_one( g.db, """ SELECT p.slug, p.display_name, p.bio, p.website, fc.bio_override FROM featured_contributors fc JOIN publishers p ON fc.publisher_id = p.id WHERE fc.placement = ? AND fc.status = 'active' AND (fc.start_at IS NULL OR fc.start_at <= CURRENT_TIMESTAMP) AND (fc.end_at IS NULL OR fc.end_at > CURRENT_TIMESTAMP) ORDER BY fc.created_at DESC LIMIT 1 """, [placement], ) if not row: return jsonify({"data": None}) return jsonify({ "data": { "slug": row["slug"], "display_name": row["display_name"], "bio": row["bio_override"] or row["bio"], "website": row["website"], } }) @app.route("/api/v1/content/announcements", methods=["GET"]) def announcements() -> Response: """Get published announcements.""" limit = min(int(request.args.get("limit", 5)), 20) rows = query_all( g.db, """ SELECT id, title, body, published_at FROM announcements WHERE published = 1 ORDER BY published_at DESC LIMIT ? """, [limit], ) data = [] for row in rows: data.append({ "id": row["id"], "title": row["title"], "body": row["body"], "published_at": row["published_at"], }) return jsonify({"data": data}) @app.route("/api/v1/reports", methods=["POST"]) def submit_report() -> Response: """Submit an abuse report for a tool.""" data = request.get_json() or {} owner = data.get("owner", "").strip() name = data.get("name", "").strip() reason = data.get("reason", "").strip() details = data.get("details", "").strip() if data.get("details") else None if not owner or not name: return error_response("VALIDATION_ERROR", "owner and name required", 400) if not reason: return error_response("VALIDATION_ERROR", "reason required", 400) if len(reason) > 100: return error_response("VALIDATION_ERROR", "reason too long (max 100)", 400) if details and len(details) > 2000: return error_response("VALIDATION_ERROR", "details too long (max 2000)", 400) # Find the tool tool = query_one( g.db, "SELECT id FROM tools WHERE owner = ? AND name = ? ORDER BY published_at DESC LIMIT 1", [owner, name], ) if not tool: return error_response("TOOL_NOT_FOUND", f"Tool {owner}/{name} not found", 404) # Get reporter info reporter_id = None if hasattr(g, "current_publisher") and g.current_publisher: reporter_id = g.current_publisher["id"] reporter_ip = request.remote_addr # Rate limit: max 5 reports per IP per hour recent = query_one( g.db, """ SELECT COUNT(*) as cnt FROM reports WHERE reporter_ip = ? AND created_at > datetime('now', '-1 hour') """, [reporter_ip], ) if recent and recent["cnt"] >= 5: return error_response("RATE_LIMITED", "Too many reports. Try again later.", 429) g.db.execute( """ INSERT INTO reports (tool_id, reporter_id, reporter_ip, reason, details) VALUES (?, ?, ?, ?, ?) """, [tool["id"], reporter_id, reporter_ip, reason, details], ) g.db.commit() return jsonify({"data": {"status": "submitted"}}) @app.route("/api/v1/consent", methods=["POST"]) def save_consent() -> Response: """Save user consent preferences for analytics/ads.""" try: data = request.get_json(force=True) or {} except Exception: data = {} analytics = bool(data.get("analytics", False)) ads = bool(data.get("ads", False)) # Store consent in session (works with our SQLite session interface) from flask import session session["consent_analytics"] = analytics session["consent_ads"] = ads session["consent_given"] = True return jsonify({ "data": { "analytics": analytics, "ads": ads, "saved": True } }) @app.route("/api/v1/analytics/pageview", methods=["POST"]) def track_pageview() -> Response: """Track a page view (privacy-friendly, no cookies).""" data = request.get_json() or {} path = data.get("path", "").strip() if not path or len(path) > 500: return jsonify({"data": {"tracked": False}}) # Hash the IP for privacy (don't store raw IP) ip_hash = hashlib.sha256( (request.remote_addr or "unknown").encode() ).hexdigest()[:16] referrer = request.headers.get("Referer", "")[:500] if request.headers.get("Referer") else None user_agent = request.headers.get("User-Agent", "")[:500] if request.headers.get("User-Agent") else None try: g.db.execute( """ INSERT INTO pageviews (path, referrer, user_agent, ip_hash) VALUES (?, ?, ?, ?) """, [path, referrer, user_agent, ip_hash], ) g.db.commit() return jsonify({"data": {"tracked": True}}) except Exception: return jsonify({"data": {"tracked": False}}) @app.route("/api/v1/webhook/gitea", methods=["POST"]) def webhook_gitea() -> Response: if request.content_length and request.content_length > MAX_BODY_BYTES: return error_response( "PAYLOAD_TOO_LARGE", "Request body exceeds 512KB limit", status=413, details={"limit": MAX_BODY_BYTES}, ) secret = os.environ.get("CMDFORGE_REGISTRY_WEBHOOK_SECRET", "") if not secret: return error_response("UNAUTHORIZED", "Webhook secret not configured", 401) status, payload = process_webhook(request.data, dict(request.headers), secret) response = jsonify(payload) response.status_code = status return response @app.route("/api/v1/webhook/deploy", methods=["POST"]) def webhook_deploy() -> Response: """Auto-deploy webhook triggered by Gitea on push to main.""" import hmac import subprocess secret = os.environ.get("CMDFORGE_DEPLOY_WEBHOOK_SECRET", "") if not secret: return error_response("UNAUTHORIZED", "Deploy webhook secret not configured", 401) # Verify Gitea signature (X-Gitea-Signature is HMAC-SHA256) signature = request.headers.get("X-Gitea-Signature", "") if not signature: return error_response("UNAUTHORIZED", "Missing signature", 401) expected = hmac.new( secret.encode(), request.data, hashlib.sha256 ).hexdigest() if not hmac.compare_digest(signature, expected): return error_response("UNAUTHORIZED", "Invalid signature", 401) # Parse payload to check branch try: payload = json.loads(request.data) ref = payload.get("ref", "") # Only deploy on push to main branch if ref not in ("refs/heads/main", "refs/heads/master"): return jsonify({"data": {"deployed": False, "reason": f"Ignoring ref {ref}"}}) except (json.JSONDecodeError, KeyError): pass # Proceed anyway if we can't parse # Run deploy in background (so we can respond before restart kills us) deploy_script = """ cd /srv/mergerfs/data_pool/home/rob/cmdforge-registry && \ git pull origin main && \ sleep 1 && \ systemctl --user restart cmdforge-web """ subprocess.Popen( ["bash", "-c", deploy_script], stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL, start_new_session=True, ) return jsonify({"data": {"deployed": True, "message": "Deploy triggered"}}) # ─── Admin API Endpoints ───────────────────────────────────────────────────── @app.route("/api/v1/admin/tools/pending", methods=["GET"]) @require_moderator def admin_pending_tools() -> Response: """List tools pending moderation.""" page = request.args.get("page", 1, type=int) per_page = min(request.args.get("per_page", 20, type=int), 100) offset = (page - 1) * per_page rows = query_all( g.db, """ SELECT t.*, p.display_name as publisher_name FROM tools t JOIN publishers p ON t.publisher_id = p.id WHERE t.moderation_status = 'pending' ORDER BY t.published_at ASC LIMIT ? OFFSET ? """, [per_page, offset], ) count_row = query_one( g.db, "SELECT COUNT(*) as total FROM tools WHERE moderation_status = 'pending'", ) total = count_row["total"] if count_row else 0 data = [] for row in rows: # Parse scrutiny report if available scrutiny_report = None if row["scrutiny_report"]: try: scrutiny_report = json.loads(row["scrutiny_report"]) except (json.JSONDecodeError, TypeError): pass data.append({ "id": row["id"], "owner": row["owner"], "name": row["name"], "version": row["version"], "description": row["description"], "category": row["category"], "published_at": row["published_at"], "publisher_name": row["publisher_name"], "visibility": row["visibility"], "scrutiny_status": row["scrutiny_status"], "scrutiny_report": scrutiny_report, }) return jsonify({ "data": data, "meta": paginate(page, per_page, total), }) @app.route("/api/v1/admin/tools/", methods=["GET"]) @require_moderator def admin_get_tool(tool_id: int) -> Response: """Get full tool details for admin review.""" tool = query_one( g.db, """ SELECT t.*, p.display_name as publisher_name FROM tools t JOIN publishers p ON t.publisher_id = p.id WHERE t.id = ? """, [tool_id], ) if not tool: return error_response("TOOL_NOT_FOUND", "Tool not found", 404) # Parse config YAML to extract steps config = {} if tool["config_yaml"]: try: config = yaml.safe_load(tool["config_yaml"]) or {} except yaml.YAMLError: pass # Parse scrutiny report scrutiny_report = None if tool["scrutiny_report"]: try: scrutiny_report = json.loads(tool["scrutiny_report"]) except (json.JSONDecodeError, TypeError): pass return jsonify({ "data": { "id": tool["id"], "owner": tool["owner"], "name": tool["name"], "version": tool["version"], "description": tool["description"], "category": tool["category"], "tags": tool["tags"], "published_at": tool["published_at"], "publisher_name": tool["publisher_name"], "visibility": tool["visibility"], "moderation_status": tool["moderation_status"], "scrutiny_status": tool["scrutiny_status"], "scrutiny_report": scrutiny_report, "config": config, "readme": tool["readme"] or "", } }) @app.route("/api/v1/admin/tools//approve", methods=["POST"]) @require_moderator def admin_approve_tool(tool_id: int) -> Response: """Approve a pending tool.""" tool = query_one(g.db, "SELECT * FROM tools WHERE id = ?", [tool_id]) if not tool: return error_response("TOOL_NOT_FOUND", "Tool not found", 404) g.db.execute( """ UPDATE tools SET moderation_status = 'approved', moderated_by = ?, moderated_at = ? WHERE id = ? """, [g.current_publisher["slug"], datetime.utcnow().isoformat(), tool_id], ) g.db.commit() log_audit("approve_tool", "tool", str(tool_id), { "tool": f"{tool['owner']}/{tool['name']}", "version": tool["version"], }) return jsonify({"data": {"status": "approved", "tool_id": tool_id}}) @app.route("/api/v1/admin/tools//reject", methods=["POST"]) @require_moderator def admin_reject_tool(tool_id: int) -> Response: """Reject a pending tool with a reason.""" data = request.get_json() or {} reason = (data.get("reason") or "").strip() if not reason: return error_response("VALIDATION_ERROR", "Rejection reason is required", 400) tool = query_one(g.db, "SELECT * FROM tools WHERE id = ?", [tool_id]) if not tool: return error_response("TOOL_NOT_FOUND", "Tool not found", 404) g.db.execute( """ UPDATE tools SET moderation_status = 'rejected', moderation_note = ?, moderated_by = ?, moderated_at = ? WHERE id = ? """, [reason, g.current_publisher["slug"], datetime.utcnow().isoformat(), tool_id], ) g.db.commit() log_audit("reject_tool", "tool", str(tool_id), { "tool": f"{tool['owner']}/{tool['name']}", "version": tool["version"], "reason": reason, }) return jsonify({"data": {"status": "rejected", "tool_id": tool_id}}) @app.route("/api/v1/admin/tools//request-changes", methods=["POST"]) @require_moderator def admin_request_changes(tool_id: int) -> Response: """Request changes from tool publisher before approval.""" data = request.get_json() or {} feedback = (data.get("feedback") or data.get("reason") or "").strip() if not feedback: return error_response("VALIDATION_ERROR", "Feedback is required", 400) tool = query_one(g.db, "SELECT * FROM tools WHERE id = ?", [tool_id]) if not tool: return error_response("TOOL_NOT_FOUND", "Tool not found", 404) g.db.execute( """ UPDATE tools SET moderation_status = 'changes_requested', moderation_note = ?, moderated_by = ?, moderated_at = ? WHERE id = ? """, [feedback, g.current_publisher["slug"], datetime.utcnow().isoformat(), tool_id], ) g.db.commit() log_audit("request_changes", "tool", str(tool_id), { "tool": f"{tool['owner']}/{tool['name']}", "version": tool["version"], "feedback": feedback[:200], # Truncate for audit log }) return jsonify({"data": {"status": "changes_requested", "tool_id": tool_id}}) @app.route("/api/v1/admin/scrutiny", methods=["GET"]) @require_moderator def admin_scrutiny_audit() -> Response: """List all tools with their scrutiny status for audit purposes.""" page = request.args.get("page", 1, type=int) per_page = min(request.args.get("per_page", 50, type=int), 100) offset = (page - 1) * per_page # Filter options scrutiny_filter = request.args.get("scrutiny_status") # approved, pending_review, pending moderation_filter = request.args.get("moderation_status") # approved, pending, rejected where_clauses = [] params = [] if scrutiny_filter: where_clauses.append("t.scrutiny_status = ?") params.append(scrutiny_filter) if moderation_filter: where_clauses.append("t.moderation_status = ?") params.append(moderation_filter) where_sql = " AND ".join(where_clauses) if where_clauses else "1=1" rows = query_all( g.db, f""" SELECT t.id, t.owner, t.name, t.version, t.description, t.category, t.scrutiny_status, t.scrutiny_report, t.moderation_status, t.moderation_note, t.published_at, p.display_name as publisher_name FROM tools t JOIN publishers p ON t.publisher_id = p.id WHERE {where_sql} ORDER BY t.published_at DESC LIMIT ? OFFSET ? """, params + [per_page, offset], ) count_row = query_one( g.db, f"SELECT COUNT(*) as total FROM tools t WHERE {where_sql}", params, ) total = count_row["total"] if count_row else 0 data = [] for row in rows: scrutiny_report = None if row["scrutiny_report"]: try: scrutiny_report = json.loads(row["scrutiny_report"]) except (json.JSONDecodeError, TypeError): pass data.append({ "id": row["id"], "owner": row["owner"], "name": row["name"], "version": row["version"], "description": row["description"], "category": row["category"], "published_at": row["published_at"], "publisher_name": row["publisher_name"], "scrutiny_status": row["scrutiny_status"], "scrutiny_report": scrutiny_report, "moderation_status": row["moderation_status"], "moderation_note": row["moderation_note"], }) # Also get summary stats stats = query_all( g.db, """ SELECT scrutiny_status, moderation_status, COUNT(*) as count FROM tools GROUP BY scrutiny_status, moderation_status """, ) return jsonify({ "data": data, "meta": paginate(page, per_page, total), "stats": [dict(s) for s in stats], }) @app.route("/api/v1/admin/tools//remove", methods=["POST"]) @require_moderator def admin_remove_tool(tool_id: int) -> Response: """Remove an approved tool (soft delete).""" data = request.get_json() or {} reason = (data.get("reason") or "").strip() tool = query_one(g.db, "SELECT * FROM tools WHERE id = ?", [tool_id]) if not tool: return error_response("TOOL_NOT_FOUND", "Tool not found", 404) g.db.execute( """ UPDATE tools SET moderation_status = 'removed', moderation_note = ?, moderated_by = ?, moderated_at = ? WHERE id = ? """, [reason or None, g.current_publisher["slug"], datetime.utcnow().isoformat(), tool_id], ) g.db.commit() log_audit("remove_tool", "tool", str(tool_id), { "tool": f"{tool['owner']}/{tool['name']}", "version": tool["version"], "reason": reason, }) return jsonify({"data": {"status": "removed", "tool_id": tool_id}}) @app.route("/api/v1/admin/cleanup/rejected", methods=["POST"]) @require_admin def admin_cleanup_rejected() -> Response: """ Delete rejected tool versions older than N days. This endpoint permanently deletes tools that were rejected during moderation after a grace period, allowing users time to see the rejection reason and fix issues. Query params: days: Number of days to retain rejected versions (default: 7) dry_run: If "true", only report what would be deleted without deleting """ days = request.args.get("days", 7, type=int) dry_run = request.args.get("dry_run", "false").lower() == "true" if days < 0: return error_response("VALIDATION_ERROR", "Days must be non-negative", 400) # Calculate cutoff date cutoff = (datetime.utcnow() - timedelta(days=days)).isoformat() # Find rejected tools older than cutoff rejected_tools = query_all( g.db, """ SELECT id, owner, name, version, moderated_at FROM tools WHERE moderation_status = 'rejected' AND moderated_at < ? """, [cutoff], ) if not rejected_tools: return jsonify({ "data": { "deleted_count": 0, "dry_run": dry_run, "cutoff_date": cutoff, "deleted_tools": [], } }) deleted_tools = [] for tool in rejected_tools: deleted_tools.append({ "id": tool["id"], "name": f"{tool['owner']}/{tool['name']}", "version": tool["version"], "moderated_at": tool["moderated_at"], }) if not dry_run: tool_id = tool["id"] # Delete associated records g.db.execute("DELETE FROM download_stats WHERE tool_id = ?", [tool_id]) g.db.execute("DELETE FROM reports WHERE tool_id = ?", [tool_id]) g.db.execute("DELETE FROM featured_tools WHERE tool_id = ?", [tool_id]) g.db.execute("DELETE FROM tools WHERE id = ?", [tool_id]) if not dry_run: g.db.commit() log_audit("cleanup_rejected", "system", "cleanup", { "days": days, "deleted_count": len(deleted_tools), "tool_ids": [t["id"] for t in deleted_tools], }) return jsonify({ "data": { "deleted_count": len(deleted_tools), "dry_run": dry_run, "cutoff_date": cutoff, "deleted_tools": deleted_tools, } }) @app.route("/api/v1/admin/tools/", methods=["DELETE"]) @require_admin def admin_delete_tool(tool_id: int) -> Response: """Hard delete a tool (admin only).""" tool = query_one(g.db, "SELECT * FROM tools WHERE id = ?", [tool_id]) if not tool: return error_response("TOOL_NOT_FOUND", "Tool not found", 404) # Delete associated records first g.db.execute("DELETE FROM download_stats WHERE tool_id = ?", [tool_id]) g.db.execute("DELETE FROM reports WHERE tool_id = ?", [tool_id]) g.db.execute("DELETE FROM featured_tools WHERE tool_id = ?", [tool_id]) g.db.execute("DELETE FROM tools WHERE id = ?", [tool_id]) g.db.commit() log_audit("delete_tool", "tool", str(tool_id), { "tool": f"{tool['owner']}/{tool['name']}", "version": tool["version"], }) return jsonify({"data": {"status": "deleted", "tool_id": tool_id}}) @app.route("/api/v1/admin/publishers", methods=["GET"]) @require_moderator def admin_list_publishers() -> Response: """List all publishers with stats.""" page = request.args.get("page", 1, type=int) per_page = min(request.args.get("per_page", 20, type=int), 100) offset = (page - 1) * per_page rows = query_all( g.db, """ SELECT p.*, (SELECT COUNT(*) FROM tools WHERE publisher_id = p.id) as tool_count, (SELECT SUM(downloads) FROM tools WHERE publisher_id = p.id) as total_downloads FROM publishers p ORDER BY p.created_at DESC LIMIT ? OFFSET ? """, [per_page, offset], ) count_row = query_one(g.db, "SELECT COUNT(*) as total FROM publishers") total = count_row["total"] if count_row else 0 data = [] for row in rows: data.append({ "id": row["id"], "slug": row["slug"], "display_name": row["display_name"], "email": row["email"], "role": row["role"] or "user", "banned": bool(row["banned"]), "ban_reason": row["ban_reason"], "verified": bool(row["verified"]), "tool_count": row["tool_count"] or 0, "total_downloads": row["total_downloads"] or 0, "created_at": row["created_at"], }) return jsonify({ "data": data, "meta": paginate(page, per_page, total), }) @app.route("/api/v1/admin/publishers/", methods=["GET"]) @require_moderator def admin_get_publisher(publisher_id: int) -> Response: """Get detailed publisher info.""" publisher = query_one(g.db, "SELECT * FROM publishers WHERE id = ?", [publisher_id]) if not publisher: return error_response("PUBLISHER_NOT_FOUND", "Publisher not found", 404) # Get their tools tools = query_all( g.db, """ SELECT id, owner, name, version, moderation_status, visibility, downloads, published_at FROM tools WHERE publisher_id = ? ORDER BY published_at DESC """, [publisher_id], ) # Get their tokens (just counts, not the hashes) token_count = query_one( g.db, "SELECT COUNT(*) as cnt FROM api_tokens WHERE publisher_id = ? AND revoked_at IS NULL", [publisher_id], ) return jsonify({ "data": { "id": publisher["id"], "slug": publisher["slug"], "display_name": publisher["display_name"], "email": publisher["email"], "bio": publisher["bio"], "website": publisher["website"], "role": publisher["role"] or "user", "banned": bool(publisher["banned"]), "banned_at": publisher["banned_at"], "banned_by": publisher["banned_by"], "ban_reason": publisher["ban_reason"], "verified": bool(publisher["verified"]), "created_at": publisher["created_at"], "active_tokens": token_count["cnt"] if token_count else 0, "tools": [ { "id": t["id"], "owner": t["owner"], "name": t["name"], "version": t["version"], "moderation_status": t["moderation_status"], "visibility": t["visibility"], "downloads": t["downloads"], "published_at": t["published_at"], } for t in tools ], } }) @app.route("/api/v1/admin/publishers//ban", methods=["POST"]) @require_admin def admin_ban_publisher(publisher_id: int) -> Response: """Ban a publisher.""" data = request.get_json() or {} reason = (data.get("reason") or "").strip() if not reason: return error_response("VALIDATION_ERROR", "Ban reason is required", 400) publisher = query_one(g.db, "SELECT * FROM publishers WHERE id = ?", [publisher_id]) if not publisher: return error_response("PUBLISHER_NOT_FOUND", "Publisher not found", 404) if publisher["role"] == "admin": return error_response("FORBIDDEN", "Cannot ban an admin", 403) now = datetime.utcnow().isoformat() # Ban the publisher g.db.execute( """ UPDATE publishers SET banned = 1, banned_at = ?, banned_by = ?, ban_reason = ? WHERE id = ? """, [now, g.current_publisher["slug"], reason, publisher_id], ) # Revoke all their tokens g.db.execute( "UPDATE api_tokens SET revoked_at = ? WHERE publisher_id = ? AND revoked_at IS NULL", [now, publisher_id], ) # Remove all their tools from public view g.db.execute( """ UPDATE tools SET moderation_status = 'removed', moderation_note = 'Publisher banned', moderated_by = ?, moderated_at = ? WHERE publisher_id = ? AND moderation_status != 'removed' """, [g.current_publisher["slug"], now, publisher_id], ) g.db.commit() log_audit("ban_publisher", "publisher", str(publisher_id), { "slug": publisher["slug"], "reason": reason, }) return jsonify({"data": {"status": "banned", "publisher_id": publisher_id}}) @app.route("/api/v1/admin/publishers//unban", methods=["POST"]) @require_admin def admin_unban_publisher(publisher_id: int) -> Response: """Unban a publisher.""" publisher = query_one(g.db, "SELECT * FROM publishers WHERE id = ?", [publisher_id]) if not publisher: return error_response("PUBLISHER_NOT_FOUND", "Publisher not found", 404) if not publisher["banned"]: return error_response("VALIDATION_ERROR", "Publisher is not banned", 400) g.db.execute( """ UPDATE publishers SET banned = 0, banned_at = NULL, banned_by = NULL, ban_reason = NULL WHERE id = ? """, [publisher_id], ) g.db.commit() log_audit("unban_publisher", "publisher", str(publisher_id), { "slug": publisher["slug"], }) return jsonify({"data": {"status": "unbanned", "publisher_id": publisher_id}}) @app.route("/api/v1/admin/publishers//role", methods=["POST"]) @require_admin def admin_change_role(publisher_id: int) -> Response: """Change a publisher's role.""" data = request.get_json() or {} new_role = (data.get("role") or "").strip() if new_role not in ("user", "moderator", "admin"): return error_response("VALIDATION_ERROR", "Invalid role. Must be: user, moderator, admin", 400) publisher = query_one(g.db, "SELECT * FROM publishers WHERE id = ?", [publisher_id]) if not publisher: return error_response("PUBLISHER_NOT_FOUND", "Publisher not found", 404) old_role = publisher["role"] or "user" g.db.execute( "UPDATE publishers SET role = ? WHERE id = ?", [new_role, publisher_id], ) g.db.commit() log_audit("change_role", "publisher", str(publisher_id), { "slug": publisher["slug"], "old_role": old_role, "new_role": new_role, }) return jsonify({"data": {"status": "updated", "publisher_id": publisher_id, "role": new_role}}) @app.route("/api/v1/admin/publishers/", methods=["DELETE"]) @require_admin def admin_delete_publisher(publisher_id: int) -> Response: """Delete a publisher and optionally their tools.""" delete_tools = request.args.get("delete_tools", "false").lower() == "true" publisher = query_one(g.db, "SELECT * FROM publishers WHERE id = ?", [publisher_id]) if not publisher: return error_response("PUBLISHER_NOT_FOUND", "Publisher not found", 404) # Don't allow deleting yourself if publisher_id == g.current_publisher["id"]: return error_response("CANNOT_DELETE_SELF", "Cannot delete your own account", 400) # Don't allow deleting other admins if publisher["role"] == "admin": return error_response("CANNOT_DELETE_ADMIN", "Cannot delete admin accounts", 400) slug = publisher["slug"] if delete_tools: # Delete all tools owned by this publisher g.db.execute("DELETE FROM tools WHERE publisher_id = ?", [publisher_id]) # Revoke all tokens g.db.execute("UPDATE api_tokens SET revoked_at = CURRENT_TIMESTAMP WHERE publisher_id = ?", [publisher_id]) # Delete the publisher g.db.execute("DELETE FROM publishers WHERE id = ?", [publisher_id]) g.db.commit() log_audit("delete_publisher", "publisher", str(publisher_id), { "slug": slug, "delete_tools": delete_tools, }) return jsonify({"data": {"status": "deleted", "publisher_id": publisher_id, "slug": slug}}) @app.route("/api/v1/admin/publishers//reset-password", methods=["POST"]) @require_admin def admin_reset_password(publisher_id: int) -> Response: """Generate a temporary password for a publisher.""" import secrets publisher = query_one(g.db, "SELECT * FROM publishers WHERE id = ?", [publisher_id]) if not publisher: return error_response("PUBLISHER_NOT_FOUND", "Publisher not found", 404) # Generate a temporary password temp_password = secrets.token_urlsafe(12) password_hash = password_hasher.hash(temp_password) g.db.execute( "UPDATE publishers SET password_hash = ? WHERE id = ?", [password_hash, publisher_id], ) g.db.commit() log_audit("reset_password", "publisher", str(publisher_id), { "slug": publisher["slug"], }) return jsonify({ "data": { "status": "reset", "publisher_id": publisher_id, "slug": publisher["slug"], "temporary_password": temp_password, } }) @app.route("/api/v1/admin/reports", methods=["GET"]) @require_moderator def admin_list_reports() -> Response: """List unresolved reports.""" page = request.args.get("page", 1, type=int) per_page = min(request.args.get("per_page", 20, type=int), 100) status_filter = request.args.get("status", "pending") offset = (page - 1) * per_page where_clause = "WHERE r.status = ?" if status_filter else "WHERE 1=1" params = [status_filter] if status_filter else [] rows = query_all( g.db, f""" SELECT r.*, t.owner, t.name as tool_name, t.version, p.display_name as reporter_name FROM reports r JOIN tools t ON r.tool_id = t.id LEFT JOIN publishers p ON r.reporter_id = p.id {where_clause} ORDER BY r.created_at DESC LIMIT ? OFFSET ? """, params + [per_page, offset], ) count_row = query_one( g.db, f"SELECT COUNT(*) as total FROM reports r {where_clause}", params, ) total = count_row["total"] if count_row else 0 data = [] for row in rows: data.append({ "id": row["id"], "tool": f"{row['owner']}/{row['tool_name']}", "tool_id": row["tool_id"], "version": row["version"], "reason": row["reason"], "details": row["details"], "reporter_name": row["reporter_name"], "status": row["status"], "created_at": row["created_at"], "resolved_at": row["resolved_at"], "resolution_note": row["resolution_note"], }) return jsonify({ "data": data, "meta": paginate(page, per_page, total), }) @app.route("/api/v1/admin/reports//resolve", methods=["POST"]) @require_moderator def admin_resolve_report(report_id: int) -> Response: """Resolve a report with an action.""" data = request.get_json() or {} action = (data.get("action") or "").strip() note = (data.get("note") or "").strip() if action not in ("dismiss", "warn", "remove_tool", "ban_publisher"): return error_response( "VALIDATION_ERROR", "Invalid action. Must be: dismiss, warn, remove_tool, ban_publisher", 400, ) report = query_one(g.db, "SELECT * FROM reports WHERE id = ?", [report_id]) if not report: return error_response("REPORT_NOT_FOUND", "Report not found", 404) if report["status"] != "pending": return error_response("VALIDATION_ERROR", "Report already resolved", 400) now = datetime.utcnow().isoformat() # Mark report as resolved g.db.execute( """ UPDATE reports SET status = 'resolved', resolved_by = ?, resolved_at = ?, resolution_note = ? WHERE id = ? """, [g.current_publisher["id"], now, f"{action}: {note}" if note else action, report_id], ) # Take action if action == "remove_tool": g.db.execute( """ UPDATE tools SET moderation_status = 'removed', moderation_note = ?, moderated_by = ?, moderated_at = ? WHERE id = ? """, [f"Removed due to report: {note}" if note else "Removed due to report", g.current_publisher["slug"], now, report["tool_id"]], ) elif action == "ban_publisher": # Get tool's publisher tool = query_one(g.db, "SELECT publisher_id FROM tools WHERE id = ?", [report["tool_id"]]) if tool: g.db.execute( """ UPDATE publishers SET banned = 1, banned_at = ?, banned_by = ?, ban_reason = ? WHERE id = ? """, [now, g.current_publisher["slug"], f"Banned due to report: {note}" if note else "Banned due to report", tool["publisher_id"]], ) # Revoke tokens g.db.execute( "UPDATE api_tokens SET revoked_at = ? WHERE publisher_id = ?", [now, tool["publisher_id"]], ) g.db.commit() log_audit("resolve_report", "report", str(report_id), { "action": action, "tool_id": report["tool_id"], "note": note, }) return jsonify({"data": {"status": "resolved", "report_id": report_id, "action": action}}) @app.route("/api/v1/admin/audit-log", methods=["GET"]) @require_admin def admin_audit_log() -> Response: """View audit log entries.""" page = request.args.get("page", 1, type=int) per_page = min(request.args.get("per_page", 50, type=int), 200) target_type = request.args.get("target_type") target_id = request.args.get("target_id") actor_id = request.args.get("actor_id") since = request.args.get("since") offset = (page - 1) * per_page where_clauses = [] params: List[Any] = [] if target_type: where_clauses.append("target_type = ?") params.append(target_type) if target_id: where_clauses.append("target_id = ?") params.append(target_id) if actor_id: where_clauses.append("actor_id = ?") params.append(actor_id) if since: where_clauses.append("created_at >= ?") params.append(since) where_sql = "WHERE " + " AND ".join(where_clauses) if where_clauses else "" rows = query_all( g.db, f""" SELECT * FROM audit_log {where_sql} ORDER BY created_at DESC LIMIT ? OFFSET ? """, params + [per_page, offset], ) count_row = query_one( g.db, f"SELECT COUNT(*) as total FROM audit_log {where_sql}", params, ) total = count_row["total"] if count_row else 0 data = [] for row in rows: data.append({ "id": row["id"], "action": row["action"], "target_type": row["target_type"], "target_id": row["target_id"], "actor_id": row["actor_id"], "details": json.loads(row["details"]) if row["details"] else None, "created_at": row["created_at"], }) return jsonify({ "data": data, "meta": paginate(page, per_page, total), }) # ─── Admin Settings API ─────────────────────────────────────────────────────── @app.route("/api/v1/admin/settings", methods=["GET"]) @require_admin def admin_list_settings() -> Response: """List all configurable settings with current values.""" from .settings import get_all_settings category = request.args.get("category") settings = get_all_settings(g.db) if category: settings = [s for s in settings if s["category"] == category] # Group by category categories = {} for s in settings: cat = s["category"] if cat not in categories: categories[cat] = [] categories[cat].append(s) return jsonify({ "data": settings, "categories": categories, "available_categories": list(categories.keys()), }) @app.route("/api/v1/admin/settings/", methods=["GET"]) @require_admin def admin_get_setting(key: str) -> Response: """Get a single setting value.""" from .settings import get_setting, DEFAULT_SETTINGS value = get_setting(g.db, key) if value is None: return error_response("NOT_FOUND", f"Setting '{key}' not found", 404) # Find metadata setting_meta = None for s in DEFAULT_SETTINGS: if s.key == key: setting_meta = s break return jsonify({ "key": key, "value": value, "value_type": setting_meta.value_type if setting_meta else "string", "description": setting_meta.description if setting_meta else "", "category": setting_meta.category if setting_meta else "general", }) @app.route("/api/v1/admin/settings/", methods=["PUT"]) @require_admin def admin_update_setting(key: str) -> Response: """Update a setting value.""" from .settings import set_setting, get_setting data = request.get_json() if not data or "value" not in data: return error_response("VALIDATION_ERROR", "Missing 'value' in request body") value = data["value"] success = set_setting(g.db, key, value, updated_by=g.user_slug) if not success: return error_response( "VALIDATION_ERROR", f"Invalid setting key '{key}' or invalid value", 400, ) # Log the change log_audit( "update_setting", "setting", key, {"old_value": get_setting(g.db, key), "new_value": value}, ) return jsonify({ "success": True, "key": key, "value": get_setting(g.db, key), }) @app.route("/api/v1/admin/settings/", methods=["DELETE"]) @require_admin def admin_reset_setting(key: str) -> Response: """Reset a setting to its default value.""" from .settings import reset_setting, get_setting, DEFAULT_SETTINGS # Check if it's a valid setting valid = any(s.key == key for s in DEFAULT_SETTINGS) if not valid: return error_response("NOT_FOUND", f"Setting '{key}' not found", 404) reset_setting(g.db, key) log_audit("reset_setting", "setting", key, {}) return jsonify({ "success": True, "key": key, "value": get_setting(g.db, key), "message": "Setting reset to default", }) @app.route("/api/v1/admin/settings/reset-all", methods=["POST"]) @require_admin def admin_reset_all_settings() -> Response: """Reset all settings to defaults.""" from .settings import reset_all_settings count = reset_all_settings(g.db) log_audit("reset_all_settings", "settings", "all", {"count": count}) return jsonify({ "success": True, "reset_count": count, "message": f"Reset {count} settings to defaults", }) @app.route("/api/v1/admin/settings/vetting", methods=["GET"]) @require_moderator def admin_get_vetting_config() -> Response: """Get current vetting configuration (for moderators).""" from .settings import get_vetting_config return jsonify(get_vetting_config(g.db)) @app.route("/api/v1/admin/settings/similarity", methods=["GET"]) @require_moderator def admin_get_similarity_config() -> Response: """Get current similarity detection configuration.""" from .settings import get_similarity_config return jsonify(get_similarity_config(g.db)) @app.route("/api/v1/admin/settings/sync", methods=["GET"]) @require_admin def admin_get_sync_config() -> Response: """Get Fabric sync configuration.""" from .settings import get_sync_config return jsonify(get_sync_config(g.db)) @app.route("/api/v1/admin/settings/moderation", methods=["GET"]) @require_moderator def admin_get_moderation_config() -> Response: """Get moderation configuration.""" from .settings import get_moderation_config return jsonify(get_moderation_config(g.db)) # ─── Reviews & Ratings API ──────────────────────────────────────────────────── @app.route("/api/v1/tools///reviews", methods=["POST"]) @require_token def submit_review(owner: str, name: str) -> Response: """Submit a review for a tool. One review per user per tool.""" if not OWNER_RE.match(owner) or not TOOL_NAME_RE.match(name): return error_response("VALIDATION_ERROR", "Invalid owner or tool name") # Find the tool tool = query_one( g.db, "SELECT id, publisher_id FROM tools WHERE owner = ? AND name = ? ORDER BY id DESC LIMIT 1", [owner, name], ) if not tool: return error_response("TOOL_NOT_FOUND", f"Tool '{owner}/{name}' not found", 404) # Can't review your own tool if tool["publisher_id"] == g.current_publisher["id"]: return error_response("VALIDATION_ERROR", "You cannot review your own tool", 400) # Check rate limit rate_resp = enforce_token_rate_limit("review", g.current_token["hash"]) if rate_resp: return rate_resp data = request.get_json() or {} rating = data.get("rating") title = (data.get("title") or "").strip()[:100] content = (data.get("content") or "").strip()[:2000] if not isinstance(rating, int) or rating < 1 or rating > 5: return error_response("VALIDATION_ERROR", "Rating must be an integer from 1 to 5") # Check for minimum content length (spam prevention) if content and len(content) < 10: return error_response("VALIDATION_ERROR", "Review content must be at least 10 characters") # Check if user already reviewed this tool existing = query_one( g.db, "SELECT id FROM reviews WHERE tool_id = ? AND reviewer_id = ?", [tool["id"], g.current_publisher["id"]], ) if existing: return error_response( "ALREADY_REVIEWED", "You have already reviewed this tool. Use PUT to update.", 409, ) now = datetime.utcnow().isoformat() g.db.execute( """ INSERT INTO reviews (tool_id, reviewer_id, rating, title, content, created_at, updated_at) VALUES (?, ?, ?, ?, ?, ?, ?) """, [tool["id"], g.current_publisher["id"], rating, title or None, content or None, now, now], ) review_id = g.db.execute("SELECT last_insert_rowid()").fetchone()[0] g.db.commit() # Refresh tool stats refresh_tool_stats(g.db, tool["id"]) response = jsonify({ "data": { "id": review_id, "rating": rating, "title": title, "content": content, "created_at": now, } }) response.status_code = 201 return response @app.route("/api/v1/tools///reviews", methods=["GET"]) def list_reviews(owner: str, name: str) -> Response: """List reviews for a tool with pagination.""" if not OWNER_RE.match(owner) or not TOOL_NAME_RE.match(name): return error_response("VALIDATION_ERROR", "Invalid owner or tool name") tool = query_one( g.db, "SELECT id FROM tools WHERE owner = ? AND name = ? ORDER BY id DESC LIMIT 1", [owner, name], ) if not tool: return error_response("TOOL_NOT_FOUND", f"Tool '{owner}/{name}' not found", 404) page = request.args.get("page", 1, type=int) per_page = min(request.args.get("per_page", 20, type=int), 100) sort = request.args.get("sort", "recent") # recent, helpful, highest, lowest offset = (page - 1) * per_page if sort == "helpful": order_sql = "helpful_count DESC, created_at DESC" elif sort == "highest": order_sql = "rating DESC, created_at DESC" elif sort == "lowest": order_sql = "rating ASC, created_at DESC" else: order_sql = "created_at DESC" rows = query_all( g.db, f""" SELECT r.*, p.slug as reviewer_slug, p.display_name as reviewer_name FROM reviews r LEFT JOIN publishers p ON r.reviewer_id = p.id WHERE r.tool_id = ? AND r.status = 'published' ORDER BY {order_sql} LIMIT ? OFFSET ? """, [tool["id"], per_page, offset], ) count_row = query_one( g.db, "SELECT COUNT(*) as total FROM reviews WHERE tool_id = ? AND status = 'published'", [tool["id"]], ) total = count_row["total"] if count_row else 0 data = [] for row in rows: data.append({ "id": row["id"], "rating": row["rating"], "title": row["title"], "content": row["content"], "reviewer_slug": row["reviewer_slug"], "reviewer_name": row["reviewer_name"] or "Anonymous", "helpful_count": row["helpful_count"], "unhelpful_count": row["unhelpful_count"], "created_at": row["created_at"], "updated_at": row["updated_at"], }) return jsonify({ "data": data, "meta": paginate(page, per_page, total), }) @app.route("/api/v1/tools///rating", methods=["GET"]) def get_tool_rating(owner: str, name: str) -> Response: """Get rating summary for a tool.""" if not OWNER_RE.match(owner) or not TOOL_NAME_RE.match(name): return error_response("VALIDATION_ERROR", "Invalid owner or tool name") tool = query_one( g.db, "SELECT id FROM tools WHERE owner = ? AND name = ? ORDER BY id DESC LIMIT 1", [owner, name], ) if not tool: return error_response("TOOL_NOT_FOUND", f"Tool '{owner}/{name}' not found", 404) stats = get_tool_stats(g.db, tool["id"]) if not stats: stats = { "average_rating": 0, "rating_count": 0, "rating_1": 0, "rating_2": 0, "rating_3": 0, "rating_4": 0, "rating_5": 0, "unique_users": 0, "open_issues": 0, "security_issues": 0, } return jsonify({ "data": { "average_rating": stats["average_rating"], "rating_count": stats["rating_count"], "distribution": { "1": stats["rating_1"], "2": stats["rating_2"], "3": stats["rating_3"], "4": stats["rating_4"], "5": stats["rating_5"], }, "unique_users": stats["unique_users"], "open_issues": stats["open_issues"], "security_issues": stats["security_issues"], } }) @app.route("/api/v1/reviews/", methods=["PUT"]) @require_token def update_review(review_id: int) -> Response: """Update an existing review (owner only).""" review = query_one(g.db, "SELECT * FROM reviews WHERE id = ?", [review_id]) if not review: return error_response("REVIEW_NOT_FOUND", "Review not found", 404) if review["reviewer_id"] != g.current_publisher["id"]: return error_response("FORBIDDEN", "You can only edit your own reviews", 403) data = request.get_json() or {} rating = data.get("rating") title = data.get("title") content = data.get("content") updates = [] params = [] if rating is not None: if not isinstance(rating, int) or rating < 1 or rating > 5: return error_response("VALIDATION_ERROR", "Rating must be 1-5") updates.append("rating = ?") params.append(rating) if title is not None: updates.append("title = ?") params.append(title.strip()[:100] if title else None) if content is not None: content = content.strip()[:2000] if content and len(content) < 10: return error_response("VALIDATION_ERROR", "Review content must be at least 10 characters") updates.append("content = ?") params.append(content if content else None) if not updates: return error_response("VALIDATION_ERROR", "No fields to update") updates.append("updated_at = ?") params.append(datetime.utcnow().isoformat()) params.append(review_id) g.db.execute(f"UPDATE reviews SET {', '.join(updates)} WHERE id = ?", params) g.db.commit() # Refresh tool stats refresh_tool_stats(g.db, review["tool_id"]) return jsonify({"data": {"status": "updated", "review_id": review_id}}) @app.route("/api/v1/reviews/", methods=["DELETE"]) @require_token def delete_review(review_id: int) -> Response: """Delete a review (owner only).""" review = query_one(g.db, "SELECT * FROM reviews WHERE id = ?", [review_id]) if not review: return error_response("REVIEW_NOT_FOUND", "Review not found", 404) if review["reviewer_id"] != g.current_publisher["id"]: return error_response("FORBIDDEN", "You can only delete your own reviews", 403) tool_id = review["tool_id"] g.db.execute("DELETE FROM reviews WHERE id = ?", [review_id]) g.db.commit() # Refresh tool stats refresh_tool_stats(g.db, tool_id) return jsonify({"data": {"status": "deleted", "review_id": review_id}}) @app.route("/api/v1/reviews//vote", methods=["POST"]) def vote_review(review_id: int) -> Response: """Vote a review as helpful or unhelpful.""" review = query_one(g.db, "SELECT * FROM reviews WHERE id = ?", [review_id]) if not review: return error_response("REVIEW_NOT_FOUND", "Review not found", 404) data = request.get_json() or {} vote_type = data.get("vote") if vote_type not in ("helpful", "unhelpful"): return error_response("VALIDATION_ERROR", "Vote must be 'helpful' or 'unhelpful'") # Use publisher ID if logged in, otherwise hash IP if hasattr(g, "current_publisher") and g.current_publisher: voter_id = f"pub:{g.current_publisher['id']}" else: voter_id = f"ip:{hashlib.sha256((request.remote_addr or 'unknown').encode()).hexdigest()[:16]}" # Check for existing vote existing = query_one( g.db, "SELECT vote_type FROM review_votes WHERE review_id = ? AND voter_id = ?", [review_id, voter_id], ) if existing: if existing["vote_type"] == vote_type: # Remove vote (toggle off) g.db.execute("DELETE FROM review_votes WHERE review_id = ? AND voter_id = ?", [review_id, voter_id]) # Decrement count if vote_type == "helpful": g.db.execute("UPDATE reviews SET helpful_count = helpful_count - 1 WHERE id = ?", [review_id]) else: g.db.execute("UPDATE reviews SET unhelpful_count = unhelpful_count - 1 WHERE id = ?", [review_id]) g.db.commit() return jsonify({"data": {"status": "removed", "vote": None}}) else: # Change vote g.db.execute( "UPDATE review_votes SET vote_type = ?, created_at = ? WHERE review_id = ? AND voter_id = ?", [vote_type, datetime.utcnow().isoformat(), review_id, voter_id], ) # Adjust counts if vote_type == "helpful": g.db.execute( "UPDATE reviews SET helpful_count = helpful_count + 1, unhelpful_count = unhelpful_count - 1 WHERE id = ?", [review_id], ) else: g.db.execute( "UPDATE reviews SET helpful_count = helpful_count - 1, unhelpful_count = unhelpful_count + 1 WHERE id = ?", [review_id], ) g.db.commit() return jsonify({"data": {"status": "changed", "vote": vote_type}}) else: # New vote g.db.execute( "INSERT INTO review_votes (review_id, voter_id, vote_type, created_at) VALUES (?, ?, ?, ?)", [review_id, voter_id, vote_type, datetime.utcnow().isoformat()], ) # Increment count if vote_type == "helpful": g.db.execute("UPDATE reviews SET helpful_count = helpful_count + 1 WHERE id = ?", [review_id]) else: g.db.execute("UPDATE reviews SET unhelpful_count = unhelpful_count + 1 WHERE id = ?", [review_id]) g.db.commit() return jsonify({"data": {"status": "added", "vote": vote_type}}) @app.route("/api/v1/reviews//flag", methods=["POST"]) def flag_review(review_id: int) -> Response: """Flag a review as inappropriate.""" review = query_one(g.db, "SELECT * FROM reviews WHERE id = ?", [review_id]) if not review: return error_response("REVIEW_NOT_FOUND", "Review not found", 404) data = request.get_json() or {} reason = (data.get("reason") or "").strip()[:200] if not reason: return error_response("VALIDATION_ERROR", "Reason is required") # Update review status to flagged g.db.execute( "UPDATE reviews SET status = 'flagged', updated_at = ? WHERE id = ?", [datetime.utcnow().isoformat(), review_id], ) g.db.commit() # Log the flag action log_audit("flag_review", "review", str(review_id), {"reason": reason}) return jsonify({"data": {"status": "flagged", "review_id": review_id}}) # ─── Issues API ─────────────────────────────────────────────────────────────── @app.route("/api/v1/tools///issues", methods=["POST"]) def submit_issue(owner: str, name: str) -> Response: """Report an issue for a tool. Auth optional for security reports.""" if not OWNER_RE.match(owner) or not TOOL_NAME_RE.match(name): return error_response("VALIDATION_ERROR", "Invalid owner or tool name") tool = query_one( g.db, "SELECT id FROM tools WHERE owner = ? AND name = ? ORDER BY id DESC LIMIT 1", [owner, name], ) if not tool: return error_response("TOOL_NOT_FOUND", f"Tool '{owner}/{name}' not found", 404) data = request.get_json() or {} issue_type = data.get("issue_type", "bug") severity = data.get("severity", "medium") title = (data.get("title") or "").strip()[:200] description = (data.get("description") or "").strip()[:5000] if issue_type not in ("bug", "security", "compatibility"): return error_response("VALIDATION_ERROR", "issue_type must be: bug, security, compatibility") if severity not in ("low", "medium", "high", "critical"): return error_response("VALIDATION_ERROR", "severity must be: low, medium, high, critical") if not title: return error_response("VALIDATION_ERROR", "Title is required") # Rate limit by IP ip = request.remote_addr or "unknown" limit_config = RATE_LIMITS["issue"] allowed, _ = rate_limiter.check(f"issue:{ip}", limit_config["limit"], limit_config["window"]) if not allowed: return error_response("RATE_LIMITED", "Too many issue reports. Try again later.", 429) # Get reporter if authenticated reporter_id = None user_slug, _ = get_current_user_context() if user_slug: pub = query_one(g.db, "SELECT id FROM publishers WHERE slug = ?", [user_slug]) if pub: reporter_id = pub["id"] now = datetime.utcnow().isoformat() g.db.execute( """ INSERT INTO tool_issues (tool_id, reporter_id, issue_type, severity, title, description, created_at, updated_at) VALUES (?, ?, ?, ?, ?, ?, ?, ?) """, [tool["id"], reporter_id, issue_type, severity, title, description or None, now, now], ) issue_id = g.db.execute("SELECT last_insert_rowid()").fetchone()[0] g.db.commit() # Refresh tool stats refresh_tool_stats(g.db, tool["id"]) response = jsonify({ "data": { "id": issue_id, "issue_type": issue_type, "severity": severity, "title": title, "status": "open", "created_at": now, } }) response.status_code = 201 return response @app.route("/api/v1/tools///issues", methods=["GET"]) def list_issues(owner: str, name: str) -> Response: """List issues for a tool.""" if not OWNER_RE.match(owner) or not TOOL_NAME_RE.match(name): return error_response("VALIDATION_ERROR", "Invalid owner or tool name") tool = query_one( g.db, "SELECT id FROM tools WHERE owner = ? AND name = ? ORDER BY id DESC LIMIT 1", [owner, name], ) if not tool: return error_response("TOOL_NOT_FOUND", f"Tool '{owner}/{name}' not found", 404) page = request.args.get("page", 1, type=int) per_page = min(request.args.get("per_page", 20, type=int), 100) status_filter = request.args.get("status") # open, confirmed, fixed, wontfix, duplicate type_filter = request.args.get("type") # bug, security, compatibility offset = (page - 1) * per_page where_clauses = ["tool_id = ?"] params: List[Any] = [tool["id"]] if status_filter: where_clauses.append("status = ?") params.append(status_filter) if type_filter: where_clauses.append("issue_type = ?") params.append(type_filter) where_sql = "WHERE " + " AND ".join(where_clauses) rows = query_all( g.db, f""" SELECT i.*, p.display_name as reporter_name FROM tool_issues i LEFT JOIN publishers p ON i.reporter_id = p.id {where_sql} ORDER BY CASE severity WHEN 'critical' THEN 1 WHEN 'high' THEN 2 WHEN 'medium' THEN 3 ELSE 4 END, created_at DESC LIMIT ? OFFSET ? """, params + [per_page, offset], ) count_row = query_one( g.db, f"SELECT COUNT(*) as total FROM tool_issues {where_sql}", params ) total = count_row["total"] if count_row else 0 data = [] for row in rows: data.append({ "id": row["id"], "issue_type": row["issue_type"], "severity": row["severity"], "title": row["title"], "status": row["status"], "reporter_name": row["reporter_name"] or "Anonymous", "created_at": row["created_at"], "resolved_at": row["resolved_at"], }) return jsonify({ "data": data, "meta": paginate(page, per_page, total), }) @app.route("/api/v1/issues/", methods=["GET"]) def get_issue(issue_id: int) -> Response: """Get issue details.""" row = query_one( g.db, """ SELECT i.*, p.display_name as reporter_name, r.display_name as resolver_name, t.owner, t.name as tool_name FROM tool_issues i LEFT JOIN publishers p ON i.reporter_id = p.id LEFT JOIN publishers r ON i.resolved_by = r.id JOIN tools t ON i.tool_id = t.id WHERE i.id = ? """, [issue_id], ) if not row: return error_response("ISSUE_NOT_FOUND", "Issue not found", 404) return jsonify({ "data": { "id": row["id"], "tool": f"{row['owner']}/{row['tool_name']}", "issue_type": row["issue_type"], "severity": row["severity"], "title": row["title"], "description": row["description"], "status": row["status"], "reporter_name": row["reporter_name"] or "Anonymous", "resolver_name": row["resolver_name"], "resolution_note": row["resolution_note"], "created_at": row["created_at"], "updated_at": row["updated_at"], "resolved_at": row["resolved_at"], } }) @app.route("/api/v1/issues/", methods=["PUT"]) @require_token def update_issue(issue_id: int) -> Response: """Update an issue (reporter or tool owner only).""" issue = query_one( g.db, """ SELECT i.*, t.owner as tool_owner FROM tool_issues i JOIN tools t ON i.tool_id = t.id WHERE i.id = ? """, [issue_id], ) if not issue: return error_response("ISSUE_NOT_FOUND", "Issue not found", 404) # Check permissions: reporter or tool owner is_reporter = issue["reporter_id"] == g.current_publisher["id"] is_owner = issue["tool_owner"] == g.current_publisher["slug"] is_mod = g.current_publisher["role"] in ("moderator", "admin") if not (is_reporter or is_owner or is_mod): return error_response("FORBIDDEN", "You don't have permission to update this issue", 403) data = request.get_json() or {} updates = [] params = [] # Reporter can update title and description if is_reporter and "title" in data: updates.append("title = ?") params.append(data["title"].strip()[:200]) if is_reporter and "description" in data: updates.append("description = ?") params.append(data["description"].strip()[:5000]) # Owner/mod can update status and severity if (is_owner or is_mod) and "severity" in data: severity = data["severity"] if severity in ("low", "medium", "high", "critical"): updates.append("severity = ?") params.append(severity) if (is_owner or is_mod) and "status" in data: status = data["status"] if status in ("open", "confirmed"): updates.append("status = ?") params.append(status) if not updates: return error_response("VALIDATION_ERROR", "No valid fields to update") updates.append("updated_at = ?") params.append(datetime.utcnow().isoformat()) params.append(issue_id) g.db.execute(f"UPDATE tool_issues SET {', '.join(updates)} WHERE id = ?", params) g.db.commit() return jsonify({"data": {"status": "updated", "issue_id": issue_id}}) @app.route("/api/v1/issues//resolve", methods=["POST"]) @require_token def resolve_issue(issue_id: int) -> Response: """Resolve an issue (tool owner or moderator).""" issue = query_one( g.db, """ SELECT i.*, t.owner as tool_owner, t.id as tool_id FROM tool_issues i JOIN tools t ON i.tool_id = t.id WHERE i.id = ? """, [issue_id], ) if not issue: return error_response("ISSUE_NOT_FOUND", "Issue not found", 404) is_owner = issue["tool_owner"] == g.current_publisher["slug"] is_mod = g.current_publisher["role"] in ("moderator", "admin") if not (is_owner or is_mod): return error_response("FORBIDDEN", "Only tool owner or moderators can resolve issues", 403) data = request.get_json() or {} resolution = data.get("resolution", "fixed") note = (data.get("note") or "").strip()[:500] if resolution not in ("fixed", "wontfix", "duplicate"): return error_response("VALIDATION_ERROR", "resolution must be: fixed, wontfix, duplicate") now = datetime.utcnow().isoformat() g.db.execute( """ UPDATE tool_issues SET status = ?, resolved_by = ?, resolved_at = ?, resolution_note = ?, updated_at = ? WHERE id = ? """, [resolution, g.current_publisher["id"], now, note or None, now, issue_id], ) g.db.commit() # Refresh tool stats refresh_tool_stats(g.db, issue["tool_id"]) return jsonify({ "data": { "status": resolution, "issue_id": issue_id, "resolved_at": now, } }) # ─── Publisher Stats API ────────────────────────────────────────────────────── @app.route("/api/v1/publishers//stats", methods=["GET"]) def get_publisher_stats_endpoint(slug: str) -> Response: """Get publisher reputation stats and badges.""" publisher = query_one( g.db, "SELECT id, slug, display_name, verified, created_at FROM publishers WHERE slug = ?", [slug], ) if not publisher: return error_response("PUBLISHER_NOT_FOUND", "Publisher not found", 404) stats = get_publisher_stats(g.db, publisher["id"]) if not stats: stats = { "tool_count": 0, "total_downloads": 0, "average_rating": 0, "total_reviews": 0, "trust_score": 0, "badges": [], } # Get badge details badge_details = [] for badge_id in stats.get("badges", []): info = get_badge_info(badge_id) if info: badge_details.append({ "id": badge_id, "name": info["name"], "icon": info["icon"], "color": info["color"], "description": info["description"], }) return jsonify({ "data": { "slug": publisher["slug"], "display_name": publisher["display_name"], "verified": bool(publisher["verified"]), "member_since": publisher["created_at"], "tool_count": stats["tool_count"], "total_downloads": stats["total_downloads"], "total_downloads_formatted": format_count(stats["total_downloads"]), "average_rating": stats["average_rating"], "total_reviews": stats["total_reviews"], "trust_score": stats["trust_score"], "badges": badge_details, } }) @app.route("/api/v1/publishers//reviews", methods=["GET"]) def list_publisher_reviews(slug: str) -> Response: """List all reviews across publisher's tools.""" publisher = query_one(g.db, "SELECT id FROM publishers WHERE slug = ?", [slug]) if not publisher: return error_response("PUBLISHER_NOT_FOUND", "Publisher not found", 404) page = request.args.get("page", 1, type=int) per_page = min(request.args.get("per_page", 20, type=int), 100) offset = (page - 1) * per_page rows = query_all( g.db, """ SELECT r.*, t.owner, t.name as tool_name, p.display_name as reviewer_name FROM reviews r JOIN tools t ON r.tool_id = t.id LEFT JOIN publishers p ON r.reviewer_id = p.id WHERE t.publisher_id = ? AND r.status = 'published' ORDER BY r.created_at DESC LIMIT ? OFFSET ? """, [publisher["id"], per_page, offset], ) count_row = query_one( g.db, """ SELECT COUNT(*) as total FROM reviews r JOIN tools t ON r.tool_id = t.id WHERE t.publisher_id = ? AND r.status = 'published' """, [publisher["id"]], ) total = count_row["total"] if count_row else 0 data = [] for row in rows: data.append({ "id": row["id"], "tool": f"{row['owner']}/{row['tool_name']}", "rating": row["rating"], "title": row["title"], "content": row["content"], "reviewer_name": row["reviewer_name"] or "Anonymous", "created_at": row["created_at"], }) return jsonify({ "data": data, "meta": paginate(page, per_page, total), }) # ─── Admin Reviews & Issues API ─────────────────────────────────────────────── @app.route("/api/v1/admin/reviews", methods=["GET"]) @require_moderator def admin_list_reviews() -> Response: """List reviews pending moderation (flagged).""" page = request.args.get("page", 1, type=int) per_page = min(request.args.get("per_page", 20, type=int), 100) status_filter = request.args.get("status", "flagged") offset = (page - 1) * per_page rows = query_all( g.db, """ SELECT r.*, t.owner, t.name as tool_name, p.display_name as reviewer_name FROM reviews r JOIN tools t ON r.tool_id = t.id LEFT JOIN publishers p ON r.reviewer_id = p.id WHERE r.status = ? ORDER BY r.updated_at DESC LIMIT ? OFFSET ? """, [status_filter, per_page, offset], ) count_row = query_one( g.db, "SELECT COUNT(*) as total FROM reviews WHERE status = ?", [status_filter] ) total = count_row["total"] if count_row else 0 data = [] for row in rows: data.append({ "id": row["id"], "tool": f"{row['owner']}/{row['tool_name']}", "rating": row["rating"], "title": row["title"], "content": row["content"], "reviewer_name": row["reviewer_name"] or "Anonymous", "status": row["status"], "created_at": row["created_at"], "updated_at": row["updated_at"], }) return jsonify({ "data": data, "meta": paginate(page, per_page, total), }) @app.route("/api/v1/admin/reviews//hide", methods=["POST"]) @require_moderator def admin_hide_review(review_id: int) -> Response: """Hide a review (moderation action).""" review = query_one(g.db, "SELECT * FROM reviews WHERE id = ?", [review_id]) if not review: return error_response("REVIEW_NOT_FOUND", "Review not found", 404) g.db.execute( "UPDATE reviews SET status = 'hidden', updated_at = ? WHERE id = ?", [datetime.utcnow().isoformat(), review_id], ) g.db.commit() # Refresh tool stats refresh_tool_stats(g.db, review["tool_id"]) log_audit("hide_review", "review", str(review_id), {"previous_status": review["status"]}) return jsonify({"data": {"status": "hidden", "review_id": review_id}}) @app.route("/api/v1/admin/reviews//restore", methods=["POST"]) @require_moderator def admin_restore_review(review_id: int) -> Response: """Restore a hidden review.""" review = query_one(g.db, "SELECT * FROM reviews WHERE id = ?", [review_id]) if not review: return error_response("REVIEW_NOT_FOUND", "Review not found", 404) g.db.execute( "UPDATE reviews SET status = 'published', updated_at = ? WHERE id = ?", [datetime.utcnow().isoformat(), review_id], ) g.db.commit() # Refresh tool stats refresh_tool_stats(g.db, review["tool_id"]) log_audit("restore_review", "review", str(review_id), {"previous_status": review["status"]}) return jsonify({"data": {"status": "published", "review_id": review_id}}) @app.route("/api/v1/admin/issues", methods=["GET"]) @require_moderator def admin_list_issues() -> Response: """List all issues across tools.""" page = request.args.get("page", 1, type=int) per_page = min(request.args.get("per_page", 20, type=int), 100) status_filter = request.args.get("status") type_filter = request.args.get("type") offset = (page - 1) * per_page where_clauses = [] params: List[Any] = [] if status_filter: where_clauses.append("i.status = ?") params.append(status_filter) if type_filter: where_clauses.append("i.issue_type = ?") params.append(type_filter) where_sql = "WHERE " + " AND ".join(where_clauses) if where_clauses else "" rows = query_all( g.db, f""" SELECT i.*, t.owner, t.name as tool_name, p.display_name as reporter_name FROM tool_issues i JOIN tools t ON i.tool_id = t.id LEFT JOIN publishers p ON i.reporter_id = p.id {where_sql} ORDER BY CASE i.severity WHEN 'critical' THEN 1 WHEN 'high' THEN 2 WHEN 'medium' THEN 3 ELSE 4 END, i.created_at DESC LIMIT ? OFFSET ? """, params + [per_page, offset], ) count_row = query_one( g.db, f"SELECT COUNT(*) as total FROM tool_issues i {where_sql}", params ) total = count_row["total"] if count_row else 0 data = [] for row in rows: data.append({ "id": row["id"], "tool": f"{row['owner']}/{row['tool_name']}", "issue_type": row["issue_type"], "severity": row["severity"], "title": row["title"], "status": row["status"], "reporter_name": row["reporter_name"] or "Anonymous", "created_at": row["created_at"], }) return jsonify({ "data": data, "meta": paginate(page, per_page, total), }) return app def main() -> None: app = create_app() app.run(host="0.0.0.0", port=int(os.environ.get("PORT", 5000))) if __name__ == "__main__": main()