"""Flask app for SmartTools Registry API (Phase 2).""" from __future__ import annotations import hashlib import json import math import os import re import secrets from dataclasses import dataclass from datetime import date, datetime, timedelta from typing import Any, Dict, Iterable, List, Optional, Tuple from flask import Flask, Response, g, jsonify, request import yaml from functools import wraps from argon2 import PasswordHasher from argon2.exceptions import VerifyMismatchError from .db import connect_db, init_db, query_all, query_one from .rate_limit import RateLimiter from .sync import process_webhook, get_categories_cache_path, get_repo_dir MAX_BODY_BYTES = 512 * 1024 MAX_CONFIG_BYTES = 64 * 1024 MAX_README_BYTES = 256 * 1024 MAX_TOOL_NAME_LEN = 64 MAX_DESC_LEN = 500 MAX_TAG_LEN = 32 MAX_TAGS = 10 MAX_PAGE_SIZE = 100 DEFAULT_PAGE_SIZE = 20 RATE_LIMITS = { "tools": {"limit": 100, "window": 60}, "download": {"limit": 60, "window": 60}, "register": {"limit": 5, "window": 3600}, "login": {"limit": 10, "window": 900}, "login_failed": {"limit": 5, "window": 900}, "tokens": {"limit": 10, "window": 3600}, "publish": {"limit": 20, "window": 3600}, } ALLOWED_SORT = { "/tools": {"downloads", "published_at", "name"}, "/tools/search": {"relevance", "downloads", "published_at"}, "/categories": {"name", "tool_count"}, } TOOL_NAME_RE = re.compile(r"^[A-Za-z0-9-]{1,64}$") OWNER_RE = re.compile(r"^[a-z0-9][a-z0-9-]{0,37}[a-z0-9]$") EMAIL_RE = re.compile(r"^[^@\s]+@[^@\s]+\.[^@\s]+$") RESERVED_SLUGS = {"official", "admin", "system", "api", "registry", "smarttools"} rate_limiter = RateLimiter() password_hasher = PasswordHasher(memory_cost=65536, time_cost=3, parallelism=4) @dataclass(frozen=True) class Semver: major: int minor: int patch: int prerelease: Tuple[Any, ...] = () @classmethod def parse(cls, value: str) -> Optional["Semver"]: match = re.match(r"^(\d+)\.(\d+)\.(\d+)(?:-([0-9A-Za-z.-]+))?(?:\+.+)?$", value) if not match: return None major, minor, patch = map(int, match.group(1, 2, 3)) prerelease_raw = match.group(4) if not prerelease_raw: return cls(major, minor, patch, ()) parts: List[Any] = [] for part in prerelease_raw.split("."): if part.isdigit(): parts.append(int(part)) else: parts.append(part) return cls(major, minor, patch, tuple(parts)) def is_prerelease(self) -> bool: return bool(self.prerelease) def __lt__(self, other: "Semver") -> bool: if (self.major, self.minor, self.patch) != (other.major, other.minor, other.patch): return (self.major, self.minor, self.patch) < (other.major, other.minor, other.patch) if not self.prerelease and other.prerelease: return False if self.prerelease and not other.prerelease: return True return self.prerelease < other.prerelease @dataclass(frozen=True) class Constraint: op: str version: Semver def parse_constraints(raw: str) -> Tuple[List[Constraint], bool]: raw = raw.strip() if not raw or raw == "*": return [], False allow_prerelease = "-" in raw parts = [part.strip() for part in raw.split(",") if part.strip()] constraints: List[Constraint] = [] for part in parts: if part.startswith("^"): base = Semver.parse(part[1:]) if not base: continue constraints.append(Constraint(">=", base)) if base.major > 0: upper = Semver(base.major + 1, 0, 0, ()) elif base.minor > 0: upper = Semver(base.major, base.minor + 1, 0, ()) else: upper = Semver(base.major, base.minor, base.patch + 1, ()) constraints.append(Constraint("<", upper)) allow_prerelease = allow_prerelease or base.is_prerelease() continue if part.startswith("~"): base = Semver.parse(part[1:]) if not base: continue constraints.append(Constraint(">=", base)) upper = Semver(base.major, base.minor + 1, 0, ()) constraints.append(Constraint("<", upper)) allow_prerelease = allow_prerelease or base.is_prerelease() continue match = re.match(r"^(>=|<=|>|<|=)?\s*(.+)$", part) if not match: continue op = match.group(1) or "=" version = Semver.parse(match.group(2)) if not version: continue constraints.append(Constraint(op, version)) allow_prerelease = allow_prerelease or version.is_prerelease() return constraints, allow_prerelease def satisfies(version: Semver, constraints: List[Constraint]) -> bool: for constraint in constraints: if constraint.op == ">" and not (version > constraint.version): return False if constraint.op == ">=" and not (version >= constraint.version): return False if constraint.op == "<" and not (version < constraint.version): return False if constraint.op == "<=" and not (version <= constraint.version): return False if constraint.op in {"=", "=="} and not (version == constraint.version): return False return True def select_version(versions: List[str], constraint_raw: Optional[str]) -> Optional[str]: parsed_versions: List[Tuple[Semver, str]] = [] for version in versions: parsed = Semver.parse(version) if parsed: parsed_versions.append((parsed, version)) if not parsed_versions: return None if not constraint_raw or constraint_raw.strip() == "*": candidates = [item for item in parsed_versions if not item[0].is_prerelease()] if not candidates: candidates = parsed_versions return max(candidates, key=lambda item: item[0])[1] constraints, allow_prerelease = parse_constraints(constraint_raw) filtered = [] for parsed, raw in parsed_versions: if not allow_prerelease and parsed.is_prerelease(): continue if satisfies(parsed, constraints): filtered.append((parsed, raw)) if not filtered: return None return max(filtered, key=lambda item: item[0])[1] def create_app() -> Flask: app = Flask(__name__) app.config["MAX_CONTENT_LENGTH"] = MAX_BODY_BYTES # Initialize database schema once at startup with connect_db() as init_conn: init_db(init_conn) @app.before_request def attach_db() -> None: g.db = connect_db() @app.teardown_request def close_db(exc: Optional[BaseException]) -> None: db = getattr(g, "db", None) if db is not None: db.close() @app.before_request def enforce_rate_limit() -> Optional[Response]: path = request.path method = request.method.upper() ip = request.headers.get("X-Forwarded-For", request.remote_addr or "unknown") if method == "GET": if path.startswith("/api/v1/tools/") and path.endswith("/download"): limit_config = RATE_LIMITS["download"] elif path.startswith("/api/v1/tools"): limit_config = RATE_LIMITS["tools"] else: return None elif method == "POST": if path == "/api/v1/register": limit_config = RATE_LIMITS["register"] elif path == "/api/v1/login": limit_config = RATE_LIMITS["login"] else: return None else: return None allowed, state = rate_limiter.check(ip, limit_config["limit"], limit_config["window"]) remaining = max(0, limit_config["limit"] - state.count) reset_at = int(state.reset_at) if not allowed: payload = { "error": { "code": "RATE_LIMITED", "message": f"Too many requests. Try again in {limit_config['window']} seconds.", "details": { "limit": limit_config["limit"], "window": f"{limit_config['window']} seconds", "retry_after": limit_config["window"], }, } } response = jsonify(payload) response.status_code = 429 response.headers["Retry-After"] = str(limit_config["window"]) response.headers["X-RateLimit-Limit"] = str(limit_config["limit"]) response.headers["X-RateLimit-Remaining"] = "0" response.headers["X-RateLimit-Reset"] = str(reset_at) return response request.rate_limit_headers = { "X-RateLimit-Limit": str(limit_config["limit"]), "X-RateLimit-Remaining": str(remaining), "X-RateLimit-Reset": str(reset_at), } return None @app.after_request def add_rate_limit_headers(response: Response) -> Response: headers = getattr(request, "rate_limit_headers", None) if headers: response.headers.update(headers) return response def error_response(code: str, message: str, status: int = 400, details: Optional[dict] = None) -> Response: payload = {"error": {"code": code, "message": message, "details": details or {}}} response = jsonify(payload) response.status_code = status return response def enforce_token_rate_limit(scope: str, token_hash: str) -> Optional[Response]: limit_config = RATE_LIMITS[scope] allowed, state = rate_limiter.check(token_hash, limit_config["limit"], limit_config["window"]) remaining = max(0, limit_config["limit"] - state.count) reset_at = int(state.reset_at) if not allowed: payload = { "error": { "code": "RATE_LIMITED", "message": f"Too many requests. Try again in {limit_config['window']} seconds.", "details": { "limit": limit_config["limit"], "window": f"{limit_config['window']} seconds", "retry_after": limit_config["window"], }, } } response = jsonify(payload) response.status_code = 429 response.headers["Retry-After"] = str(limit_config["window"]) response.headers["X-RateLimit-Limit"] = str(limit_config["limit"]) response.headers["X-RateLimit-Remaining"] = "0" response.headers["X-RateLimit-Reset"] = str(reset_at) return response request.rate_limit_headers = { "X-RateLimit-Limit": str(limit_config["limit"]), "X-RateLimit-Remaining": str(remaining), "X-RateLimit-Reset": str(reset_at), } return None def require_token(f): @wraps(f) def decorated(*args, **kwargs): auth_header = request.headers.get("Authorization") if not auth_header or not auth_header.startswith("Bearer "): return error_response("UNAUTHORIZED", "Missing or invalid token", 401) token = auth_header[7:] token_hash = hashlib.sha256(token.encode()).hexdigest() row = query_one( g.db, """ SELECT t.*, p.slug, p.display_name FROM api_tokens t JOIN publishers p ON t.publisher_id = p.id WHERE t.token_hash = ? AND t.revoked_at IS NULL """, [token_hash], ) if not row: return error_response("UNAUTHORIZED", "Invalid or revoked token", 401) g.db.execute( "UPDATE api_tokens SET last_used_at = ? WHERE id = ?", [datetime.utcnow().isoformat(), row["id"]], ) g.current_publisher = { "id": row["publisher_id"], "slug": row["slug"], "display_name": row["display_name"], } g.current_token = {"id": row["id"], "hash": token_hash} g.db.commit() return f(*args, **kwargs) return decorated def generate_token() -> Tuple[str, str]: alphabet = "0123456789ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz" raw = secrets.token_bytes(32) num = int.from_bytes(raw, "big") chars = [] while num > 0: num, rem = divmod(num, 62) chars.append(alphabet[rem]) token_body = "".join(reversed(chars)).rjust(43, "0") token = "reg_" + token_body[:43] token_hash = hashlib.sha256(token.encode()).hexdigest() return token, token_hash def validate_payload_size(field: str, content: str, limit: int) -> Optional[Response]: size = len(content.encode("utf-8")) if size > limit: return error_response( "PAYLOAD_TOO_LARGE", f"{field} exceeds {limit} bytes limit", 413, details={"field": field, "size": size, "limit": limit}, ) return None def paginate(page: int, per_page: int, total: int) -> Dict[str, int]: total_pages = max(1, math.ceil(total / per_page)) if per_page else 1 return { "page": page, "per_page": per_page, "total": total, "total_pages": total_pages, } def parse_pagination(endpoint_key: str, default_sort: str) -> Tuple[int, int, str, str, Optional[Response]]: try: page = int(request.args.get("page", 1)) except ValueError: return 1, DEFAULT_PAGE_SIZE, "downloads", "desc", error_response("VALIDATION_ERROR", "Invalid page") per_page_raw = request.args.get("per_page") if per_page_raw is None and request.args.get("limit") is not None: per_page_raw = request.args.get("limit") try: per_page = int(per_page_raw) if per_page_raw is not None else DEFAULT_PAGE_SIZE except ValueError: return 1, DEFAULT_PAGE_SIZE, "downloads", "desc", error_response("VALIDATION_ERROR", "Invalid per_page") if page < 1: return 1, DEFAULT_PAGE_SIZE, "downloads", "desc", error_response("VALIDATION_ERROR", "Page must be >= 1") if per_page < 1 or per_page > MAX_PAGE_SIZE: return 1, DEFAULT_PAGE_SIZE, "downloads", "desc", error_response("VALIDATION_ERROR", "per_page out of range") sort = request.args.get("sort", default_sort) order = request.args.get("order", "desc").lower() if order not in {"asc", "desc"}: return 1, DEFAULT_PAGE_SIZE, "downloads", "desc", error_response("INVALID_SORT", "Invalid sort order") allowed = ALLOWED_SORT.get(endpoint_key, set()) if sort not in allowed: return 1, DEFAULT_PAGE_SIZE, "downloads", "desc", error_response( "INVALID_SORT", f"Unknown sort field '{sort}'. Allowed: {', '.join(sorted(allowed))}", ) return page, per_page, sort, order, None def load_tool_row(owner: str, name: str, version: Optional[str] = None) -> Optional[dict]: sql = "SELECT * FROM tools WHERE owner = ? AND name = ?" params: List[Any] = [owner, name] if version: sql += " AND version = ?" params.append(version) sql += " ORDER BY id DESC LIMIT 1" row = query_one(g.db, sql, params) return dict(row) if row else None @app.route("/api/v1/tools", methods=["GET"]) def list_tools() -> Response: page, per_page, sort, order, error = parse_pagination("/tools", "downloads") if error: return error category = request.args.get("category") offset = (page - 1) * per_page base_where = "WHERE 1=1" params: List[Any] = [] if category: base_where += " AND category = ?" params.append(category) count_row = query_one( g.db, f"SELECT COUNT(DISTINCT owner || '/' || name) AS total FROM tools {base_where}", params, ) total = int(count_row["total"]) if count_row else 0 order_dir = "DESC" if order == "desc" else "ASC" order_sql = f"{sort} {order_dir}, published_at DESC, id DESC" rows = query_all( g.db, f""" WITH latest_any AS ( SELECT owner, name, MAX(id) AS max_id FROM tools {base_where} GROUP BY owner, name ), latest_stable AS ( SELECT owner, name, MAX(id) AS max_id FROM tools {base_where} AND version NOT LIKE '%-%' GROUP BY owner, name ) SELECT t.* FROM tools t JOIN ( SELECT a.owner, a.name, COALESCE(s.max_id, a.max_id) AS max_id FROM latest_any a LEFT JOIN latest_stable s ON s.owner = a.owner AND s.name = a.name ) latest ON t.owner = latest.owner AND t.name = latest.name AND t.id = latest.max_id ORDER BY {order_sql} LIMIT ? OFFSET ? """, params + [per_page, offset], ) data = [] for row in rows: data.append({ "owner": row["owner"], "name": row["name"], "version": row["version"], "description": row["description"], "category": row["category"], "tags": json.loads(row["tags"] or "[]"), "downloads": row["downloads"], "published_at": row["published_at"], }) return jsonify({"data": data, "meta": paginate(page, per_page, total)}) @app.route("/api/v1/tools/search", methods=["GET"]) def search_tools() -> Response: query_text = request.args.get("q", "").strip() if not query_text: return error_response("VALIDATION_ERROR", "Missing search query") page, per_page, sort, order, error = parse_pagination("/tools/search", "downloads") if error: return error category = request.args.get("category") offset = (page - 1) * per_page where_clause = "WHERE tools_fts MATCH ?" params: List[Any] = [query_text] if category: where_clause += " AND tools.category = ?" params.append(category) order_dir = "DESC" if order == "desc" else "ASC" if sort == "relevance": order_sql = f"rank {order_dir}, downloads DESC, published_at DESC, id DESC" else: order_sql = f"{sort} {order_dir}, published_at DESC, id DESC" rows = query_all( g.db, f""" WITH matches AS ( SELECT tools.*, bm25(tools_fts) AS rank FROM tools_fts JOIN tools ON tools_fts.rowid = tools.id {where_clause} ), latest_any AS ( SELECT owner, name, MAX(id) AS max_id FROM matches GROUP BY owner, name ), latest_stable AS ( SELECT owner, name, MAX(id) AS max_id FROM matches WHERE version NOT LIKE '%-%' GROUP BY owner, name ) SELECT m.* FROM matches m JOIN ( SELECT a.owner, a.name, COALESCE(s.max_id, a.max_id) AS max_id FROM latest_any a LEFT JOIN latest_stable s ON s.owner = a.owner AND s.name = a.name ) latest ON m.owner = latest.owner AND m.name = latest.name AND m.id = latest.max_id ORDER BY {order_sql} LIMIT ? OFFSET ? """, params + [per_page, offset], ) count_row = query_one( g.db, f""" WITH matches AS ( SELECT tools.* FROM tools_fts JOIN tools ON tools_fts.rowid = tools.id {where_clause} ) SELECT COUNT(DISTINCT owner || '/' || name) AS total FROM matches """, params, ) total = int(count_row["total"]) if count_row else 0 data = [] for row in rows: score = 1.0 / (1.0 + row["rank"]) if row["rank"] is not None else None data.append({ "owner": row["owner"], "name": row["name"], "version": row["version"], "description": row["description"], "category": row["category"], "tags": json.loads(row["tags"] or "[]"), "downloads": row["downloads"], "published_at": row["published_at"], "score": score, }) return jsonify({"data": data, "meta": paginate(page, per_page, total)}) @app.route("/api/v1/tools//", methods=["GET"]) def get_tool(owner: str, name: str) -> Response: if not OWNER_RE.match(owner) or not TOOL_NAME_RE.match(name): return error_response("VALIDATION_ERROR", "Invalid owner or tool name") version = request.args.get("version") if version: row = load_tool_row(owner, name, version) else: row = resolve_tool(owner, name, "*") if not row: return error_response("TOOL_NOT_FOUND", f"Tool '{owner}/{name}' does not exist", 404) payload = { "owner": row["owner"], "name": row["name"], "version": row["version"], "description": row["description"], "category": row["category"], "tags": json.loads(row["tags"] or "[]"), "downloads": row["downloads"], "published_at": row["published_at"], "deprecated": bool(row["deprecated"]), "deprecated_message": row["deprecated_message"], "replacement": row["replacement"], "config": row["config_yaml"], "readme": row["readme"], } response = jsonify({"data": payload}) response.headers["Cache-Control"] = "max-age=60" return response def resolve_tool(owner: str, name: str, constraint: Optional[str]) -> Optional[dict]: rows = query_all(g.db, "SELECT * FROM tools WHERE owner = ? AND name = ?", [owner, name]) if not rows: return None versions = [row["version"] for row in rows] selected = select_version(versions, constraint) if not selected: return None for row in rows: if row["version"] == selected: return dict(row) return None @app.route("/api/v1/tools///versions", methods=["GET"]) def list_tool_versions(owner: str, name: str) -> Response: if not OWNER_RE.match(owner) or not TOOL_NAME_RE.match(name): return error_response("VALIDATION_ERROR", "Invalid owner or tool name") rows = query_all(g.db, "SELECT version FROM tools WHERE owner = ? AND name = ? ORDER BY id DESC", [owner, name]) if not rows: return error_response("TOOL_NOT_FOUND", f"Tool '{owner}/{name}' does not exist", 404) versions = [row["version"] for row in rows] return jsonify({"data": {"versions": versions}}) @app.route("/api/v1/tools///download", methods=["GET"]) def download_tool(owner: str, name: str) -> Response: if not OWNER_RE.match(owner) or not TOOL_NAME_RE.match(name): return error_response("VALIDATION_ERROR", "Invalid owner or tool name") constraint = request.args.get("version") install_flag = request.args.get("install", "false").lower() == "true" row = resolve_tool(owner, name, constraint) if not row: available = [r["version"] for r in query_all(g.db, "SELECT version FROM tools WHERE owner = ? AND name = ?", [owner, name])] return error_response( "VERSION_NOT_FOUND", f"No version of '{owner}/{name}' satisfies constraint '{constraint or '*'}'", 404, details={ "tool": f"{owner}/{name}", "constraint": constraint or "*", "available_versions": available, "latest_stable": select_version(available, "*") if available else None, }, ) if install_flag: client_id = request.headers.get("X-Client-ID") if not client_id: client_id = f"anon_{hash(request.remote_addr)}" today = date.today().isoformat() try: g.db.execute( "INSERT INTO download_stats (tool_id, client_id, downloaded_at) VALUES (?, ?, ?)", [row["id"], client_id, today], ) g.db.execute("UPDATE tools SET downloads = downloads + 1 WHERE id = ?", [row["id"]]) g.db.commit() except Exception: g.db.rollback() response = jsonify({ "data": { "owner": row["owner"], "name": row["name"], "resolved_version": row["version"], "config": row["config_yaml"], "readme": row["readme"] or "", } }) response.headers["Cache-Control"] = "max-age=3600, immutable" return response @app.route("/api/v1/categories", methods=["GET"]) def list_categories() -> Response: page, per_page, sort, order, error = parse_pagination("/categories", "name") if error: return error cache_path = get_categories_cache_path() categories_payload = None if cache_path.exists(): categories_payload = json.loads(cache_path.read_text(encoding="utf-8")) else: categories_yaml = get_repo_dir() / "categories" / "categories.yaml" if categories_yaml.exists(): categories_payload = yaml.safe_load(categories_yaml.read_text(encoding="utf-8")) or {} categories = (categories_payload or {}).get("categories", []) counts = query_all( g.db, "SELECT category, COUNT(DISTINCT owner || '/' || name) AS total FROM tools GROUP BY category", ) count_map = {row["category"]: row["total"] for row in counts} data = [] for cat in categories: name = cat.get("name") if not name: continue data.append({ "name": name, "description": cat.get("description"), "icon": cat.get("icon"), "tool_count": count_map.get(name, 0), }) reverse = order == "desc" if sort == "tool_count": data.sort(key=lambda item: item["tool_count"], reverse=reverse) else: data.sort(key=lambda item: item["name"], reverse=reverse) total = len(data) start = (page - 1) * per_page end = start + per_page sliced = data[start:end] response = jsonify({"data": sliced, "meta": paginate(page, per_page, total)}) response.headers["Cache-Control"] = "max-age=3600" return response @app.route("/api/v1/stats/popular", methods=["GET"]) def popular_tools() -> Response: limit = min(int(request.args.get("limit", 10)), 50) rows = query_all( g.db, """ WITH latest AS ( SELECT owner, name, MAX(id) AS max_id FROM tools WHERE version NOT LIKE '%-%' GROUP BY owner, name ) SELECT t.* FROM tools t JOIN latest ON t.owner = latest.owner AND t.name = latest.name AND t.id = latest.max_id ORDER BY t.downloads DESC, t.published_at DESC LIMIT ? """, [limit], ) data = [] for row in rows: data.append({ "owner": row["owner"], "name": row["name"], "version": row["version"], "description": row["description"], "category": row["category"], "tags": json.loads(row["tags"] or "[]"), "downloads": row["downloads"], "published_at": row["published_at"], }) return jsonify({"data": data}) @app.route("/api/v1/index.json", methods=["GET"]) def get_index() -> Response: rows = query_all( g.db, """ WITH latest AS ( SELECT owner, name, MAX(id) AS max_id FROM tools WHERE version NOT LIKE '%-%' GROUP BY owner, name ) SELECT t.owner, t.name, t.version, t.description, t.category, t.tags, t.downloads FROM tools t JOIN latest ON t.owner = latest.owner AND t.name = latest.name AND t.id = latest.max_id ORDER BY t.downloads DESC """, ) tools = [] for row in rows: tools.append({ "owner": row["owner"], "name": row["name"], "version": row["version"], "description": row["description"], "category": row["category"], "tags": json.loads(row["tags"] or "[]"), "downloads": row["downloads"], }) # Generate checksum for integrity verification content = json.dumps(tools, sort_keys=True) checksum = "sha256:" + hashlib.sha256(content.encode()).hexdigest() payload = { "version": "1.0", "generated_at": datetime.utcnow().isoformat() + "Z", "checksum": checksum, "tool_count": len(tools), "tools": tools, } response = jsonify(payload) response.headers["Cache-Control"] = "max-age=300, stale-while-revalidate=60" response.headers["ETag"] = f'"{checksum}"' return response @app.route("/api/v1/register", methods=["POST"]) def register() -> Response: if request.content_length and request.content_length > MAX_BODY_BYTES: return error_response("PAYLOAD_TOO_LARGE", "Request body exceeds 512KB limit", 413) payload = request.get_json(silent=True) or {} email = (payload.get("email") or "").strip() password = payload.get("password") or "" slug = (payload.get("slug") or "").strip() display_name = (payload.get("display_name") or "").strip() if not email or not EMAIL_RE.match(email): return error_response("VALIDATION_ERROR", "Invalid email format") if not password or len(password) < 8: return error_response("VALIDATION_ERROR", "Password must be at least 8 characters") if not slug or not OWNER_RE.match(slug) or len(slug) < 2 or len(slug) > 39: return error_response("VALIDATION_ERROR", "Invalid slug format") if slug in RESERVED_SLUGS: return error_response("SLUG_TAKEN", f"Slug '{slug}' is reserved", 409) if not display_name: return error_response("VALIDATION_ERROR", "Display name is required") existing_email = query_one(g.db, "SELECT id FROM publishers WHERE email = ?", [email]) if existing_email: return error_response("VALIDATION_ERROR", "Email already registered") existing_slug = query_one(g.db, "SELECT id FROM publishers WHERE slug = ?", [slug]) if existing_slug: return error_response("SLUG_TAKEN", f"Slug '{slug}' is already taken", 409) password_hash = password_hasher.hash(password) g.db.execute( """ INSERT INTO publishers (email, password_hash, slug, display_name, verified) VALUES (?, ?, ?, ?, ?) """, [email, password_hash, slug, display_name, False], ) g.db.commit() publisher_id = query_one(g.db, "SELECT id FROM publishers WHERE slug = ?", [slug])["id"] response = jsonify({ "data": { "id": publisher_id, "slug": slug, "display_name": display_name, "email": email, } }) response.status_code = 201 return response @app.route("/api/v1/login", methods=["POST"]) def login() -> Response: if request.content_length and request.content_length > MAX_BODY_BYTES: return error_response("PAYLOAD_TOO_LARGE", "Request body exceeds 512KB limit", 413) payload = request.get_json(silent=True) or {} email = (payload.get("email") or "").strip() password = payload.get("password") or "" if not email or not password: return error_response("VALIDATION_ERROR", "Email and password are required") publisher = query_one( g.db, "SELECT * FROM publishers WHERE email = ?", [email], ) if not publisher: return error_response("UNAUTHORIZED", "Invalid credentials", 401) locked_until = publisher["locked_until"] if locked_until: try: locked_dt = datetime.fromisoformat(locked_until) if datetime.utcnow() < locked_dt: return error_response("ACCOUNT_LOCKED", "Account is locked", 403) except ValueError: pass try: password_hasher.verify(publisher["password_hash"], password) except VerifyMismatchError: ip = request.headers.get("X-Forwarded-For", request.remote_addr or "unknown") rate_key = f"{ip}:{email}:login_failed" limit_config = RATE_LIMITS["login_failed"] allowed, _ = rate_limiter.check(rate_key, limit_config["limit"], limit_config["window"]) attempts = int(publisher["failed_login_attempts"] or 0) + 1 locked = None if attempts >= 10: locked = datetime.utcnow() + timedelta(hours=1) elif attempts >= 5: locked = datetime.utcnow() + timedelta(minutes=15) g.db.execute( "UPDATE publishers SET failed_login_attempts = ?, locked_until = ? WHERE id = ?", [attempts, locked.isoformat() if locked else None, publisher["id"]], ) g.db.commit() if not allowed: return error_response("RATE_LIMITED", "Too many failed logins. Try again later.", 429) return error_response("UNAUTHORIZED", "Invalid credentials", 401) g.db.execute( "UPDATE publishers SET failed_login_attempts = 0, locked_until = NULL WHERE id = ?", [publisher["id"]], ) token, token_hash = generate_token() g.db.execute( """ INSERT INTO api_tokens (publisher_id, token_hash, name, created_at) VALUES (?, ?, ?, ?) """, [publisher["id"], token_hash, "login", datetime.utcnow().isoformat()], ) g.db.commit() return jsonify({ "data": { "token": token, "publisher": { "slug": publisher["slug"], "display_name": publisher["display_name"], }, } }) @app.route("/api/v1/tokens", methods=["POST"]) @require_token def create_token() -> Response: if request.content_length and request.content_length > MAX_BODY_BYTES: return error_response("PAYLOAD_TOO_LARGE", "Request body exceeds 512KB limit", 413) rate_resp = enforce_token_rate_limit("tokens", g.current_token["hash"]) if rate_resp: return rate_resp payload = request.get_json(silent=True) or {} name = (payload.get("name") or "CLI token").strip() token, token_hash = generate_token() now = datetime.utcnow().isoformat() g.db.execute( "INSERT INTO api_tokens (publisher_id, token_hash, name, created_at) VALUES (?, ?, ?, ?)", [g.current_publisher["id"], token_hash, name, now], ) g.db.commit() response = jsonify({ "data": { "token": token, "name": name, "created_at": now, } }) response.status_code = 201 return response @app.route("/api/v1/tokens", methods=["GET"]) @require_token def list_tokens() -> Response: rows = query_all( g.db, """ SELECT id, name, created_at, last_used_at FROM api_tokens WHERE publisher_id = ? AND revoked_at IS NULL ORDER BY created_at DESC """, [g.current_publisher["id"]], ) data = [] for row in rows: data.append({ "id": row["id"], "name": row["name"], "created_at": row["created_at"], "last_used_at": row["last_used_at"], }) return jsonify({"data": data}) @app.route("/api/v1/tokens/", methods=["DELETE"]) @require_token def revoke_token(token_id: int) -> Response: row = query_one( g.db, "SELECT id FROM api_tokens WHERE id = ? AND publisher_id = ?", [token_id, g.current_publisher["id"]], ) if not row: return error_response("FORBIDDEN", "Cannot revoke this token", 403) g.db.execute( "UPDATE api_tokens SET revoked_at = ? WHERE id = ?", [datetime.utcnow().isoformat(), token_id], ) g.db.commit() return jsonify({"data": {"revoked": True}}) @app.route("/api/v1/tools", methods=["POST"]) @require_token def publish_tool() -> Response: if request.content_length and request.content_length > MAX_BODY_BYTES: return error_response("PAYLOAD_TOO_LARGE", "Request body exceeds 512KB limit", 413) rate_resp = enforce_token_rate_limit("publish", g.current_token["hash"]) if rate_resp: return rate_resp payload = request.get_json(silent=True) or {} config_text = payload.get("config") or "" readme = payload.get("readme") or "" dry_run = bool(payload.get("dry_run")) size_resp = validate_payload_size("config", config_text, MAX_CONFIG_BYTES) if size_resp: return size_resp if readme: size_resp = validate_payload_size("readme", readme, MAX_README_BYTES) if size_resp: return size_resp try: data = yaml.safe_load(config_text) or {} except yaml.YAMLError: return error_response("VALIDATION_ERROR", "Invalid YAML in config") name = (data.get("name") or "").strip() version = (data.get("version") or "").strip() description = (data.get("description") or "").strip() category = (data.get("category") or "").strip() or None tags = data.get("tags") or [] if not name or not TOOL_NAME_RE.match(name) or len(name) > MAX_TOOL_NAME_LEN: return error_response("VALIDATION_ERROR", "Invalid tool name") if not version or Semver.parse(version) is None: return error_response("INVALID_VERSION", "Version string is not valid semver") if description and len(description) > MAX_DESC_LEN: return error_response("VALIDATION_ERROR", "Description exceeds 500 characters") if tags: if not isinstance(tags, list): return error_response("VALIDATION_ERROR", "Tags must be a list") if len(tags) > MAX_TAGS: return error_response("VALIDATION_ERROR", "Too many tags") for tag in tags: if len(str(tag)) > MAX_TAG_LEN: return error_response("VALIDATION_ERROR", "Tag exceeds 32 characters") owner = g.current_publisher["slug"] existing = query_one( g.db, "SELECT published_at FROM tools WHERE owner = ? AND name = ? AND version = ?", [owner, name, version], ) if existing: return error_response( "VERSION_EXISTS", f"Version {version} already exists", 409, details={"published_at": existing["published_at"]}, ) suggestions = {"category": None, "similar_tools": []} try: from .categorize import suggest_categories from .similarity import find_similar_tools categories_path = get_repo_dir() / "categories" / "categories.yaml" if not category and categories_path.exists(): ranked = suggest_categories(name, description, tags, categories_path) if ranked: suggestions["category"] = { "suggested": ranked[0][0], "confidence": ranked[0][1], } rows = query_all( g.db, "SELECT owner, name, description, category, tags FROM tools", ) existing = [] for row in rows: try: existing.append({ "owner": row["owner"], "name": row["name"], "description": row["description"] or "", "category": row["category"], "tags": json.loads(row["tags"] or "[]"), }) except Exception: continue similar = find_similar_tools(existing, name, description, tags, category) suggestions["similar_tools"] = [ {"name": f"{tool['owner']}/{tool['name']}", "similarity": score} for tool, score in similar[:5] ] except Exception: pass if dry_run: return jsonify({ "data": { "owner": owner, "name": name, "version": version, "status": "validated", "suggestions": suggestions, } }) tags_json = json.dumps(tags) g.db.execute( """ INSERT INTO tools ( owner, name, version, description, category, tags, config_yaml, readme, publisher_id, deprecated, deprecated_message, replacement, downloads, published_at ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) """, [ owner, name, version, description or None, category, tags_json, config_text, readme, g.current_publisher["id"], int(bool(data.get("deprecated"))), data.get("deprecated_message"), data.get("replacement"), 0, datetime.utcnow().isoformat(), ], ) g.db.commit() response = jsonify({ "data": { "owner": owner, "name": name, "version": version, "pr_url": "", "status": "pending_review", "suggestions": suggestions, } }) response.status_code = 201 return response @app.route("/api/v1/me/tools", methods=["GET"]) @require_token def my_tools() -> Response: rows = query_all( g.db, """ SELECT owner, name, version, description, downloads, deprecated, deprecated_message, replacement FROM tools WHERE owner = ? ORDER BY published_at DESC """, [g.current_publisher["slug"]], ) data = [] for row in rows: data.append({ "owner": row["owner"], "name": row["name"], "version": row["version"], "description": row["description"], "downloads": row["downloads"], "deprecated": bool(row["deprecated"]), "deprecated_message": row["deprecated_message"], "replacement": row["replacement"], }) return jsonify({"data": data}) @app.route("/api/v1/me/settings", methods=["PUT"]) @require_token def update_settings() -> Response: """Update current user's profile settings.""" data = request.get_json() or {} # Validate fields display_name = data.get("display_name", "").strip() bio = data.get("bio", "").strip() if data.get("bio") else None website = data.get("website", "").strip() if data.get("website") else None if display_name and len(display_name) > 100: return error_response("VALIDATION_ERROR", "Display name too long (max 100)", 400) if bio and len(bio) > 500: return error_response("VALIDATION_ERROR", "Bio too long (max 500)", 400) if website and len(website) > 200: return error_response("VALIDATION_ERROR", "Website URL too long (max 200)", 400) # Build update query updates = [] params = [] if display_name: updates.append("display_name = ?") params.append(display_name) if bio is not None: updates.append("bio = ?") params.append(bio) if website is not None: updates.append("website = ?") params.append(website) if not updates: return error_response("VALIDATION_ERROR", "No valid fields to update", 400) updates.append("updated_at = CURRENT_TIMESTAMP") params.append(g.current_publisher["id"]) g.db.execute( f"UPDATE publishers SET {', '.join(updates)} WHERE id = ?", params, ) g.db.commit() return jsonify({"data": {"status": "updated"}}) @app.route("/api/v1/featured/tools", methods=["GET"]) def featured_tools() -> Response: """Get featured tools for homepage/landing.""" placement = request.args.get("placement", "homepage") limit = min(int(request.args.get("limit", 6)), 20) rows = query_all( g.db, """ SELECT t.owner, t.name, t.version, t.description, t.category, t.downloads, ft.priority FROM featured_tools ft JOIN tools t ON ft.tool_id = t.id WHERE ft.placement = ? AND ft.status = 'active' AND (ft.start_at IS NULL OR ft.start_at <= CURRENT_TIMESTAMP) AND (ft.end_at IS NULL OR ft.end_at > CURRENT_TIMESTAMP) ORDER BY ft.priority DESC, t.downloads DESC LIMIT ? """, [placement, limit], ) # If no featured tools, fall back to popular if not rows: rows = query_all( g.db, """ SELECT owner, name, version, description, category, downloads FROM tools WHERE deprecated = 0 ORDER BY downloads DESC LIMIT ? """, [limit], ) data = [] for row in rows: data.append({ "owner": row["owner"], "name": row["name"], "version": row["version"], "description": row["description"], "category": row["category"], "downloads": row["downloads"], }) return jsonify({"data": data}) @app.route("/api/v1/featured/contributors", methods=["GET"]) def featured_contributors() -> Response: """Get featured contributor for homepage.""" placement = request.args.get("placement", "homepage") row = query_one( g.db, """ SELECT p.slug, p.display_name, p.bio, p.website, fc.bio_override FROM featured_contributors fc JOIN publishers p ON fc.publisher_id = p.id WHERE fc.placement = ? AND fc.status = 'active' AND (fc.start_at IS NULL OR fc.start_at <= CURRENT_TIMESTAMP) AND (fc.end_at IS NULL OR fc.end_at > CURRENT_TIMESTAMP) ORDER BY fc.created_at DESC LIMIT 1 """, [placement], ) if not row: return jsonify({"data": None}) return jsonify({ "data": { "slug": row["slug"], "display_name": row["display_name"], "bio": row["bio_override"] or row["bio"], "website": row["website"], } }) @app.route("/api/v1/content/announcements", methods=["GET"]) def announcements() -> Response: """Get published announcements.""" limit = min(int(request.args.get("limit", 5)), 20) rows = query_all( g.db, """ SELECT id, title, body, published_at FROM announcements WHERE published = 1 ORDER BY published_at DESC LIMIT ? """, [limit], ) data = [] for row in rows: data.append({ "id": row["id"], "title": row["title"], "body": row["body"], "published_at": row["published_at"], }) return jsonify({"data": data}) @app.route("/api/v1/reports", methods=["POST"]) def submit_report() -> Response: """Submit an abuse report for a tool.""" data = request.get_json() or {} owner = data.get("owner", "").strip() name = data.get("name", "").strip() reason = data.get("reason", "").strip() details = data.get("details", "").strip() if data.get("details") else None if not owner or not name: return error_response("VALIDATION_ERROR", "owner and name required", 400) if not reason: return error_response("VALIDATION_ERROR", "reason required", 400) if len(reason) > 100: return error_response("VALIDATION_ERROR", "reason too long (max 100)", 400) if details and len(details) > 2000: return error_response("VALIDATION_ERROR", "details too long (max 2000)", 400) # Find the tool tool = query_one( g.db, "SELECT id FROM tools WHERE owner = ? AND name = ? ORDER BY published_at DESC LIMIT 1", [owner, name], ) if not tool: return error_response("TOOL_NOT_FOUND", f"Tool {owner}/{name} not found", 404) # Get reporter info reporter_id = None if hasattr(g, "current_publisher") and g.current_publisher: reporter_id = g.current_publisher["id"] reporter_ip = request.remote_addr # Rate limit: max 5 reports per IP per hour recent = query_one( g.db, """ SELECT COUNT(*) as cnt FROM reports WHERE reporter_ip = ? AND created_at > datetime('now', '-1 hour') """, [reporter_ip], ) if recent and recent["cnt"] >= 5: return error_response("RATE_LIMITED", "Too many reports. Try again later.", 429) g.db.execute( """ INSERT INTO reports (tool_id, reporter_id, reporter_ip, reason, details) VALUES (?, ?, ?, ?, ?) """, [tool["id"], reporter_id, reporter_ip, reason, details], ) g.db.commit() return jsonify({"data": {"status": "submitted"}}) @app.route("/api/v1/consent", methods=["POST"]) def save_consent() -> Response: """Save user consent preferences for analytics/ads.""" try: data = request.get_json(force=True) or {} except Exception: data = {} analytics = bool(data.get("analytics", False)) ads = bool(data.get("ads", False)) # Store consent in session (works with our SQLite session interface) from flask import session session["consent_analytics"] = analytics session["consent_ads"] = ads session["consent_given"] = True return jsonify({ "data": { "analytics": analytics, "ads": ads, "saved": True } }) @app.route("/api/v1/analytics/pageview", methods=["POST"]) def track_pageview() -> Response: """Track a page view (privacy-friendly, no cookies).""" data = request.get_json() or {} path = data.get("path", "").strip() if not path or len(path) > 500: return jsonify({"data": {"tracked": False}}) # Hash the IP for privacy (don't store raw IP) ip_hash = hashlib.sha256( (request.remote_addr or "unknown").encode() ).hexdigest()[:16] referrer = request.headers.get("Referer", "")[:500] if request.headers.get("Referer") else None user_agent = request.headers.get("User-Agent", "")[:500] if request.headers.get("User-Agent") else None try: g.db.execute( """ INSERT INTO pageviews (path, referrer, user_agent, ip_hash) VALUES (?, ?, ?, ?) """, [path, referrer, user_agent, ip_hash], ) g.db.commit() return jsonify({"data": {"tracked": True}}) except Exception: return jsonify({"data": {"tracked": False}}) @app.route("/api/v1/webhook/gitea", methods=["POST"]) def webhook_gitea() -> Response: if request.content_length and request.content_length > MAX_BODY_BYTES: return error_response( "PAYLOAD_TOO_LARGE", "Request body exceeds 512KB limit", status=413, details={"limit": MAX_BODY_BYTES}, ) secret = os.environ.get("SMARTTOOLS_REGISTRY_WEBHOOK_SECRET", "") if not secret: return error_response("UNAUTHORIZED", "Webhook secret not configured", 401) status, payload = process_webhook(request.data, dict(request.headers), secret) response = jsonify(payload) response.status_code = status return response return app def main() -> None: app = create_app() app.run(host="0.0.0.0", port=int(os.environ.get("PORT", 5000))) if __name__ == "__main__": main()