"""Registry commands.""" import json import re import sys from pathlib import Path import yaml from ..config import load_config, set_registry_token from ..resolver import install_from_registry, uninstall_tool, ToolSpec def cmd_registry(args): """Handle registry subcommands.""" from ..registry_client import RegistryClient, RegistryError, RateLimitError, get_client if args.registry_cmd == "search": return _cmd_registry_search(args) elif args.registry_cmd == "tags": return _cmd_registry_tags(args) elif args.registry_cmd == "install": return _cmd_registry_install(args) elif args.registry_cmd == "uninstall": return _cmd_registry_uninstall(args) elif args.registry_cmd == "info": return _cmd_registry_info(args) elif args.registry_cmd == "update": return _cmd_registry_update(args) elif args.registry_cmd == "publish": return _cmd_registry_publish(args) elif args.registry_cmd == "my-tools": return _cmd_registry_my_tools(args) elif args.registry_cmd == "status": return _cmd_registry_status(args) elif args.registry_cmd == "browse": return _cmd_registry_browse(args) elif args.registry_cmd == "config": return _cmd_registry_config(args) else: # Default: show registry help print("Registry commands:") print(" search Search for tools") print(" tags List available tags") print(" install Install a tool") print(" uninstall Uninstall a tool") print(" info Show tool information") print(" update Update local index cache") print(" publish [path] Publish a tool") print(" my-tools List your published tools") print(" status Check moderation status of a tool") print(" browse Browse tools (GUI)") print(" config [action] Manage registry settings (admin)") return 0 def _cmd_registry_search(args): """Search for tools in the registry.""" from ..registry_client import RegistryError, get_client try: client = get_client() # Handle shortcut flags min_downloads = getattr(args, 'min_downloads', None) max_downloads = None if getattr(args, 'popular', False): min_downloads = 100 if getattr(args, 'new', False): max_downloads = 10 results = client.search_tools( query=args.query, category=getattr(args, 'category', None), tags=getattr(args, 'tags', None), owner=getattr(args, 'owner', None), min_downloads=min_downloads, max_downloads=max_downloads, published_after=getattr(args, 'since', None), published_before=getattr(args, 'before', None), include_deprecated=getattr(args, 'deprecated', False), include_facets=getattr(args, 'show_facets', False), per_page=args.limit or 20, sort=getattr(args, 'sort', 'relevance') ) # JSON output if getattr(args, 'json', False): output = { "query": args.query, "total": results.total, "results": results.data } if results.facets: output["facets"] = results.facets print(json.dumps(output, indent=2)) return 0 if not results.data: print(f"No tools found matching '{args.query}'") return 0 print(f"Found {results.total} tools matching \"{args.query}\":") # Show facets summary if requested if results.facets: cats = results.facets.get("categories", [])[:5] tags = results.facets.get("tags", [])[:5] if cats: cat_str = ", ".join(f"{c['name']} ({c['count']})" for c in cats) print(f"\nCategories: {cat_str}") if tags: tag_str = ", ".join(f"{t['name']} ({t['count']})" for t in tags) print(f"Top Tags: {tag_str}") print() for tool in results.data: owner = tool.get("owner", "") name = tool.get("name", "") version = tool.get("version", "") desc = tool.get("description", "") downloads = tool.get("downloads", 0) tags = tool.get("tags", []) print(f" {owner}/{name} v{version}") print(f" {desc[:60]}{'...' if len(desc) > 60 else ''}") if tags: print(f" Tags: {', '.join(tags[:5])}") print(f" Downloads: {downloads:,}") print() if results.total_pages > 1: print(f"Showing page {results.page}/{results.total_pages}") except RegistryError as e: if e.code == "CONNECTION_ERROR": print("Could not connect to the registry.", file=sys.stderr) print("Check your internet connection or try again later.", file=sys.stderr) elif e.code == "RATE_LIMITED": print(f"Rate limited. Please wait and try again.", file=sys.stderr) else: print(f"Error: {e.message}", file=sys.stderr) return 1 except Exception as e: print(f"Error searching registry: {e}", file=sys.stderr) print("If the problem persists, check: cmdforge config show", file=sys.stderr) return 1 return 0 def _cmd_registry_tags(args): """List available tags.""" from ..registry_client import RegistryError, get_client try: client = get_client() tags = client.get_tags( category=getattr(args, 'category', None), limit=getattr(args, 'limit', 50) ) # JSON output if getattr(args, 'json', False): print(json.dumps({"tags": tags}, indent=2)) return 0 if not tags: print("No tags found") return 0 print(f"Available tags ({len(tags)}):\n") for tag in tags: print(f" {tag['name']:20} ({tag['count']} tools)") except RegistryError as e: if e.code == "CONNECTION_ERROR": print("Could not connect to the registry.", file=sys.stderr) else: print(f"Error: {e.message}", file=sys.stderr) return 1 except Exception as e: print(f"Error: {e}", file=sys.stderr) return 1 return 0 def _cmd_registry_install(args): """Install a tool from the registry.""" from ..registry_client import RegistryError from ..tool import BIN_DIR from ..system_deps import prompt_install_missing tool_spec = args.tool version = args.version auto_yes = getattr(args, 'yes', False) skip_sys_deps = getattr(args, 'no_system_deps', False) print(f"Installing {tool_spec}...") try: resolved = install_from_registry(tool_spec, version) print(f"Installed: {resolved.full_name}@{resolved.version}") print(f"Location: {resolved.path}") # Show wrapper info wrapper_name = resolved.tool.name if resolved.owner: # Check for collision short_wrapper = BIN_DIR / resolved.tool.name if short_wrapper.exists(): wrapper_name = f"{resolved.owner}-{resolved.tool.name}" print(f"\nRun with: {wrapper_name}") # Check system dependencies unless skipped if not skip_sys_deps and resolved.tool.system_dependencies: print() tool_ref = f"{resolved.owner}/{resolved.tool.name}" if resolved.owner else resolved.tool.name prompt_install_missing(resolved.tool.system_dependencies, tool_ref, auto_yes=auto_yes) except RegistryError as e: if e.code == "TOOL_NOT_FOUND": print(f"Tool '{tool_spec}' not found in the registry.", file=sys.stderr) print(f"Try: cmdforge registry search {tool_spec.split('/')[-1]}", file=sys.stderr) elif e.code == "VERSION_NOT_FOUND" or e.code == "CONSTRAINT_UNSATISFIABLE": print(f"Error: {e.message}", file=sys.stderr) if e.details and "available_versions" in e.details: versions = e.details["available_versions"] print(f"Available versions: {', '.join(versions[:5])}", file=sys.stderr) if e.details.get("latest_stable"): print(f"Latest stable: {e.details['latest_stable']}", file=sys.stderr) elif e.code == "CONNECTION_ERROR": print("Could not connect to the registry.", file=sys.stderr) print("Check your internet connection or try again later.", file=sys.stderr) else: print(f"Error: {e.message}", file=sys.stderr) return 1 except Exception as e: print(f"Error installing tool: {e}", file=sys.stderr) return 1 return 0 def _cmd_registry_uninstall(args): """Uninstall a tool.""" tool_spec = args.tool print(f"Uninstalling {tool_spec}...") if uninstall_tool(tool_spec): print(f"Uninstalled: {tool_spec}") else: print(f"Tool '{tool_spec}' not found", file=sys.stderr) return 1 return 0 def _cmd_registry_info(args): """Show tool information.""" from ..registry_client import RegistryError, get_client tool_spec = args.tool try: # Parse the tool spec parsed = ToolSpec.parse(tool_spec) owner = parsed.owner or "official" client = get_client() tool_info = client.get_tool(owner, parsed.name) print(f"{tool_info.owner}/{tool_info.name} v{tool_info.version}") print("=" * 50) print(f"Description: {tool_info.description}") print(f"Category: {tool_info.category}") print(f"Tags: {', '.join(tool_info.tags)}") print(f"Downloads: {tool_info.downloads}") print(f"Published: {tool_info.published_at}") if tool_info.deprecated: print() print(f"DEPRECATED: {tool_info.deprecated_message}") if tool_info.replacement: print(f"Use instead: {tool_info.replacement}") # Show versions versions = client.get_tool_versions(owner, parsed.name) if versions: print(f"\nVersions: {', '.join(versions[:5])}") if len(versions) > 5: print(f" ...and {len(versions) - 5} more") print(f"\nInstall: cmdforge registry install {tool_info.owner}/{tool_info.name}") except RegistryError as e: if e.code == "TOOL_NOT_FOUND": print(f"Tool '{tool_spec}' not found in the registry.", file=sys.stderr) print(f"Try: cmdforge registry search {parsed.name}", file=sys.stderr) elif e.code == "CONNECTION_ERROR": print("Could not connect to the registry.", file=sys.stderr) print("Check your internet connection or try again later.", file=sys.stderr) else: print(f"Error: {e.message}", file=sys.stderr) return 1 except Exception as e: print(f"Error fetching tool info: {e}", file=sys.stderr) return 1 return 0 def _cmd_registry_update(args): """Update local index cache.""" from ..registry_client import RegistryError, get_client print("Updating registry index...") try: client = get_client() index = client.get_index(force_refresh=True) tool_count = index.get("tool_count", len(index.get("tools", []))) generated = index.get("generated_at", "unknown") print(f"Index updated: {tool_count} tools") print(f"Generated: {generated}") except RegistryError as e: if e.code == "CONNECTION_ERROR": print("Could not connect to the registry.", file=sys.stderr) print("Check your internet connection or try again later.", file=sys.stderr) elif e.code == "RATE_LIMITED": print("Rate limited. Please wait a moment and try again.", file=sys.stderr) else: print(f"Error: {e.message}", file=sys.stderr) return 1 except Exception as e: print(f"Error updating index: {e}", file=sys.stderr) return 1 return 0 def _cmd_registry_publish(args): """Publish a tool to the registry.""" from ..registry_client import RegistryError, get_client # Read tool from current directory or specified path tool_path = Path(args.path) if args.path else Path.cwd() if tool_path.is_dir(): config_path = tool_path / "config.yaml" else: config_path = tool_path tool_path = config_path.parent if not config_path.exists(): print(f"Error: config.yaml not found in {tool_path}", file=sys.stderr) return 1 # Read config config_yaml = config_path.read_text() # Read README if exists readme_path = tool_path / "README.md" readme = readme_path.read_text() if readme_path.exists() else "" # Read defaults if exists defaults_path = tool_path / "defaults.yaml" defaults = "" if defaults_path.exists(): defaults = defaults_path.read_text() # Warn about potential secrets in defaults defaults_lower = defaults.lower() secret_patterns = ['api_key:', 'api_secret:', 'password:', 'token:', 'secret:'] for pattern in secret_patterns: if pattern in defaults_lower: # Check if it has a non-empty value match = re.search(rf'{pattern}\s*["\']?([^"\'\n]+)', defaults_lower) if match and match.group(1).strip() and match.group(1).strip() not in ('""', "''", ''): print(f"Warning: defaults.yaml contains '{pattern[:-1]}' with a value.") print(" Make sure you're not publishing actual credentials!") if not getattr(args, 'force', False): try: confirm = input("Continue anyway? [y/N] ") if confirm.lower() != 'y': return 1 except (EOFError, KeyboardInterrupt): print("\nCancelled.") return 1 break # Validate try: data = yaml.safe_load(config_yaml) name = data.get("name", "") version = data.get("version", "") if not name or not version: print("Error: config.yaml must have 'name' and 'version' fields", file=sys.stderr) return 1 except yaml.YAMLError as e: print(f"Error: Invalid YAML in config.yaml: {e}", file=sys.stderr) return 1 if args.dry_run: print("Dry run - validating only") print() print(f"Would publish:") print(f" Name: {name}") print(f" Version: {version}") print(f" Config: {len(config_yaml)} bytes") print(f" README: {len(readme)} bytes") return 0 # Check for token config = load_config() if not config.registry.token: print("No registry token configured.") print() print("To publish tools, you need an account:") print(" 1. Register at: https://cmdforge.brrd.tech/register") print(" 2. Log in and go to Dashboard > Tokens") print(" 3. Generate an API token") print(" 4. Enter your token below (or run: cmdforge config set-token )") print() try: token = input("Registry token: ").strip() if not token: print("Cancelled.") return 1 set_registry_token(token) print("Token saved.") except (EOFError, KeyboardInterrupt): print("\nCancelled.") return 1 print(f"Publishing {name}@{version}...") try: client = get_client() result = client.publish_tool(config_yaml, readme, defaults) pr_url = result.get("pr_url", "") status = result.get("status", "") if status == "published" or result.get("version"): print(f"Published: {result.get('owner', '')}/{result.get('name', '')}@{result.get('version', version)}") elif pr_url: print(f"PR created: {pr_url}") print("Your tool is pending review.") else: print("Published successfully!") # Show suggestions if provided (from Phase 6 smart features) suggestions = result.get("suggestions", {}) if suggestions: print() # Category suggestion cat_suggestion = suggestions.get("category") if cat_suggestion and cat_suggestion.get("suggested"): confidence = cat_suggestion.get("confidence", 0) print(f"Suggested category: {cat_suggestion['suggested']} ({confidence:.0%} confidence)") # Similar tools warning similar = suggestions.get("similar_tools", []) if similar: print("Similar existing tools:") for tool in similar[:3]: similarity = tool.get("similarity", 0) print(f" - {tool.get('name', 'unknown')} ({similarity:.0%} similar)") # Save registry_hash and status to local config for tracking config_hash = result.get("config_hash") moderation_status = result.get("status", "pending") if config_hash: try: config_data = yaml.safe_load(config_path.read_text()) or {} config_data["registry_hash"] = config_hash config_data["registry_status"] = moderation_status # Clear any old feedback when republishing if "registry_feedback" in config_data: del config_data["registry_feedback"] config_path.write_text(yaml.dump(config_data, default_flow_style=False, sort_keys=False)) except Exception: pass # Non-critical except RegistryError as e: if e.code == "UNAUTHORIZED": print("Authentication failed.", file=sys.stderr) print("Your token may have expired. Generate a new one from the registry.", file=sys.stderr) elif e.code == "INVALID_CONFIG": print(f"Invalid tool config: {e.message}", file=sys.stderr) print("Check your config.yaml for errors.", file=sys.stderr) elif e.code == "VERSION_EXISTS": print(f"Version already exists: {e.message}", file=sys.stderr) print("Bump the version in config.yaml and try again.", file=sys.stderr) elif e.code == "CONNECTION_ERROR": print("Could not connect to the registry.", file=sys.stderr) print("Check your internet connection or try again later.", file=sys.stderr) elif e.code == "RATE_LIMITED": print("Rate limited. Please wait a moment and try again.", file=sys.stderr) else: print(f"Error: {e.message}", file=sys.stderr) return 1 except Exception as e: print(f"Error publishing: {e}", file=sys.stderr) return 1 return 0 def _cmd_registry_my_tools(args): """List your published tools.""" from ..registry_client import RegistryError, get_client try: client = get_client() tools = client.get_my_tools() if not tools: print("You haven't published any tools yet.") print("Publish your first tool with: cmdforge registry publish") return 0 print(f"Your published tools ({len(tools)}):\n") for tool in tools: status = "[DEPRECATED]" if tool.deprecated else "" print(f" {tool.owner}/{tool.name} v{tool.version} {status}") print(f" Downloads: {tool.downloads}") print() except RegistryError as e: if e.code == "UNAUTHORIZED": print("Not logged in. Set your registry token first:", file=sys.stderr) print(" cmdforge config set-token ", file=sys.stderr) print() print("Don't have a token? Register at the registry website.", file=sys.stderr) elif e.code == "CONNECTION_ERROR": print("Could not connect to the registry.", file=sys.stderr) print("Check your internet connection or try again later.", file=sys.stderr) elif e.code == "RATE_LIMITED": print("Rate limited. Please wait a moment and try again.", file=sys.stderr) else: print(f"Error: {e.message}", file=sys.stderr) return 1 except Exception as e: print(f"Error: {e}", file=sys.stderr) return 1 return 0 def _cmd_registry_status(args): """Check moderation status of a tool.""" from ..registry_client import RegistryError, get_client from ..tool import get_tools_dir tool_name = args.tool do_sync = getattr(args, 'sync', False) # Check if tool exists locally config_path = get_tools_dir() / tool_name / "config.yaml" if not config_path.exists(): print(f"Error: Tool '{tool_name}' not found locally", file=sys.stderr) return 1 try: config_data = yaml.safe_load(config_path.read_text()) or {} except yaml.YAMLError as e: print(f"Error reading tool config: {e}", file=sys.stderr) return 1 local_status = config_data.get("registry_status", "not_published") local_hash = config_data.get("registry_hash") local_feedback = config_data.get("registry_feedback") if not local_hash: print(f"Tool '{tool_name}' has not been published to the registry.") print() print("Publish with: cmdforge registry publish") return 0 # If syncing, fetch from server if do_sync: try: client = get_client() status_data = client.get_my_tool_status(tool_name) new_status = status_data.get("status", "pending") new_hash = status_data.get("config_hash") new_feedback = status_data.get("feedback") changed = False if local_status != new_status: config_data["registry_status"] = new_status local_status = new_status changed = True if new_hash and local_hash != new_hash: config_data["registry_hash"] = new_hash local_hash = new_hash changed = True if new_feedback != local_feedback: if new_feedback: config_data["registry_feedback"] = new_feedback local_feedback = new_feedback elif "registry_feedback" in config_data: del config_data["registry_feedback"] local_feedback = None changed = True if changed: config_path.write_text(yaml.dump(config_data, default_flow_style=False, sort_keys=False)) print("Status synced from server.\n") except RegistryError as e: if e.code == "UNAUTHORIZED": print("Not logged in. Set your registry token to sync.", file=sys.stderr) elif e.code == "TOOL_NOT_FOUND": print(f"Tool '{tool_name}' not found in registry.", file=sys.stderr) else: print(f"Error syncing: {e.message}", file=sys.stderr) return 1 except Exception as e: print(f"Error syncing: {e}", file=sys.stderr) return 1 # Display status print(f"Tool: {tool_name}") print(f"Registry Hash: {local_hash[:16]}...") status_colors = { "approved": "\033[32mApproved\033[0m", # Green "pending": "\033[33mPending Review\033[0m", # Yellow "changes_requested": "\033[93mChanges Requested\033[0m", # Light yellow/orange "rejected": "\033[31mRejected\033[0m", # Red } status_display = status_colors.get(local_status, local_status) print(f"Status: {status_display}") if local_feedback: print() print("Feedback from moderator:") print("-" * 40) print(local_feedback) print("-" * 40) if local_status == "changes_requested": print() print("Action required: Address the feedback above and republish.") print(" cmdforge registry publish") elif local_status == "rejected": print() print("Your tool was rejected. Review the feedback above.") elif local_status == "pending": print() print("Your tool is waiting for moderator review.") print("Use --sync to check for updates.") elif local_status == "approved": print() print("Your tool is live in the registry!") return 0 def _cmd_registry_browse(args): """Browse tools (GUI).""" from ..gui import run_gui # Launch GUI - it will open to Registry page return run_gui() def _cmd_registry_config(args): """Manage registry settings (admin only).""" from ..registry_client import RegistryError, get_client action = getattr(args, 'action', 'list') key = getattr(args, 'key', None) value = getattr(args, 'value', None) as_json = getattr(args, 'json', False) category = getattr(args, 'category', None) try: client = get_client() if action == "list": return _config_list(client, as_json, category) elif action == "get": if not key: print("Error: key is required for 'get' action", file=sys.stderr) print("Usage: cmdforge registry config get ", file=sys.stderr) return 1 return _config_get(client, key, as_json) elif action == "set": if not key or value is None: print("Error: key and value are required for 'set' action", file=sys.stderr) print("Usage: cmdforge registry config set ", file=sys.stderr) return 1 return _config_set(client, key, value) except RegistryError as e: if e.code == "UNAUTHORIZED": print("Authentication failed.", file=sys.stderr) print("This command requires admin privileges.", file=sys.stderr) print("Set your admin token with: cmdforge config set-token ", file=sys.stderr) elif e.code == "FORBIDDEN": print("Access denied. Admin privileges required.", file=sys.stderr) elif e.code == "CONNECTION_ERROR": print("Could not connect to the registry.", file=sys.stderr) else: print(f"Error: {e.message}", file=sys.stderr) return 1 except Exception as e: print(f"Error: {e}", file=sys.stderr) return 1 return 0 def _config_list(client, as_json, category=None): """List all settings.""" # Use the admin settings endpoint response = client._request("GET", "/admin/settings") settings = response.get("settings", []) # Filter by category if specified if category: settings = [s for s in settings if s.get("category") == category] if as_json: print(json.dumps({"settings": settings}, indent=2)) return 0 if not settings: print("No settings found.") return 0 # Group by category by_category = {} for s in settings: cat = s.get("category", "general") if cat not in by_category: by_category[cat] = [] by_category[cat].append(s) print("Registry Settings") print("=" * 60) for cat, cat_settings in sorted(by_category.items()): print(f"\n[{cat.upper()}]") for s in cat_settings: key = s.get("key", "") value = s.get("value") value_type = s.get("value_type", "string") desc = s.get("description", "") is_default = s.get("is_default", True) # Format value display if value_type == "bool": value_str = "true" if value else "false" else: value_str = str(value) status = "" if is_default else " (modified)" print(f" {key}") print(f" Value: {value_str}{status}") if desc: print(f" {desc}") print() print("Use 'cmdforge registry config get ' to see a setting's value") print("Use 'cmdforge registry config set ' to change a setting") return 0 def _config_get(client, key, as_json): """Get a specific setting.""" response = client._request("GET", f"/admin/settings/{key}") if as_json: print(json.dumps(response, indent=2)) return 0 setting = response.get("setting", {}) print(f"Key: {setting.get('key', key)}") print(f"Value: {setting.get('value')}") print(f"Type: {setting.get('value_type', 'string')}") print(f"Category: {setting.get('category', 'general')}") if setting.get('description'): print(f"Description: {setting['description']}") if setting.get('updated_at'): print(f"Last updated: {setting['updated_at'][:19]} by {setting.get('updated_by', 'system')}") return 0 def _config_set(client, key, value): """Set a setting value.""" # Try to parse value as appropriate type parsed_value = value # Try to parse as bool if value.lower() in ("true", "false"): parsed_value = value.lower() == "true" # Try to parse as number else: try: if "." in value: parsed_value = float(value) else: parsed_value = int(value) except ValueError: # Keep as string pass response = client._request("PUT", f"/admin/settings/{key}", json={"value": parsed_value}) if response.get("success"): print(f"Setting '{key}' updated successfully.") print(f"New value: {response.get('setting', {}).get('value', parsed_value)}") else: print(f"Failed to update setting: {response.get('error', 'Unknown error')}", file=sys.stderr) return 1 return 0