diff --git a/CHANGELOG.md b/CHANGELOG.md index 707a26e..ceeb0ca 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -6,6 +6,49 @@ All notable changes to CmdForge will be documented in this file. ### Added +#### Transitive Dependency Resolution +- **Full transitive dependency resolution**: When installing dependencies, CmdForge now resolves and installs the complete dependency tree + - `DependencyGraph` and `DependencyNode` dataclasses for structured dependency tracking + - `DependencyGraphBuilder` class for recursive resolution with cycle detection + - Topological sorting ensures dependencies are installed before dependents (Kahn's algorithm) + - Version conflict detection with provenance tracking (which package requested each version) + - Registry config caching during resolution to avoid duplicate fetches + +- **Semver version constraint matching**: Full semantic versioning support + - Caret constraints: `^1.2.3` (>=1.2.3 <2.0.0), `^0.2.3` (>=0.2.3 <0.3.0), `^0.0.3` (exact match) + - Tilde constraints: `~1.2.3` (>=1.2.3 <1.3.0) + - Comparison operators: `>=`, `<=`, `>`, `<`, `=` + - Wildcards: `*`, `latest` + - Prerelease version handling per semver spec + +- **New CLI commands**: + - `cmdforge deps tree` - Show full dependency tree with transitive dependencies + - `cmdforge deps tree --verbose` - Show detailed resolution info + +- **Enhanced install command**: + - `--dry-run` - Show what would be installed without installing + - `--force` - Install despite version conflicts (first constraint wins) + - `--verbose` - Show detailed resolution info + +#### Lock File Support +- **Lock file for reproducible installs**: `cmdforge.lock` records exact versions of all dependencies + - `Lockfile`, `LockedPackage`, `LockfileMetadata` dataclasses + - Records direct and transitive dependencies with provenance (`required_by`) + - Integrity verification via SHA256 hashes (uses registry `config_hash`) + - Normalized YAML hashing via `hash_utils.compute_yaml_hash()` for consistent integrity checks + - Platform and Python version metadata for debugging + +- **New CLI commands**: + - `cmdforge lock` - Generate or update `cmdforge.lock` + - `cmdforge lock --force` - Force regenerate even if up to date + - `cmdforge verify` - Verify installed tools match lock file + +- **Lock-aware install**: + - `cmdforge install` - Uses lock file when present for reproducible installs + - `--frozen` - Fail if lock file is missing (for CI) + - `--strict-frozen` - Fail if lock file is stale (manifest changed) + - `--ignore-lock` - Ignore lock file and resolve fresh from manifest + #### Local Collections Support - **Local collection definitions**: Create and manage collections locally before publishing - Collections stored as YAML files in `~/.cmdforge/collections/` diff --git a/README.md b/README.md index dbd0f9f..2c231eb 100644 --- a/README.md +++ b/README.md @@ -441,13 +441,53 @@ cmdforge init cmdforge add official/summarize cmdforge add myuser/tool@^2.0.0 # With version constraint -# Install all dependencies +# Install all dependencies (including transitive) cmdforge install # Check dependency status cmdforge deps + +# Show full dependency tree +cmdforge deps tree ``` +### Reproducible Installs with Lock Files + +Lock files ensure identical tool versions across machines and over time: + +```bash +# Generate lock file after installing +cmdforge lock + +# Install exact versions from lock (reproducible) +cmdforge install # Uses lock file if present + +# CI/CD: Fail if lock is missing or stale +cmdforge install --frozen # Requires lock file +cmdforge install --strict-frozen # Requires lock + manifest match + +# Verify installed tools match lock +cmdforge verify + +# Preview what would be installed +cmdforge install --dry-run + +# Ignore lock and resolve fresh +cmdforge install --ignore-lock +``` + +### Version Constraints + +CmdForge supports semantic versioning constraints: + +| Constraint | Meaning | Example | +|------------|---------|---------| +| `*` | Any version | `official/summarize` | +| `^1.2.3` | Compatible (>=1.2.3 <2.0.0) | `tool@^1.2.3` | +| `~1.2.3` | Approximately (>=1.2.3 <1.3.0) | `tool@~1.2.3` | +| `>=1.0.0` | Greater or equal | `tool@>=1.0.0` | +| `1.0.0` | Exact version | `tool@1.0.0` | + This ensures all required tools are available before running your project's scripts. ## Collections diff --git a/src/cmdforge/cli/__init__.py b/src/cmdforge/cli/__init__.py index 0f1af93..9779f00 100644 --- a/src/cmdforge/cli/__init__.py +++ b/src/cmdforge/cli/__init__.py @@ -12,7 +12,7 @@ from .tool_commands import ( from .provider_commands import cmd_providers from .registry_commands import cmd_registry from .collections_commands import cmd_collections -from .project_commands import cmd_deps, cmd_install_deps, cmd_add, cmd_init +from .project_commands import cmd_deps, cmd_deps_tree, cmd_install_deps, cmd_add, cmd_init, cmd_lock, cmd_verify from .config_commands import cmd_config @@ -284,14 +284,38 @@ def main(): # Project Commands # ------------------------------------------------------------------------- - # 'deps' command + # 'deps' command with subcommands p_deps = subparsers.add_parser("deps", help="Show project dependencies") + deps_sub = p_deps.add_subparsers(dest="deps_cmd", help="Deps commands") + + # deps tree + p_deps_tree = deps_sub.add_parser("tree", help="Show dependency tree") + p_deps_tree.add_argument("--verbose", "-v", action="store_true", help="Show detailed info") + p_deps_tree.set_defaults(func=cmd_deps_tree) + + # Default for deps with no subcommand shows basic deps list p_deps.set_defaults(func=cmd_deps) # 'install' command (for dependencies) p_install = subparsers.add_parser("install", help="Install dependencies from cmdforge.yaml") + p_install.add_argument("--dry-run", action="store_true", help="Show what would be installed without installing") + p_install.add_argument("--force", action="store_true", help="Install despite version conflicts or stale lock") + p_install.add_argument("--verbose", "-v", action="store_true", help="Show detailed resolution info") + p_install.add_argument("--ignore-lock", action="store_true", help="Ignore lock file and resolve fresh from manifest") + p_install.add_argument("--frozen", action="store_true", help="Fail if lock file is missing (for CI)") + p_install.add_argument("--strict-frozen", action="store_true", help="Fail if lock is missing or stale (stricter CI mode)") p_install.set_defaults(func=cmd_install_deps) + # 'lock' command + p_lock = subparsers.add_parser("lock", help="Generate or update cmdforge.lock") + p_lock.add_argument("--force", "-f", action="store_true", help="Force regenerate even if up to date") + p_lock.add_argument("--verbose", "-v", action="store_true", help="Show detailed output") + p_lock.set_defaults(func=cmd_lock) + + # 'verify' command + p_verify = subparsers.add_parser("verify", help="Verify installed tools match lock file") + p_verify.set_defaults(func=cmd_verify) + # 'add' command p_add = subparsers.add_parser("add", help="Add a tool to project dependencies") p_add.add_argument("tool", help="Tool to add (owner/name)") diff --git a/src/cmdforge/cli/project_commands.py b/src/cmdforge/cli/project_commands.py index d675ddb..c5c469b 100644 --- a/src/cmdforge/cli/project_commands.py +++ b/src/cmdforge/cli/project_commands.py @@ -14,6 +14,10 @@ from ..resolver import ( def cmd_deps(args): """Show project dependencies from cmdforge.yaml.""" + # If 'tree' subcommand, delegate to that + if hasattr(args, 'deps_cmd') and args.deps_cmd == 'tree': + return cmd_deps_tree(args) + manifest = load_manifest() if manifest is None: @@ -51,9 +55,99 @@ def cmd_deps(args): return 0 +def cmd_deps_tree(args): + """Show dependency tree with transitive dependencies.""" + from ..dependency_graph import DependencyGraphBuilder + from ..resolver import ToolResolver + + manifest = load_manifest() + if manifest is None: + print("No cmdforge.yaml found in current project.") + print("Create one with: cmdforge init") + return 1 + + if not manifest.dependencies: + print("No dependencies in cmdforge.yaml") + return 0 + + verbose = getattr(args, 'verbose', False) + + resolver = ToolResolver(auto_fetch=False) + builder = DependencyGraphBuilder(resolver=resolver, verbose=verbose) + + print("Resolving dependencies...") + graph = builder.build(manifest.dependencies) + + def print_tree(qualified_name: str, indent: int = 0, seen: set = None): + seen = seen or set() + prefix = " " * indent + node = graph.get_node(qualified_name) + + if not node: + print(f"{prefix}- {qualified_name} (not found)") + return + + # Status indicator + if node.source in ("local", "global"): + status = "installed" + elif node.source == "registry" and node.path: + status = "installed" + elif node.source == "registry": + status = "registry" + else: + status = "missing" + + # Show resolved version if available, otherwise fall back to constraint + if node.resolved_version: + version = f"@{node.resolved_version}" + elif node.version_constraint: + version = f"@{node.version_constraint}" + else: + version = "" + circular = " (circular)" if qualified_name in seen else "" + + print(f"{prefix}- {node.qualified_name}{version} [{status}]{circular}") + + if qualified_name not in seen: + seen.add(qualified_name) + for child in node.children: + print_tree(child, indent + 1, seen.copy()) + + print() + print("Dependency tree:") + for root in graph.root_dependencies: + print_tree(root) + + if graph.conflicts: + print() + print("Version conflicts:") + for tool, v1, from1, v2, from2 in graph.conflicts: + print(f" {tool}: {v1} (from {from1}) vs {v2} (from {from2})") + + if graph.cycles: + print() + print("Circular dependencies:") + for cycle in graph.cycles: + print(f" {' -> '.join(cycle)}") + + # Summary + print() + total = len(graph.nodes) + installed = sum(1 for n in graph.nodes.values() if n.source in ("local", "global") or (n.source == "registry" and n.path)) + to_install = sum(1 for n in graph.nodes.values() if n.source == "registry" and n.path is None) + missing = sum(1 for n in graph.nodes.values() if not n.is_resolved) + + print(f"Total: {total} tools ({installed} installed, {to_install} to install, {missing} missing)") + + return 0 + + def cmd_install_deps(args): - """Install dependencies from cmdforge.yaml.""" + """Install all dependencies from cmdforge.yaml with transitive resolution.""" + from ..dependency_graph import DependencyGraphBuilder + from ..resolver import ToolResolver, install_from_registry, ToolNotFoundError from ..registry_client import RegistryError + from ..lockfile import Lockfile manifest = load_manifest() @@ -66,46 +160,195 @@ def cmd_install_deps(args): print("No dependencies to install.") return 0 - print(f"Installing dependencies for {manifest.name}...") + verbose = getattr(args, 'verbose', False) + dry_run = getattr(args, 'dry_run', False) + force = getattr(args, 'force', False) + frozen = getattr(args, 'frozen', False) + strict_frozen = getattr(args, 'strict_frozen', False) + ignore_lock = getattr(args, 'ignore_lock', False) + + # Check for lock file + lock = Lockfile.load() + + if frozen or strict_frozen: + # Frozen mode: require lock file + if lock is None: + print("Error: --frozen requires cmdforge.lock") + print("Run 'cmdforge lock' first.") + return 1 + if strict_frozen and lock.is_stale(): + print("Error: cmdforge.lock is stale (manifest changed)") + print("Run 'cmdforge lock' to update, or drop --strict-frozen") + return 1 + + if lock and not ignore_lock: + return _install_from_lock(lock, args) + + # Fall through to manifest-based installation + # Build dependency graph + resolver = ToolResolver(auto_fetch=False) + builder = DependencyGraphBuilder(resolver=resolver, verbose=verbose) + + print("Resolving dependencies...") + graph = builder.build(manifest.dependencies) + + # Check for issues + if graph.has_cycles(): + print() + print("Error: Circular dependencies detected:") + for cycle in graph.cycles: + print(f" {' -> '.join(cycle)}") + return 1 + + if graph.has_conflicts(): + print() + print("Warning: Version conflicts detected:") + for tool, v1, from1, v2, from2 in graph.conflicts: + print(f" {tool}: {v1} (from {from1}) vs {v2} (from {from2})") + if not force: + print() + print("Use --force to install anyway (first constraint wins)") + return 1 + unresolved = graph.get_unresolved() + if unresolved: + print() + print("Error: Missing dependencies:") + for node in unresolved: + constraint = f"@{node.version_constraint}" if node.version_constraint else "" + print(f" {node.qualified_name}{constraint}") + return 1 + + # Determine what needs installation (in correct order) + to_install = [ + node for node in graph.get_install_order() + if node.source == "registry" and node.path is None + ] + + if not to_install: + print("All dependencies already installed.") + return 0 + + if dry_run: + print() + print(f"Would install {len(to_install)} tool(s):") + for node in to_install: + version = node.resolved_version or "latest" + print(f" {node.qualified_name}@{version}") + return 0 + print() + print(f"Installing {len(to_install)} tool(s) in dependency order:") + for node in to_install: + version = node.resolved_version or "latest" + print(f" {node.qualified_name}@{version}") + # Install in topological order (dependencies first) + installed = 0 failed = [] - installed = [] - for i, dep in enumerate(manifest.dependencies, 1): - print(f"[{i}/{len(manifest.dependencies)}] {dep.name}@{dep.version}") + print() + for node in to_install: + try: + print(f"Installing {node.qualified_name}...", end=" ", flush=True) + spec = node.qualified_name + install_version = node.resolved_version or node.version_constraint + install_from_registry(spec, version=install_version) + print(f"v{node.resolved_version or 'latest'}") + installed += 1 + except RegistryError as e: + print(f"FAILED: {e.message}") + failed.append((node.qualified_name, e.message)) + except Exception as e: + print(f"FAILED: {e}") + failed.append((node.qualified_name, str(e))) - # Check if already installed - existing = find_tool(dep.name) - if existing: - print(f" Already installed: {existing.full_name}") - installed.append(dep.name) + # Summary + print() + print(f"Installed: {installed}/{len(to_install)}") + if failed: + print("Failed:") + for name, err in failed: + print(f" {name}: {err}") + return 1 + + return 0 + + +def _install_from_lock(lock: "Lockfile", args) -> int: + """Install exact versions from lock file.""" + from ..resolver import ToolResolver, ToolNotFoundError, install_from_registry + from ..registry_client import RegistryError + + verbose = getattr(args, 'verbose', False) + dry_run = getattr(args, 'dry_run', False) + force = getattr(args, 'force', False) + + print("Installing from cmdforge.lock (reproducible)...") + + if lock.is_stale() and not force: + print("Warning: Lock file may be outdated (manifest changed)") + print("Run 'cmdforge lock' to update, or use --force to continue") + return 1 + + # Check what needs installing + resolver = ToolResolver(auto_fetch=False) + resolver.manifest = None + to_install = [] + + for name, locked in lock.packages.items(): + try: + resolved = resolver.resolve(name) + # Check if version matches + resolved_version = resolved.version or "" + locked_version = locked.version or "" + if resolved_version != locked_version: + to_install.append(locked) + except ToolNotFoundError: + to_install.append(locked) + except Exception: + to_install.append(locked) + + if not to_install: + print("All locked packages already installed.") + return 0 + + print() + print(f"Installing {len(to_install)} package(s):") + for pkg in to_install: + print(f" {pkg.name}@{pkg.version}") + + if dry_run: + return 0 + + # Install exact versions + installed = 0 + failed = [] + + print() + for pkg in to_install: + if pkg.source == "local": + print(f"{pkg.name}: skipping (local)") continue try: - print(f" Downloading...") - resolved = install_from_registry(dep.name, dep.version) - print(f" Installed: {resolved.full_name}@{resolved.version}") - installed.append(dep.name) + print(f"Installing {pkg.name}@{pkg.version}...", end=" ", flush=True) + # Use correct API: install_from_registry(spec, version) + install_from_registry(pkg.name, version=pkg.version) + print("OK") + installed += 1 except RegistryError as e: - if e.code == "TOOL_NOT_FOUND": - print(f" Not found in registry") - elif e.code == "VERSION_NOT_FOUND" or e.code == "CONSTRAINT_UNSATISFIABLE": - print(f" Version {dep.version} not available") - elif e.code == "CONNECTION_ERROR": - print(f" Connection failed (check network)") - else: - print(f" Failed: {e.message}") - failed.append(dep.name) + print(f"FAILED: {e.message}") + failed.append((pkg.name, e.message)) except Exception as e: - print(f" Failed: {e}") - failed.append(dep.name) + print(f"FAILED: {e}") + failed.append((pkg.name, str(e))) print() - print(f"Installed {len(installed)} tools") - + print(f"Installed: {installed}/{len(to_install)}") if failed: - print(f"Failed: {', '.join(failed)}") + print("Failed:") + for name, err in failed: + print(f" {name}: {err}") return 1 return 0 @@ -200,3 +443,105 @@ def cmd_init(args): print("Install them with: cmdforge install") return 0 + + +def cmd_lock(args): + """Generate or update cmdforge.lock file.""" + from ..lockfile import Lockfile, generate_lockfile + from ..dependency_graph import DependencyGraphBuilder + from ..resolver import ToolResolver + from ..registry_client import get_client + + manifest = load_manifest() + if manifest is None: + print("No cmdforge.yaml found. Run 'cmdforge init' first.") + return 1 + + verbose = getattr(args, 'verbose', False) + force = getattr(args, 'force', False) + + # Check for existing lock + existing_lock = Lockfile.load() + if existing_lock and not force: + if not existing_lock.is_stale(): + print("Lock file is up to date. Use --force to regenerate.") + return 0 + print("Lock file is stale (manifest changed), regenerating...") + + # Resolve all dependencies + print("Resolving dependencies...") + resolver = ToolResolver(auto_fetch=True) # Allow fetching for resolution + builder = DependencyGraphBuilder(resolver=resolver, verbose=verbose) + + try: + graph = builder.build(manifest.dependencies) + except Exception as e: + print(f"Error resolving dependencies: {e}") + return 1 + + if graph.has_cycles(): + print("Error: Circular dependencies detected") + for cycle in graph.cycles: + print(f" {' -> '.join(cycle)}") + return 1 + + unresolved = graph.get_unresolved() + if unresolved and not force: + print("Error: Some dependencies could not be resolved:") + for node in unresolved: + constraint = f"@{node.version_constraint}" if node.version_constraint else "" + print(f" {node.qualified_name}{constraint}") + print("Fix missing deps or re-run with --force to lock what can be resolved.") + return 1 + if unresolved and force: + print("Warning: Some dependencies could not be resolved and will be skipped:") + for node in unresolved: + constraint = f"@{node.version_constraint}" if node.version_constraint else "" + print(f" {node.qualified_name}{constraint}") + + # Generate lock file + print("Generating lock file...") + client = get_client() + lock = generate_lockfile(manifest, graph, client) + lock.save() + + # Summary + direct = sum(1 for p in lock.packages.values() if p.direct) + transitive = len(lock.packages) - direct + print() + print(f"Locked {len(lock.packages)} package(s):") + print(f" Direct: {direct}") + print(f" Transitive: {transitive}") + print() + print("Wrote cmdforge.lock") + + return 0 + + +def cmd_verify(args): + """Verify installed tools match lock file.""" + from ..lockfile import Lockfile, verify_lockfile + from ..registry_client import get_client + + lock = Lockfile.load() + if lock is None: + print("No cmdforge.lock found. Run 'cmdforge lock' first.") + return 1 + + if lock.is_stale(): + print("Warning: Lock file may be outdated (manifest changed)") + print() + + print("Verifying lock file...") + client = get_client() + errors = verify_lockfile(lock, client) + + if not errors: + print(f"All {len(lock.packages)} packages verified OK") + return 0 + + print() + print(f"{len(errors)} verification error(s):") + for error in errors: + print(f" {error}") + return 1 diff --git a/src/cmdforge/dependency_graph.py b/src/cmdforge/dependency_graph.py new file mode 100644 index 0000000..b00387f --- /dev/null +++ b/src/cmdforge/dependency_graph.py @@ -0,0 +1,547 @@ +"""Dependency graph for transitive dependency resolution. + +Builds a complete dependency graph with: +- Transitive closure (A -> B -> C) +- Cycle detection +- Version conflict detection with provenance +- Topological sorting for install order +""" + +from dataclasses import dataclass, field +from pathlib import Path +from typing import Dict, List, Set, Optional, Tuple + +import yaml + +from .tool import TOOLS_DIR +from .manifest import Dependency +from .version import matches_constraint + + +# Default owner for unqualified tool references +DEFAULT_OWNER = "official" + + +@dataclass +class DependencyNode: + """A node in the dependency graph.""" + owner: str # e.g., "official" (resolved), "" if unresolved + name: str # e.g., "summarize" + version_constraint: Optional[str] # e.g., "^1.0.0", "*", None + resolved_version: Optional[str] # e.g., "1.2.3" (after resolution) + source: Optional[str] # "local", "global", "registry", None (unresolved) + path: Optional[Path] # Path to installed tool (if installed) + children: List[str] = field(default_factory=list) # Qualified names of dependencies + requested_by: str = "root" # Parent that introduced this dep + + @property + def qualified_name(self) -> str: + """Consistent key for graph lookups.""" + return f"{self.owner}/{self.name}" if self.owner else self.name + + @property + def is_resolved(self) -> bool: + """Check if this node has been resolved to a source.""" + return self.source is not None + + +@dataclass +class DependencyGraph: + """Complete dependency graph with transitive closure.""" + root_dependencies: List[str] = field(default_factory=list) # Direct deps (qualified names) + nodes: Dict[str, DependencyNode] = field(default_factory=dict) # qualified_name -> node + resolution_order: List[str] = field(default_factory=list) # Topological sort (install order) + conflicts: List[Tuple[str, str, str, str, str]] = field(default_factory=list) # (tool, c1, from1, c2, from2) + cycles: List[List[str]] = field(default_factory=list) # Detected cycles + + def add_node(self, node: DependencyNode, requested_by: str = "root") -> None: + """Add or update a node. Detects version conflicts with provenance.""" + key = node.qualified_name + node.requested_by = requested_by + + if key in self.nodes: + existing = self.nodes[key] + # Check for version conflict + if (existing.version_constraint and node.version_constraint and + existing.version_constraint != node.version_constraint): + self.conflicts.append(( + key, + existing.version_constraint, + existing.requested_by, + node.version_constraint, + requested_by + )) + self.nodes[key] = node + + def get_node(self, qualified_name: str) -> Optional[DependencyNode]: + """Get a node by qualified name.""" + return self.nodes.get(qualified_name) + + def get_install_order(self) -> List[DependencyNode]: + """Return nodes in installation order (dependencies first).""" + return [self.nodes[name] for name in self.resolution_order if name in self.nodes] + + def has_conflicts(self) -> bool: + """Check if any version conflicts were detected.""" + return len(self.conflicts) > 0 + + def has_cycles(self) -> bool: + """Check if any circular dependencies were detected.""" + return len(self.cycles) > 0 + + def get_unresolved(self) -> List[DependencyNode]: + """Get all unresolved nodes.""" + return [node for node in self.nodes.values() if not node.is_resolved] + + def get_registry_nodes(self) -> List[DependencyNode]: + """Get nodes that need to be installed from registry.""" + return [ + node for node in self.nodes.values() + if node.source == "registry" and node.path is None + ] + + +class DependencyGraphBuilder: + """Builds a complete dependency graph with transitive resolution.""" + + def __init__( + self, + resolver: Optional["ToolResolver"] = None, + client: Optional["RegistryClient"] = None, + max_depth: int = 50, + verbose: bool = False + ): + """ + Initialize the graph builder. + + Args: + resolver: ToolResolver for local/global lookups (created if not provided) + client: RegistryClient for registry lookups (created if not provided) + max_depth: Maximum dependency depth (prevents infinite recursion) + verbose: Print debug info + """ + self.resolver = resolver + self.client = client + self.max_depth = max_depth + self.verbose = verbose + self._visited: Set[str] = set() # qualified names + self._stack: List[str] = [] # For cycle detection + self._registry_cache: Dict[str, "DownloadResult"] = {} # cache_key -> DownloadResult + self._resolved_count = 0 + self.include_implicit = True + + def _get_resolver(self) -> "ToolResolver": + """Get or create the tool resolver.""" + if self.resolver is None: + from .resolver import ToolResolver + self.resolver = ToolResolver(auto_fetch=False) + return self.resolver + + def _get_client(self) -> "RegistryClient": + """Get or create the registry client.""" + if self.client is None: + from .registry_client import get_client + self.client = get_client() + return self.client + + def build( + self, + dependencies: List[Dependency], + include_implicit: bool = True + ) -> DependencyGraph: + """ + Build complete dependency graph. + + Args: + dependencies: Direct dependencies to resolve + include_implicit: Whether to include ToolStep dependencies + + Returns: + DependencyGraph with all transitive dependencies + """ + graph = DependencyGraph() + self.include_implicit = include_implicit + + # Reset state for new build + self._visited.clear() + self._stack.clear() + self._resolved_count = 0 + + # Track original names to resolve qualified names after + original_names = [] + for dep in dependencies: + original_names.append(dep.name) + self._resolve_recursive(dep.name, dep.version, graph, depth=0) + + # Update root_dependencies with qualified names from resolved nodes + for orig_name in original_names: + # Find the resolved node - check both qualified and unqualified + node = graph.get_node(orig_name) + if node: + graph.root_dependencies.append(node.qualified_name) + else: + # Try to find by matching tool name across all nodes + found = False + for qname, n in graph.nodes.items(): + if n.name == orig_name or qname == orig_name: + graph.root_dependencies.append(n.qualified_name) + found = True + break + if not found: + # Unresolved - keep original name for error reporting + graph.root_dependencies.append(orig_name) + + graph.resolution_order = self._topological_sort(graph) + return graph + + def _qualify_name(self, name: str, resolved_owner: Optional[str] = None) -> str: + """Return qualified name after resolution (local-first).""" + if "/" in name: + return name + owner = resolved_owner or DEFAULT_OWNER + return f"{owner}/{name}" + + def _parse_qualified(self, qualified: str) -> Tuple[Optional[str], str]: + """Split qualified name into owner and name (owner may be None).""" + if "/" in qualified: + parts = qualified.split("/", 1) + return (parts[0], parts[1]) + return (None, qualified) + + def _resolve_recursive( + self, + qualified_name: str, + version_constraint: Optional[str], + graph: DependencyGraph, + depth: int + ) -> Optional[DependencyNode]: + """Recursively resolve a dependency and its children.""" + + # Cycle detection + if qualified_name in self._stack: + cycle = self._stack[self._stack.index(qualified_name):] + [qualified_name] + graph.cycles.append(cycle) + if self.verbose: + print(f" Cycle detected: {' -> '.join(cycle)}") + return None + + # Already resolved - check for version conflicts + if qualified_name in self._visited: + existing = graph.get_node(qualified_name) + if existing and version_constraint and existing.version_constraint != version_constraint: + graph.conflicts.append(( + qualified_name, + existing.version_constraint or "*", + existing.requested_by or "unknown", + version_constraint or "*", + self._stack[-1] if self._stack else "unknown" + )) + return existing + + # Depth limit + if depth > self.max_depth: + raise RecursionError(f"Dependency depth exceeded {self.max_depth}") + + self._visited.add(qualified_name) + self._stack.append(qualified_name) + + try: + # Resolve the tool + node = self._resolve_single(qualified_name, version_constraint) + if node is None: + return None + + # Also track the resolved qualified name to prevent duplicates + if node.qualified_name != qualified_name: + self._visited.add(node.qualified_name) + + graph.add_node(node, requested_by=self._stack[-2] if len(self._stack) > 1 else "root") + self._resolved_count += 1 + if self.verbose: + print(f" Resolved {self._resolved_count}: {node.qualified_name}") + + # Get child dependencies (only if this node is resolved) + if node.is_resolved: + child_deps = self._get_tool_dependencies(node) + else: + child_deps = [] + + # Recurse into children and capture resolved keys + node.children = [] + for child in child_deps: + child_node = self._resolve_recursive(child.name, child.version, graph, depth + 1) + if child_node and child_node.is_resolved: + node.children.append(child_node.qualified_name) + elif child_node: + # Child exists but is unresolved - still track for error reporting + node.children.append(child_node.qualified_name) + if self.verbose: + print(f" Warning: Unresolved dependency {child.name}") + + return node + + finally: + self._stack.pop() + + def _resolve_single( + self, + qualified_name: str, + version_constraint: Optional[str] + ) -> Optional[DependencyNode]: + """Resolve a single dependency to a node.""" + owner, name = self._parse_qualified(qualified_name) + owner_hint = None + + # Try local/global first via resolver + try: + from .resolver import ToolNotFoundError + resolver = self._get_resolver() + resolved = resolver.resolve(qualified_name) + owner_hint = resolved.owner + if version_constraint and version_constraint not in ("", "*", "latest"): + resolved_version = resolved.version or "" + if not resolved_version or not matches_constraint(resolved_version, version_constraint): + if self.verbose: + print( + f" Local/global {qualified_name}@{resolved_version or 'unknown'} " + f"does not satisfy {version_constraint}, resolving from registry" + ) + else: + return DependencyNode( + owner=resolved.owner or owner or "", + name=resolved.tool.name, + version_constraint=version_constraint, + resolved_version=resolved.version, + source=resolved.source, + path=resolved.path, + children=[] + ) + else: + return DependencyNode( + owner=resolved.owner or owner or "", + name=resolved.tool.name, + version_constraint=version_constraint, + resolved_version=resolved.version, + source=resolved.source, + path=resolved.path, + children=[] + ) + except ToolNotFoundError: + pass + except Exception as e: + if self.verbose: + print(f" Warning: Resolver error for {qualified_name}: {e}") + + # Try registry + try: + reg_owner = owner or owner_hint or DEFAULT_OWNER + result = self._download_registry_config( + reg_owner, + name, + version_constraint + ) + return DependencyNode( + owner=reg_owner, + name=name, + version_constraint=version_constraint, + resolved_version=result.resolved_version or "", + source="registry", + path=None, # Not installed yet + children=[] + ) + except Exception as e: + if self.verbose: + print(f" Could not resolve {qualified_name}: {e}") + # Record unresolved node so graph reflects missing dependency + return DependencyNode( + owner=owner or owner_hint or "", + name=name, + version_constraint=version_constraint, + resolved_version=None, + source=None, + path=None, + children=[] + ) + + def _get_tool_dependencies(self, node: DependencyNode) -> List[Dependency]: + """Get dependencies for a resolved tool node.""" + deps = [] + + if node.source in ("local", "global") and node.path: + # Read from local config file + deps.extend(self._read_deps_from_config(node.path)) + elif node.source == "registry": + # Fetch config from registry without installing + deps.extend(self._read_deps_from_registry(node.owner, node.name, node.resolved_version or node.version_constraint)) + + return deps + + def _read_deps_from_config(self, tool_path: Path) -> List[Dependency]: + """Read dependencies from a local tool config.""" + deps = [] + config_path = tool_path / "config.yaml" + if not config_path.exists(): + return deps + + try: + data = yaml.safe_load(config_path.read_text()) or {} + + # Explicit dependencies + for dep in data.get("dependencies", []): + deps.append(Dependency.from_dict(dep)) + + # Implicit dependencies from ToolSteps + if self.include_implicit: + for step in data.get("steps", []): + if step.get("type") == "tool": + tool_ref = step.get("tool") + if tool_ref: + deps.append(Dependency.from_dict(tool_ref)) + + except Exception as e: + if self.verbose: + print(f" Warning: Could not read deps from {config_path}: {e}") + + return deps + + def _read_deps_from_registry(self, owner: str, name: str, version: Optional[str]) -> List[Dependency]: + """Read dependencies from registry tool config.""" + deps = [] + + try: + # Download config without installing (cache by qualified name + version/constraint) + result = self._download_registry_config(owner, name, version) + config_yaml = result.config_yaml + + data = yaml.safe_load(config_yaml) or {} + + # Explicit dependencies + for dep in data.get("dependencies", []): + deps.append(Dependency.from_dict(dep)) + + # Implicit dependencies from ToolSteps + if self.include_implicit: + for step in data.get("steps", []): + if step.get("type") == "tool": + tool_ref = step.get("tool") + if tool_ref: + deps.append(Dependency.from_dict(tool_ref)) + + except Exception as e: + if self.verbose: + print(f" Warning: Could not read deps from registry {owner}/{name}: {e}") + + return deps + + def _download_registry_config( + self, + owner: str, + name: str, + version: Optional[str] + ) -> "DownloadResult": + """Download registry config with caching (version or constraint aware).""" + cache_key = f"{owner}/{name}@{version or ''}" + if cache_key in self._registry_cache: + return self._registry_cache[cache_key] + + client = self._get_client() + result = client.download_tool(owner, name, version=version, install=False) + self._registry_cache[cache_key] = result + return result + + def _topological_sort(self, graph: DependencyGraph) -> List[str]: + """Return nodes in installation order (dependencies first).""" + # Kahn's algorithm + in_degree = {name: 0 for name in graph.nodes} + + # Build in-degree counts + for node in graph.nodes.values(): + for child in node.children: + if child in in_degree: + in_degree[child] += 1 + + # Start with nodes that have no dependencies + queue = [name for name, deg in in_degree.items() if deg == 0] + result = [] + + while queue: + name = queue.pop(0) + result.append(name) + node = graph.nodes.get(name) + if node: + for child in node.children: + if child in in_degree: + in_degree[child] -= 1 + if in_degree[child] == 0: + queue.append(child) + + # In our graph, A.children contains B means "A depends on B". + # Kahn's algorithm processes nodes with in_degree=0 first. + # Nodes with in_degree=0 are roots (no one depends on them). + # So the raw output is: dependents first, then dependencies. + # We reverse to get: dependencies first, then dependents (correct install order). + return list(reversed(result)) + + +@dataclass +class ResolutionResult: + """Result of dependency resolution.""" + graph: DependencyGraph + to_install: List[DependencyNode] # Tools needing installation + already_installed: List[DependencyNode] # Already present + failed: List[Tuple[str, str]] # (qualified_name, error_message) + warnings: List[str] # Non-fatal issues + + @property + def success(self) -> bool: + """Check if resolution was successful.""" + return not self.graph.has_cycles() and not self.failed + + +def resolve_dependencies( + dependencies: List[Dependency], + verbose: bool = False, + include_implicit: bool = True +) -> ResolutionResult: + """ + Convenience function to resolve dependencies and categorize results. + + Args: + dependencies: Direct dependencies to resolve + verbose: Print debug info + include_implicit: Include ToolStep dependencies + + Returns: + ResolutionResult with categorized nodes + """ + builder = DependencyGraphBuilder(verbose=verbose) + graph = builder.build(dependencies, include_implicit=include_implicit) + + to_install = [] + already_installed = [] + failed = [] + warnings = [] + + for node in graph.get_install_order(): + if not node.is_resolved: + failed.append((node.qualified_name, "Tool not found")) + elif node.source == "registry" and node.path is None: + to_install.append(node) + else: + already_installed.append(node) + + if graph.has_conflicts(): + for tool, v1, from1, v2, from2 in graph.conflicts: + warnings.append(f"Version conflict for {tool}: {v1} (from {from1}) vs {v2} (from {from2})") + + if graph.has_cycles(): + for cycle in graph.cycles: + warnings.append(f"Circular dependency: {' -> '.join(cycle)}") + + return ResolutionResult( + graph=graph, + to_install=to_install, + already_installed=already_installed, + failed=failed, + warnings=warnings + ) diff --git a/src/cmdforge/lockfile.py b/src/cmdforge/lockfile.py new file mode 100644 index 0000000..b9653b1 --- /dev/null +++ b/src/cmdforge/lockfile.py @@ -0,0 +1,371 @@ +"""Lock file for reproducible dependency installation. + +Provides cmdforge.lock file support for: +- Recording exact versions of all dependencies (direct + transitive) +- Reproducible installs across machines +- Integrity verification via content hashes +""" + +import hashlib +from dataclasses import dataclass, field +from datetime import datetime +from pathlib import Path +from typing import Dict, List, Optional + +import yaml + + +@dataclass +class LockedPackage: + """A locked dependency with exact version and integrity.""" + name: str # Qualified: owner/name + version: str # Exact resolved version + constraint: str # Original constraint + integrity: str # sha256 hash (from registry config_hash) + source: str # "registry", "local", "global" + direct: bool # True if in manifest + required_by: List[str] = field(default_factory=list) # Parent packages + path: Optional[str] = None # Relative path for local tools + + @property + def owner(self) -> str: + """Extract owner from qualified name.""" + return self.name.split("/")[0] if "/" in self.name else "" + + @property + def tool_name(self) -> str: + """Extract tool name from qualified name.""" + return self.name.split("/")[1] if "/" in self.name else self.name + + +@dataclass +class LockfileMetadata: + """Metadata about when/how the lock file was generated.""" + generated_at: str + cmdforge_version: str + manifest_hash: str + platform: str + python_version: str + + +@dataclass +class Lockfile: + """Complete lock file representation.""" + lockfile_version: int = 1 + metadata: Optional[LockfileMetadata] = None + packages: Dict[str, LockedPackage] = field(default_factory=dict) + + @classmethod + def load(cls, path: Path = None) -> Optional["Lockfile"]: + """Load lock file from disk. + + Args: + path: Path to lock file (default: ./cmdforge.lock) + + Returns: + Lockfile object, or None if not found + """ + path = path or Path("cmdforge.lock") + if not path.exists(): + return None + + try: + with open(path) as f: + data = yaml.safe_load(f) or {} + return cls._from_dict(data) + except Exception as e: + print(f"Warning: Could not load lock file: {e}") + return None + + def save(self, path: Path = None) -> None: + """Save lock file to disk. + + Args: + path: Path to save to (default: ./cmdforge.lock) + """ + path = path or Path("cmdforge.lock") + with open(path, "w") as f: + # Add header comment + f.write("# cmdforge.lock\n") + f.write("# Auto-generated by 'cmdforge lock' - do not edit manually\n") + f.write("# To update: cmdforge lock --force\n\n") + yaml.safe_dump( + self._to_dict(), + f, + sort_keys=False, + default_flow_style=False + ) + + def get_package(self, name: str) -> Optional[LockedPackage]: + """Get a locked package by qualified name.""" + return self.packages.get(name) + + def is_stale(self, manifest_path: Path = None) -> bool: + """Check if lock file is outdated relative to manifest. + + Note: This only checks if the manifest hash changed. + It does NOT detect if local tools were edited after locking. + Use verify_lockfile() for full integrity checking. + + Args: + manifest_path: Path to manifest (default: ./cmdforge.yaml) + + Returns: + True if lock file is stale + """ + manifest_path = manifest_path or Path("cmdforge.yaml") + if not manifest_path.exists(): + return True + if not self.metadata: + return True + current_hash = compute_file_hash(manifest_path) + return current_hash != self.metadata.manifest_hash + + @classmethod + def _from_dict(cls, data: dict) -> "Lockfile": + """Parse lock file from dict.""" + metadata = None + if "metadata" in data: + m = data["metadata"] + metadata = LockfileMetadata( + generated_at=m.get("generated_at", ""), + cmdforge_version=m.get("cmdforge_version", ""), + manifest_hash=m.get("manifest_hash", ""), + platform=m.get("platform", ""), + python_version=m.get("python_version", "") + ) + + packages = {} + for name, pkg_data in data.get("packages", {}).items(): + packages[name] = LockedPackage( + name=name, + version=pkg_data.get("version", ""), + constraint=pkg_data.get("constraint", "*"), + integrity=pkg_data.get("integrity", ""), + source=pkg_data.get("source", "registry"), + direct=pkg_data.get("direct", False), + required_by=pkg_data.get("required_by", []), + path=pkg_data.get("path") + ) + + return cls( + lockfile_version=data.get("lockfile_version", 1), + metadata=metadata, + packages=packages + ) + + def _to_dict(self) -> dict: + """Convert to dict for YAML serialization.""" + d = {"lockfile_version": self.lockfile_version} + + if self.metadata: + d["metadata"] = { + "generated_at": self.metadata.generated_at, + "cmdforge_version": self.metadata.cmdforge_version, + "manifest_hash": self.metadata.manifest_hash, + "platform": self.metadata.platform, + "python_version": self.metadata.python_version + } + + d["packages"] = {} + for name, pkg in self.packages.items(): + pkg_dict = { + "version": pkg.version, + "constraint": pkg.constraint, + "integrity": pkg.integrity, + "source": pkg.source, + "direct": pkg.direct + } + if pkg.required_by: + pkg_dict["required_by"] = pkg.required_by + if pkg.path: + pkg_dict["path"] = pkg.path + d["packages"][name] = pkg_dict + + return d + + +def compute_file_hash(path: Path) -> str: + """Compute SHA256 hash of raw file bytes. + + Used ONLY for manifest hash (to detect any change, including formatting). + For tool config integrity, use hash_utils.compute_yaml_hash() instead. + + Args: + path: Path to file + + Returns: + Hash string in format "sha256:<64-char-hex>" + """ + sha256 = hashlib.sha256() + with open(path, "rb") as f: + for chunk in iter(lambda: f.read(8192), b""): + sha256.update(chunk) + return f"sha256:{sha256.hexdigest()}" + + +def generate_lockfile( + manifest: "Manifest", + graph: "DependencyGraph", + client: "RegistryClient" +) -> Lockfile: + """ + Generate a lock file from resolved dependencies. + + Args: + manifest: Project manifest with constraints + graph: Resolved dependency graph (from DependencyGraphBuilder) + client: Registry client for fetching config_hash + + Returns: + Complete Lockfile ready to save + """ + import platform + import sys + from . import __version__ + + # Compute manifest hash + manifest_path = Path("cmdforge.yaml") + manifest_hash = compute_file_hash(manifest_path) if manifest_path.exists() else "" + + metadata = LockfileMetadata( + generated_at=datetime.now().astimezone().isoformat(), + cmdforge_version=__version__, + manifest_hash=manifest_hash, + platform=platform.system().lower(), + python_version=f"{sys.version_info.major}.{sys.version_info.minor}.{sys.version_info.micro}" + ) + + lock = Lockfile(metadata=metadata) + + # Get direct dependency names for marking + direct_names = {d.name for d in manifest.dependencies} + + # Process all nodes in the graph + for qualified_name, node in graph.nodes.items(): + if not node.is_resolved: + continue + integrity = _get_integrity_hash(node, client) + + pkg = LockedPackage( + name=qualified_name, + version=node.resolved_version or "", + constraint=node.version_constraint or "*", + integrity=integrity, + source=node.source or "registry", + direct=qualified_name in direct_names or _is_direct(qualified_name, direct_names), + required_by=_find_parents(qualified_name, graph), + path=str(node.path) if node.source == "local" and node.path else None + ) + lock.packages[qualified_name] = pkg + + return lock + + +def _get_integrity_hash(node: "DependencyNode", client: "RegistryClient") -> str: + """Get integrity hash for a dependency node.""" + if node.source in ("local", "global") and node.path: + # For local/global tools, hash normalized config + config_path = node.path / "config.yaml" + if config_path.exists(): + from .hash_utils import compute_yaml_hash + return compute_yaml_hash(config_path.read_text()) + return "" + + elif node.source == "registry": + # For registry tools, get the config_hash from registry + # This is the hash BEFORE any local modifications + try: + result = client.download_tool( + node.owner, + node.name, + version=node.resolved_version, + install=False # Don't count as install, just get config + ) + # Use the config_hash from registry response + if hasattr(result, 'config_hash') and result.config_hash: + # config_hash already includes "sha256:" prefix + return result.config_hash + # Fallback: hash the config_yaml content using normalized hashing + if hasattr(result, 'config_yaml') and result.config_yaml: + from .hash_utils import compute_yaml_hash + return compute_yaml_hash(result.config_yaml) + except Exception: + return "" + + return "" + + +def _is_direct(qualified_name: str, direct_names: set) -> bool: + """Check if a qualified name matches any direct dependency.""" + # Handle case where manifest has unqualified names + tool_name = qualified_name.split("/")[1] if "/" in qualified_name else qualified_name + return qualified_name in direct_names or tool_name in direct_names + + +def _find_parents(qualified_name: str, graph: "DependencyGraph") -> List[str]: + """Find packages that depend on this one.""" + parents = [] + for name, node in graph.nodes.items(): + if qualified_name in node.children: + parents.append(name) + return parents + + +def verify_lockfile( + lock: Lockfile, + client: "RegistryClient" +) -> List[str]: + """ + Verify installed tools match lock file. + + Args: + lock: Lock file to verify against + client: Registry client for hash verification + + Returns: + List of verification errors (empty if all OK) + """ + from .resolver import ToolResolver, ToolNotFoundError + + errors = [] + # Create resolver that doesn't use manifest (bypass version overrides) + resolver = ToolResolver(auto_fetch=False) + resolver.manifest = None + + for name, locked in lock.packages.items(): + # Check tool exists + try: + resolved = resolver.resolve(name) + except ToolNotFoundError: + errors.append(f"{name}: not installed") + continue + except Exception: + errors.append(f"{name}: not installed") + continue + + # Check version matches + resolved_version = resolved.version or "" + locked_version = locked.version or "" + if resolved_version and locked_version and resolved_version != locked_version: + errors.append( + f"{name}: version mismatch " + f"(installed: {resolved_version}, locked: {locked_version})" + ) + + # Check integrity against installed tool config + if locked.integrity and resolved.path: + try: + config_path = resolved.path / "config.yaml" + if config_path.exists(): + from .hash_utils import compute_yaml_hash + current_hash = compute_yaml_hash(config_path.read_text()) + if current_hash != locked.integrity: + errors.append( + f"{name}: integrity mismatch " + f"(installed tool differs from lock)" + ) + except Exception as e: + errors.append(f"{name}: could not verify integrity ({e})") + + return errors diff --git a/src/cmdforge/version.py b/src/cmdforge/version.py new file mode 100644 index 0000000..bc19e8c --- /dev/null +++ b/src/cmdforge/version.py @@ -0,0 +1,250 @@ +"""Semantic version constraint matching. + +Provides version parsing and constraint matching for dependency resolution. +Follows semver 2.0.0 specification with support for common constraint operators. +""" + +import re +from dataclasses import dataclass +from typing import Optional, Tuple, List + +VERSION_PATTERN = re.compile( + r'^(?P\d+)\.(?P\d+)\.(?P\d+)' + r'(?:-(?P[0-9A-Za-z.-]+))?' + r'(?:\+(?P[0-9A-Za-z.-]+))?$' +) + + +@dataclass +class Version: + """Semantic version representation.""" + major: int + minor: int + patch: int + prerelease: Optional[str] = None + build: Optional[str] = None + + @classmethod + def parse(cls, version_str: str) -> Optional["Version"]: + """Parse a version string into a Version object. + + Args: + version_str: Version string like "1.2.3" or "1.0.0-alpha.1+build.123" + + Returns: + Version object, or None if parsing fails + """ + if not version_str: + return None + match = VERSION_PATTERN.match(version_str.strip()) + if not match: + return None + return cls( + major=int(match.group("major")), + minor=int(match.group("minor")), + patch=int(match.group("patch")), + prerelease=match.group("prerelease"), + build=match.group("build") + ) + + def __str__(self) -> str: + s = f"{self.major}.{self.minor}.{self.patch}" + if self.prerelease: + s += f"-{self.prerelease}" + if self.build: + s += f"+{self.build}" + return s + + @property + def tuple(self) -> Tuple[int, int, int]: + """Get (major, minor, patch) tuple for comparison.""" + return (self.major, self.minor, self.patch) + + def __eq__(self, other: object) -> bool: + if not isinstance(other, Version): + return False + return self.tuple == other.tuple and self.prerelease == other.prerelease + + def __lt__(self, other: "Version") -> bool: + if self.tuple != other.tuple: + return self.tuple < other.tuple + # Prerelease versions are less than release versions + # No prerelease = release version + if self.prerelease is None and other.prerelease is not None: + return False # 1.0.0 > 1.0.0-alpha + if self.prerelease is not None and other.prerelease is None: + return True # 1.0.0-alpha < 1.0.0 + # Both have prerelease, compare lexicographically + return (self.prerelease or "") < (other.prerelease or "") + + def __le__(self, other: "Version") -> bool: + return self == other or self < other + + def __gt__(self, other: "Version") -> bool: + return not self <= other + + def __ge__(self, other: "Version") -> bool: + return not self < other + + def __hash__(self) -> int: + return hash((self.tuple, self.prerelease)) + + +def matches_constraint(version: str, constraint: str) -> bool: + """ + Check if a version satisfies a constraint. + + Supported constraints: + - "*" or "latest" or "": any version + - "1.0.0": exact match + - "^1.2.3": compatible (>=1.2.3 and <2.0.0) + - "^0.2.3": for 0.x, means >=0.2.3 and <0.3.0 (semver special case) + - "^0.0.3": for 0.0.x, means exactly 0.0.3 (semver special case) + - "~1.2.3": approximately (>=1.2.3 and <1.3.0) + - ">=1.0.0", "<=1.0.0", ">1.0.0", "<1.0.0": comparisons + - "=1.0.0": explicit exact match + + Args: + version: Version string to check (e.g., "1.2.3") + constraint: Constraint string (e.g., "^1.0.0") + + Returns: + True if version satisfies constraint + """ + if not constraint or constraint in ("*", "latest"): + return True + + v = Version.parse(version) + if not v: + return False + + constraint = constraint.strip() + + # Standard semver behavior: prerelease versions do NOT satisfy ranges + # unless the constraint itself includes a prerelease. + if v.prerelease and "-" not in constraint: + return False + + # Caret: ^1.2.3 means >=1.2.3 and <2.0.0 + # Special cases: + # ^0.2.3 -> >=0.2.3 and <0.3.0 + # ^0.0.3 -> >=0.0.3 and <0.0.4 (exact match for 0.0.x series) + if constraint.startswith("^"): + c = Version.parse(constraint[1:]) + if not c: + return False + + # Must be >= constraint version + if v < c: + return False + + # Upper bound depends on major version + if c.major == 0: + if c.minor == 0: + # ^0.0.y allows ONLY 0.0.y (exact match for 0.0.x series) + return v.major == 0 and v.minor == 0 and v.patch == c.patch + # ^0.x.y allows >=0.x.y and <0.(x+1).0 + return v.major == 0 and v.minor == c.minor + else: + # ^x.y.z allows changes that don't modify major: <(x+1).0.0 + return v.major == c.major + + # Tilde: ~1.2.3 means >=1.2.3 and <1.3.0 + if constraint.startswith("~"): + c = Version.parse(constraint[1:]) + if not c: + return False + + # Must be >= constraint version + if v < c: + return False + + # Must be same major.minor + return v.major == c.major and v.minor == c.minor + + # Comparison operators (check longest first) + for op in (">=", "<=", ">", "<", "="): + if constraint.startswith(op): + c = Version.parse(constraint[len(op):].strip()) + if not c: + return False + if op == ">=": + return v >= c + elif op == "<=": + return v <= c + elif op == ">": + return v > c + elif op == "<": + return v < c + elif op == "=": + return v == c + + # Exact match (no operator) + c = Version.parse(constraint) + if c: + return v == c + + return False + + +def find_best_match(versions: List[str], constraint: str) -> Optional[str]: + """ + Find the best (highest) version that matches a constraint. + + Args: + versions: List of available versions + constraint: Version constraint + + Returns: + Best matching version, or None if no match + """ + matching = [v for v in versions if matches_constraint(v, constraint)] + if not matching: + return None + + # Sort by parsed version, return highest + parsed = [(Version.parse(v), v) for v in matching] + parsed = [(p, v) for p, v in parsed if p is not None] + if not parsed: + return None + + parsed.sort(key=lambda x: x[0], reverse=True) + return parsed[0][1] + + +def is_valid_version(version: str) -> bool: + """Check if a string is a valid semver version. + + Args: + version: Version string to validate + + Returns: + True if valid semver format + """ + return Version.parse(version) is not None + + +def compare_versions(v1: str, v2: str) -> int: + """Compare two version strings. + + Args: + v1: First version + v2: Second version + + Returns: + -1 if v1 < v2, 0 if equal, 1 if v1 > v2 + + Raises: + ValueError: If either version is invalid + """ + parsed_v1 = Version.parse(v1) + parsed_v2 = Version.parse(v2) + + if not parsed_v1 or not parsed_v2: + raise ValueError(f"Invalid version: {v1 if not parsed_v1 else v2}") + + if parsed_v1 < parsed_v2: + return -1 + elif parsed_v1 > parsed_v2: + return 1 + return 0 diff --git a/tests/test_dependency_graph.py b/tests/test_dependency_graph.py new file mode 100644 index 0000000..ba21ec0 --- /dev/null +++ b/tests/test_dependency_graph.py @@ -0,0 +1,293 @@ +"""Tests for dependency_graph.py - Transitive dependency resolution.""" + +import pytest +from pathlib import Path +from unittest.mock import MagicMock, patch +import tempfile +import yaml + +from cmdforge.dependency_graph import ( + DependencyNode, + DependencyGraph, + DependencyGraphBuilder, + ResolutionResult, + resolve_dependencies +) +from cmdforge.manifest import Dependency + + +class TestDependencyNode: + """Tests for DependencyNode dataclass.""" + + def test_qualified_name_with_owner(self): + node = DependencyNode( + owner="official", + name="summarize", + version_constraint="^1.0.0", + resolved_version="1.2.3", + source="global", + path=Path("/test") + ) + assert node.qualified_name == "official/summarize" + + def test_qualified_name_without_owner(self): + node = DependencyNode( + owner="", + name="summarize", + version_constraint="*", + resolved_version=None, + source=None, + path=None + ) + assert node.qualified_name == "summarize" + + def test_is_resolved(self): + resolved = DependencyNode( + owner="official", + name="test", + version_constraint="*", + resolved_version="1.0.0", + source="global", + path=Path("/test") + ) + assert resolved.is_resolved + + unresolved = DependencyNode( + owner="official", + name="test", + version_constraint="*", + resolved_version=None, + source=None, + path=None + ) + assert not unresolved.is_resolved + + +class TestDependencyGraph: + """Tests for DependencyGraph dataclass.""" + + def test_add_node(self): + graph = DependencyGraph() + node = DependencyNode( + owner="official", + name="test", + version_constraint="*", + resolved_version="1.0.0", + source="global", + path=Path("/test") + ) + graph.add_node(node) + assert "official/test" in graph.nodes + + def test_get_node(self): + graph = DependencyGraph() + node = DependencyNode( + owner="official", + name="test", + version_constraint="*", + resolved_version="1.0.0", + source="global", + path=Path("/test") + ) + graph.add_node(node) + assert graph.get_node("official/test") == node + assert graph.get_node("nonexistent") is None + + def test_version_conflict_detection(self): + graph = DependencyGraph() + + node1 = DependencyNode( + owner="official", + name="test", + version_constraint="^1.0.0", + resolved_version="1.0.0", + source="global", + path=Path("/test"), + requested_by="root" + ) + graph.add_node(node1, requested_by="root") + + node2 = DependencyNode( + owner="official", + name="test", + version_constraint="^2.0.0", + resolved_version="2.0.0", + source="global", + path=Path("/test"), + requested_by="other" + ) + graph.add_node(node2, requested_by="other") + + assert graph.has_conflicts() + assert len(graph.conflicts) == 1 + + def test_get_unresolved(self): + graph = DependencyGraph() + + resolved = DependencyNode( + owner="official", + name="resolved", + version_constraint="*", + resolved_version="1.0.0", + source="global", + path=Path("/test") + ) + unresolved = DependencyNode( + owner="official", + name="unresolved", + version_constraint="*", + resolved_version=None, + source=None, + path=None + ) + graph.add_node(resolved) + graph.add_node(unresolved) + + unresolved_list = graph.get_unresolved() + assert len(unresolved_list) == 1 + assert unresolved_list[0].name == "unresolved" + + +class TestDependencyGraphBuilder: + """Tests for DependencyGraphBuilder class.""" + + def test_build_empty_deps(self): + builder = DependencyGraphBuilder(verbose=False) + graph = builder.build([]) + assert len(graph.nodes) == 0 + + def test_topological_sort_simple(self): + """Test that dependencies come before dependents.""" + graph = DependencyGraph() + + # A depends on B + node_a = DependencyNode( + owner="official", + name="a", + version_constraint="*", + resolved_version="1.0.0", + source="global", + path=Path("/a"), + children=["official/b"] + ) + node_b = DependencyNode( + owner="official", + name="b", + version_constraint="*", + resolved_version="1.0.0", + source="global", + path=Path("/b"), + children=[] + ) + graph.add_node(node_a) + graph.add_node(node_b) + graph.root_dependencies = ["official/a"] + + builder = DependencyGraphBuilder() + order = builder._topological_sort(graph) + + # B should come before A since A depends on B + b_idx = order.index("official/b") + a_idx = order.index("official/a") + assert b_idx < a_idx + + def test_cycle_detection(self): + """Test that cycles are detected.""" + # This requires mocking the resolver to create a cycle + # For now, just verify the cycle list exists + graph = DependencyGraph() + assert not graph.has_cycles() + graph.cycles.append(["a", "b", "a"]) + assert graph.has_cycles() + + +class TestResolveDepencies: + """Tests for resolve_dependencies convenience function.""" + + @patch('cmdforge.dependency_graph.DependencyGraphBuilder.build') + def test_categorizes_results(self, mock_build): + """Test that results are properly categorized.""" + # Create mock graph + mock_graph = DependencyGraph() + mock_graph.resolution_order = ["official/test"] + + node = DependencyNode( + owner="official", + name="test", + version_constraint="*", + resolved_version="1.0.0", + source="registry", + path=None # Not installed yet + ) + mock_graph.nodes["official/test"] = node + mock_build.return_value = mock_graph + + deps = [Dependency(name="official/test", version="*")] + result = resolve_dependencies(deps) + + assert isinstance(result, ResolutionResult) + assert len(result.to_install) == 1 + assert result.to_install[0].name == "test" + + +class TestIntegration: + """Integration tests with real file system.""" + + def test_reads_local_tool_deps(self, tmp_path): + """Test that dependencies can be read from a local tool config.""" + # Create a tool config with dependencies + tool_dir = tmp_path / "test-tool" + tool_dir.mkdir() + + config = { + "name": "test-tool", + "description": "Test tool", + "dependencies": [ + {"name": "official/helper", "version": "^1.0.0"} + ], + "steps": [ + {"type": "prompt", "prompt": "test", "output_var": "result"} + ] + } + (tool_dir / "config.yaml").write_text(yaml.dump(config)) + + # Create a node pointing to this tool + node = DependencyNode( + owner="local", + name="test-tool", + version_constraint="*", + resolved_version="1.0.0", + source="local", + path=tool_dir + ) + + builder = DependencyGraphBuilder() + deps = builder._read_deps_from_config(tool_dir) + + assert len(deps) == 1 + assert deps[0].name == "official/helper" + assert deps[0].version == "^1.0.0" + + def test_reads_implicit_tool_deps(self, tmp_path): + """Test that ToolStep references are discovered as dependencies.""" + tool_dir = tmp_path / "meta-tool" + tool_dir.mkdir() + + config = { + "name": "meta-tool", + "description": "Meta tool that calls others", + "steps": [ + { + "type": "tool", + "tool": "official/summarize", + "output_var": "summary" + } + ] + } + (tool_dir / "config.yaml").write_text(yaml.dump(config)) + + builder = DependencyGraphBuilder() + deps = builder._read_deps_from_config(tool_dir) + + assert len(deps) == 1 + assert deps[0].name == "official/summarize" diff --git a/tests/test_lockfile.py b/tests/test_lockfile.py new file mode 100644 index 0000000..6e63f36 --- /dev/null +++ b/tests/test_lockfile.py @@ -0,0 +1,374 @@ +"""Tests for lockfile.py - Lock file functionality.""" + +import pytest +from pathlib import Path +from unittest.mock import MagicMock, patch +import tempfile +import yaml + +from cmdforge.lockfile import ( + Lockfile, LockedPackage, LockfileMetadata, + generate_lockfile, verify_lockfile, + compute_file_hash +) +from cmdforge.hash_utils import compute_yaml_hash + + +class TestLockedPackage: + """Tests for LockedPackage dataclass.""" + + def test_owner_and_name_qualified(self): + pkg = LockedPackage( + name="official/summarize", + version="1.0.0", + constraint="^1.0.0", + integrity="sha256:abc", + source="registry", + direct=True + ) + assert pkg.owner == "official" + assert pkg.tool_name == "summarize" + + def test_owner_and_name_unqualified(self): + pkg = LockedPackage( + name="summarize", + version="1.0.0", + constraint="*", + integrity="sha256:abc", + source="local", + direct=True + ) + assert pkg.owner == "" + assert pkg.tool_name == "summarize" + + def test_required_by_default_empty(self): + pkg = LockedPackage( + name="test", + version="1.0.0", + constraint="*", + integrity="", + source="registry", + direct=True + ) + assert pkg.required_by == [] + + +class TestLockfile: + """Tests for Lockfile dataclass.""" + + def test_save_and_load(self, tmp_path): + lock = Lockfile( + metadata=LockfileMetadata( + generated_at="2026-01-26T00:00:00Z", + cmdforge_version="1.5.0", + manifest_hash="sha256:test123", + platform="linux", + python_version="3.12.0" + ), + packages={ + "official/summarize": LockedPackage( + name="official/summarize", + version="1.2.3", + constraint="^1.0.0", + integrity="sha256:abc123", + source="registry", + direct=True + ) + } + ) + + lock_path = tmp_path / "cmdforge.lock" + lock.save(lock_path) + + loaded = Lockfile.load(lock_path) + assert loaded is not None + assert "official/summarize" in loaded.packages + assert loaded.packages["official/summarize"].version == "1.2.3" + assert loaded.packages["official/summarize"].integrity == "sha256:abc123" + # Verify optional fields round-trip + assert loaded.packages["official/summarize"].required_by == [] + assert loaded.packages["official/summarize"].path is None + + def test_load_missing_file(self, tmp_path): + lock_path = tmp_path / "nonexistent.lock" + loaded = Lockfile.load(lock_path) + assert loaded is None + + def test_is_stale_when_manifest_changed(self, tmp_path): + manifest_path = tmp_path / "cmdforge.yaml" + manifest_path.write_text("name: test\ndependencies: []") + + original_hash = compute_file_hash(manifest_path) + + lock = Lockfile( + metadata=LockfileMetadata( + generated_at="2026-01-26T00:00:00Z", + cmdforge_version="1.5.0", + manifest_hash=original_hash, + platform="linux", + python_version="3.12.0" + ) + ) + + assert not lock.is_stale(manifest_path) + + # Modify manifest + manifest_path.write_text("name: test\ndependencies:\n - official/foo") + + assert lock.is_stale(manifest_path) + + def test_is_stale_without_metadata(self): + lock = Lockfile() + assert lock.is_stale() + + def test_get_package(self): + lock = Lockfile( + packages={ + "official/summarize": LockedPackage( + name="official/summarize", + version="1.0.0", + constraint="*", + integrity="", + source="registry", + direct=True + ) + } + ) + assert lock.get_package("official/summarize") is not None + assert lock.get_package("nonexistent") is None + + def test_lockfile_version(self): + lock = Lockfile() + assert lock.lockfile_version == 1 + + def test_save_includes_header(self, tmp_path): + lock = Lockfile() + lock_path = tmp_path / "cmdforge.lock" + lock.save(lock_path) + + content = lock_path.read_text() + assert "# cmdforge.lock" in content + assert "# Auto-generated" in content + assert "do not edit manually" in content + + +class TestHashFunctions: + """Tests for hash functions.""" + + def test_compute_file_hash(self, tmp_path): + test_file = tmp_path / "test.txt" + test_file.write_text("hello world") + + hash1 = compute_file_hash(test_file) + assert hash1.startswith("sha256:") + + # Same content = same hash + hash2 = compute_file_hash(test_file) + assert hash1 == hash2 + + # Different content = different hash + test_file.write_text("different") + hash3 = compute_file_hash(test_file) + assert hash1 != hash3 + + def test_compute_yaml_hash_normalized(self): + """Test that YAML hash normalizes content (sorted keys, excluded fields).""" + # Same logical content, different formatting + yaml1 = "name: test\ndescription: foo" + yaml2 = "description: foo\nname: test" # Different key order + + # Should produce same hash (normalized) + assert compute_yaml_hash(yaml1) == compute_yaml_hash(yaml2) + + # Different content = different hash + yaml3 = "name: test\ndescription: bar" + assert compute_yaml_hash(yaml1) != compute_yaml_hash(yaml3) + + # registry_hash field should be excluded + yaml4 = "name: test\ndescription: foo\nregistry_hash: sha256:abc" + assert compute_yaml_hash(yaml1) == compute_yaml_hash(yaml4) + + +class TestGenerateLockfile: + """Tests for generate_lockfile function.""" + + def test_generates_complete_lockfile(self, tmp_path): + """Test that lock file includes all graph nodes.""" + from cmdforge.manifest import Manifest, Dependency + from cmdforge.dependency_graph import DependencyGraph, DependencyNode + from pathlib import Path + + # Create mock manifest + manifest = Manifest( + name="test-project", + version="1.0.0", + dependencies=[ + Dependency(name="official/summarize", version="^1.0.0") + ] + ) + + # Create mock graph with local tool (has path for integrity) + tool_dir = tmp_path / "tool" + tool_dir.mkdir() + (tool_dir / "config.yaml").write_text("name: summarize\ndescription: test") + + graph = DependencyGraph() + graph.root_dependencies = ["official/summarize"] + graph.nodes = { + "official/summarize": DependencyNode( + owner="official", + name="summarize", + version_constraint="^1.0.0", + resolved_version="1.2.3", + source="local", # Use local so we can compute hash from file + path=tool_dir, + children=["official/text-utils"] + ), + "official/text-utils": DependencyNode( + owner="official", + name="text-utils", + version_constraint="*", + resolved_version="1.0.0", + source="registry", + path=None, + children=[] + ) + } + + # Mock registry client - won't be called for local tools + mock_client = MagicMock() + mock_client.download_tool.return_value = MagicMock( + config_hash="sha256:registry_hash", + config_yaml="name: text-utils" + ) + + # Create a cmdforge.yaml for manifest hash + manifest_path = tmp_path / "cmdforge.yaml" + manifest_path.write_text("name: test\ndependencies: []") + + import os + old_cwd = os.getcwd() + os.chdir(tmp_path) + try: + lock = generate_lockfile(manifest, graph, mock_client) + finally: + os.chdir(old_cwd) + + assert len(lock.packages) == 2 + assert "official/summarize" in lock.packages + assert "official/text-utils" in lock.packages + assert lock.packages["official/summarize"].direct is True + assert lock.packages["official/text-utils"].direct is False + + +class TestVerifyLockfile: + """Tests for verify_lockfile function.""" + + def test_reports_missing_tools(self): + """Test that missing tools are reported.""" + from cmdforge.resolver import ToolNotFoundError, ToolSpec + + lock = Lockfile( + packages={ + "official/missing": LockedPackage( + name="official/missing", + version="1.0.0", + constraint="*", + integrity="sha256:abc", + source="registry", + direct=True + ) + } + ) + + mock_client = MagicMock() + + with patch('cmdforge.resolver.ToolResolver') as MockResolver: + mock_instance = MockResolver.return_value + mock_instance.manifest = None + mock_instance.resolve.side_effect = ToolNotFoundError( + ToolSpec.parse("official/missing"), + [] + ) + + errors = verify_lockfile(lock, mock_client) + + assert len(errors) == 1 + assert "not installed" in errors[0] + + +class TestLockfileRoundTrip: + """Tests for complete lock file round-trip.""" + + def test_full_round_trip(self, tmp_path): + """Test saving and loading a complete lock file.""" + lock = Lockfile( + lockfile_version=1, + metadata=LockfileMetadata( + generated_at="2026-01-26T10:30:00Z", + cmdforge_version="1.5.0", + manifest_hash="sha256:abc123def456", + platform="linux", + python_version="3.12.0" + ), + packages={ + "official/summarize": LockedPackage( + name="official/summarize", + version="1.2.3", + constraint="^1.0.0", + integrity="sha256:abc123", + source="registry", + direct=True, + required_by=[] + ), + "official/text-utils": LockedPackage( + name="official/text-utils", + version="1.0.0", + constraint="*", + integrity="sha256:def456", + source="registry", + direct=False, + required_by=["official/summarize"] + ), + "my-local-tool": LockedPackage( + name="my-local-tool", + version="0.1.0", + constraint="*", + integrity="sha256:ghi789", + source="local", + direct=True, + path=".cmdforge/my-local-tool" + ) + } + ) + + lock_path = tmp_path / "cmdforge.lock" + lock.save(lock_path) + + loaded = Lockfile.load(lock_path) + + # Verify metadata + assert loaded.lockfile_version == 1 + assert loaded.metadata.generated_at == "2026-01-26T10:30:00Z" + assert loaded.metadata.cmdforge_version == "1.5.0" + assert loaded.metadata.manifest_hash == "sha256:abc123def456" + assert loaded.metadata.platform == "linux" + assert loaded.metadata.python_version == "3.12.0" + + # Verify packages + assert len(loaded.packages) == 3 + + summarize = loaded.packages["official/summarize"] + assert summarize.version == "1.2.3" + assert summarize.constraint == "^1.0.0" + assert summarize.integrity == "sha256:abc123" + assert summarize.source == "registry" + assert summarize.direct is True + + text_utils = loaded.packages["official/text-utils"] + assert text_utils.direct is False + assert text_utils.required_by == ["official/summarize"] + + local_tool = loaded.packages["my-local-tool"] + assert local_tool.source == "local" + assert local_tool.path == ".cmdforge/my-local-tool" diff --git a/tests/test_version.py b/tests/test_version.py new file mode 100644 index 0000000..7b91ab5 --- /dev/null +++ b/tests/test_version.py @@ -0,0 +1,218 @@ +"""Tests for version.py - Semantic version parsing and constraint matching.""" + +import pytest + +from cmdforge.version import ( + Version, + matches_constraint, + find_best_match, + is_valid_version, + compare_versions +) + + +class TestVersion: + """Tests for Version dataclass.""" + + def test_parse_basic(self): + v = Version.parse("1.2.3") + assert v is not None + assert v.major == 1 + assert v.minor == 2 + assert v.patch == 3 + assert v.prerelease is None + assert v.build is None + + def test_parse_with_prerelease(self): + v = Version.parse("1.0.0-alpha.1") + assert v is not None + assert v.major == 1 + assert v.minor == 0 + assert v.patch == 0 + assert v.prerelease == "alpha.1" + + def test_parse_with_build(self): + v = Version.parse("1.0.0+build.123") + assert v is not None + assert v.build == "build.123" + + def test_parse_full(self): + v = Version.parse("2.1.0-beta.2+20240115") + assert v is not None + assert v.major == 2 + assert v.minor == 1 + assert v.patch == 0 + assert v.prerelease == "beta.2" + assert v.build == "20240115" + + def test_parse_invalid(self): + assert Version.parse("") is None + assert Version.parse("invalid") is None + assert Version.parse("1.2") is None + assert Version.parse("1.2.3.4") is None + + def test_str(self): + assert str(Version(1, 2, 3)) == "1.2.3" + assert str(Version(1, 0, 0, prerelease="alpha")) == "1.0.0-alpha" + assert str(Version(1, 0, 0, build="123")) == "1.0.0+123" + assert str(Version(1, 0, 0, prerelease="rc.1", build="build")) == "1.0.0-rc.1+build" + + def test_tuple(self): + v = Version(1, 2, 3) + assert v.tuple == (1, 2, 3) + + def test_equality(self): + v1 = Version(1, 2, 3) + v2 = Version(1, 2, 3) + v3 = Version(1, 2, 4) + assert v1 == v2 + assert v1 != v3 + + def test_comparison(self): + assert Version(1, 0, 0) < Version(2, 0, 0) + assert Version(1, 0, 0) < Version(1, 1, 0) + assert Version(1, 0, 0) < Version(1, 0, 1) + assert Version(2, 0, 0) > Version(1, 9, 9) + + def test_prerelease_comparison(self): + # Prerelease < release + assert Version(1, 0, 0, prerelease="alpha") < Version(1, 0, 0) + assert Version(1, 0, 0, prerelease="beta") < Version(1, 0, 0) + # Alphabetic ordering for prerelease + assert Version(1, 0, 0, prerelease="alpha") < Version(1, 0, 0, prerelease="beta") + + def test_hash(self): + v1 = Version(1, 2, 3) + v2 = Version(1, 2, 3) + assert hash(v1) == hash(v2) + # Can be used in sets/dicts + s = {v1, v2} + assert len(s) == 1 + + +class TestMatchesConstraint: + """Tests for matches_constraint function.""" + + def test_any_version(self): + assert matches_constraint("1.0.0", "*") + assert matches_constraint("2.3.4", "*") + assert matches_constraint("1.0.0", "latest") + assert matches_constraint("1.0.0", "") + + def test_exact_match(self): + assert matches_constraint("1.2.3", "1.2.3") + assert not matches_constraint("1.2.4", "1.2.3") + assert matches_constraint("1.0.0", "=1.0.0") + + def test_caret_major(self): + # ^1.2.3 allows >=1.2.3 and <2.0.0 + assert matches_constraint("1.2.3", "^1.2.3") + assert matches_constraint("1.2.4", "^1.2.3") + assert matches_constraint("1.9.9", "^1.2.3") + assert not matches_constraint("1.2.2", "^1.2.3") + assert not matches_constraint("2.0.0", "^1.2.3") + + def test_caret_zero_minor(self): + # ^0.2.3 allows >=0.2.3 and <0.3.0 + assert matches_constraint("0.2.3", "^0.2.3") + assert matches_constraint("0.2.9", "^0.2.3") + assert not matches_constraint("0.3.0", "^0.2.3") + assert not matches_constraint("0.2.2", "^0.2.3") + + def test_caret_zero_zero(self): + # ^0.0.3 allows ONLY 0.0.3 + assert matches_constraint("0.0.3", "^0.0.3") + assert not matches_constraint("0.0.4", "^0.0.3") + assert not matches_constraint("0.0.2", "^0.0.3") + + def test_tilde(self): + # ~1.2.3 allows >=1.2.3 and <1.3.0 + assert matches_constraint("1.2.3", "~1.2.3") + assert matches_constraint("1.2.9", "~1.2.3") + assert not matches_constraint("1.3.0", "~1.2.3") + assert not matches_constraint("1.2.2", "~1.2.3") + + def test_greater_than_or_equal(self): + assert matches_constraint("1.0.0", ">=1.0.0") + assert matches_constraint("2.0.0", ">=1.0.0") + assert not matches_constraint("0.9.9", ">=1.0.0") + + def test_less_than_or_equal(self): + assert matches_constraint("1.0.0", "<=1.0.0") + assert matches_constraint("0.9.0", "<=1.0.0") + assert not matches_constraint("1.0.1", "<=1.0.0") + + def test_greater_than(self): + assert matches_constraint("1.0.1", ">1.0.0") + assert not matches_constraint("1.0.0", ">1.0.0") + + def test_less_than(self): + assert matches_constraint("0.9.9", "<1.0.0") + assert not matches_constraint("1.0.0", "<1.0.0") + + def test_prerelease_excluded_by_default(self): + # Prerelease versions should NOT match ranges without prerelease specifier + assert not matches_constraint("1.0.0-alpha", "^1.0.0") + assert not matches_constraint("1.0.0-beta", ">=1.0.0") + + def test_invalid_version(self): + assert not matches_constraint("invalid", "^1.0.0") + + +class TestFindBestMatch: + """Tests for find_best_match function.""" + + def test_finds_highest(self): + versions = ["1.0.0", "1.1.0", "1.2.0", "2.0.0"] + assert find_best_match(versions, "*") == "2.0.0" + + def test_respects_constraint(self): + versions = ["1.0.0", "1.1.0", "1.2.0", "2.0.0"] + assert find_best_match(versions, "^1.0.0") == "1.2.0" + + def test_no_match(self): + versions = ["1.0.0", "1.1.0"] + assert find_best_match(versions, ">=2.0.0") is None + + def test_exact_match(self): + versions = ["1.0.0", "1.1.0", "1.2.0"] + assert find_best_match(versions, "1.1.0") == "1.1.0" + + def test_empty_versions(self): + assert find_best_match([], "*") is None + + +class TestIsValidVersion: + """Tests for is_valid_version function.""" + + def test_valid(self): + assert is_valid_version("1.0.0") + assert is_valid_version("0.1.0") + assert is_valid_version("1.0.0-alpha") + assert is_valid_version("1.0.0+build") + + def test_invalid(self): + assert not is_valid_version("") + assert not is_valid_version("1.0") + assert not is_valid_version("invalid") + + +class TestCompareVersions: + """Tests for compare_versions function.""" + + def test_less_than(self): + assert compare_versions("1.0.0", "2.0.0") == -1 + assert compare_versions("1.0.0", "1.1.0") == -1 + + def test_equal(self): + assert compare_versions("1.0.0", "1.0.0") == 0 + + def test_greater_than(self): + assert compare_versions("2.0.0", "1.0.0") == 1 + assert compare_versions("1.1.0", "1.0.0") == 1 + + def test_invalid_raises(self): + with pytest.raises(ValueError): + compare_versions("invalid", "1.0.0") + with pytest.raises(ValueError): + compare_versions("1.0.0", "invalid")