CmdForge/src/cmdforge/registry_client.py

1176 lines
35 KiB
Python

"""Registry API client for CmdForge.
Handles all HTTP communication with the registry server.
"""
import hashlib
import json
import time
from dataclasses import dataclass, field
from datetime import datetime, timedelta
from pathlib import Path
from typing import Optional, List, Dict, Any
from urllib.parse import urljoin, urlencode
import requests
from .config import (
load_config,
get_registry_url,
get_registry_token,
get_client_id,
CONFIG_DIR
)
# Local cache directory
CACHE_DIR = CONFIG_DIR / "registry"
INDEX_CACHE_FILE = CACHE_DIR / "index.json"
INDEX_CACHE_MAX_AGE = timedelta(hours=24)
@dataclass
class RegistryError(Exception):
"""Base exception for registry errors."""
code: str
message: str
details: Optional[Dict] = None
http_status: int = 0
def __str__(self):
return f"{self.code}: {self.message}"
@dataclass
class RateLimitError(RegistryError):
"""Raised when rate limited by the registry."""
retry_after: int = 60
def __init__(self, retry_after: int = 60):
super().__init__(
code="RATE_LIMITED",
message=f"Rate limited. Retry after {retry_after} seconds.",
http_status=429
)
self.retry_after = retry_after
@dataclass
class PaginatedResponse:
"""Paginated API response."""
data: List[Dict]
page: int = 1
per_page: int = 20
total: int = 0
total_pages: int = 0
facets: Optional[Dict] = None # Category/tag/owner counts when requested
@dataclass
class ToolInfo:
"""Tool information from the registry."""
owner: str
name: str
version: str
description: str = ""
category: str = ""
tags: List[str] = field(default_factory=list)
downloads: int = 0
deprecated: bool = False
deprecated_message: str = ""
replacement: str = ""
published_at: str = ""
readme: str = ""
defaults: str = "" # Default settings YAML content
@property
def full_name(self) -> str:
return f"{self.owner}/{self.name}"
@classmethod
def from_dict(cls, data: dict) -> "ToolInfo":
return cls(
owner=data.get("owner", ""),
name=data.get("name", ""),
version=data.get("version", ""),
description=data.get("description", ""),
category=data.get("category", ""),
tags=data.get("tags", []),
downloads=data.get("downloads", 0),
deprecated=data.get("deprecated", False),
deprecated_message=data.get("deprecated_message", ""),
replacement=data.get("replacement", ""),
published_at=data.get("published_at", ""),
readme=data.get("readme", ""),
defaults=data.get("defaults", "")
)
@dataclass
class DownloadResult:
"""Result of downloading a tool."""
owner: str
name: str
resolved_version: str
config_yaml: str
readme: str = ""
config_hash: str = "" # Registry hash for integrity verification
defaults: str = "" # Default settings YAML content
class RegistryClient:
"""Client for interacting with the CmdForge registry API."""
def __init__(
self,
base_url: Optional[str] = None,
token: Optional[str] = None,
timeout: int = 30,
max_retries: int = 3
):
"""
Initialize the registry client.
Args:
base_url: Registry API base URL (default: from config)
token: Auth token for authenticated requests (default: from config)
timeout: Request timeout in seconds
max_retries: Maximum number of retries for failed requests
"""
self.base_url = base_url or get_registry_url()
self.token = token or get_registry_token()
self.timeout = timeout
self.max_retries = max_retries
self.client_id = get_client_id()
# Session for connection pooling
self._session = requests.Session()
self._session.headers.update({
"User-Agent": "CmdForge-CLI/1.0",
"X-CmdForge-Client": "cli/1.0.0",
"Accept": "application/json"
})
# Add client ID header
if self.client_id:
self._session.headers["X-Client-ID"] = self.client_id
def _url(self, path: str) -> str:
"""Build full URL from path."""
# Ensure base_url ends without /api/v1 duplication
base = self.base_url.rstrip("/")
if not path.startswith("/"):
path = "/" + path
return base + path
def _auth_headers(self) -> Dict[str, str]:
"""Get authentication headers if token is available."""
if self.token:
return {"Authorization": f"Bearer {self.token}"}
return {}
def _request(
self,
method: str,
path: str,
params: Optional[Dict] = None,
json_data: Optional[Dict] = None,
require_auth: bool = False,
etag: Optional[str] = None
) -> requests.Response:
"""
Make an HTTP request with retry logic.
Args:
method: HTTP method
path: API path
params: Query parameters
json_data: JSON body data
require_auth: Whether auth is required
etag: ETag for conditional requests
Returns:
Response object
Raises:
RegistryError: On API errors
RateLimitError: When rate limited
"""
url = self._url(path)
headers = {}
if require_auth:
if not self.token:
raise RegistryError(
code="UNAUTHORIZED",
message="Authentication required. Set registry token with 'cmdforge config set-token'",
http_status=401
)
headers.update(self._auth_headers())
if etag:
headers["If-None-Match"] = etag
last_error = None
for attempt in range(self.max_retries):
try:
response = self._session.request(
method=method,
url=url,
params=params,
json=json_data,
headers=headers,
timeout=self.timeout
)
# Handle rate limiting
if response.status_code == 429:
retry_after = int(response.headers.get("Retry-After", 60))
if attempt < self.max_retries - 1:
time.sleep(min(retry_after, 30)) # Cap wait at 30s per attempt
continue
raise RateLimitError(retry_after=retry_after)
# Handle server errors with retry
if response.status_code >= 500:
if attempt < self.max_retries - 1:
time.sleep(2 ** attempt) # Exponential backoff
continue
return response
except requests.exceptions.Timeout:
last_error = RegistryError(
code="TIMEOUT",
message="Request timed out"
)
if attempt < self.max_retries - 1:
time.sleep(2 ** attempt)
continue
except requests.exceptions.ConnectionError:
last_error = RegistryError(
code="CONNECTION_ERROR",
message="Could not connect to registry"
)
if attempt < self.max_retries - 1:
time.sleep(2 ** attempt)
continue
raise last_error or RegistryError(
code="REQUEST_FAILED",
message="Request failed after retries"
)
def _handle_error_response(self, response: requests.Response) -> None:
"""Parse and raise appropriate error from error response."""
try:
data = response.json()
error = data.get("error", {})
raise RegistryError(
code=error.get("code", "UNKNOWN_ERROR"),
message=error.get("message", "Unknown error"),
details=error.get("details"),
http_status=response.status_code
)
except (json.JSONDecodeError, KeyError):
raise RegistryError(
code="UNKNOWN_ERROR",
message=f"HTTP {response.status_code}: {response.text[:200]}",
http_status=response.status_code
)
# -------------------------------------------------------------------------
# Public API Methods
# -------------------------------------------------------------------------
def list_tools(
self,
category: Optional[str] = None,
page: int = 1,
per_page: int = 20,
sort: str = "downloads",
order: str = "desc",
prefix: Optional[str] = None
) -> PaginatedResponse:
"""
List tools from the registry.
Args:
category: Filter by category
page: Page number (1-indexed)
per_page: Items per page (max 100)
sort: Sort field (downloads, published_at, name, owner, average_rating)
order: Sort order (asc, desc)
prefix: Letter prefix filter (A-Z or '#' for non-alpha)
Returns:
PaginatedResponse with tool data
"""
params = {
"page": page,
"per_page": min(per_page, 100),
"sort": sort,
"order": order
}
if category:
params["category"] = category
if prefix:
params["prefix"] = prefix
response = self._request("GET", "/tools", params=params)
if response.status_code != 200:
self._handle_error_response(response)
data = response.json()
meta = data.get("meta", {})
return PaginatedResponse(
data=data.get("data", []),
page=meta.get("page", page),
per_page=meta.get("per_page", per_page),
total=meta.get("total", 0),
total_pages=meta.get("total_pages", 0)
)
def search_tools(
self,
query: str,
category: Optional[str] = None,
categories: Optional[List[str]] = None,
tags: Optional[List[str]] = None,
owner: Optional[str] = None,
min_downloads: Optional[int] = None,
max_downloads: Optional[int] = None,
published_after: Optional[str] = None,
published_before: Optional[str] = None,
include_deprecated: bool = False,
include_facets: bool = False,
page: int = 1,
per_page: int = 20,
sort: str = "relevance",
order: str = "desc",
prefix: Optional[str] = None
) -> PaginatedResponse:
"""
Search for tools in the registry.
Args:
query: Search query
category: Filter by single category (backward compat)
categories: Filter by multiple categories (OR logic)
tags: Filter by tags (AND logic - must have all)
owner: Filter by publisher/owner
min_downloads: Minimum download count
max_downloads: Maximum download count
published_after: Published after date (ISO format)
published_before: Published before date (ISO format)
include_deprecated: Include deprecated tools
include_facets: Include category/tag/owner counts in response
page: Page number
per_page: Items per page
sort: Sort field (relevance, downloads, published_at, name, owner, average_rating)
order: Sort order (asc, desc)
prefix: Letter prefix filter (A-Z or '#' for non-alpha)
Returns:
PaginatedResponse with matching tools (and facets if requested)
"""
params: Dict[str, Any] = {
"q": query,
"page": page,
"per_page": min(per_page, 100),
"sort": sort,
"order": order
}
if category:
params["category"] = category
if categories:
params["categories"] = ",".join(categories)
if tags:
params["tags"] = ",".join(tags)
if owner:
params["owner"] = owner
if min_downloads is not None:
params["min_downloads"] = min_downloads
if max_downloads is not None:
params["max_downloads"] = max_downloads
if published_after:
params["published_after"] = published_after
if published_before:
params["published_before"] = published_before
if include_deprecated:
params["deprecated"] = "true"
if include_facets:
params["include_facets"] = "true"
if prefix:
params["prefix"] = prefix
response = self._request("GET", "/tools/search", params=params)
if response.status_code != 200:
self._handle_error_response(response)
data = response.json()
meta = data.get("meta", {})
return PaginatedResponse(
data=data.get("data", []),
page=meta.get("page", page),
per_page=meta.get("per_page", per_page),
total=meta.get("total", 0),
total_pages=meta.get("total_pages", 0),
facets=data.get("facets")
)
def get_tags(
self,
category: Optional[str] = None,
limit: int = 100
) -> List[Dict]:
"""
Get all tags with usage counts.
Args:
category: Filter tags by category
limit: Maximum tags to return
Returns:
List of tag objects with name and count
"""
params: Dict[str, Any] = {"limit": min(limit, 500)}
if category:
params["category"] = category
response = self._request("GET", "/tags", params=params)
if response.status_code != 200:
self._handle_error_response(response)
data = response.json()
return data.get("data", [])
def get_tool(self, owner: str, name: str) -> ToolInfo:
"""
Get detailed information about a tool.
Args:
owner: Tool owner (namespace)
name: Tool name
Returns:
ToolInfo object
"""
response = self._request("GET", f"/tools/{owner}/{name}")
if response.status_code == 404:
raise RegistryError(
code="TOOL_NOT_FOUND",
message=f"Tool '{owner}/{name}' not found",
http_status=404
)
if response.status_code != 200:
self._handle_error_response(response)
data = response.json().get("data", {})
return ToolInfo.from_dict(data)
def get_tool_versions(self, owner: str, name: str) -> List[str]:
"""
Get all versions of a tool.
Args:
owner: Tool owner
name: Tool name
Returns:
List of version strings (sorted newest first)
"""
response = self._request("GET", f"/tools/{owner}/{name}/versions")
if response.status_code == 404:
raise RegistryError(
code="TOOL_NOT_FOUND",
message=f"Tool '{owner}/{name}' not found",
http_status=404
)
if response.status_code != 200:
self._handle_error_response(response)
data = response.json()
return data.get("data", {}).get("versions", [])
def download_tool(
self,
owner: str,
name: str,
version: Optional[str] = None,
install: bool = True
) -> DownloadResult:
"""
Download a tool's configuration.
Args:
owner: Tool owner
name: Tool name
version: Version or constraint (default: latest)
install: Whether to count as install for stats
Returns:
DownloadResult with config YAML
"""
params = {"install": str(install).lower()}
if version:
params["version"] = version
response = self._request(
"GET",
f"/tools/{owner}/{name}/download",
params=params
)
if response.status_code == 404:
error_data = {}
try:
error_data = response.json().get("error", {})
except json.JSONDecodeError:
pass
code = error_data.get("code", "TOOL_NOT_FOUND")
message = error_data.get("message", f"Tool '{owner}/{name}' not found")
raise RegistryError(
code=code,
message=message,
details=error_data.get("details"),
http_status=404
)
if response.status_code != 200:
self._handle_error_response(response)
data = response.json().get("data", {})
return DownloadResult(
owner=data.get("owner", owner),
name=data.get("name", name),
resolved_version=data.get("resolved_version", ""),
config_yaml=data.get("config", ""),
readme=data.get("readme", ""),
config_hash=data.get("config_hash", ""),
defaults=data.get("defaults", "")
)
def get_categories(self) -> List[Dict[str, Any]]:
"""
Get list of tool categories.
Returns:
List of category dicts with name, description, icon
"""
response = self._request("GET", "/categories")
if response.status_code != 200:
self._handle_error_response(response)
return response.json().get("data", [])
def get_collections(self) -> List[Dict[str, Any]]:
"""
Get list of tool collections.
Returns:
List of collection dicts with name, display_name, description, tools
"""
response = self._request("GET", "/collections")
if response.status_code != 200:
self._handle_error_response(response)
return response.json().get("data", [])
def get_collection(self, name: str) -> Dict[str, Any]:
"""
Get detailed information about a collection.
Args:
name: Collection name
Returns:
Collection dict with tools and pinned versions
"""
response = self._request("GET", f"/collections/{name}")
if response.status_code == 404:
raise RegistryError(
code="COLLECTION_NOT_FOUND",
message=f"Collection '{name}' not found",
http_status=404
)
if response.status_code != 200:
self._handle_error_response(response)
return response.json().get("data", {})
def publish_tool(
self,
config_yaml: str,
readme: str = "",
defaults: str = "",
dry_run: bool = False,
visibility: str = "public",
owner: str = ""
) -> Dict[str, Any]:
"""
Publish a tool to the registry.
Args:
config_yaml: Tool configuration YAML content
readme: README.md content
defaults: Default settings YAML content (defaults.yaml)
dry_run: If True, validate without publishing
visibility: Tool visibility - "public", "private", or "unlisted"
owner: Optional owner override (admin only, use "official" for official tools)
Returns:
Dict with PR URL or validation results
"""
payload = {
"config": config_yaml,
"readme": readme,
"dry_run": dry_run,
"visibility": visibility
}
if defaults:
payload["defaults"] = defaults
if owner:
payload["owner"] = owner
response = self._request(
"POST",
"/tools",
json_data=payload,
require_auth=True
)
if response.status_code == 409:
# Version already exists
self._handle_error_response(response)
if response.status_code not in (200, 201):
self._handle_error_response(response)
return response.json().get("data", {})
def validate_token(self) -> tuple[bool, str]:
"""
Validate that the current token is valid.
Returns:
Tuple of (is_valid, error_message)
- (True, "") if token is valid
- (False, "reason") if token is invalid or missing
"""
if not self.token:
return False, "No token configured"
try:
response = self._request("GET", "/me/tools", require_auth=True)
if response.status_code == 200:
return True, ""
elif response.status_code == 401:
return False, "Token is invalid or revoked"
else:
return False, f"Unexpected response: {response.status_code}"
except RegistryError as e:
if e.http_status == 401 or e.code == "UNAUTHORIZED":
return False, "Token is invalid or revoked"
return False, str(e.message)
except Exception as e:
return False, f"Connection error: {e}"
def get_my_tools(self) -> List[ToolInfo]:
"""
Get tools published by the authenticated user.
Returns:
List of ToolInfo objects
"""
response = self._request("GET", "/me/tools", require_auth=True)
if response.status_code != 200:
self._handle_error_response(response)
tools = response.json().get("data", [])
return [ToolInfo.from_dict(t) for t in tools]
def get_my_tool_status(self, name: str) -> Dict[str, Any]:
"""
Get the moderation status of a specific tool owned by the authenticated user.
Args:
name: Tool name
Returns:
Dict with name, version, status (pending/approved/rejected), config_hash, published_at
"""
response = self._request("GET", f"/me/tools/{name}/status", require_auth=True)
if response.status_code == 404:
raise RegistryError(
code="TOOL_NOT_FOUND",
message=f"Tool '{name}' not found",
http_status=404
)
if response.status_code != 200:
self._handle_error_response(response)
return response.json().get("data", {})
def get_tool_status_by_hashes(self, hashes: List[str]) -> Dict[str, Dict[str, Any]]:
"""
Look up tool statuses by config hash (batch).
Args:
hashes: List of config hash strings
Returns:
Dict mapping hash -> status info (owner, name, version, status, config_hash, feedback)
"""
response = self._request(
"POST", "/tools/status-by-hash",
json_data={"hashes": hashes},
require_auth=True
)
if response.status_code != 200:
self._handle_error_response(response)
return response.json().get("data", {})
def get_me(self) -> Dict[str, Any]:
"""
Get current user info.
Returns:
Dict with user info including id, slug, email, role, etc.
"""
response = self._request("GET", "/me", require_auth=True)
if response.status_code != 200:
self._handle_error_response(response)
return response.json().get("data", {})
def has_approved_public_tool(self, owner: str, name: str) -> bool:
"""
Check if any approved public version of a tool exists.
Args:
owner: Tool owner
name: Tool name
Returns:
True if at least one approved public version exists
"""
response = self._request("GET", f"/tools/{owner}/{name}/approved")
if response.status_code == 404:
raise RegistryError(
code="TOOL_NOT_FOUND",
message=f"Tool '{owner}/{name}' not found",
http_status=404
)
if response.status_code != 200:
self._handle_error_response(response)
return bool(response.json().get("data", {}).get("has_approved_public_version"))
def publish_collection(self, data: dict) -> dict:
"""
Publish a collection to the registry.
Args:
data: Collection data dict with name, display_name, tools, etc.
Returns:
Published collection data
"""
response = self._request(
"POST",
"/collections",
json_data=data,
require_auth=True
)
if response.status_code == 409:
raise RegistryError(
code="COLLECTION_EXISTS",
message="Collection already exists with different owner",
http_status=409
)
if response.status_code not in (200, 201):
self._handle_error_response(response)
return response.json().get("data", {})
# -------------------------------------------------------------------------
# Reviews & Ratings
# -------------------------------------------------------------------------
def get_tool_rating(self, owner: str, name: str) -> Dict[str, Any]:
"""
Get rating summary for a tool.
Args:
owner: Tool owner
name: Tool name
Returns:
Dict with average_rating, rating_count, distribution, etc.
"""
response = self._request("GET", f"/tools/{owner}/{name}/rating")
if response.status_code == 404:
raise RegistryError(
code="TOOL_NOT_FOUND",
message=f"Tool '{owner}/{name}' not found",
http_status=404
)
if response.status_code != 200:
self._handle_error_response(response)
return response.json().get("data", {})
def get_my_review(self, owner: str, name: str) -> Optional[Dict[str, Any]]:
"""
Get the current user's review for a tool.
Args:
owner: Tool owner
name: Tool name
Returns:
Review dict or None if no review exists
"""
response = self._request("GET", f"/tools/{owner}/{name}/my-review", require_auth=True)
if response.status_code == 404:
# No review found - not an error
return None
if response.status_code != 200:
self._handle_error_response(response)
return response.json().get("data", {})
def submit_review(
self, owner: str, name: str, rating: int, title: str = "", content: str = ""
) -> Dict[str, Any]:
"""
Submit a review for a tool.
Args:
owner: Tool owner
name: Tool name
rating: Rating 1-5
title: Optional review title
content: Optional review content
Returns:
Dict with review id and data
"""
payload: Dict[str, Any] = {"rating": rating}
if title:
payload["title"] = title
if content:
payload["content"] = content
response = self._request(
"POST", f"/tools/{owner}/{name}/reviews",
json_data=payload,
require_auth=True
)
if response.status_code not in (200, 201):
self._handle_error_response(response)
return response.json().get("data", {})
def update_review(
self, review_id: int, rating: int, title: str = "", content: str = ""
) -> Dict[str, Any]:
"""
Update an existing review.
Args:
review_id: Review ID
rating: Rating 1-5
title: Review title
content: Review content
Returns:
Dict with status
"""
payload: Dict[str, Any] = {"rating": rating, "title": title, "content": content}
response = self._request(
"PUT", f"/reviews/{review_id}",
json_data=payload,
require_auth=True
)
if response.status_code != 200:
self._handle_error_response(response)
return response.json().get("data", {})
def delete_review(self, review_id: int) -> Dict[str, Any]:
"""
Delete a review.
Args:
review_id: Review ID
Returns:
Dict with status
"""
response = self._request(
"DELETE", f"/reviews/{review_id}",
require_auth=True
)
if response.status_code != 200:
self._handle_error_response(response)
return response.json().get("data", {})
# -------------------------------------------------------------------------
# Issues
# -------------------------------------------------------------------------
def submit_issue(
self, owner: str, name: str, issue_type: str, severity: str,
title: str, description: str = ""
) -> Dict[str, Any]:
"""
Submit an issue for a tool.
Args:
owner: Tool owner
name: Tool name
issue_type: One of 'bug', 'security', 'compatibility'
severity: One of 'low', 'medium', 'high', 'critical'
title: Issue title (max 200 chars)
description: Issue description (max 5000 chars)
Returns:
Dict with issue id and data
"""
payload: Dict[str, Any] = {
"issue_type": issue_type,
"severity": severity,
"title": title,
}
if description:
payload["description"] = description
response = self._request(
"POST", f"/tools/{owner}/{name}/issues",
json_data=payload,
)
if response.status_code not in (200, 201):
self._handle_error_response(response)
return response.json().get("data", {})
def get_popular_tools(self, limit: int = 10) -> List[ToolInfo]:
"""
Get most popular tools.
Args:
limit: Maximum number of tools to return
Returns:
List of ToolInfo objects
"""
response = self._request(
"GET",
"/stats/popular",
params={"limit": limit}
)
if response.status_code != 200:
self._handle_error_response(response)
tools = response.json().get("data", [])
return [ToolInfo.from_dict(t) for t in tools]
# -------------------------------------------------------------------------
# Index Caching
# -------------------------------------------------------------------------
def get_index(self, force_refresh: bool = False) -> Dict[str, Any]:
"""
Get the full tool index, using cache when possible.
Args:
force_refresh: Force refresh from server
Returns:
Index dict with tools list
"""
# Check cache first
if not force_refresh:
cached = self._load_cached_index()
if cached:
return cached
# Fetch from server
etag = self._get_cached_etag()
response = self._request("GET", "/index.json", etag=etag)
if response.status_code == 304:
# Not modified, use cache
cached = self._load_cached_index()
if cached:
return cached
if response.status_code != 200:
# Try to use stale cache on error
cached = self._load_cached_index(ignore_age=True)
if cached:
return cached
self._handle_error_response(response)
data = response.json()
# Cache the response
new_etag = response.headers.get("ETag")
self._save_cached_index(data, new_etag)
return data
def _load_cached_index(self, ignore_age: bool = False) -> Optional[Dict]:
"""Load cached index if valid."""
if not INDEX_CACHE_FILE.exists():
return None
try:
cache_data = json.loads(INDEX_CACHE_FILE.read_text())
# Check age
if not ignore_age:
cached_at = datetime.fromisoformat(cache_data.get("_cached_at", ""))
if datetime.now() - cached_at > INDEX_CACHE_MAX_AGE:
return None
# Verify checksum
if not self._verify_index_checksum(cache_data):
return None
return cache_data
except (json.JSONDecodeError, KeyError, ValueError):
return None
def _save_cached_index(self, data: Dict, etag: Optional[str] = None) -> None:
"""Save index to cache."""
CACHE_DIR.mkdir(parents=True, exist_ok=True)
data["_cached_at"] = datetime.now().isoformat()
if etag:
data["_etag"] = etag
INDEX_CACHE_FILE.write_text(json.dumps(data, indent=2))
def _get_cached_etag(self) -> Optional[str]:
"""Get ETag from cached index."""
if not INDEX_CACHE_FILE.exists():
return None
try:
cache_data = json.loads(INDEX_CACHE_FILE.read_text())
return cache_data.get("_etag")
except (json.JSONDecodeError, KeyError):
return None
def _verify_index_checksum(self, data: Dict) -> bool:
"""Verify cached index integrity."""
checksum = data.get("checksum", "")
if not checksum:
return True # No checksum to verify
# Compute checksum of tools list
tools = data.get("tools", [])
content = json.dumps(tools, sort_keys=True)
computed = "sha256:" + hashlib.sha256(content.encode()).hexdigest()
return computed == checksum
def clear_cache(self) -> None:
"""Clear the local index cache."""
if INDEX_CACHE_FILE.exists():
INDEX_CACHE_FILE.unlink()
# -------------------------------------------------------------------------
# Convenience functions
# -------------------------------------------------------------------------
def get_client() -> RegistryClient:
"""Get a configured registry client instance."""
return RegistryClient()
def search(query: str, **kwargs) -> PaginatedResponse:
"""Search the registry for tools."""
return get_client().search_tools(query, **kwargs)
def install_tool(tool_spec: str, version: Optional[str] = None) -> DownloadResult:
"""
Download a tool for installation.
Args:
tool_spec: Tool specification (owner/name or just name)
version: Version constraint
Returns:
DownloadResult with config YAML
"""
client = get_client()
# Parse tool spec
if "/" in tool_spec:
owner, name = tool_spec.split("/", 1)
else:
# Shorthand - try official namespace first
owner = "official"
name = tool_spec
try:
return client.download_tool(owner, name, version=version, install=True)
except RegistryError as e:
if e.code == "TOOL_NOT_FOUND" and owner == "official":
# Fall back to searching for most popular tool with this name
results = client.search_tools(name, per_page=1)
if results.data:
first = results.data[0]
return client.download_tool(
first["owner"],
first["name"],
version=version,
install=True
)
raise