CmdForge/src/cmdforge/registry/app.py

2937 lines
109 KiB
Python

"""Flask app for CmdForge Registry API (Phase 2)."""
from __future__ import annotations
import hashlib
import json
import math
import os
import re
import secrets
from dataclasses import dataclass
from datetime import date, datetime, timedelta
from typing import Any, Dict, Iterable, List, Optional, Tuple
from flask import Flask, Response, g, jsonify, request
import yaml
from functools import wraps
from argon2 import PasswordHasher
from argon2.exceptions import VerifyMismatchError
from .db import connect_db, init_db, query_all, query_one
from .rate_limit import RateLimiter
from .sync import process_webhook, get_categories_cache_path, get_repo_dir
MAX_BODY_BYTES = 512 * 1024
MAX_CONFIG_BYTES = 64 * 1024
MAX_README_BYTES = 256 * 1024
MAX_TOOL_NAME_LEN = 64
MAX_DESC_LEN = 500
MAX_TAG_LEN = 32
MAX_TAGS = 10
MAX_PAGE_SIZE = 100
DEFAULT_PAGE_SIZE = 20
RATE_LIMITS = {
"tools": {"limit": 100, "window": 60},
"download": {"limit": 60, "window": 60},
"register": {"limit": 5, "window": 3600},
"login": {"limit": 10, "window": 900},
"login_failed": {"limit": 5, "window": 900},
"tokens": {"limit": 10, "window": 3600},
"publish": {"limit": 20, "window": 3600},
}
ALLOWED_SORT = {
"/tools": {"downloads", "published_at", "name"},
"/tools/search": {"relevance", "downloads", "published_at"},
"/categories": {"name", "tool_count"},
}
TOOL_NAME_RE = re.compile(r"^[A-Za-z0-9-]{1,64}$")
OWNER_RE = re.compile(r"^[a-z0-9][a-z0-9-]{0,37}[a-z0-9]$")
EMAIL_RE = re.compile(r"^[^@\s]+@[^@\s]+\.[^@\s]+$")
RESERVED_SLUGS = {"official", "admin", "system", "api", "registry", "cmdforge"}
rate_limiter = RateLimiter()
password_hasher = PasswordHasher(memory_cost=65536, time_cost=3, parallelism=4)
@dataclass(frozen=True)
class Semver:
major: int
minor: int
patch: int
prerelease: Tuple[Any, ...] = ()
@classmethod
def parse(cls, value: str) -> Optional["Semver"]:
match = re.match(r"^(\d+)\.(\d+)\.(\d+)(?:-([0-9A-Za-z.-]+))?(?:\+.+)?$", value)
if not match:
return None
major, minor, patch = map(int, match.group(1, 2, 3))
prerelease_raw = match.group(4)
if not prerelease_raw:
return cls(major, minor, patch, ())
parts: List[Any] = []
for part in prerelease_raw.split("."):
if part.isdigit():
parts.append(int(part))
else:
parts.append(part)
return cls(major, minor, patch, tuple(parts))
def is_prerelease(self) -> bool:
return bool(self.prerelease)
def __lt__(self, other: "Semver") -> bool:
if (self.major, self.minor, self.patch) != (other.major, other.minor, other.patch):
return (self.major, self.minor, self.patch) < (other.major, other.minor, other.patch)
if not self.prerelease and other.prerelease:
return False
if self.prerelease and not other.prerelease:
return True
return self.prerelease < other.prerelease
@dataclass(frozen=True)
class Constraint:
op: str
version: Semver
def parse_constraints(raw: str) -> Tuple[List[Constraint], bool]:
raw = raw.strip()
if not raw or raw == "*":
return [], False
allow_prerelease = "-" in raw
parts = [part.strip() for part in raw.split(",") if part.strip()]
constraints: List[Constraint] = []
for part in parts:
if part.startswith("^"):
base = Semver.parse(part[1:])
if not base:
continue
constraints.append(Constraint(">=", base))
if base.major > 0:
upper = Semver(base.major + 1, 0, 0, ())
elif base.minor > 0:
upper = Semver(base.major, base.minor + 1, 0, ())
else:
upper = Semver(base.major, base.minor, base.patch + 1, ())
constraints.append(Constraint("<", upper))
allow_prerelease = allow_prerelease or base.is_prerelease()
continue
if part.startswith("~"):
base = Semver.parse(part[1:])
if not base:
continue
constraints.append(Constraint(">=", base))
upper = Semver(base.major, base.minor + 1, 0, ())
constraints.append(Constraint("<", upper))
allow_prerelease = allow_prerelease or base.is_prerelease()
continue
match = re.match(r"^(>=|<=|>|<|=)?\s*(.+)$", part)
if not match:
continue
op = match.group(1) or "="
version = Semver.parse(match.group(2))
if not version:
continue
constraints.append(Constraint(op, version))
allow_prerelease = allow_prerelease or version.is_prerelease()
return constraints, allow_prerelease
def satisfies(version: Semver, constraints: List[Constraint]) -> bool:
for constraint in constraints:
if constraint.op == ">" and not (version > constraint.version):
return False
if constraint.op == ">=" and not (version >= constraint.version):
return False
if constraint.op == "<" and not (version < constraint.version):
return False
if constraint.op == "<=" and not (version <= constraint.version):
return False
if constraint.op in {"=", "=="} and not (version == constraint.version):
return False
return True
def select_version(versions: List[str], constraint_raw: Optional[str]) -> Optional[str]:
parsed_versions: List[Tuple[Semver, str]] = []
for version in versions:
parsed = Semver.parse(version)
if parsed:
parsed_versions.append((parsed, version))
if not parsed_versions:
return None
if not constraint_raw or constraint_raw.strip() == "*":
candidates = [item for item in parsed_versions if not item[0].is_prerelease()]
if not candidates:
candidates = parsed_versions
return max(candidates, key=lambda item: item[0])[1]
constraints, allow_prerelease = parse_constraints(constraint_raw)
filtered = []
for parsed, raw in parsed_versions:
if not allow_prerelease and parsed.is_prerelease():
continue
if satisfies(parsed, constraints):
filtered.append((parsed, raw))
if not filtered:
return None
return max(filtered, key=lambda item: item[0])[1]
def create_app() -> Flask:
app = Flask(__name__)
app.config["MAX_CONTENT_LENGTH"] = MAX_BODY_BYTES
# Initialize database schema once at startup
with connect_db() as init_conn:
init_db(init_conn)
@app.before_request
def attach_db() -> None:
g.db = connect_db()
@app.teardown_request
def close_db(exc: Optional[BaseException]) -> None:
db = getattr(g, "db", None)
if db is not None:
db.close()
@app.before_request
def enforce_rate_limit() -> Optional[Response]:
path = request.path
method = request.method.upper()
ip = request.headers.get("X-Forwarded-For", request.remote_addr or "unknown")
if method == "GET":
if path.startswith("/api/v1/tools/") and path.endswith("/download"):
limit_config = RATE_LIMITS["download"]
elif path.startswith("/api/v1/tools"):
limit_config = RATE_LIMITS["tools"]
else:
return None
elif method == "POST":
if path == "/api/v1/register":
limit_config = RATE_LIMITS["register"]
elif path == "/api/v1/login":
limit_config = RATE_LIMITS["login"]
else:
return None
else:
return None
allowed, state = rate_limiter.check(ip, limit_config["limit"], limit_config["window"])
remaining = max(0, limit_config["limit"] - state.count)
reset_at = int(state.reset_at)
if not allowed:
payload = {
"error": {
"code": "RATE_LIMITED",
"message": f"Too many requests. Try again in {limit_config['window']} seconds.",
"details": {
"limit": limit_config["limit"],
"window": f"{limit_config['window']} seconds",
"retry_after": limit_config["window"],
},
}
}
response = jsonify(payload)
response.status_code = 429
response.headers["Retry-After"] = str(limit_config["window"])
response.headers["X-RateLimit-Limit"] = str(limit_config["limit"])
response.headers["X-RateLimit-Remaining"] = "0"
response.headers["X-RateLimit-Reset"] = str(reset_at)
return response
request.rate_limit_headers = {
"X-RateLimit-Limit": str(limit_config["limit"]),
"X-RateLimit-Remaining": str(remaining),
"X-RateLimit-Reset": str(reset_at),
}
return None
@app.after_request
def add_rate_limit_headers(response: Response) -> Response:
headers = getattr(request, "rate_limit_headers", None)
if headers:
response.headers.update(headers)
return response
def error_response(code: str, message: str, status: int = 400, details: Optional[dict] = None) -> Response:
payload = {"error": {"code": code, "message": message, "details": details or {}}}
response = jsonify(payload)
response.status_code = status
return response
def enforce_token_rate_limit(scope: str, token_hash: str) -> Optional[Response]:
limit_config = RATE_LIMITS[scope]
allowed, state = rate_limiter.check(token_hash, limit_config["limit"], limit_config["window"])
remaining = max(0, limit_config["limit"] - state.count)
reset_at = int(state.reset_at)
if not allowed:
payload = {
"error": {
"code": "RATE_LIMITED",
"message": f"Too many requests. Try again in {limit_config['window']} seconds.",
"details": {
"limit": limit_config["limit"],
"window": f"{limit_config['window']} seconds",
"retry_after": limit_config["window"],
},
}
}
response = jsonify(payload)
response.status_code = 429
response.headers["Retry-After"] = str(limit_config["window"])
response.headers["X-RateLimit-Limit"] = str(limit_config["limit"])
response.headers["X-RateLimit-Remaining"] = "0"
response.headers["X-RateLimit-Reset"] = str(reset_at)
return response
request.rate_limit_headers = {
"X-RateLimit-Limit": str(limit_config["limit"]),
"X-RateLimit-Remaining": str(remaining),
"X-RateLimit-Reset": str(reset_at),
}
return None
def require_token(f):
@wraps(f)
def decorated(*args, **kwargs):
auth_header = request.headers.get("Authorization")
if not auth_header or not auth_header.startswith("Bearer "):
return error_response("UNAUTHORIZED", "Missing or invalid token", 401)
token = auth_header[7:]
token_hash = hashlib.sha256(token.encode()).hexdigest()
row = query_one(
g.db,
"""
SELECT t.*, p.slug, p.display_name, p.role, p.banned, p.ban_reason
FROM api_tokens t
JOIN publishers p ON t.publisher_id = p.id
WHERE t.token_hash = ? AND t.revoked_at IS NULL
""",
[token_hash],
)
if not row:
return error_response("UNAUTHORIZED", "Invalid or revoked token", 401)
# Check if publisher is banned
if row["banned"]:
return error_response(
"ACCOUNT_BANNED",
f"Your account has been banned: {row['ban_reason'] or 'No reason given'}",
403,
)
g.db.execute(
"UPDATE api_tokens SET last_used_at = ? WHERE id = ?",
[datetime.utcnow().isoformat(), row["id"]],
)
g.current_publisher = {
"id": row["publisher_id"],
"slug": row["slug"],
"display_name": row["display_name"],
"role": row["role"] or "user",
}
g.current_token = {"id": row["id"], "hash": token_hash}
g.db.commit()
return f(*args, **kwargs)
return decorated
def require_role(*roles):
"""Decorator that requires the user to have one of the specified roles."""
def decorator(f):
@wraps(f)
@require_token
def decorated(*args, **kwargs):
user_role = g.current_publisher.get("role", "user")
if user_role not in roles:
return error_response(
"FORBIDDEN",
f"This action requires one of these roles: {', '.join(roles)}",
403,
)
return f(*args, **kwargs)
return decorated
return decorator
def require_admin(f):
"""Decorator that requires admin role."""
return require_role("admin")(f)
def require_moderator(f):
"""Decorator that requires moderator or admin role."""
return require_role("moderator", "admin")(f)
def log_audit(action: str, target_type: str, target_id: str, details: dict = None) -> None:
"""Log a moderation action to the audit trail."""
actor_id = g.current_publisher["id"] if hasattr(g, "current_publisher") and g.current_publisher else None
g.db.execute(
"""
INSERT INTO audit_log (action, target_type, target_id, actor_id, details)
VALUES (?, ?, ?, ?, ?)
""",
[action, target_type, str(target_id), str(actor_id) if actor_id else "system",
json.dumps(details) if details else None],
)
g.db.commit()
def get_current_user_context() -> Tuple[Optional[str], Optional[str]]:
"""Get current user's slug and role from request context.
Tries to authenticate via Bearer token without requiring it.
Returns (slug, role) tuple, both None if not authenticated.
"""
auth_header = request.headers.get("Authorization")
if not auth_header or not auth_header.startswith("Bearer "):
return None, None
token = auth_header[7:]
token_hash = hashlib.sha256(token.encode()).hexdigest()
row = query_one(
g.db,
"""
SELECT p.slug, p.role, p.banned
FROM api_tokens t
JOIN publishers p ON t.publisher_id = p.id
WHERE t.token_hash = ? AND t.revoked_at IS NULL
""",
[token_hash],
)
if not row or row["banned"]:
return None, None
return row["slug"], row["role"] or "user"
def build_visibility_filter(table_prefix: str = "") -> Tuple[str, List[Any]]:
"""Build SQL WHERE clause for tool visibility filtering.
Returns (sql_clause, params) tuple.
The clause includes leading AND or WHERE as appropriate.
"""
prefix = f"{table_prefix}." if table_prefix else ""
user_slug, user_role = get_current_user_context()
# Moderators and admins see everything
if user_role in ("moderator", "admin"):
return "", []
# Regular users see:
# 1. Approved public tools
# 2. Their own tools (any status/visibility)
if user_slug:
return (
f" AND (({prefix}visibility = 'public' AND {prefix}moderation_status = 'approved') "
f"OR {prefix}owner = ?)",
[user_slug],
)
else:
# Unauthenticated users only see approved public tools
return (
f" AND {prefix}visibility = 'public' AND {prefix}moderation_status = 'approved'",
[],
)
def generate_token() -> Tuple[str, str]:
alphabet = "0123456789ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz"
raw = secrets.token_bytes(32)
num = int.from_bytes(raw, "big")
chars = []
while num > 0:
num, rem = divmod(num, 62)
chars.append(alphabet[rem])
token_body = "".join(reversed(chars)).rjust(43, "0")
token = "reg_" + token_body[:43]
token_hash = hashlib.sha256(token.encode()).hexdigest()
return token, token_hash
def validate_payload_size(field: str, content: str, limit: int) -> Optional[Response]:
size = len(content.encode("utf-8"))
if size > limit:
return error_response(
"PAYLOAD_TOO_LARGE",
f"{field} exceeds {limit} bytes limit",
413,
details={"field": field, "size": size, "limit": limit},
)
return None
def paginate(page: int, per_page: int, total: int) -> Dict[str, int]:
total_pages = max(1, math.ceil(total / per_page)) if per_page else 1
return {
"page": page,
"per_page": per_page,
"total": total,
"total_pages": total_pages,
}
def parse_pagination(endpoint_key: str, default_sort: str) -> Tuple[int, int, str, str, Optional[Response]]:
try:
page = int(request.args.get("page", 1))
except ValueError:
return 1, DEFAULT_PAGE_SIZE, "downloads", "desc", error_response("VALIDATION_ERROR", "Invalid page")
per_page_raw = request.args.get("per_page")
if per_page_raw is None and request.args.get("limit") is not None:
per_page_raw = request.args.get("limit")
try:
per_page = int(per_page_raw) if per_page_raw is not None else DEFAULT_PAGE_SIZE
except ValueError:
return 1, DEFAULT_PAGE_SIZE, "downloads", "desc", error_response("VALIDATION_ERROR", "Invalid per_page")
if page < 1:
return 1, DEFAULT_PAGE_SIZE, "downloads", "desc", error_response("VALIDATION_ERROR", "Page must be >= 1")
if per_page < 1 or per_page > MAX_PAGE_SIZE:
return 1, DEFAULT_PAGE_SIZE, "downloads", "desc", error_response("VALIDATION_ERROR", "per_page out of range")
sort = request.args.get("sort", default_sort)
order = request.args.get("order", "desc").lower()
if order not in {"asc", "desc"}:
return 1, DEFAULT_PAGE_SIZE, "downloads", "desc", error_response("INVALID_SORT", "Invalid sort order")
allowed = ALLOWED_SORT.get(endpoint_key, set())
if sort not in allowed:
return 1, DEFAULT_PAGE_SIZE, "downloads", "desc", error_response(
"INVALID_SORT",
f"Unknown sort field '{sort}'. Allowed: {', '.join(sorted(allowed))}",
)
return page, per_page, sort, order, None
def load_tool_row(owner: str, name: str, version: Optional[str] = None) -> Optional[dict]:
sql = "SELECT * FROM tools WHERE owner = ? AND name = ?"
params: List[Any] = [owner, name]
if version:
sql += " AND version = ?"
params.append(version)
sql += " ORDER BY id DESC LIMIT 1"
row = query_one(g.db, sql, params)
return dict(row) if row else None
@app.route("/api/v1/tools", methods=["GET"])
def list_tools() -> Response:
page, per_page, sort, order, error = parse_pagination("/tools", "downloads")
if error:
return error
category = request.args.get("category")
offset = (page - 1) * per_page
# Build visibility filter
vis_filter, vis_params = build_visibility_filter()
base_where = "WHERE 1=1"
params: List[Any] = []
if category:
base_where += " AND category = ?"
params.append(category)
# Add visibility filter
base_where += vis_filter
params.extend(vis_params)
count_row = query_one(
g.db,
f"SELECT COUNT(DISTINCT owner || '/' || name) AS total FROM tools {base_where}",
params,
)
total = int(count_row["total"]) if count_row else 0
order_dir = "DESC" if order == "desc" else "ASC"
order_sql = f"{sort} {order_dir}, published_at DESC, id DESC"
rows = query_all(
g.db,
f"""
WITH latest_any AS (
SELECT owner, name, MAX(id) AS max_id
FROM tools
{base_where}
GROUP BY owner, name
),
latest_stable AS (
SELECT owner, name, MAX(id) AS max_id
FROM tools
{base_where} AND version NOT LIKE '%-%'
GROUP BY owner, name
)
SELECT t.* FROM tools t
JOIN (
SELECT a.owner, a.name, COALESCE(s.max_id, a.max_id) AS max_id
FROM latest_any a
LEFT JOIN latest_stable s ON s.owner = a.owner AND s.name = a.name
) latest
ON t.owner = latest.owner AND t.name = latest.name AND t.id = latest.max_id
ORDER BY {order_sql}
LIMIT ? OFFSET ?
""",
params + params + [per_page, offset], # params twice for both CTEs
)
data = []
for row in rows:
data.append({
"owner": row["owner"],
"name": row["name"],
"version": row["version"],
"description": row["description"],
"category": row["category"],
"tags": json.loads(row["tags"] or "[]"),
"downloads": row["downloads"],
"published_at": row["published_at"],
})
return jsonify({"data": data, "meta": paginate(page, per_page, total)})
@app.route("/api/v1/tools/search", methods=["GET"])
def search_tools() -> Response:
query_text = request.args.get("q", "").strip()
if not query_text:
return error_response("VALIDATION_ERROR", "Missing search query")
# Sanitize query for FTS5 - escape special characters that cause syntax errors
# FTS5 special chars: * " ( ) : ^ - NOT AND OR NEAR
# For safety, we'll quote the entire query if it contains special chars
fts5_special = set('*"():^-')
if any(c in fts5_special for c in query_text) or query_text.upper() in ('NOT', 'AND', 'OR', 'NEAR'):
# Escape double quotes and wrap in quotes for literal search
query_text = '"' + query_text.replace('"', '""') + '"'
page, per_page, sort, order, error = parse_pagination("/tools/search", "downloads")
if error:
return error
offset = (page - 1) * per_page
# Parse filter parameters
category = request.args.get("category") # Single category (backward compat)
categories_param = request.args.get("categories", "") # Multi-category (OR logic)
tags_param = request.args.get("tags", "") # Tags (AND logic)
owner_filter = request.args.get("owner")
min_downloads = request.args.get("min_downloads", type=int)
max_downloads = request.args.get("max_downloads", type=int)
published_after = request.args.get("published_after")
published_before = request.args.get("published_before")
include_deprecated = request.args.get("deprecated", "false").lower() == "true"
include_facets = request.args.get("include_facets", "false").lower() == "true"
# Parse multi-value params
categories = [c.strip() for c in categories_param.split(",") if c.strip()]
if category and category not in categories:
categories.append(category)
tags = [t.strip() for t in tags_param.split(",") if t.strip()]
# Build WHERE clause
where_clauses = ["tools_fts MATCH ?"]
params: List[Any] = [query_text]
if categories:
placeholders = ",".join(["?" for _ in categories])
where_clauses.append(f"tools.category IN ({placeholders})")
params.extend(categories)
if owner_filter:
where_clauses.append("tools.owner = ?")
params.append(owner_filter)
if min_downloads is not None:
where_clauses.append("tools.downloads >= ?")
params.append(min_downloads)
if max_downloads is not None:
where_clauses.append("tools.downloads <= ?")
params.append(max_downloads)
if published_after:
where_clauses.append("tools.published_at >= ?")
params.append(published_after)
if published_before:
where_clauses.append("tools.published_at <= ?")
params.append(published_before)
if not include_deprecated:
where_clauses.append("tools.deprecated = 0")
# Add visibility filtering
vis_filter, vis_params = build_visibility_filter("tools")
if vis_filter:
# Remove leading " AND " since we're adding to a list
where_clauses.append(vis_filter.strip().lstrip("AND").strip())
params.extend(vis_params)
where_clause = "WHERE " + " AND ".join(where_clauses)
# Tag filtering CTE (AND logic - must have ALL specified tags)
tag_cte = ""
tag_join = ""
if tags:
tag_placeholders = ",".join(["?" for _ in tags])
tag_cte = f"""
tag_matches AS (
SELECT tools.id
FROM tools, json_each(tools.tags) AS tag
WHERE tag.value IN ({tag_placeholders})
GROUP BY tools.id
HAVING COUNT(DISTINCT tag.value) = ?
),
"""
tag_join = "JOIN tag_matches tm ON m.id = tm.id"
# Prepend tag params
params = tags + [len(tags)] + params
order_dir = "DESC" if order == "desc" else "ASC"
if sort == "relevance":
order_sql = f"rank {order_dir}, downloads DESC, published_at DESC, id DESC"
else:
order_sql = f"{sort} {order_dir}, published_at DESC, id DESC"
rows = query_all(
g.db,
f"""
WITH {tag_cte}
matches AS (
SELECT tools.*, bm25(tools_fts) AS rank
FROM tools_fts
JOIN tools ON tools_fts.rowid = tools.id
{where_clause}
),
filtered AS (
SELECT m.* FROM matches m
{tag_join}
),
latest_any AS (
SELECT owner, name, MAX(id) AS max_id
FROM filtered
GROUP BY owner, name
),
latest_stable AS (
SELECT owner, name, MAX(id) AS max_id
FROM filtered
WHERE version NOT LIKE '%-%'
GROUP BY owner, name
)
SELECT f.* FROM filtered f
JOIN (
SELECT a.owner, a.name, COALESCE(s.max_id, a.max_id) AS max_id
FROM latest_any a
LEFT JOIN latest_stable s ON s.owner = a.owner AND s.name = a.name
) latest
ON f.owner = latest.owner AND f.name = latest.name AND f.id = latest.max_id
ORDER BY {order_sql}
LIMIT ? OFFSET ?
""",
params + [per_page, offset],
)
# Count query (reuse same params without pagination)
count_row = query_one(
g.db,
f"""
WITH {tag_cte}
matches AS (
SELECT tools.*
FROM tools_fts
JOIN tools ON tools_fts.rowid = tools.id
{where_clause}
),
filtered AS (
SELECT m.* FROM matches m
{tag_join}
)
SELECT COUNT(DISTINCT owner || '/' || name) AS total FROM filtered
""",
params,
)
total = int(count_row["total"]) if count_row else 0
data = []
for row in rows:
score = 1.0 / (1.0 + row["rank"]) if row["rank"] is not None else None
data.append({
"owner": row["owner"],
"name": row["name"],
"version": row["version"],
"description": row["description"],
"category": row["category"],
"tags": json.loads(row["tags"] or "[]"),
"downloads": row["downloads"],
"published_at": row["published_at"],
"score": score,
})
result: dict = {"data": data, "meta": paginate(page, per_page, total)}
# Compute facets if requested
if include_facets:
# Category facets
cat_rows = query_all(
g.db,
f"""
WITH {tag_cte}
matches AS (
SELECT tools.*
FROM tools_fts
JOIN tools ON tools_fts.rowid = tools.id
{where_clause}
),
filtered AS (
SELECT m.* FROM matches m
{tag_join}
)
SELECT category, COUNT(DISTINCT owner || '/' || name) AS count
FROM filtered
WHERE category IS NOT NULL
GROUP BY category
ORDER BY count DESC
LIMIT 20
""",
params,
)
# Tag facets
tag_rows = query_all(
g.db,
f"""
WITH {tag_cte}
matches AS (
SELECT tools.*
FROM tools_fts
JOIN tools ON tools_fts.rowid = tools.id
{where_clause}
),
filtered AS (
SELECT m.* FROM matches m
{tag_join}
)
SELECT tag.value AS name, COUNT(DISTINCT filtered.owner || '/' || filtered.name) AS count
FROM filtered, json_each(filtered.tags) AS tag
GROUP BY tag.value
ORDER BY count DESC
LIMIT 30
""",
params,
)
# Owner facets
owner_rows = query_all(
g.db,
f"""
WITH {tag_cte}
matches AS (
SELECT tools.*
FROM tools_fts
JOIN tools ON tools_fts.rowid = tools.id
{where_clause}
),
filtered AS (
SELECT m.* FROM matches m
{tag_join}
)
SELECT owner, COUNT(DISTINCT owner || '/' || name) AS count
FROM filtered
GROUP BY owner
ORDER BY count DESC
LIMIT 20
""",
params,
)
result["facets"] = {
"categories": [{"name": r["category"], "count": r["count"]} for r in cat_rows],
"tags": [{"name": r["name"], "count": r["count"]} for r in tag_rows],
"owners": [{"name": r["owner"], "count": r["count"]} for r in owner_rows],
}
return jsonify(result)
@app.route("/api/v1/tools/<owner>/<name>", methods=["GET"])
def get_tool(owner: str, name: str) -> Response:
if not OWNER_RE.match(owner) or not TOOL_NAME_RE.match(name):
return error_response("VALIDATION_ERROR", "Invalid owner or tool name")
version = request.args.get("version")
if version:
row = load_tool_row(owner, name, version)
else:
row = resolve_tool(owner, name, "*")
if not row:
return error_response("TOOL_NOT_FOUND", f"Tool '{owner}/{name}' does not exist", 404)
# Check visibility permissions
user_slug, user_role = get_current_user_context()
is_elevated = user_role in ("moderator", "admin")
is_owner = user_slug == row["owner"]
visibility = row.get("visibility", "public")
moderation_status = row.get("moderation_status", "approved")
# Public tools require approval, unlisted tools accessible by direct link, private only to owner
is_approved = moderation_status == "approved"
is_public_approved = visibility == "public" and is_approved
is_unlisted_approved = visibility == "unlisted" and is_approved
is_accessible = is_elevated or is_owner or is_public_approved or is_unlisted_approved
if not is_accessible:
return error_response("TOOL_NOT_FOUND", f"Tool '{owner}/{name}' does not exist", 404)
# Parse source attribution
source_obj = None
if row["source_json"]:
try:
source_obj = json.loads(row["source_json"])
except (json.JSONDecodeError, TypeError):
pass
# Fall back to legacy fields if no source_json
if not source_obj and (row["source"] or row["source_url"]):
source_obj = {
"type": "imported",
"original_tool": row["source"],
"url": row["source_url"],
}
payload = {
"owner": row["owner"],
"name": row["name"],
"version": row["version"],
"description": row["description"],
"category": row["category"],
"tags": json.loads(row["tags"] or "[]"),
"downloads": row["downloads"],
"published_at": row["published_at"],
"deprecated": bool(row["deprecated"]),
"deprecated_message": row["deprecated_message"],
"replacement": row["replacement"],
"config": row["config_yaml"],
"readme": row["readme"],
"source": source_obj,
}
response = jsonify({"data": payload})
response.headers["Cache-Control"] = "max-age=60"
return response
def resolve_tool(owner: str, name: str, constraint: Optional[str]) -> Optional[dict]:
rows = query_all(g.db, "SELECT * FROM tools WHERE owner = ? AND name = ?", [owner, name])
if not rows:
return None
versions = [row["version"] for row in rows]
selected = select_version(versions, constraint)
if not selected:
return None
for row in rows:
if row["version"] == selected:
return dict(row)
return None
@app.route("/api/v1/tools/<owner>/<name>/versions", methods=["GET"])
def list_tool_versions(owner: str, name: str) -> Response:
if not OWNER_RE.match(owner) or not TOOL_NAME_RE.match(name):
return error_response("VALIDATION_ERROR", "Invalid owner or tool name")
rows = query_all(g.db, "SELECT version FROM tools WHERE owner = ? AND name = ? ORDER BY id DESC", [owner, name])
if not rows:
return error_response("TOOL_NOT_FOUND", f"Tool '{owner}/{name}' does not exist", 404)
versions = [row["version"] for row in rows]
return jsonify({"data": {"versions": versions}})
@app.route("/api/v1/tools/<owner>/<name>/download", methods=["GET"])
def download_tool(owner: str, name: str) -> Response:
if not OWNER_RE.match(owner) or not TOOL_NAME_RE.match(name):
return error_response("VALIDATION_ERROR", "Invalid owner or tool name")
constraint = request.args.get("version")
install_flag = request.args.get("install", "false").lower() == "true"
row = resolve_tool(owner, name, constraint)
if not row:
available = [r["version"] for r in query_all(g.db, "SELECT version FROM tools WHERE owner = ? AND name = ?", [owner, name])]
return error_response(
"VERSION_NOT_FOUND",
f"No version of '{owner}/{name}' satisfies constraint '{constraint or '*'}'",
404,
details={
"tool": f"{owner}/{name}",
"constraint": constraint or "*",
"available_versions": available,
"latest_stable": select_version(available, "*") if available else None,
},
)
# Check visibility permissions
user_slug, user_role = get_current_user_context()
is_elevated = user_role in ("moderator", "admin")
is_owner = user_slug == row["owner"]
visibility = row.get("visibility", "public")
moderation_status = row.get("moderation_status", "approved")
# Public tools require approval, unlisted tools accessible by direct link, private only to owner
is_approved = moderation_status == "approved"
is_public_approved = visibility == "public" and is_approved
is_unlisted_approved = visibility == "unlisted" and is_approved
is_accessible = is_elevated or is_owner or is_public_approved or is_unlisted_approved
if not is_accessible:
return error_response("TOOL_NOT_FOUND", f"Tool '{owner}/{name}' does not exist", 404)
if install_flag:
client_id = request.headers.get("X-Client-ID")
if not client_id:
client_id = f"anon_{hash(request.remote_addr)}"
today = date.today().isoformat()
try:
g.db.execute(
"INSERT INTO download_stats (tool_id, client_id, downloaded_at) VALUES (?, ?, ?)",
[row["id"], client_id, today],
)
g.db.execute("UPDATE tools SET downloads = downloads + 1 WHERE id = ?", [row["id"]])
g.db.commit()
except Exception:
g.db.rollback()
response = jsonify({
"data": {
"owner": row["owner"],
"name": row["name"],
"resolved_version": row["version"],
"config": row["config_yaml"],
"readme": row["readme"] or "",
}
})
response.headers["Cache-Control"] = "max-age=3600, immutable"
return response
@app.route("/api/v1/categories", methods=["GET"])
def list_categories() -> Response:
page, per_page, sort, order, error = parse_pagination("/categories", "name")
if error:
return error
cache_path = get_categories_cache_path()
categories_payload = None
if cache_path.exists():
categories_payload = json.loads(cache_path.read_text(encoding="utf-8"))
else:
categories_yaml = get_repo_dir() / "categories" / "categories.yaml"
if categories_yaml.exists():
categories_payload = yaml.safe_load(categories_yaml.read_text(encoding="utf-8")) or {}
predefined_categories = (categories_payload or {}).get("categories", [])
# Get counts for all categories in the database
counts = query_all(
g.db,
"SELECT category, COUNT(DISTINCT owner || '/' || name) AS total FROM tools GROUP BY category",
)
count_map = {row["category"]: row["total"] for row in counts}
# Calculate total tools across all categories
total_tools = sum(row["total"] for row in counts)
# Build data from predefined categories
predefined_names = set()
data = []
for cat in predefined_categories:
name = cat.get("name")
if not name:
continue
predefined_names.add(name)
data.append({
"name": name,
"description": cat.get("description"),
"icon": cat.get("icon"),
"tool_count": count_map.get(name, 0),
})
# Add any categories from database that aren't in predefined list
for category_name, count in count_map.items():
if category_name and category_name not in predefined_names:
# Auto-generate display info for dynamic categories
display_name = category_name.replace("-", " ").title()
data.append({
"name": category_name,
"description": f"Tools in the {display_name} category",
"icon": None,
"tool_count": count,
})
reverse = order == "desc"
if sort == "tool_count":
data.sort(key=lambda item: item["tool_count"], reverse=reverse)
else:
data.sort(key=lambda item: item["name"], reverse=reverse)
total = len(data)
start = (page - 1) * per_page
end = start + per_page
sliced = data[start:end]
meta = paginate(page, per_page, total)
meta["total_tools"] = total_tools # Add total tools count to meta
response = jsonify({"data": sliced, "meta": meta})
response.headers["Cache-Control"] = "max-age=3600"
return response
@app.route("/api/v1/tags", methods=["GET"])
def list_tags() -> Response:
"""List all tags with usage counts."""
category = request.args.get("category")
limit = min(int(request.args.get("limit", 100)), 500)
# Build query - extract tags from JSON array and count occurrences
if category:
rows = query_all(
g.db,
"""
SELECT tag.value AS name, COUNT(DISTINCT tools.owner || '/' || tools.name) AS count
FROM tools, json_each(tools.tags) AS tag
WHERE tools.category = ?
GROUP BY tag.value
ORDER BY count DESC
LIMIT ?
""",
(category, limit),
)
else:
rows = query_all(
g.db,
"""
SELECT tag.value AS name, COUNT(DISTINCT tools.owner || '/' || tools.name) AS count
FROM tools, json_each(tools.tags) AS tag
GROUP BY tag.value
ORDER BY count DESC
LIMIT ?
""",
(limit,),
)
data = [{"name": row["name"], "count": row["count"]} for row in rows]
response = jsonify({"data": data, "meta": {"total": len(data)}})
response.headers["Cache-Control"] = "max-age=3600"
return response
# ─── Collections ─────────────────────────────────────────────────────────────
@app.route("/api/v1/collections", methods=["GET"])
def list_collections() -> Response:
"""List all collections."""
rows = query_all(
g.db,
"SELECT * FROM collections ORDER BY name",
)
data = []
for row in rows:
tools = json.loads(row["tools"]) if row["tools"] else []
tags = json.loads(row["tags"]) if row["tags"] else []
data.append({
"name": row["name"],
"display_name": row["display_name"],
"description": row["description"],
"icon": row["icon"],
"maintainer": row["maintainer"],
"tool_count": len(tools),
"tags": tags,
})
response = jsonify({"data": data})
response.headers["Cache-Control"] = "max-age=3600"
return response
@app.route("/api/v1/collections/<name>", methods=["GET"])
def get_collection(name: str) -> Response:
"""Get collection details with tool information."""
row = query_one(g.db, "SELECT * FROM collections WHERE name = ?", [name])
if not row:
return error_response("COLLECTION_NOT_FOUND", f"Collection '{name}' not found", 404)
tools_refs = json.loads(row["tools"]) if row["tools"] else []
pinned = json.loads(row["pinned"]) if row["pinned"] else {}
tags = json.loads(row["tags"]) if row["tags"] else []
# Fetch tool details for each tool in the collection
tools_data = []
for ref in tools_refs:
parts = ref.split("/")
if len(parts) != 2:
continue
owner, tool_name = parts
tool_row = query_one(
g.db,
"""
SELECT * FROM tools
WHERE owner = ? AND name = ? AND version NOT LIKE '%-%'
ORDER BY id DESC LIMIT 1
""",
[owner, tool_name],
)
if tool_row:
tools_data.append({
"owner": tool_row["owner"],
"name": tool_row["name"],
"version": tool_row["version"],
"description": tool_row["description"],
"category": tool_row["category"],
"downloads": tool_row["downloads"],
"pinned_version": pinned.get(ref),
})
else:
# Tool not found in registry
tools_data.append({
"owner": owner,
"name": tool_name,
"version": None,
"description": None,
"category": None,
"downloads": 0,
"pinned_version": pinned.get(ref),
"missing": True,
})
response = jsonify({
"data": {
"name": row["name"],
"display_name": row["display_name"],
"description": row["description"],
"icon": row["icon"],
"maintainer": row["maintainer"],
"tools": tools_data,
"tags": tags,
}
})
response.headers["Cache-Control"] = "max-age=3600"
return response
@app.route("/api/v1/stats/popular", methods=["GET"])
def popular_tools() -> Response:
limit = min(int(request.args.get("limit", 10)), 50)
rows = query_all(
g.db,
"""
WITH latest AS (
SELECT owner, name, MAX(id) AS max_id
FROM tools
WHERE version NOT LIKE '%-%'
GROUP BY owner, name
)
SELECT t.* FROM tools t
JOIN latest ON t.owner = latest.owner AND t.name = latest.name AND t.id = latest.max_id
ORDER BY t.downloads DESC, t.published_at DESC
LIMIT ?
""",
[limit],
)
data = []
for row in rows:
data.append({
"owner": row["owner"],
"name": row["name"],
"version": row["version"],
"description": row["description"],
"category": row["category"],
"tags": json.loads(row["tags"] or "[]"),
"downloads": row["downloads"],
"published_at": row["published_at"],
})
return jsonify({"data": data})
@app.route("/api/v1/index.json", methods=["GET"])
def get_index() -> Response:
rows = query_all(
g.db,
"""
WITH latest AS (
SELECT owner, name, MAX(id) AS max_id
FROM tools
WHERE version NOT LIKE '%-%'
GROUP BY owner, name
)
SELECT t.owner, t.name, t.version, t.description, t.category, t.tags, t.downloads
FROM tools t
JOIN latest ON t.owner = latest.owner AND t.name = latest.name AND t.id = latest.max_id
ORDER BY t.downloads DESC
""",
)
tools = []
for row in rows:
tools.append({
"owner": row["owner"],
"name": row["name"],
"version": row["version"],
"description": row["description"],
"category": row["category"],
"tags": json.loads(row["tags"] or "[]"),
"downloads": row["downloads"],
})
# Generate checksum for integrity verification
content = json.dumps(tools, sort_keys=True)
checksum = "sha256:" + hashlib.sha256(content.encode()).hexdigest()
payload = {
"version": "1.0",
"generated_at": datetime.utcnow().isoformat() + "Z",
"checksum": checksum,
"tool_count": len(tools),
"tools": tools,
}
response = jsonify(payload)
response.headers["Cache-Control"] = "max-age=300, stale-while-revalidate=60"
response.headers["ETag"] = f'"{checksum}"'
return response
@app.route("/api/v1/register", methods=["POST"])
def register() -> Response:
if request.content_length and request.content_length > MAX_BODY_BYTES:
return error_response("PAYLOAD_TOO_LARGE", "Request body exceeds 512KB limit", 413)
payload = request.get_json(silent=True) or {}
email = (payload.get("email") or "").strip()
password = payload.get("password") or ""
slug = (payload.get("slug") or "").strip()
display_name = (payload.get("display_name") or "").strip()
if not email or not EMAIL_RE.match(email):
return error_response("VALIDATION_ERROR", "Invalid email format")
if not password or len(password) < 8:
return error_response("VALIDATION_ERROR", "Password must be at least 8 characters")
if not slug or not OWNER_RE.match(slug) or len(slug) < 2 or len(slug) > 39:
return error_response("VALIDATION_ERROR", "Invalid slug format")
if slug in RESERVED_SLUGS:
return error_response("SLUG_TAKEN", f"Slug '{slug}' is reserved", 409)
if not display_name:
return error_response("VALIDATION_ERROR", "Display name is required")
existing_email = query_one(g.db, "SELECT id FROM publishers WHERE email = ?", [email])
if existing_email:
return error_response("VALIDATION_ERROR", "Email already registered")
existing_slug = query_one(g.db, "SELECT id FROM publishers WHERE slug = ?", [slug])
if existing_slug:
return error_response("SLUG_TAKEN", f"Slug '{slug}' is already taken", 409)
password_hash = password_hasher.hash(password)
g.db.execute(
"""
INSERT INTO publishers (email, password_hash, slug, display_name, verified)
VALUES (?, ?, ?, ?, ?)
""",
[email, password_hash, slug, display_name, False],
)
g.db.commit()
publisher_id = query_one(g.db, "SELECT id FROM publishers WHERE slug = ?", [slug])["id"]
response = jsonify({
"data": {
"id": publisher_id,
"slug": slug,
"display_name": display_name,
"email": email,
}
})
response.status_code = 201
return response
@app.route("/api/v1/login", methods=["POST"])
def login() -> Response:
if request.content_length and request.content_length > MAX_BODY_BYTES:
return error_response("PAYLOAD_TOO_LARGE", "Request body exceeds 512KB limit", 413)
payload = request.get_json(silent=True) or {}
email = (payload.get("email") or "").strip()
password = payload.get("password") or ""
if not email or not password:
return error_response("VALIDATION_ERROR", "Email and password are required")
publisher = query_one(
g.db,
"SELECT * FROM publishers WHERE email = ?",
[email],
)
if not publisher:
return error_response("UNAUTHORIZED", "Invalid credentials", 401)
locked_until = publisher["locked_until"]
if locked_until:
try:
locked_dt = datetime.fromisoformat(locked_until)
if datetime.utcnow() < locked_dt:
return error_response("ACCOUNT_LOCKED", "Account is locked", 403)
except ValueError:
pass
try:
password_hasher.verify(publisher["password_hash"], password)
except VerifyMismatchError:
ip = request.headers.get("X-Forwarded-For", request.remote_addr or "unknown")
rate_key = f"{ip}:{email}:login_failed"
limit_config = RATE_LIMITS["login_failed"]
allowed, _ = rate_limiter.check(rate_key, limit_config["limit"], limit_config["window"])
attempts = int(publisher["failed_login_attempts"] or 0) + 1
locked = None
if attempts >= 10:
locked = datetime.utcnow() + timedelta(hours=1)
elif attempts >= 5:
locked = datetime.utcnow() + timedelta(minutes=15)
g.db.execute(
"UPDATE publishers SET failed_login_attempts = ?, locked_until = ? WHERE id = ?",
[attempts, locked.isoformat() if locked else None, publisher["id"]],
)
g.db.commit()
if not allowed:
return error_response("RATE_LIMITED", "Too many failed logins. Try again later.", 429)
return error_response("UNAUTHORIZED", "Invalid credentials", 401)
g.db.execute(
"UPDATE publishers SET failed_login_attempts = 0, locked_until = NULL WHERE id = ?",
[publisher["id"]],
)
# Delete any existing session tokens for this user (cleanup)
g.db.execute(
"DELETE FROM api_tokens WHERE publisher_id = ? AND name = 'Web Session'",
[publisher["id"]],
)
token, token_hash = generate_token()
g.db.execute(
"""
INSERT INTO api_tokens (publisher_id, token_hash, name, created_at)
VALUES (?, ?, ?, ?)
""",
[publisher["id"], token_hash, "Web Session", datetime.utcnow().isoformat()],
)
g.db.commit()
return jsonify({
"data": {
"token": token,
"publisher": {
"slug": publisher["slug"],
"display_name": publisher["display_name"],
},
}
})
@app.route("/api/v1/tokens", methods=["POST"])
@require_token
def create_token() -> Response:
if request.content_length and request.content_length > MAX_BODY_BYTES:
return error_response("PAYLOAD_TOO_LARGE", "Request body exceeds 512KB limit", 413)
rate_resp = enforce_token_rate_limit("tokens", g.current_token["hash"])
if rate_resp:
return rate_resp
payload = request.get_json(silent=True) or {}
name = (payload.get("name") or "CLI token").strip()
token, token_hash = generate_token()
now = datetime.utcnow().isoformat()
g.db.execute(
"INSERT INTO api_tokens (publisher_id, token_hash, name, created_at) VALUES (?, ?, ?, ?)",
[g.current_publisher["id"], token_hash, name, now],
)
g.db.commit()
response = jsonify({
"data": {
"token": token,
"name": name,
"created_at": now,
}
})
response.status_code = 201
return response
@app.route("/api/v1/tokens", methods=["GET"])
@require_token
def list_tokens() -> Response:
rows = query_all(
g.db,
"""
SELECT id, name, created_at, last_used_at
FROM api_tokens
WHERE publisher_id = ? AND revoked_at IS NULL
ORDER BY created_at DESC
""",
[g.current_publisher["id"]],
)
data = []
for row in rows:
data.append({
"id": row["id"],
"name": row["name"],
"created_at": row["created_at"],
"last_used_at": row["last_used_at"],
})
return jsonify({"data": data})
@app.route("/api/v1/tokens/<int:token_id>", methods=["DELETE"])
@require_token
def revoke_token(token_id: int) -> Response:
row = query_one(
g.db,
"SELECT id FROM api_tokens WHERE id = ? AND publisher_id = ?",
[token_id, g.current_publisher["id"]],
)
if not row:
return error_response("FORBIDDEN", "Cannot revoke this token", 403)
g.db.execute(
"UPDATE api_tokens SET revoked_at = ? WHERE id = ?",
[datetime.utcnow().isoformat(), token_id],
)
g.db.commit()
return jsonify({"data": {"revoked": True}})
# ─── App Pairing (Connect Flow) ──────────────────────────────────────────────
@app.route("/api/v1/pairing/initiate", methods=["POST"])
@require_token
def initiate_pairing() -> Response:
"""Initiate a pairing request from the website. Creates a pending pairing code."""
import secrets
# Clean up old expired pairing requests
g.db.execute(
"DELETE FROM pairing_requests WHERE expires_at < ? OR status = 'claimed'",
[datetime.utcnow().isoformat()],
)
g.db.commit()
# Cancel any existing pending requests for this user
g.db.execute(
"UPDATE pairing_requests SET status = 'cancelled' WHERE publisher_id = ? AND status = 'pending'",
[g.current_publisher["id"]],
)
# Generate pairing code and token
pairing_code = secrets.token_urlsafe(16)
token, token_hash = generate_token()
expires_at = (datetime.utcnow() + timedelta(minutes=5)).isoformat()
g.db.execute(
"""
INSERT INTO pairing_requests (publisher_id, pairing_code, token_hash, token_plain, expires_at)
VALUES (?, ?, ?, ?, ?)
""",
[g.current_publisher["id"], pairing_code, token_hash, token, expires_at],
)
g.db.commit()
return jsonify({
"data": {
"status": "pending",
"username": g.current_publisher["slug"],
"expires_in": 300,
}
})
@app.route("/api/v1/pairing/check/<username>", methods=["GET"])
def check_pairing(username: str) -> Response:
"""CLI polls this to check if pairing is ready. Returns token if ready."""
hostname = request.args.get("hostname", "Unknown Device")
# Find pending pairing for this username
row = query_one(
g.db,
"""
SELECT pr.*, p.slug
FROM pairing_requests pr
JOIN publishers p ON pr.publisher_id = p.id
WHERE p.slug = ? AND pr.status = 'pending' AND pr.expires_at > ?
ORDER BY pr.created_at DESC
LIMIT 1
""",
[username, datetime.utcnow().isoformat()],
)
if not row:
return jsonify({"data": {"status": "not_found"}})
# Claim the pairing
g.db.execute(
"""
UPDATE pairing_requests
SET status = 'claimed', hostname = ?, claimed_at = ?
WHERE id = ?
""",
[hostname, datetime.utcnow().isoformat(), row["id"]],
)
# Create the actual API token with hostname
g.db.execute(
"""
INSERT INTO api_tokens (publisher_id, token_hash, name, hostname, created_at)
VALUES (?, ?, ?, ?, ?)
""",
[row["publisher_id"], row["token_hash"], f"App: {hostname}", hostname, datetime.utcnow().isoformat()],
)
g.db.commit()
return jsonify({
"data": {
"status": "connected",
"token": row["token_plain"],
"username": username,
}
})
@app.route("/api/v1/pairing/status", methods=["GET"])
@require_token
def pairing_status() -> Response:
"""Check if there's an active pairing request for the current user."""
row = query_one(
g.db,
"""
SELECT status, expires_at, created_at
FROM pairing_requests
WHERE publisher_id = ? AND status = 'pending' AND expires_at > ?
ORDER BY created_at DESC
LIMIT 1
""",
[g.current_publisher["id"], datetime.utcnow().isoformat()],
)
if not row:
return jsonify({"data": {"status": "none"}})
return jsonify({
"data": {
"status": "pending",
"expires_at": row["expires_at"],
}
})
@app.route("/api/v1/connected-apps", methods=["GET"])
@require_token
def list_connected_apps() -> Response:
"""List connected apps (tokens with hostnames) for the current user."""
rows = query_all(
g.db,
"""
SELECT id, name, hostname, created_at, last_used_at
FROM api_tokens
WHERE publisher_id = ? AND revoked_at IS NULL
ORDER BY last_used_at DESC NULLS LAST, created_at DESC
""",
[g.current_publisher["id"]],
)
data = []
for row in rows:
data.append({
"id": row["id"],
"name": row["name"],
"hostname": row["hostname"] or "Unknown",
"created_at": row["created_at"],
"last_used_at": row["last_used_at"],
})
return jsonify({"data": data})
@app.route("/api/v1/connected-apps/<int:app_id>", methods=["DELETE"])
@require_token
def disconnect_app(app_id: int) -> Response:
"""Disconnect (revoke) an app."""
row = query_one(
g.db,
"SELECT id FROM api_tokens WHERE id = ? AND publisher_id = ?",
[app_id, g.current_publisher["id"]],
)
if not row:
return error_response("NOT_FOUND", "App not found", 404)
g.db.execute(
"UPDATE api_tokens SET revoked_at = ? WHERE id = ?",
[datetime.utcnow().isoformat(), app_id],
)
g.db.commit()
return jsonify({"data": {"disconnected": True}})
@app.route("/api/v1/tools", methods=["POST"])
@require_token
def publish_tool() -> Response:
if request.content_length and request.content_length > MAX_BODY_BYTES:
return error_response("PAYLOAD_TOO_LARGE", "Request body exceeds 512KB limit", 413)
rate_resp = enforce_token_rate_limit("publish", g.current_token["hash"])
if rate_resp:
return rate_resp
payload = request.get_json(silent=True) or {}
config_text = payload.get("config") or ""
readme = payload.get("readme") or ""
dry_run = bool(payload.get("dry_run"))
size_resp = validate_payload_size("config", config_text, MAX_CONFIG_BYTES)
if size_resp:
return size_resp
if readme:
size_resp = validate_payload_size("readme", readme, MAX_README_BYTES)
if size_resp:
return size_resp
try:
data = yaml.safe_load(config_text) or {}
except yaml.YAMLError:
return error_response("VALIDATION_ERROR", "Invalid YAML in config")
name = (data.get("name") or "").strip()
version = (data.get("version") or "").strip()
description = (data.get("description") or "").strip()
category = (data.get("category") or "").strip() or None
tags = data.get("tags") or []
# Parse visibility from payload or config YAML
visibility = (payload.get("visibility") or data.get("visibility") or "public").strip().lower()
if visibility not in ("public", "private", "unlisted"):
return error_response("VALIDATION_ERROR", "Invalid visibility. Must be: public, private, unlisted")
# Handle source attribution - can be a dict (full ToolSource) or string (legacy)
source_data = data.get("source")
source_json = None
source = None
source_url = (data.get("source_url") or "").strip() or None
if isinstance(source_data, dict):
# Full source object from tool YAML
source_json = json.dumps(source_data)
# Keep legacy fields for backward compat
source = source_data.get("original_tool") or source_data.get("author")
source_url = source_data.get("url") or source_url
elif isinstance(source_data, str) and source_data.strip():
# Legacy string format
source = source_data.strip()
# Create a minimal source_json for consistency
source_json = json.dumps({"type": "imported", "original_tool": source, "url": source_url})
if not name or not TOOL_NAME_RE.match(name) or len(name) > MAX_TOOL_NAME_LEN:
return error_response("VALIDATION_ERROR", "Invalid tool name")
if not version or Semver.parse(version) is None:
return error_response("INVALID_VERSION", "Version string is not valid semver")
if description and len(description) > MAX_DESC_LEN:
return error_response("VALIDATION_ERROR", "Description exceeds 500 characters")
if tags:
if not isinstance(tags, list):
return error_response("VALIDATION_ERROR", "Tags must be a list")
if len(tags) > MAX_TAGS:
return error_response("VALIDATION_ERROR", "Too many tags")
for tag in tags:
if len(str(tag)) > MAX_TAG_LEN:
return error_response("VALIDATION_ERROR", "Tag exceeds 32 characters")
owner = g.current_publisher["slug"]
existing = query_one(
g.db,
"SELECT published_at FROM tools WHERE owner = ? AND name = ? AND version = ?",
[owner, name, version],
)
if existing:
return error_response(
"VERSION_EXISTS",
f"Version {version} already exists",
409,
details={"published_at": existing["published_at"]},
)
suggestions = {"category": None, "similar_tools": []}
try:
from .categorize import suggest_categories
from .similarity import find_similar_tools
categories_path = get_repo_dir() / "categories" / "categories.yaml"
if not category and categories_path.exists():
ranked = suggest_categories(name, description, tags, categories_path)
if ranked:
suggestions["category"] = {
"suggested": ranked[0][0],
"confidence": ranked[0][1],
}
rows = query_all(
g.db,
"SELECT owner, name, description, category, tags FROM tools",
)
existing = []
for row in rows:
try:
existing.append({
"owner": row["owner"],
"name": row["name"],
"description": row["description"] or "",
"category": row["category"],
"tags": json.loads(row["tags"] or "[]"),
})
except Exception:
continue
similar = find_similar_tools(existing, name, description, tags, category)
suggestions["similar_tools"] = [
{"name": f"{tool['owner']}/{tool['name']}", "similarity": score}
for tool, score in similar[:5]
]
except Exception:
pass
# Run automated scrutiny
scrutiny_report = None
try:
from .scrutiny import scrutinize_tool
scrutiny_report = scrutinize_tool(config_text, description or "", readme)
except Exception:
pass
# Check scrutiny decision
if scrutiny_report:
suggestions["scrutiny"] = scrutiny_report
if scrutiny_report.get("decision") == "reject":
# Find the failing check for error message
fail_findings = [f for f in scrutiny_report.get("findings", []) if f.get("result") == "fail"]
fail_msg = fail_findings[0]["message"] if fail_findings else "Tool failed automated review"
return error_response(
"SCRUTINY_FAILED",
f"Tool rejected: {fail_msg}",
400,
details={"scrutiny": scrutiny_report},
)
if dry_run:
return jsonify({
"data": {
"owner": owner,
"name": name,
"version": version,
"status": "validated",
"suggestions": suggestions,
}
})
tags_json = json.dumps(tags)
# Determine status based on scrutiny
if scrutiny_report and scrutiny_report.get("decision") == "approve":
scrutiny_status = "approved"
elif scrutiny_report and scrutiny_report.get("decision") == "review":
scrutiny_status = "pending_review"
else:
scrutiny_status = "pending"
scrutiny_json = json.dumps(scrutiny_report) if scrutiny_report else None
# Determine moderation_status based on visibility
# Private and unlisted tools are auto-approved (no moderation needed)
if visibility in ("private", "unlisted"):
moderation_status = "approved"
else:
# Public tools need moderation approval
moderation_status = "pending"
g.db.execute(
"""
INSERT INTO tools (
owner, name, version, description, category, tags, config_yaml, readme,
publisher_id, deprecated, deprecated_message, replacement, downloads,
scrutiny_status, scrutiny_report, source, source_url, source_json,
visibility, moderation_status, published_at
) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
""",
[
owner,
name,
version,
description or None,
category,
tags_json,
config_text,
readme,
g.current_publisher["id"],
int(bool(data.get("deprecated"))),
data.get("deprecated_message"),
data.get("replacement"),
0,
scrutiny_status,
scrutiny_json,
source,
source_url,
source_json,
visibility,
moderation_status,
datetime.utcnow().isoformat(),
],
)
g.db.commit()
response = jsonify({
"data": {
"owner": owner,
"name": name,
"version": version,
"pr_url": "",
"status": moderation_status,
"visibility": visibility,
"suggestions": suggestions,
}
})
response.status_code = 201
return response
@app.route("/api/v1/me/tools", methods=["GET"])
@require_token
def my_tools() -> Response:
rows = query_all(
g.db,
"""
SELECT owner, name, version, description, downloads, deprecated, deprecated_message, replacement, published_at
FROM tools
WHERE owner = ?
ORDER BY published_at DESC
""",
[g.current_publisher["slug"]],
)
data = []
for row in rows:
data.append({
"owner": row["owner"],
"name": row["name"],
"version": row["version"],
"description": row["description"],
"downloads": row["downloads"],
"deprecated": bool(row["deprecated"]),
"deprecated_message": row["deprecated_message"],
"replacement": row["replacement"],
"published_at": row["published_at"],
})
return jsonify({"data": data})
@app.route("/api/v1/tools/<owner>/<name>/deprecate", methods=["POST"])
@require_token
def deprecate_tool(owner: str, name: str) -> Response:
"""Mark a tool as deprecated."""
if g.current_publisher["slug"] != owner:
return error_response("FORBIDDEN", "You can only deprecate your own tools", 403)
data = request.get_json() or {}
message = (data.get("deprecated_message") or data.get("message") or "").strip()
replacement = (data.get("replacement") or "").strip() or None
if message and len(message) > 500:
return error_response("VALIDATION_ERROR", "Message too long (max 500)", 400)
# Update all versions of the tool
result = g.db.execute(
"""
UPDATE tools SET deprecated = 1, deprecated_message = ?, replacement = ?
WHERE owner = ? AND name = ?
""",
[message or None, replacement, owner, name],
)
if result.rowcount == 0:
return error_response("TOOL_NOT_FOUND", f"Tool {owner}/{name} not found", 404)
g.db.commit()
return jsonify({"data": {"status": "deprecated", "owner": owner, "name": name}})
@app.route("/api/v1/tools/<owner>/<name>/undeprecate", methods=["POST"])
@require_token
def undeprecate_tool(owner: str, name: str) -> Response:
"""Remove deprecation status from a tool."""
if g.current_publisher["slug"] != owner:
return error_response("FORBIDDEN", "You can only undeprecate your own tools", 403)
result = g.db.execute(
"""
UPDATE tools SET deprecated = 0, deprecated_message = NULL, replacement = NULL
WHERE owner = ? AND name = ?
""",
[owner, name],
)
if result.rowcount == 0:
return error_response("TOOL_NOT_FOUND", f"Tool {owner}/{name} not found", 404)
g.db.commit()
return jsonify({"data": {"status": "active", "owner": owner, "name": name}})
@app.route("/api/v1/me/settings", methods=["PUT"])
@require_token
def update_settings() -> Response:
"""Update current user's profile settings."""
data = request.get_json() or {}
# Validate fields
display_name = data.get("display_name", "").strip()
bio = data.get("bio", "").strip() if data.get("bio") else None
website = data.get("website", "").strip() if data.get("website") else None
if display_name and len(display_name) > 100:
return error_response("VALIDATION_ERROR", "Display name too long (max 100)", 400)
if bio and len(bio) > 500:
return error_response("VALIDATION_ERROR", "Bio too long (max 500)", 400)
if website and len(website) > 200:
return error_response("VALIDATION_ERROR", "Website URL too long (max 200)", 400)
# Build update query
updates = []
params = []
if display_name:
updates.append("display_name = ?")
params.append(display_name)
if bio is not None:
updates.append("bio = ?")
params.append(bio)
if website is not None:
updates.append("website = ?")
params.append(website)
if not updates:
return error_response("VALIDATION_ERROR", "No valid fields to update", 400)
updates.append("updated_at = CURRENT_TIMESTAMP")
params.append(g.current_publisher["id"])
g.db.execute(
f"UPDATE publishers SET {', '.join(updates)} WHERE id = ?",
params,
)
g.db.commit()
return jsonify({"data": {"status": "updated"}})
@app.route("/api/v1/featured/tools", methods=["GET"])
def featured_tools() -> Response:
"""Get featured tools for homepage/landing."""
placement = request.args.get("placement", "homepage")
limit = min(int(request.args.get("limit", 6)), 20)
rows = query_all(
g.db,
"""
SELECT t.owner, t.name, t.version, t.description, t.category, t.downloads,
ft.priority
FROM featured_tools ft
JOIN tools t ON ft.tool_id = t.id
WHERE ft.placement = ?
AND ft.status = 'active'
AND (ft.start_at IS NULL OR ft.start_at <= CURRENT_TIMESTAMP)
AND (ft.end_at IS NULL OR ft.end_at > CURRENT_TIMESTAMP)
ORDER BY ft.priority DESC, t.downloads DESC
LIMIT ?
""",
[placement, limit],
)
# If no featured tools, fall back to popular
if not rows:
rows = query_all(
g.db,
"""
SELECT owner, name, version, description, category, downloads
FROM tools
WHERE deprecated = 0
ORDER BY downloads DESC
LIMIT ?
""",
[limit],
)
data = []
for row in rows:
data.append({
"owner": row["owner"],
"name": row["name"],
"version": row["version"],
"description": row["description"],
"category": row["category"],
"downloads": row["downloads"],
})
return jsonify({"data": data})
@app.route("/api/v1/featured/contributors", methods=["GET"])
def featured_contributors() -> Response:
"""Get featured contributor for homepage."""
placement = request.args.get("placement", "homepage")
row = query_one(
g.db,
"""
SELECT p.slug, p.display_name, p.bio, p.website,
fc.bio_override
FROM featured_contributors fc
JOIN publishers p ON fc.publisher_id = p.id
WHERE fc.placement = ?
AND fc.status = 'active'
AND (fc.start_at IS NULL OR fc.start_at <= CURRENT_TIMESTAMP)
AND (fc.end_at IS NULL OR fc.end_at > CURRENT_TIMESTAMP)
ORDER BY fc.created_at DESC
LIMIT 1
""",
[placement],
)
if not row:
return jsonify({"data": None})
return jsonify({
"data": {
"slug": row["slug"],
"display_name": row["display_name"],
"bio": row["bio_override"] or row["bio"],
"website": row["website"],
}
})
@app.route("/api/v1/content/announcements", methods=["GET"])
def announcements() -> Response:
"""Get published announcements."""
limit = min(int(request.args.get("limit", 5)), 20)
rows = query_all(
g.db,
"""
SELECT id, title, body, published_at
FROM announcements
WHERE published = 1
ORDER BY published_at DESC
LIMIT ?
""",
[limit],
)
data = []
for row in rows:
data.append({
"id": row["id"],
"title": row["title"],
"body": row["body"],
"published_at": row["published_at"],
})
return jsonify({"data": data})
@app.route("/api/v1/reports", methods=["POST"])
def submit_report() -> Response:
"""Submit an abuse report for a tool."""
data = request.get_json() or {}
owner = data.get("owner", "").strip()
name = data.get("name", "").strip()
reason = data.get("reason", "").strip()
details = data.get("details", "").strip() if data.get("details") else None
if not owner or not name:
return error_response("VALIDATION_ERROR", "owner and name required", 400)
if not reason:
return error_response("VALIDATION_ERROR", "reason required", 400)
if len(reason) > 100:
return error_response("VALIDATION_ERROR", "reason too long (max 100)", 400)
if details and len(details) > 2000:
return error_response("VALIDATION_ERROR", "details too long (max 2000)", 400)
# Find the tool
tool = query_one(
g.db,
"SELECT id FROM tools WHERE owner = ? AND name = ? ORDER BY published_at DESC LIMIT 1",
[owner, name],
)
if not tool:
return error_response("TOOL_NOT_FOUND", f"Tool {owner}/{name} not found", 404)
# Get reporter info
reporter_id = None
if hasattr(g, "current_publisher") and g.current_publisher:
reporter_id = g.current_publisher["id"]
reporter_ip = request.remote_addr
# Rate limit: max 5 reports per IP per hour
recent = query_one(
g.db,
"""
SELECT COUNT(*) as cnt FROM reports
WHERE reporter_ip = ?
AND created_at > datetime('now', '-1 hour')
""",
[reporter_ip],
)
if recent and recent["cnt"] >= 5:
return error_response("RATE_LIMITED", "Too many reports. Try again later.", 429)
g.db.execute(
"""
INSERT INTO reports (tool_id, reporter_id, reporter_ip, reason, details)
VALUES (?, ?, ?, ?, ?)
""",
[tool["id"], reporter_id, reporter_ip, reason, details],
)
g.db.commit()
return jsonify({"data": {"status": "submitted"}})
@app.route("/api/v1/consent", methods=["POST"])
def save_consent() -> Response:
"""Save user consent preferences for analytics/ads."""
try:
data = request.get_json(force=True) or {}
except Exception:
data = {}
analytics = bool(data.get("analytics", False))
ads = bool(data.get("ads", False))
# Store consent in session (works with our SQLite session interface)
from flask import session
session["consent_analytics"] = analytics
session["consent_ads"] = ads
session["consent_given"] = True
return jsonify({
"data": {
"analytics": analytics,
"ads": ads,
"saved": True
}
})
@app.route("/api/v1/analytics/pageview", methods=["POST"])
def track_pageview() -> Response:
"""Track a page view (privacy-friendly, no cookies)."""
data = request.get_json() or {}
path = data.get("path", "").strip()
if not path or len(path) > 500:
return jsonify({"data": {"tracked": False}})
# Hash the IP for privacy (don't store raw IP)
ip_hash = hashlib.sha256(
(request.remote_addr or "unknown").encode()
).hexdigest()[:16]
referrer = request.headers.get("Referer", "")[:500] if request.headers.get("Referer") else None
user_agent = request.headers.get("User-Agent", "")[:500] if request.headers.get("User-Agent") else None
try:
g.db.execute(
"""
INSERT INTO pageviews (path, referrer, user_agent, ip_hash)
VALUES (?, ?, ?, ?)
""",
[path, referrer, user_agent, ip_hash],
)
g.db.commit()
return jsonify({"data": {"tracked": True}})
except Exception:
return jsonify({"data": {"tracked": False}})
@app.route("/api/v1/webhook/gitea", methods=["POST"])
def webhook_gitea() -> Response:
if request.content_length and request.content_length > MAX_BODY_BYTES:
return error_response(
"PAYLOAD_TOO_LARGE",
"Request body exceeds 512KB limit",
status=413,
details={"limit": MAX_BODY_BYTES},
)
secret = os.environ.get("CMDFORGE_REGISTRY_WEBHOOK_SECRET", "")
if not secret:
return error_response("UNAUTHORIZED", "Webhook secret not configured", 401)
status, payload = process_webhook(request.data, dict(request.headers), secret)
response = jsonify(payload)
response.status_code = status
return response
@app.route("/api/v1/webhook/deploy", methods=["POST"])
def webhook_deploy() -> Response:
"""Auto-deploy webhook triggered by Gitea on push to main."""
import hmac
import subprocess
secret = os.environ.get("CMDFORGE_DEPLOY_WEBHOOK_SECRET", "")
if not secret:
return error_response("UNAUTHORIZED", "Deploy webhook secret not configured", 401)
# Verify Gitea signature (X-Gitea-Signature is HMAC-SHA256)
signature = request.headers.get("X-Gitea-Signature", "")
if not signature:
return error_response("UNAUTHORIZED", "Missing signature", 401)
expected = hmac.new(
secret.encode(),
request.data,
hashlib.sha256
).hexdigest()
if not hmac.compare_digest(signature, expected):
return error_response("UNAUTHORIZED", "Invalid signature", 401)
# Parse payload to check branch
try:
payload = json.loads(request.data)
ref = payload.get("ref", "")
# Only deploy on push to main branch
if ref not in ("refs/heads/main", "refs/heads/master"):
return jsonify({"data": {"deployed": False, "reason": f"Ignoring ref {ref}"}})
except (json.JSONDecodeError, KeyError):
pass # Proceed anyway if we can't parse
# Run deploy in background (so we can respond before restart kills us)
deploy_script = """
cd /srv/mergerfs/data_pool/home/rob/cmdforge-registry && \
git pull origin main && \
sleep 1 && \
systemctl --user restart cmdforge-web
"""
subprocess.Popen(
["bash", "-c", deploy_script],
stdout=subprocess.DEVNULL,
stderr=subprocess.DEVNULL,
start_new_session=True,
)
return jsonify({"data": {"deployed": True, "message": "Deploy triggered"}})
# ─── Admin API Endpoints ─────────────────────────────────────────────────────
@app.route("/api/v1/admin/tools/pending", methods=["GET"])
@require_moderator
def admin_pending_tools() -> Response:
"""List tools pending moderation."""
page = request.args.get("page", 1, type=int)
per_page = min(request.args.get("per_page", 20, type=int), 100)
offset = (page - 1) * per_page
rows = query_all(
g.db,
"""
SELECT t.*, p.display_name as publisher_name
FROM tools t
JOIN publishers p ON t.publisher_id = p.id
WHERE t.moderation_status = 'pending'
ORDER BY t.published_at ASC
LIMIT ? OFFSET ?
""",
[per_page, offset],
)
count_row = query_one(
g.db,
"SELECT COUNT(*) as total FROM tools WHERE moderation_status = 'pending'",
)
total = count_row["total"] if count_row else 0
data = []
for row in rows:
data.append({
"id": row["id"],
"owner": row["owner"],
"name": row["name"],
"version": row["version"],
"description": row["description"],
"category": row["category"],
"published_at": row["published_at"],
"publisher_name": row["publisher_name"],
"visibility": row["visibility"],
})
return jsonify({
"data": data,
"meta": paginate(page, per_page, total),
})
@app.route("/api/v1/admin/tools/<int:tool_id>/approve", methods=["POST"])
@require_moderator
def admin_approve_tool(tool_id: int) -> Response:
"""Approve a pending tool."""
tool = query_one(g.db, "SELECT * FROM tools WHERE id = ?", [tool_id])
if not tool:
return error_response("TOOL_NOT_FOUND", "Tool not found", 404)
g.db.execute(
"""
UPDATE tools
SET moderation_status = 'approved',
moderated_by = ?,
moderated_at = ?
WHERE id = ?
""",
[g.current_publisher["slug"], datetime.utcnow().isoformat(), tool_id],
)
g.db.commit()
log_audit("approve_tool", "tool", str(tool_id), {
"tool": f"{tool['owner']}/{tool['name']}",
"version": tool["version"],
})
return jsonify({"data": {"status": "approved", "tool_id": tool_id}})
@app.route("/api/v1/admin/tools/<int:tool_id>/reject", methods=["POST"])
@require_moderator
def admin_reject_tool(tool_id: int) -> Response:
"""Reject a pending tool with a reason."""
data = request.get_json() or {}
reason = (data.get("reason") or "").strip()
if not reason:
return error_response("VALIDATION_ERROR", "Rejection reason is required", 400)
tool = query_one(g.db, "SELECT * FROM tools WHERE id = ?", [tool_id])
if not tool:
return error_response("TOOL_NOT_FOUND", "Tool not found", 404)
g.db.execute(
"""
UPDATE tools
SET moderation_status = 'rejected',
moderation_note = ?,
moderated_by = ?,
moderated_at = ?
WHERE id = ?
""",
[reason, g.current_publisher["slug"], datetime.utcnow().isoformat(), tool_id],
)
g.db.commit()
log_audit("reject_tool", "tool", str(tool_id), {
"tool": f"{tool['owner']}/{tool['name']}",
"version": tool["version"],
"reason": reason,
})
return jsonify({"data": {"status": "rejected", "tool_id": tool_id}})
@app.route("/api/v1/admin/tools/<int:tool_id>/remove", methods=["POST"])
@require_moderator
def admin_remove_tool(tool_id: int) -> Response:
"""Remove an approved tool (soft delete)."""
data = request.get_json() or {}
reason = (data.get("reason") or "").strip()
tool = query_one(g.db, "SELECT * FROM tools WHERE id = ?", [tool_id])
if not tool:
return error_response("TOOL_NOT_FOUND", "Tool not found", 404)
g.db.execute(
"""
UPDATE tools
SET moderation_status = 'removed',
moderation_note = ?,
moderated_by = ?,
moderated_at = ?
WHERE id = ?
""",
[reason or None, g.current_publisher["slug"], datetime.utcnow().isoformat(), tool_id],
)
g.db.commit()
log_audit("remove_tool", "tool", str(tool_id), {
"tool": f"{tool['owner']}/{tool['name']}",
"version": tool["version"],
"reason": reason,
})
return jsonify({"data": {"status": "removed", "tool_id": tool_id}})
@app.route("/api/v1/admin/tools/<int:tool_id>", methods=["DELETE"])
@require_admin
def admin_delete_tool(tool_id: int) -> Response:
"""Hard delete a tool (admin only)."""
tool = query_one(g.db, "SELECT * FROM tools WHERE id = ?", [tool_id])
if not tool:
return error_response("TOOL_NOT_FOUND", "Tool not found", 404)
# Delete associated records first
g.db.execute("DELETE FROM download_stats WHERE tool_id = ?", [tool_id])
g.db.execute("DELETE FROM reports WHERE tool_id = ?", [tool_id])
g.db.execute("DELETE FROM featured_tools WHERE tool_id = ?", [tool_id])
g.db.execute("DELETE FROM tools WHERE id = ?", [tool_id])
g.db.commit()
log_audit("delete_tool", "tool", str(tool_id), {
"tool": f"{tool['owner']}/{tool['name']}",
"version": tool["version"],
})
return jsonify({"data": {"status": "deleted", "tool_id": tool_id}})
@app.route("/api/v1/admin/publishers", methods=["GET"])
@require_moderator
def admin_list_publishers() -> Response:
"""List all publishers with stats."""
page = request.args.get("page", 1, type=int)
per_page = min(request.args.get("per_page", 20, type=int), 100)
offset = (page - 1) * per_page
rows = query_all(
g.db,
"""
SELECT p.*,
(SELECT COUNT(*) FROM tools WHERE publisher_id = p.id) as tool_count,
(SELECT SUM(downloads) FROM tools WHERE publisher_id = p.id) as total_downloads
FROM publishers p
ORDER BY p.created_at DESC
LIMIT ? OFFSET ?
""",
[per_page, offset],
)
count_row = query_one(g.db, "SELECT COUNT(*) as total FROM publishers")
total = count_row["total"] if count_row else 0
data = []
for row in rows:
data.append({
"id": row["id"],
"slug": row["slug"],
"display_name": row["display_name"],
"email": row["email"],
"role": row["role"] or "user",
"banned": bool(row["banned"]),
"ban_reason": row["ban_reason"],
"verified": bool(row["verified"]),
"tool_count": row["tool_count"] or 0,
"total_downloads": row["total_downloads"] or 0,
"created_at": row["created_at"],
})
return jsonify({
"data": data,
"meta": paginate(page, per_page, total),
})
@app.route("/api/v1/admin/publishers/<int:publisher_id>", methods=["GET"])
@require_moderator
def admin_get_publisher(publisher_id: int) -> Response:
"""Get detailed publisher info."""
publisher = query_one(g.db, "SELECT * FROM publishers WHERE id = ?", [publisher_id])
if not publisher:
return error_response("PUBLISHER_NOT_FOUND", "Publisher not found", 404)
# Get their tools
tools = query_all(
g.db,
"""
SELECT id, owner, name, version, moderation_status, visibility, downloads, published_at
FROM tools
WHERE publisher_id = ?
ORDER BY published_at DESC
""",
[publisher_id],
)
# Get their tokens (just counts, not the hashes)
token_count = query_one(
g.db,
"SELECT COUNT(*) as cnt FROM api_tokens WHERE publisher_id = ? AND revoked_at IS NULL",
[publisher_id],
)
return jsonify({
"data": {
"id": publisher["id"],
"slug": publisher["slug"],
"display_name": publisher["display_name"],
"email": publisher["email"],
"bio": publisher["bio"],
"website": publisher["website"],
"role": publisher["role"] or "user",
"banned": bool(publisher["banned"]),
"banned_at": publisher["banned_at"],
"banned_by": publisher["banned_by"],
"ban_reason": publisher["ban_reason"],
"verified": bool(publisher["verified"]),
"created_at": publisher["created_at"],
"active_tokens": token_count["cnt"] if token_count else 0,
"tools": [
{
"id": t["id"],
"owner": t["owner"],
"name": t["name"],
"version": t["version"],
"moderation_status": t["moderation_status"],
"visibility": t["visibility"],
"downloads": t["downloads"],
"published_at": t["published_at"],
}
for t in tools
],
}
})
@app.route("/api/v1/admin/publishers/<int:publisher_id>/ban", methods=["POST"])
@require_admin
def admin_ban_publisher(publisher_id: int) -> Response:
"""Ban a publisher."""
data = request.get_json() or {}
reason = (data.get("reason") or "").strip()
if not reason:
return error_response("VALIDATION_ERROR", "Ban reason is required", 400)
publisher = query_one(g.db, "SELECT * FROM publishers WHERE id = ?", [publisher_id])
if not publisher:
return error_response("PUBLISHER_NOT_FOUND", "Publisher not found", 404)
if publisher["role"] == "admin":
return error_response("FORBIDDEN", "Cannot ban an admin", 403)
now = datetime.utcnow().isoformat()
# Ban the publisher
g.db.execute(
"""
UPDATE publishers
SET banned = 1,
banned_at = ?,
banned_by = ?,
ban_reason = ?
WHERE id = ?
""",
[now, g.current_publisher["slug"], reason, publisher_id],
)
# Revoke all their tokens
g.db.execute(
"UPDATE api_tokens SET revoked_at = ? WHERE publisher_id = ? AND revoked_at IS NULL",
[now, publisher_id],
)
# Remove all their tools from public view
g.db.execute(
"""
UPDATE tools
SET moderation_status = 'removed',
moderation_note = 'Publisher banned',
moderated_by = ?,
moderated_at = ?
WHERE publisher_id = ? AND moderation_status != 'removed'
""",
[g.current_publisher["slug"], now, publisher_id],
)
g.db.commit()
log_audit("ban_publisher", "publisher", str(publisher_id), {
"slug": publisher["slug"],
"reason": reason,
})
return jsonify({"data": {"status": "banned", "publisher_id": publisher_id}})
@app.route("/api/v1/admin/publishers/<int:publisher_id>/unban", methods=["POST"])
@require_admin
def admin_unban_publisher(publisher_id: int) -> Response:
"""Unban a publisher."""
publisher = query_one(g.db, "SELECT * FROM publishers WHERE id = ?", [publisher_id])
if not publisher:
return error_response("PUBLISHER_NOT_FOUND", "Publisher not found", 404)
if not publisher["banned"]:
return error_response("VALIDATION_ERROR", "Publisher is not banned", 400)
g.db.execute(
"""
UPDATE publishers
SET banned = 0,
banned_at = NULL,
banned_by = NULL,
ban_reason = NULL
WHERE id = ?
""",
[publisher_id],
)
g.db.commit()
log_audit("unban_publisher", "publisher", str(publisher_id), {
"slug": publisher["slug"],
})
return jsonify({"data": {"status": "unbanned", "publisher_id": publisher_id}})
@app.route("/api/v1/admin/publishers/<int:publisher_id>/role", methods=["POST"])
@require_admin
def admin_change_role(publisher_id: int) -> Response:
"""Change a publisher's role."""
data = request.get_json() or {}
new_role = (data.get("role") or "").strip()
if new_role not in ("user", "moderator", "admin"):
return error_response("VALIDATION_ERROR", "Invalid role. Must be: user, moderator, admin", 400)
publisher = query_one(g.db, "SELECT * FROM publishers WHERE id = ?", [publisher_id])
if not publisher:
return error_response("PUBLISHER_NOT_FOUND", "Publisher not found", 404)
old_role = publisher["role"] or "user"
g.db.execute(
"UPDATE publishers SET role = ? WHERE id = ?",
[new_role, publisher_id],
)
g.db.commit()
log_audit("change_role", "publisher", str(publisher_id), {
"slug": publisher["slug"],
"old_role": old_role,
"new_role": new_role,
})
return jsonify({"data": {"status": "updated", "publisher_id": publisher_id, "role": new_role}})
@app.route("/api/v1/admin/publishers/<int:publisher_id>", methods=["DELETE"])
@require_admin
def admin_delete_publisher(publisher_id: int) -> Response:
"""Delete a publisher and optionally their tools."""
delete_tools = request.args.get("delete_tools", "false").lower() == "true"
publisher = query_one(g.db, "SELECT * FROM publishers WHERE id = ?", [publisher_id])
if not publisher:
return error_response("PUBLISHER_NOT_FOUND", "Publisher not found", 404)
# Don't allow deleting yourself
if publisher_id == g.current_publisher["id"]:
return error_response("CANNOT_DELETE_SELF", "Cannot delete your own account", 400)
# Don't allow deleting other admins
if publisher["role"] == "admin":
return error_response("CANNOT_DELETE_ADMIN", "Cannot delete admin accounts", 400)
slug = publisher["slug"]
if delete_tools:
# Delete all tools owned by this publisher
g.db.execute("DELETE FROM tools WHERE publisher_id = ?", [publisher_id])
# Revoke all tokens
g.db.execute("UPDATE api_tokens SET revoked_at = CURRENT_TIMESTAMP WHERE publisher_id = ?", [publisher_id])
# Delete the publisher
g.db.execute("DELETE FROM publishers WHERE id = ?", [publisher_id])
g.db.commit()
log_audit("delete_publisher", "publisher", str(publisher_id), {
"slug": slug,
"delete_tools": delete_tools,
})
return jsonify({"data": {"status": "deleted", "publisher_id": publisher_id, "slug": slug}})
@app.route("/api/v1/admin/publishers/<int:publisher_id>/reset-password", methods=["POST"])
@require_admin
def admin_reset_password(publisher_id: int) -> Response:
"""Generate a temporary password for a publisher."""
import secrets
publisher = query_one(g.db, "SELECT * FROM publishers WHERE id = ?", [publisher_id])
if not publisher:
return error_response("PUBLISHER_NOT_FOUND", "Publisher not found", 404)
# Generate a temporary password
temp_password = secrets.token_urlsafe(12)
password_hash = password_hasher.hash(temp_password)
g.db.execute(
"UPDATE publishers SET password_hash = ? WHERE id = ?",
[password_hash, publisher_id],
)
g.db.commit()
log_audit("reset_password", "publisher", str(publisher_id), {
"slug": publisher["slug"],
})
return jsonify({
"data": {
"status": "reset",
"publisher_id": publisher_id,
"slug": publisher["slug"],
"temporary_password": temp_password,
}
})
@app.route("/api/v1/admin/reports", methods=["GET"])
@require_moderator
def admin_list_reports() -> Response:
"""List unresolved reports."""
page = request.args.get("page", 1, type=int)
per_page = min(request.args.get("per_page", 20, type=int), 100)
status_filter = request.args.get("status", "pending")
offset = (page - 1) * per_page
where_clause = "WHERE r.status = ?" if status_filter else "WHERE 1=1"
params = [status_filter] if status_filter else []
rows = query_all(
g.db,
f"""
SELECT r.*, t.owner, t.name as tool_name, t.version,
p.display_name as reporter_name
FROM reports r
JOIN tools t ON r.tool_id = t.id
LEFT JOIN publishers p ON r.reporter_id = p.id
{where_clause}
ORDER BY r.created_at DESC
LIMIT ? OFFSET ?
""",
params + [per_page, offset],
)
count_row = query_one(
g.db,
f"SELECT COUNT(*) as total FROM reports r {where_clause}",
params,
)
total = count_row["total"] if count_row else 0
data = []
for row in rows:
data.append({
"id": row["id"],
"tool": f"{row['owner']}/{row['tool_name']}",
"tool_id": row["tool_id"],
"version": row["version"],
"reason": row["reason"],
"details": row["details"],
"reporter_name": row["reporter_name"],
"status": row["status"],
"created_at": row["created_at"],
"resolved_at": row["resolved_at"],
"resolution_note": row["resolution_note"],
})
return jsonify({
"data": data,
"meta": paginate(page, per_page, total),
})
@app.route("/api/v1/admin/reports/<int:report_id>/resolve", methods=["POST"])
@require_moderator
def admin_resolve_report(report_id: int) -> Response:
"""Resolve a report with an action."""
data = request.get_json() or {}
action = (data.get("action") or "").strip()
note = (data.get("note") or "").strip()
if action not in ("dismiss", "warn", "remove_tool", "ban_publisher"):
return error_response(
"VALIDATION_ERROR",
"Invalid action. Must be: dismiss, warn, remove_tool, ban_publisher",
400,
)
report = query_one(g.db, "SELECT * FROM reports WHERE id = ?", [report_id])
if not report:
return error_response("REPORT_NOT_FOUND", "Report not found", 404)
if report["status"] != "pending":
return error_response("VALIDATION_ERROR", "Report already resolved", 400)
now = datetime.utcnow().isoformat()
# Mark report as resolved
g.db.execute(
"""
UPDATE reports
SET status = 'resolved',
resolved_by = ?,
resolved_at = ?,
resolution_note = ?
WHERE id = ?
""",
[g.current_publisher["id"], now, f"{action}: {note}" if note else action, report_id],
)
# Take action
if action == "remove_tool":
g.db.execute(
"""
UPDATE tools
SET moderation_status = 'removed',
moderation_note = ?,
moderated_by = ?,
moderated_at = ?
WHERE id = ?
""",
[f"Removed due to report: {note}" if note else "Removed due to report",
g.current_publisher["slug"], now, report["tool_id"]],
)
elif action == "ban_publisher":
# Get tool's publisher
tool = query_one(g.db, "SELECT publisher_id FROM tools WHERE id = ?", [report["tool_id"]])
if tool:
g.db.execute(
"""
UPDATE publishers
SET banned = 1, banned_at = ?, banned_by = ?, ban_reason = ?
WHERE id = ?
""",
[now, g.current_publisher["slug"],
f"Banned due to report: {note}" if note else "Banned due to report",
tool["publisher_id"]],
)
# Revoke tokens
g.db.execute(
"UPDATE api_tokens SET revoked_at = ? WHERE publisher_id = ?",
[now, tool["publisher_id"]],
)
g.db.commit()
log_audit("resolve_report", "report", str(report_id), {
"action": action,
"tool_id": report["tool_id"],
"note": note,
})
return jsonify({"data": {"status": "resolved", "report_id": report_id, "action": action}})
@app.route("/api/v1/admin/audit-log", methods=["GET"])
@require_admin
def admin_audit_log() -> Response:
"""View audit log entries."""
page = request.args.get("page", 1, type=int)
per_page = min(request.args.get("per_page", 50, type=int), 200)
target_type = request.args.get("target_type")
target_id = request.args.get("target_id")
actor_id = request.args.get("actor_id")
since = request.args.get("since")
offset = (page - 1) * per_page
where_clauses = []
params: List[Any] = []
if target_type:
where_clauses.append("target_type = ?")
params.append(target_type)
if target_id:
where_clauses.append("target_id = ?")
params.append(target_id)
if actor_id:
where_clauses.append("actor_id = ?")
params.append(actor_id)
if since:
where_clauses.append("created_at >= ?")
params.append(since)
where_sql = "WHERE " + " AND ".join(where_clauses) if where_clauses else ""
rows = query_all(
g.db,
f"""
SELECT * FROM audit_log
{where_sql}
ORDER BY created_at DESC
LIMIT ? OFFSET ?
""",
params + [per_page, offset],
)
count_row = query_one(
g.db,
f"SELECT COUNT(*) as total FROM audit_log {where_sql}",
params,
)
total = count_row["total"] if count_row else 0
data = []
for row in rows:
data.append({
"id": row["id"],
"action": row["action"],
"target_type": row["target_type"],
"target_id": row["target_id"],
"actor_id": row["actor_id"],
"details": json.loads(row["details"]) if row["details"] else None,
"created_at": row["created_at"],
})
return jsonify({
"data": data,
"meta": paginate(page, per_page, total),
})
return app
def main() -> None:
app = create_app()
app.run(host="0.0.0.0", port=int(os.environ.get("PORT", 5000)))
if __name__ == "__main__":
main()