"""Tool resolution with proper search order. Implements the tool resolution order: 1. Local project: ./.smarttools///config.yaml 2. Global user: ~/.smarttools///config.yaml 3. Registry: Fetch from API, install to global, then run 4. Error if not found """ import logging import re import sys from dataclasses import dataclass from pathlib import Path from typing import Optional, Tuple import yaml from .tool import Tool, TOOLS_DIR, get_bin_dir, BIN_DIR from .config import is_auto_fetch_enabled, load_config from .manifest import load_manifest logger = logging.getLogger(__name__) # Local project tools directories (support both legacy and documented paths) LOCAL_TOOLS_DIRS = [Path(".smarttools"), Path("smarttools")] @dataclass class ToolSpec: """Parsed tool specification.""" owner: Optional[str] # None for unqualified names like "summarize" name: str version: Optional[str] = None # Version constraint @property def full_name(self) -> str: """Get full owner/name format.""" if self.owner: return f"{self.owner}/{self.name}" return self.name @property def is_qualified(self) -> bool: """Check if this is a fully qualified name (owner/name).""" return self.owner is not None @classmethod def parse(cls, spec: str) -> "ToolSpec": """ Parse a tool specification string. Formats: - "summarize" -> owner=None, name="summarize" - "rob/summarize" -> owner="rob", name="summarize" - "summarize@1.0.0" -> name="summarize", version="1.0.0" - "rob/summarize@^1.0.0" -> owner="rob", name="summarize", version="^1.0.0" """ version = None # Extract version if present if "@" in spec: spec, version = spec.rsplit("@", 1) # Extract owner if present if "/" in spec: owner, name = spec.split("/", 1) else: owner = None name = spec return cls(owner=owner, name=name, version=version) @dataclass class ResolvedTool: """Result of tool resolution.""" tool: Tool source: str # "local", "global", "registry" path: Path owner: Optional[str] = None version: Optional[str] = None @property def full_name(self) -> str: if self.owner: return f"{self.owner}/{self.tool.name}" return self.tool.name class ToolNotFoundError(Exception): """Raised when a tool cannot be found.""" def __init__(self, spec: ToolSpec, searched_paths: list): self.spec = spec self.searched_paths = searched_paths super().__init__(f"Tool '{spec.full_name}' not found") class ToolResolver: """Resolves tool specifications to actual tool configs.""" def __init__( self, project_dir: Optional[Path] = None, auto_fetch: Optional[bool] = None, verbose: bool = False ): """ Initialize the resolver. Args: project_dir: Project root directory (default: cwd) auto_fetch: Override auto-fetch setting verbose: Print debug info """ self.project_dir = project_dir or Path.cwd() self.verbose = verbose # Determine auto-fetch setting if auto_fetch is not None: self.auto_fetch = auto_fetch else: self.auto_fetch = is_auto_fetch_enabled() # Load project manifest if present self.manifest = load_manifest() def resolve(self, spec: str | ToolSpec) -> ResolvedTool: """ Resolve a tool specification to an actual tool. Args: spec: Tool specification (string or ToolSpec) Returns: ResolvedTool with loaded tool and metadata Raises: ToolNotFoundError: If tool cannot be found """ if isinstance(spec, str): spec = ToolSpec.parse(spec) searched_paths = [] # Get version constraint from manifest if available version = spec.version if not version and self.manifest: for dep in self.manifest.dependencies: if dep.tool_name == spec.name: version = dep.version if not spec.owner and dep.owner: spec = ToolSpec(owner=dep.owner, name=spec.name, version=version) break # 1. Check local project directory result = self._find_in_local(spec, searched_paths) if result: return result # 2. Check global user directory result = self._find_in_global(spec, searched_paths) if result: return result # 3. Try fetching from registry if self.auto_fetch: result = self._fetch_from_registry(spec, version) if result: return result # Not found raise ToolNotFoundError(spec, searched_paths) def _find_in_local( self, spec: ToolSpec, searched_paths: list ) -> Optional[ResolvedTool]: """Search for tool in local project directory.""" local_dirs = [self.project_dir / path for path in LOCAL_TOOLS_DIRS] local_dirs = [path for path in local_dirs if path.exists()] if not local_dirs: return None for local_dir in local_dirs: # Try qualified path first if owner is specified if spec.owner: path = local_dir / spec.owner / spec.name / "config.yaml" searched_paths.append(str(path)) if path.exists(): tool = self._load_tool_from_path(path) if tool: return ResolvedTool( tool=tool, source="local", path=path.parent, owner=spec.owner ) # Try unqualified path path = local_dir / spec.name / "config.yaml" searched_paths.append(str(path)) if path.exists(): tool = self._load_tool_from_path(path) if tool: return ResolvedTool( tool=tool, source="local", path=path.parent ) # Search all owner directories for this tool name # Priority: official first, then alphabetical for deterministic resolution owner_dirs = [d for d in local_dir.iterdir() if d.is_dir() and not d.name.startswith(".")] def owner_priority(d: Path) -> tuple: if d.name == "official": return (0, d.name) return (1, d.name) owner_dirs.sort(key=owner_priority) for owner_dir in owner_dirs: tool_dir = owner_dir / spec.name config_path = tool_dir / "config.yaml" if config_path.exists(): tool = self._load_tool_from_path(config_path) if tool: return ResolvedTool( tool=tool, source="local", path=tool_dir, owner=owner_dir.name ) return None def _find_in_global( self, spec: ToolSpec, searched_paths: list ) -> Optional[ResolvedTool]: """Search for tool in global user directory.""" global_dir = TOOLS_DIR if not global_dir.exists(): return None # Try qualified path first if owner is specified if spec.owner: path = global_dir / spec.owner / spec.name / "config.yaml" searched_paths.append(str(path)) if path.exists(): tool = self._load_tool_from_path(path) if tool: return ResolvedTool( tool=tool, source="global", path=path.parent, owner=spec.owner ) # Try unqualified path (old-style tools without owner) path = global_dir / spec.name / "config.yaml" searched_paths.append(str(path)) if path.exists(): tool = self._load_tool_from_path(path) if tool: return ResolvedTool( tool=tool, source="global", path=path.parent ) # Search all owner directories for this tool name # Priority: official first, then alphabetical for deterministic resolution owner_dirs = [ d for d in global_dir.iterdir() if d.is_dir() and not d.name.startswith(".") and d.name not in ("registry",) ] # Sort with official first, then alphabetical def owner_priority(d: Path) -> tuple: if d.name == "official": return (0, d.name) return (1, d.name) owner_dirs.sort(key=owner_priority) for owner_dir in owner_dirs: tool_dir = owner_dir / spec.name config_path = tool_dir / "config.yaml" if config_path.exists(): tool = self._load_tool_from_path(config_path) if tool: return ResolvedTool( tool=tool, source="global", path=tool_dir, owner=owner_dir.name ) return None def _fetch_from_registry( self, spec: ToolSpec, version: Optional[str] = None ) -> Optional[ResolvedTool]: """Fetch and install tool from registry.""" try: # Import here to avoid circular imports from .registry_client import get_client, RegistryError if self.verbose: print(f"Fetching '{spec.full_name}' from registry...", file=sys.stderr) client = get_client() # Determine owner for registry lookup owner = spec.owner or "official" try: result = client.download_tool( owner=owner, name=spec.name, version=version, install=True ) except RegistryError as e: if e.code == "TOOL_NOT_FOUND" and not spec.owner: # Try searching for most popular tool with this name results = client.search_tools(spec.name, per_page=1) if results.data: first = results.data[0] result = client.download_tool( owner=first["owner"], name=first["name"], version=version, install=True ) else: return None else: raise # Install the tool locally resolved = self._install_from_registry( owner=result.owner, name=result.name, version=result.resolved_version, config_yaml=result.config_yaml, readme=result.readme ) if self.verbose: print( f"Installed: {result.owner}/{result.name}@{result.resolved_version}", file=sys.stderr ) return resolved except ImportError: # Registry client not available logger.debug("Registry client not available") return None except Exception as e: logger.warning(f"Registry fetch failed: {e}") if self.verbose: print(f"Registry fetch failed: {e}", file=sys.stderr) return None def _install_from_registry( self, owner: str, name: str, version: str, config_yaml: str, readme: str = "" ) -> ResolvedTool: """Install a tool fetched from registry to global directory.""" # Create directory structure tool_dir = TOOLS_DIR / owner / name tool_dir.mkdir(parents=True, exist_ok=True) # Write config config_path = tool_dir / "config.yaml" config_path.write_text(config_yaml) # Write README if present if readme: readme_path = tool_dir / "README.md" readme_path.write_text(readme) # Load the tool tool = self._load_tool_from_path(config_path) # Create wrapper script (handling collisions) self._create_wrapper_script(owner, name) return ResolvedTool( tool=tool, source="registry", path=tool_dir, owner=owner, version=version ) def _create_wrapper_script(self, owner: str, name: str) -> Path: """Create wrapper script with collision handling.""" import stat bin_dir = get_bin_dir() # Check if short name wrapper exists short_wrapper = bin_dir / name if short_wrapper.exists(): # Check if it belongs to the same owner existing_owner = self._get_wrapper_owner(short_wrapper) if existing_owner and existing_owner != owner: # Collision - use owner-name format wrapper_name = f"{owner}-{name}" else: wrapper_name = name else: wrapper_name = name wrapper_path = bin_dir / wrapper_name # Generate wrapper script import sys python_path = sys.executable script = f"""#!/bin/bash # SmartTools wrapper for '{owner}/{name}' # Auto-generated - do not edit exec {python_path} -m smarttools.runner {owner}/{name} "$@" """ wrapper_path.write_text(script) wrapper_path.chmod(wrapper_path.stat().st_mode | stat.S_IXUSR | stat.S_IXGRP | stat.S_IXOTH) return wrapper_path def _get_wrapper_owner(self, wrapper_path: Path) -> Optional[str]: """Extract owner from existing wrapper script.""" try: content = wrapper_path.read_text() # Look for pattern: smarttools.runner owner/name # Owner slugs can contain lowercase alphanumeric and hyphens match = re.search(r'smarttools\.runner\s+([a-z0-9][a-z0-9-]*)/([a-zA-Z0-9_-]+)', content) if match: return match.group(1) return None except Exception: return None def _load_tool_from_path(self, config_path: Path) -> Optional[Tool]: """Load a tool from a specific config path.""" try: data = yaml.safe_load(config_path.read_text()) # Handle legacy format if "prompt" in data and "steps" not in data: data = self._convert_legacy_format(data) return Tool.from_dict(data) except Exception as e: logger.warning(f"Error loading tool from {config_path}: {e}") if self.verbose: print(f"Error loading tool from {config_path}: {e}", file=sys.stderr) return None def _convert_legacy_format(self, data: dict) -> dict: """Convert legacy tool format to new format.""" steps = [] if data.get("prompt"): steps.append({ "type": "prompt", "prompt": data["prompt"], "provider": data.get("provider", "mock"), "output_var": "response" }) arguments = [] for inp in data.get("inputs", []): arguments.append({ "flag": inp.get("flag", f"--{inp['name']}"), "variable": inp["name"], "default": inp.get("default"), "description": inp.get("description", "") }) return { "name": data["name"], "description": data.get("description", ""), "arguments": arguments, "steps": steps, "output": "{response}" if steps else "{input}" } # ------------------------------------------------------------------------- # Convenience functions # ------------------------------------------------------------------------- def resolve_tool(spec: str, auto_fetch: Optional[bool] = None) -> ResolvedTool: """ Resolve a tool specification to an actual tool. Args: spec: Tool specification (e.g., "summarize", "rob/summarize@1.0.0") auto_fetch: Override auto-fetch setting Returns: ResolvedTool with loaded tool and metadata """ resolver = ToolResolver(auto_fetch=auto_fetch) return resolver.resolve(spec) def find_tool(name: str) -> Optional[ResolvedTool]: """ Find a tool by name without auto-fetching. Args: name: Tool name or owner/name Returns: ResolvedTool if found, None otherwise """ try: resolver = ToolResolver(auto_fetch=False) return resolver.resolve(name) except ToolNotFoundError: return None def install_from_registry(spec: str, version: Optional[str] = None) -> ResolvedTool: """ Install a tool from the registry. Args: spec: Tool specification version: Version constraint Returns: ResolvedTool for installed tool """ from .registry_client import get_client parsed = ToolSpec.parse(spec) if version: parsed.version = version client = get_client() owner = parsed.owner or "official" result = client.download_tool( owner=owner, name=parsed.name, version=parsed.version, install=True ) resolver = ToolResolver(auto_fetch=False) return resolver._install_from_registry( owner=result.owner, name=result.name, version=result.resolved_version, config_yaml=result.config_yaml, readme=result.readme ) def uninstall_tool(spec: str) -> bool: """ Uninstall a tool. Args: spec: Tool specification Returns: True if tool was uninstalled """ import shutil parsed = ToolSpec.parse(spec) # Find the tool first resolved = find_tool(spec) if not resolved: return False # Remove tool directory if resolved.path.exists(): shutil.rmtree(resolved.path) # Remove wrapper script(s) bin_dir = get_bin_dir() # Remove short name wrapper if it belongs to this tool short_wrapper = bin_dir / parsed.name if short_wrapper.exists(): resolver = ToolResolver(auto_fetch=False) wrapper_owner = resolver._get_wrapper_owner(short_wrapper) if wrapper_owner == resolved.owner or wrapper_owner is None: short_wrapper.unlink() # Remove owner-name wrapper if resolved.owner: long_wrapper = bin_dir / f"{resolved.owner}-{parsed.name}" if long_wrapper.exists(): long_wrapper.unlink() return True def list_installed_tools() -> list[ResolvedTool]: """ List all installed tools (global only). Returns: List of ResolvedTool objects """ tools = [] if not TOOLS_DIR.exists(): return tools # Check owner directories for item in TOOLS_DIR.iterdir(): if item.is_dir() and not item.name.startswith("."): # Skip non-owner directories if item.name in ("registry",): continue # Check if this is an owner directory (contains tool subdirectories) has_subtools = False for subitem in item.iterdir(): if subitem.is_dir(): config = subitem / "config.yaml" if config.exists(): has_subtools = True try: tool = Tool.from_dict(yaml.safe_load(config.read_text())) tools.append(ResolvedTool( tool=tool, source="global", path=subitem, owner=item.name )) except Exception: pass # If no subtools, this might be an old-style tool directory if not has_subtools: config = item / "config.yaml" if config.exists(): try: tool = Tool.from_dict(yaml.safe_load(config.read_text())) tools.append(ResolvedTool( tool=tool, source="global", path=item )) except Exception: pass return sorted(tools, key=lambda t: t.full_name)