CmdForge/src/cmdforge/resolver.py

695 lines
22 KiB
Python

"""Tool resolution with proper search order.
Implements the tool resolution order:
1. Local project: ./.cmdforge/<owner>/<name>/config.yaml
2. Global user: ~/.cmdforge/<owner>/<name>/config.yaml
3. Registry: Fetch from API, install to global, then run
4. Error if not found
"""
import logging
import re
import sys
from dataclasses import dataclass
from pathlib import Path
from typing import Optional, Tuple
import yaml
from .tool import Tool, TOOLS_DIR, get_bin_dir, BIN_DIR
from .config import is_auto_fetch_enabled, load_config
from .manifest import load_manifest
logger = logging.getLogger(__name__)
# Local project tools directories (support both legacy and documented paths)
LOCAL_TOOLS_DIRS = [Path(".cmdforge"), Path("cmdforge")]
@dataclass
class ToolSpec:
"""Parsed tool specification."""
owner: Optional[str] # None for unqualified names like "summarize"
name: str
version: Optional[str] = None # Version constraint
@property
def full_name(self) -> str:
"""Get full owner/name format."""
if self.owner:
return f"{self.owner}/{self.name}"
return self.name
@property
def is_qualified(self) -> bool:
"""Check if this is a fully qualified name (owner/name)."""
return self.owner is not None
@classmethod
def parse(cls, spec: str) -> "ToolSpec":
"""
Parse a tool specification string.
Formats:
- "summarize" -> owner=None, name="summarize"
- "rob/summarize" -> owner="rob", name="summarize"
- "summarize@1.0.0" -> name="summarize", version="1.0.0"
- "rob/summarize@^1.0.0" -> owner="rob", name="summarize", version="^1.0.0"
"""
version = None
# Extract version if present
if "@" in spec:
spec, version = spec.rsplit("@", 1)
# Extract owner if present
if "/" in spec:
owner, name = spec.split("/", 1)
else:
owner = None
name = spec
return cls(owner=owner, name=name, version=version)
@dataclass
class ResolvedTool:
"""Result of tool resolution."""
tool: Tool
source: str # "local", "global", "registry"
path: Path
owner: Optional[str] = None
version: Optional[str] = None
@property
def full_name(self) -> str:
if self.owner:
return f"{self.owner}/{self.tool.name}"
return self.tool.name
class ToolNotFoundError(Exception):
"""Raised when a tool cannot be found."""
def __init__(self, spec: ToolSpec, searched_paths: list):
self.spec = spec
self.searched_paths = searched_paths
super().__init__(f"Tool '{spec.full_name}' not found")
class ToolResolver:
"""Resolves tool specifications to actual tool configs."""
def __init__(
self,
project_dir: Optional[Path] = None,
auto_fetch: Optional[bool] = None,
verbose: bool = False
):
"""
Initialize the resolver.
Args:
project_dir: Project root directory (default: cwd)
auto_fetch: Override auto-fetch setting
verbose: Print debug info
"""
self.project_dir = project_dir or Path.cwd()
self.verbose = verbose
# Determine auto-fetch setting
if auto_fetch is not None:
self.auto_fetch = auto_fetch
else:
self.auto_fetch = is_auto_fetch_enabled()
# Load project manifest if present
self.manifest = load_manifest()
def resolve(self, spec: str | ToolSpec) -> ResolvedTool:
"""
Resolve a tool specification to an actual tool.
Args:
spec: Tool specification (string or ToolSpec)
Returns:
ResolvedTool with loaded tool and metadata
Raises:
ToolNotFoundError: If tool cannot be found
"""
if isinstance(spec, str):
spec = ToolSpec.parse(spec)
searched_paths = []
# Get version constraint from manifest if available
version = spec.version
if not version and self.manifest:
for dep in self.manifest.dependencies:
if dep.tool_name == spec.name:
version = dep.version
if not spec.owner and dep.owner:
spec = ToolSpec(owner=dep.owner, name=spec.name, version=version)
break
# 1. Check local project directory
result = self._find_in_local(spec, searched_paths)
if result:
return result
# 2. Check global user directory
result = self._find_in_global(spec, searched_paths)
if result:
return result
# 3. Try fetching from registry
if self.auto_fetch:
result = self._fetch_from_registry(spec, version)
if result:
return result
# Not found
raise ToolNotFoundError(spec, searched_paths)
def _find_in_local(
self,
spec: ToolSpec,
searched_paths: list
) -> Optional[ResolvedTool]:
"""Search for tool in local project directory."""
local_dirs = [self.project_dir / path for path in LOCAL_TOOLS_DIRS]
local_dirs = [path for path in local_dirs if path.exists()]
if not local_dirs:
return None
for local_dir in local_dirs:
# Try qualified path first if owner is specified
if spec.owner:
path = local_dir / spec.owner / spec.name / "config.yaml"
searched_paths.append(str(path))
if path.exists():
tool = self._load_tool_from_path(path)
if tool:
return ResolvedTool(
tool=tool,
source="local",
path=path.parent,
owner=spec.owner
)
# Try unqualified path
path = local_dir / spec.name / "config.yaml"
searched_paths.append(str(path))
if path.exists():
tool = self._load_tool_from_path(path)
if tool:
return ResolvedTool(
tool=tool,
source="local",
path=path.parent
)
# Search all owner directories for this tool name
# Priority: official first, then alphabetical for deterministic resolution
owner_dirs = [d for d in local_dir.iterdir() if d.is_dir() and not d.name.startswith(".")]
def owner_priority(d: Path) -> tuple:
if d.name == "official":
return (0, d.name)
return (1, d.name)
owner_dirs.sort(key=owner_priority)
for owner_dir in owner_dirs:
tool_dir = owner_dir / spec.name
config_path = tool_dir / "config.yaml"
if config_path.exists():
tool = self._load_tool_from_path(config_path)
if tool:
return ResolvedTool(
tool=tool,
source="local",
path=tool_dir,
owner=owner_dir.name
)
return None
def _find_in_global(
self,
spec: ToolSpec,
searched_paths: list
) -> Optional[ResolvedTool]:
"""Search for tool in global user directory."""
global_dir = TOOLS_DIR
if not global_dir.exists():
return None
# Try qualified path first if owner is specified
if spec.owner:
path = global_dir / spec.owner / spec.name / "config.yaml"
searched_paths.append(str(path))
if path.exists():
tool = self._load_tool_from_path(path)
if tool:
return ResolvedTool(
tool=tool,
source="global",
path=path.parent,
owner=spec.owner
)
# Try unqualified path (old-style tools without owner)
path = global_dir / spec.name / "config.yaml"
searched_paths.append(str(path))
if path.exists():
tool = self._load_tool_from_path(path)
if tool:
return ResolvedTool(
tool=tool,
source="global",
path=path.parent
)
# Search all owner directories for this tool name
# Priority: official first, then alphabetical for deterministic resolution
owner_dirs = [
d for d in global_dir.iterdir()
if d.is_dir() and not d.name.startswith(".") and d.name not in ("registry",)
]
# Sort with official first, then alphabetical
def owner_priority(d: Path) -> tuple:
if d.name == "official":
return (0, d.name)
return (1, d.name)
owner_dirs.sort(key=owner_priority)
for owner_dir in owner_dirs:
tool_dir = owner_dir / spec.name
config_path = tool_dir / "config.yaml"
if config_path.exists():
tool = self._load_tool_from_path(config_path)
if tool:
return ResolvedTool(
tool=tool,
source="global",
path=tool_dir,
owner=owner_dir.name
)
return None
def _fetch_from_registry(
self,
spec: ToolSpec,
version: Optional[str] = None
) -> Optional[ResolvedTool]:
"""Fetch and install tool from registry."""
try:
# Import here to avoid circular imports
from .registry_client import get_client, RegistryError
if self.verbose:
print(f"Fetching '{spec.full_name}' from registry...", file=sys.stderr)
client = get_client()
# Determine owner for registry lookup
owner = spec.owner or "official"
try:
result = client.download_tool(
owner=owner,
name=spec.name,
version=version,
install=True
)
except RegistryError as e:
if e.code == "TOOL_NOT_FOUND" and not spec.owner:
# Try searching for most popular tool with this name
results = client.search_tools(spec.name, per_page=1)
if results.data:
first = results.data[0]
result = client.download_tool(
owner=first["owner"],
name=first["name"],
version=version,
install=True
)
else:
return None
else:
raise
# Install the tool locally
resolved = self._install_from_registry(
owner=result.owner,
name=result.name,
version=result.resolved_version,
config_yaml=result.config_yaml,
readme=result.readme
)
if self.verbose:
print(
f"Installed: {result.owner}/{result.name}@{result.resolved_version}",
file=sys.stderr
)
return resolved
except ImportError:
# Registry client not available
logger.debug("Registry client not available")
return None
except Exception as e:
logger.warning(f"Registry fetch failed: {e}")
if self.verbose:
print(f"Registry fetch failed: {e}", file=sys.stderr)
return None
def _install_from_registry(
self,
owner: str,
name: str,
version: str,
config_yaml: str,
readme: str = ""
) -> ResolvedTool:
"""Install a tool fetched from registry to global directory."""
# Create directory structure
tool_dir = TOOLS_DIR / owner / name
tool_dir.mkdir(parents=True, exist_ok=True)
# Write config
config_path = tool_dir / "config.yaml"
config_path.write_text(config_yaml)
# Write README if present
if readme:
readme_path = tool_dir / "README.md"
readme_path.write_text(readme)
# Load the tool
tool = self._load_tool_from_path(config_path)
# Create wrapper script (handling collisions)
self._create_wrapper_script(owner, name)
return ResolvedTool(
tool=tool,
source="registry",
path=tool_dir,
owner=owner,
version=version
)
def _create_wrapper_script(self, owner: str, name: str) -> Path:
"""Create wrapper script with collision handling."""
import stat
bin_dir = get_bin_dir()
# Check if short name wrapper exists
short_wrapper = bin_dir / name
if short_wrapper.exists():
# Check if it belongs to the same owner
existing_owner = self._get_wrapper_owner(short_wrapper)
if existing_owner and existing_owner != owner:
# Collision - use owner-name format
wrapper_name = f"{owner}-{name}"
else:
wrapper_name = name
else:
wrapper_name = name
wrapper_path = bin_dir / wrapper_name
# Generate wrapper script
import sys
python_path = sys.executable
script = f"""#!/bin/bash
# CmdForge wrapper for '{owner}/{name}'
# Auto-generated - do not edit
exec {python_path} -m cmdforge.runner {owner}/{name} "$@"
"""
wrapper_path.write_text(script)
wrapper_path.chmod(wrapper_path.stat().st_mode | stat.S_IXUSR | stat.S_IXGRP | stat.S_IXOTH)
return wrapper_path
def _get_wrapper_owner(self, wrapper_path: Path) -> Optional[str]:
"""Extract owner from existing wrapper script."""
try:
content = wrapper_path.read_text()
# Look for pattern: cmdforge.runner owner/name
# Owner slugs can contain lowercase alphanumeric and hyphens
match = re.search(r'cmdforge\.runner\s+([a-z0-9][a-z0-9-]*)/([a-zA-Z0-9_-]+)', content)
if match:
return match.group(1)
return None
except Exception:
return None
def _load_tool_from_path(self, config_path: Path) -> Optional[Tool]:
"""Load a tool from a specific config path."""
try:
data = yaml.safe_load(config_path.read_text())
# Handle legacy format
if "prompt" in data and "steps" not in data:
data = self._convert_legacy_format(data)
return Tool.from_dict(data)
except yaml.YAMLError as e:
logger.warning(f"YAML error in {config_path}: {e}")
if self.verbose:
print(f"Error loading tool from '{config_path}': YAML syntax error", file=sys.stderr)
if hasattr(e, 'problem_mark') and e.problem_mark:
mark = e.problem_mark
print(f" Line {mark.line + 1}, column {mark.column + 1}", file=sys.stderr)
try:
lines = config_path.read_text().split('\n')
if mark.line < len(lines):
print(file=sys.stderr)
if mark.line > 0:
print(f" {mark.line}: {lines[mark.line - 1]}", file=sys.stderr)
print(f" > {mark.line + 1}: {lines[mark.line]}", file=sys.stderr)
print(f" {' ' * (mark.column + 4)}^", file=sys.stderr)
except Exception:
pass
if hasattr(e, 'problem') and e.problem:
print(f"\n Problem: {e.problem}", file=sys.stderr)
return None
except Exception as e:
logger.warning(f"Error loading tool from {config_path}: {e}")
if self.verbose:
print(f"Error loading tool from {config_path}: {e}", file=sys.stderr)
return None
def _convert_legacy_format(self, data: dict) -> dict:
"""Convert legacy tool format to new format."""
steps = []
if data.get("prompt"):
steps.append({
"type": "prompt",
"prompt": data["prompt"],
"provider": data.get("provider", "mock"),
"output_var": "response"
})
arguments = []
for inp in data.get("inputs", []):
arguments.append({
"flag": inp.get("flag", f"--{inp['name']}"),
"variable": inp["name"],
"default": inp.get("default"),
"description": inp.get("description", "")
})
return {
"name": data["name"],
"description": data.get("description", ""),
"arguments": arguments,
"steps": steps,
"output": "{response}" if steps else "{input}"
}
# -------------------------------------------------------------------------
# Convenience functions
# -------------------------------------------------------------------------
def resolve_tool(spec: str, auto_fetch: Optional[bool] = None) -> ResolvedTool:
"""
Resolve a tool specification to an actual tool.
Args:
spec: Tool specification (e.g., "summarize", "rob/summarize@1.0.0")
auto_fetch: Override auto-fetch setting
Returns:
ResolvedTool with loaded tool and metadata
"""
resolver = ToolResolver(auto_fetch=auto_fetch)
return resolver.resolve(spec)
def find_tool(name: str) -> Optional[ResolvedTool]:
"""
Find a tool by name without auto-fetching.
Args:
name: Tool name or owner/name
Returns:
ResolvedTool if found, None otherwise
"""
try:
resolver = ToolResolver(auto_fetch=False)
return resolver.resolve(name)
except ToolNotFoundError:
return None
def install_from_registry(spec: str, version: Optional[str] = None) -> ResolvedTool:
"""
Install a tool from the registry.
Args:
spec: Tool specification
version: Version constraint
Returns:
ResolvedTool for installed tool
"""
from .registry_client import get_client
parsed = ToolSpec.parse(spec)
if version:
parsed.version = version
client = get_client()
owner = parsed.owner or "official"
result = client.download_tool(
owner=owner,
name=parsed.name,
version=parsed.version,
install=True
)
resolver = ToolResolver(auto_fetch=False)
return resolver._install_from_registry(
owner=result.owner,
name=result.name,
version=result.resolved_version,
config_yaml=result.config_yaml,
readme=result.readme
)
def uninstall_tool(spec: str) -> bool:
"""
Uninstall a tool.
Args:
spec: Tool specification
Returns:
True if tool was uninstalled
"""
import shutil
parsed = ToolSpec.parse(spec)
# Find the tool first
resolved = find_tool(spec)
if not resolved:
return False
# Remove tool directory
if resolved.path.exists():
shutil.rmtree(resolved.path)
# Remove wrapper script(s)
bin_dir = get_bin_dir()
# Remove short name wrapper if it belongs to this tool
short_wrapper = bin_dir / parsed.name
if short_wrapper.exists():
resolver = ToolResolver(auto_fetch=False)
wrapper_owner = resolver._get_wrapper_owner(short_wrapper)
if wrapper_owner == resolved.owner or wrapper_owner is None:
short_wrapper.unlink()
# Remove owner-name wrapper
if resolved.owner:
long_wrapper = bin_dir / f"{resolved.owner}-{parsed.name}"
if long_wrapper.exists():
long_wrapper.unlink()
return True
def list_installed_tools() -> list[ResolvedTool]:
"""
List all installed tools (global only).
Returns:
List of ResolvedTool objects
"""
tools = []
if not TOOLS_DIR.exists():
return tools
# Check owner directories
for item in TOOLS_DIR.iterdir():
if item.is_dir() and not item.name.startswith("."):
# Skip non-owner directories
if item.name in ("registry",):
continue
# Check if this is an owner directory (contains tool subdirectories)
has_subtools = False
for subitem in item.iterdir():
if subitem.is_dir():
config = subitem / "config.yaml"
if config.exists():
has_subtools = True
try:
tool = Tool.from_dict(yaml.safe_load(config.read_text()))
tools.append(ResolvedTool(
tool=tool,
source="global",
path=subitem,
owner=item.name
))
except Exception:
pass
# If no subtools, this might be an old-style tool directory
if not has_subtools:
config = item / "config.yaml"
if config.exists():
try:
tool = Tool.from_dict(yaml.safe_load(config.read_text()))
tools.append(ResolvedTool(
tool=tool,
source="global",
path=item
))
except Exception:
pass
return sorted(tools, key=lambda t: t.full_name)