Add transitive dependency resolution and lock file support

Implements two major features for project dependency management:

Transitive Dependency Resolution:
- DependencyGraph/DependencyNode for structured dependency tracking
- DependencyGraphBuilder with recursive resolution and cycle detection
- Topological sorting for correct install order (dependencies first)
- Version conflict detection with provenance tracking
- Full semver support: ^, ~, >=, <=, >, <, = constraints
- New `cmdforge deps tree` command to visualize dependency tree
- Install flags: --dry-run, --force, --verbose

Lock File Support (cmdforge.lock):
- Lockfile/LockedPackage/LockfileMetadata dataclasses
- Records exact versions of all dependencies (direct + transitive)
- Integrity verification via SHA256 hashes
- `cmdforge lock` to generate/update lock file
- `cmdforge verify` to check installed tools match lock
- Install flags: --frozen, --strict-frozen, --ignore-lock

62 new tests for version, dependency_graph, and lockfile modules.

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
This commit is contained in:
rob 2026-01-26 22:05:09 -04:00
parent 29869ef59f
commit 3e7eaeb504
10 changed files with 2536 additions and 31 deletions

View File

@ -6,6 +6,49 @@ All notable changes to CmdForge will be documented in this file.
### Added
#### Transitive Dependency Resolution
- **Full transitive dependency resolution**: When installing dependencies, CmdForge now resolves and installs the complete dependency tree
- `DependencyGraph` and `DependencyNode` dataclasses for structured dependency tracking
- `DependencyGraphBuilder` class for recursive resolution with cycle detection
- Topological sorting ensures dependencies are installed before dependents (Kahn's algorithm)
- Version conflict detection with provenance tracking (which package requested each version)
- Registry config caching during resolution to avoid duplicate fetches
- **Semver version constraint matching**: Full semantic versioning support
- Caret constraints: `^1.2.3` (>=1.2.3 <2.0.0), `^0.2.3` (>=0.2.3 <0.3.0), `^0.0.3` (exact match)
- Tilde constraints: `~1.2.3` (>=1.2.3 <1.3.0)
- Comparison operators: `>=`, `<=`, `>`, `<`, `=`
- Wildcards: `*`, `latest`
- Prerelease version handling per semver spec
- **New CLI commands**:
- `cmdforge deps tree` - Show full dependency tree with transitive dependencies
- `cmdforge deps tree --verbose` - Show detailed resolution info
- **Enhanced install command**:
- `--dry-run` - Show what would be installed without installing
- `--force` - Install despite version conflicts (first constraint wins)
- `--verbose` - Show detailed resolution info
#### Lock File Support
- **Lock file for reproducible installs**: `cmdforge.lock` records exact versions of all dependencies
- `Lockfile`, `LockedPackage`, `LockfileMetadata` dataclasses
- Records direct and transitive dependencies with provenance (`required_by`)
- Integrity verification via SHA256 hashes (uses registry `config_hash`)
- Normalized YAML hashing via `hash_utils.compute_yaml_hash()` for consistent integrity checks
- Platform and Python version metadata for debugging
- **New CLI commands**:
- `cmdforge lock` - Generate or update `cmdforge.lock`
- `cmdforge lock --force` - Force regenerate even if up to date
- `cmdforge verify` - Verify installed tools match lock file
- **Lock-aware install**:
- `cmdforge install` - Uses lock file when present for reproducible installs
- `--frozen` - Fail if lock file is missing (for CI)
- `--strict-frozen` - Fail if lock file is stale (manifest changed)
- `--ignore-lock` - Ignore lock file and resolve fresh from manifest
#### Local Collections Support
- **Local collection definitions**: Create and manage collections locally before publishing
- Collections stored as YAML files in `~/.cmdforge/collections/`

View File

@ -441,13 +441,53 @@ cmdforge init
cmdforge add official/summarize
cmdforge add myuser/tool@^2.0.0 # With version constraint
# Install all dependencies
# Install all dependencies (including transitive)
cmdforge install
# Check dependency status
cmdforge deps
# Show full dependency tree
cmdforge deps tree
```
### Reproducible Installs with Lock Files
Lock files ensure identical tool versions across machines and over time:
```bash
# Generate lock file after installing
cmdforge lock
# Install exact versions from lock (reproducible)
cmdforge install # Uses lock file if present
# CI/CD: Fail if lock is missing or stale
cmdforge install --frozen # Requires lock file
cmdforge install --strict-frozen # Requires lock + manifest match
# Verify installed tools match lock
cmdforge verify
# Preview what would be installed
cmdforge install --dry-run
# Ignore lock and resolve fresh
cmdforge install --ignore-lock
```
### Version Constraints
CmdForge supports semantic versioning constraints:
| Constraint | Meaning | Example |
|------------|---------|---------|
| `*` | Any version | `official/summarize` |
| `^1.2.3` | Compatible (>=1.2.3 <2.0.0) | `tool@^1.2.3` |
| `~1.2.3` | Approximately (>=1.2.3 <1.3.0) | `tool@~1.2.3` |
| `>=1.0.0` | Greater or equal | `tool@>=1.0.0` |
| `1.0.0` | Exact version | `tool@1.0.0` |
This ensures all required tools are available before running your project's scripts.
## Collections

View File

@ -12,7 +12,7 @@ from .tool_commands import (
from .provider_commands import cmd_providers
from .registry_commands import cmd_registry
from .collections_commands import cmd_collections
from .project_commands import cmd_deps, cmd_install_deps, cmd_add, cmd_init
from .project_commands import cmd_deps, cmd_deps_tree, cmd_install_deps, cmd_add, cmd_init, cmd_lock, cmd_verify
from .config_commands import cmd_config
@ -284,14 +284,38 @@ def main():
# Project Commands
# -------------------------------------------------------------------------
# 'deps' command
# 'deps' command with subcommands
p_deps = subparsers.add_parser("deps", help="Show project dependencies")
deps_sub = p_deps.add_subparsers(dest="deps_cmd", help="Deps commands")
# deps tree
p_deps_tree = deps_sub.add_parser("tree", help="Show dependency tree")
p_deps_tree.add_argument("--verbose", "-v", action="store_true", help="Show detailed info")
p_deps_tree.set_defaults(func=cmd_deps_tree)
# Default for deps with no subcommand shows basic deps list
p_deps.set_defaults(func=cmd_deps)
# 'install' command (for dependencies)
p_install = subparsers.add_parser("install", help="Install dependencies from cmdforge.yaml")
p_install.add_argument("--dry-run", action="store_true", help="Show what would be installed without installing")
p_install.add_argument("--force", action="store_true", help="Install despite version conflicts or stale lock")
p_install.add_argument("--verbose", "-v", action="store_true", help="Show detailed resolution info")
p_install.add_argument("--ignore-lock", action="store_true", help="Ignore lock file and resolve fresh from manifest")
p_install.add_argument("--frozen", action="store_true", help="Fail if lock file is missing (for CI)")
p_install.add_argument("--strict-frozen", action="store_true", help="Fail if lock is missing or stale (stricter CI mode)")
p_install.set_defaults(func=cmd_install_deps)
# 'lock' command
p_lock = subparsers.add_parser("lock", help="Generate or update cmdforge.lock")
p_lock.add_argument("--force", "-f", action="store_true", help="Force regenerate even if up to date")
p_lock.add_argument("--verbose", "-v", action="store_true", help="Show detailed output")
p_lock.set_defaults(func=cmd_lock)
# 'verify' command
p_verify = subparsers.add_parser("verify", help="Verify installed tools match lock file")
p_verify.set_defaults(func=cmd_verify)
# 'add' command
p_add = subparsers.add_parser("add", help="Add a tool to project dependencies")
p_add.add_argument("tool", help="Tool to add (owner/name)")

View File

@ -14,6 +14,10 @@ from ..resolver import (
def cmd_deps(args):
"""Show project dependencies from cmdforge.yaml."""
# If 'tree' subcommand, delegate to that
if hasattr(args, 'deps_cmd') and args.deps_cmd == 'tree':
return cmd_deps_tree(args)
manifest = load_manifest()
if manifest is None:
@ -51,9 +55,99 @@ def cmd_deps(args):
return 0
def cmd_deps_tree(args):
"""Show dependency tree with transitive dependencies."""
from ..dependency_graph import DependencyGraphBuilder
from ..resolver import ToolResolver
manifest = load_manifest()
if manifest is None:
print("No cmdforge.yaml found in current project.")
print("Create one with: cmdforge init")
return 1
if not manifest.dependencies:
print("No dependencies in cmdforge.yaml")
return 0
verbose = getattr(args, 'verbose', False)
resolver = ToolResolver(auto_fetch=False)
builder = DependencyGraphBuilder(resolver=resolver, verbose=verbose)
print("Resolving dependencies...")
graph = builder.build(manifest.dependencies)
def print_tree(qualified_name: str, indent: int = 0, seen: set = None):
seen = seen or set()
prefix = " " * indent
node = graph.get_node(qualified_name)
if not node:
print(f"{prefix}- {qualified_name} (not found)")
return
# Status indicator
if node.source in ("local", "global"):
status = "installed"
elif node.source == "registry" and node.path:
status = "installed"
elif node.source == "registry":
status = "registry"
else:
status = "missing"
# Show resolved version if available, otherwise fall back to constraint
if node.resolved_version:
version = f"@{node.resolved_version}"
elif node.version_constraint:
version = f"@{node.version_constraint}"
else:
version = ""
circular = " (circular)" if qualified_name in seen else ""
print(f"{prefix}- {node.qualified_name}{version} [{status}]{circular}")
if qualified_name not in seen:
seen.add(qualified_name)
for child in node.children:
print_tree(child, indent + 1, seen.copy())
print()
print("Dependency tree:")
for root in graph.root_dependencies:
print_tree(root)
if graph.conflicts:
print()
print("Version conflicts:")
for tool, v1, from1, v2, from2 in graph.conflicts:
print(f" {tool}: {v1} (from {from1}) vs {v2} (from {from2})")
if graph.cycles:
print()
print("Circular dependencies:")
for cycle in graph.cycles:
print(f" {' -> '.join(cycle)}")
# Summary
print()
total = len(graph.nodes)
installed = sum(1 for n in graph.nodes.values() if n.source in ("local", "global") or (n.source == "registry" and n.path))
to_install = sum(1 for n in graph.nodes.values() if n.source == "registry" and n.path is None)
missing = sum(1 for n in graph.nodes.values() if not n.is_resolved)
print(f"Total: {total} tools ({installed} installed, {to_install} to install, {missing} missing)")
return 0
def cmd_install_deps(args):
"""Install dependencies from cmdforge.yaml."""
"""Install all dependencies from cmdforge.yaml with transitive resolution."""
from ..dependency_graph import DependencyGraphBuilder
from ..resolver import ToolResolver, install_from_registry, ToolNotFoundError
from ..registry_client import RegistryError
from ..lockfile import Lockfile
manifest = load_manifest()
@ -66,46 +160,195 @@ def cmd_install_deps(args):
print("No dependencies to install.")
return 0
print(f"Installing dependencies for {manifest.name}...")
verbose = getattr(args, 'verbose', False)
dry_run = getattr(args, 'dry_run', False)
force = getattr(args, 'force', False)
frozen = getattr(args, 'frozen', False)
strict_frozen = getattr(args, 'strict_frozen', False)
ignore_lock = getattr(args, 'ignore_lock', False)
# Check for lock file
lock = Lockfile.load()
if frozen or strict_frozen:
# Frozen mode: require lock file
if lock is None:
print("Error: --frozen requires cmdforge.lock")
print("Run 'cmdforge lock' first.")
return 1
if strict_frozen and lock.is_stale():
print("Error: cmdforge.lock is stale (manifest changed)")
print("Run 'cmdforge lock' to update, or drop --strict-frozen")
return 1
if lock and not ignore_lock:
return _install_from_lock(lock, args)
# Fall through to manifest-based installation
# Build dependency graph
resolver = ToolResolver(auto_fetch=False)
builder = DependencyGraphBuilder(resolver=resolver, verbose=verbose)
print("Resolving dependencies...")
graph = builder.build(manifest.dependencies)
# Check for issues
if graph.has_cycles():
print()
print("Error: Circular dependencies detected:")
for cycle in graph.cycles:
print(f" {' -> '.join(cycle)}")
return 1
if graph.has_conflicts():
print()
print("Warning: Version conflicts detected:")
for tool, v1, from1, v2, from2 in graph.conflicts:
print(f" {tool}: {v1} (from {from1}) vs {v2} (from {from2})")
if not force:
print()
print("Use --force to install anyway (first constraint wins)")
return 1
unresolved = graph.get_unresolved()
if unresolved:
print()
print("Error: Missing dependencies:")
for node in unresolved:
constraint = f"@{node.version_constraint}" if node.version_constraint else ""
print(f" {node.qualified_name}{constraint}")
return 1
# Determine what needs installation (in correct order)
to_install = [
node for node in graph.get_install_order()
if node.source == "registry" and node.path is None
]
if not to_install:
print("All dependencies already installed.")
return 0
if dry_run:
print()
print(f"Would install {len(to_install)} tool(s):")
for node in to_install:
version = node.resolved_version or "latest"
print(f" {node.qualified_name}@{version}")
return 0
print()
print(f"Installing {len(to_install)} tool(s) in dependency order:")
for node in to_install:
version = node.resolved_version or "latest"
print(f" {node.qualified_name}@{version}")
# Install in topological order (dependencies first)
installed = 0
failed = []
installed = []
for i, dep in enumerate(manifest.dependencies, 1):
print(f"[{i}/{len(manifest.dependencies)}] {dep.name}@{dep.version}")
print()
for node in to_install:
try:
print(f"Installing {node.qualified_name}...", end=" ", flush=True)
spec = node.qualified_name
install_version = node.resolved_version or node.version_constraint
install_from_registry(spec, version=install_version)
print(f"v{node.resolved_version or 'latest'}")
installed += 1
except RegistryError as e:
print(f"FAILED: {e.message}")
failed.append((node.qualified_name, e.message))
except Exception as e:
print(f"FAILED: {e}")
failed.append((node.qualified_name, str(e)))
# Check if already installed
existing = find_tool(dep.name)
if existing:
print(f" Already installed: {existing.full_name}")
installed.append(dep.name)
# Summary
print()
print(f"Installed: {installed}/{len(to_install)}")
if failed:
print("Failed:")
for name, err in failed:
print(f" {name}: {err}")
return 1
return 0
def _install_from_lock(lock: "Lockfile", args) -> int:
"""Install exact versions from lock file."""
from ..resolver import ToolResolver, ToolNotFoundError, install_from_registry
from ..registry_client import RegistryError
verbose = getattr(args, 'verbose', False)
dry_run = getattr(args, 'dry_run', False)
force = getattr(args, 'force', False)
print("Installing from cmdforge.lock (reproducible)...")
if lock.is_stale() and not force:
print("Warning: Lock file may be outdated (manifest changed)")
print("Run 'cmdforge lock' to update, or use --force to continue")
return 1
# Check what needs installing
resolver = ToolResolver(auto_fetch=False)
resolver.manifest = None
to_install = []
for name, locked in lock.packages.items():
try:
resolved = resolver.resolve(name)
# Check if version matches
resolved_version = resolved.version or ""
locked_version = locked.version or ""
if resolved_version != locked_version:
to_install.append(locked)
except ToolNotFoundError:
to_install.append(locked)
except Exception:
to_install.append(locked)
if not to_install:
print("All locked packages already installed.")
return 0
print()
print(f"Installing {len(to_install)} package(s):")
for pkg in to_install:
print(f" {pkg.name}@{pkg.version}")
if dry_run:
return 0
# Install exact versions
installed = 0
failed = []
print()
for pkg in to_install:
if pkg.source == "local":
print(f"{pkg.name}: skipping (local)")
continue
try:
print(f" Downloading...")
resolved = install_from_registry(dep.name, dep.version)
print(f" Installed: {resolved.full_name}@{resolved.version}")
installed.append(dep.name)
print(f"Installing {pkg.name}@{pkg.version}...", end=" ", flush=True)
# Use correct API: install_from_registry(spec, version)
install_from_registry(pkg.name, version=pkg.version)
print("OK")
installed += 1
except RegistryError as e:
if e.code == "TOOL_NOT_FOUND":
print(f" Not found in registry")
elif e.code == "VERSION_NOT_FOUND" or e.code == "CONSTRAINT_UNSATISFIABLE":
print(f" Version {dep.version} not available")
elif e.code == "CONNECTION_ERROR":
print(f" Connection failed (check network)")
else:
print(f" Failed: {e.message}")
failed.append(dep.name)
print(f"FAILED: {e.message}")
failed.append((pkg.name, e.message))
except Exception as e:
print(f" Failed: {e}")
failed.append(dep.name)
print(f"FAILED: {e}")
failed.append((pkg.name, str(e)))
print()
print(f"Installed {len(installed)} tools")
print(f"Installed: {installed}/{len(to_install)}")
if failed:
print(f"Failed: {', '.join(failed)}")
print("Failed:")
for name, err in failed:
print(f" {name}: {err}")
return 1
return 0
@ -200,3 +443,105 @@ def cmd_init(args):
print("Install them with: cmdforge install")
return 0
def cmd_lock(args):
"""Generate or update cmdforge.lock file."""
from ..lockfile import Lockfile, generate_lockfile
from ..dependency_graph import DependencyGraphBuilder
from ..resolver import ToolResolver
from ..registry_client import get_client
manifest = load_manifest()
if manifest is None:
print("No cmdforge.yaml found. Run 'cmdforge init' first.")
return 1
verbose = getattr(args, 'verbose', False)
force = getattr(args, 'force', False)
# Check for existing lock
existing_lock = Lockfile.load()
if existing_lock and not force:
if not existing_lock.is_stale():
print("Lock file is up to date. Use --force to regenerate.")
return 0
print("Lock file is stale (manifest changed), regenerating...")
# Resolve all dependencies
print("Resolving dependencies...")
resolver = ToolResolver(auto_fetch=True) # Allow fetching for resolution
builder = DependencyGraphBuilder(resolver=resolver, verbose=verbose)
try:
graph = builder.build(manifest.dependencies)
except Exception as e:
print(f"Error resolving dependencies: {e}")
return 1
if graph.has_cycles():
print("Error: Circular dependencies detected")
for cycle in graph.cycles:
print(f" {' -> '.join(cycle)}")
return 1
unresolved = graph.get_unresolved()
if unresolved and not force:
print("Error: Some dependencies could not be resolved:")
for node in unresolved:
constraint = f"@{node.version_constraint}" if node.version_constraint else ""
print(f" {node.qualified_name}{constraint}")
print("Fix missing deps or re-run with --force to lock what can be resolved.")
return 1
if unresolved and force:
print("Warning: Some dependencies could not be resolved and will be skipped:")
for node in unresolved:
constraint = f"@{node.version_constraint}" if node.version_constraint else ""
print(f" {node.qualified_name}{constraint}")
# Generate lock file
print("Generating lock file...")
client = get_client()
lock = generate_lockfile(manifest, graph, client)
lock.save()
# Summary
direct = sum(1 for p in lock.packages.values() if p.direct)
transitive = len(lock.packages) - direct
print()
print(f"Locked {len(lock.packages)} package(s):")
print(f" Direct: {direct}")
print(f" Transitive: {transitive}")
print()
print("Wrote cmdforge.lock")
return 0
def cmd_verify(args):
"""Verify installed tools match lock file."""
from ..lockfile import Lockfile, verify_lockfile
from ..registry_client import get_client
lock = Lockfile.load()
if lock is None:
print("No cmdforge.lock found. Run 'cmdforge lock' first.")
return 1
if lock.is_stale():
print("Warning: Lock file may be outdated (manifest changed)")
print()
print("Verifying lock file...")
client = get_client()
errors = verify_lockfile(lock, client)
if not errors:
print(f"All {len(lock.packages)} packages verified OK")
return 0
print()
print(f"{len(errors)} verification error(s):")
for error in errors:
print(f" {error}")
return 1

View File

@ -0,0 +1,547 @@
"""Dependency graph for transitive dependency resolution.
Builds a complete dependency graph with:
- Transitive closure (A -> B -> C)
- Cycle detection
- Version conflict detection with provenance
- Topological sorting for install order
"""
from dataclasses import dataclass, field
from pathlib import Path
from typing import Dict, List, Set, Optional, Tuple
import yaml
from .tool import TOOLS_DIR
from .manifest import Dependency
from .version import matches_constraint
# Default owner for unqualified tool references
DEFAULT_OWNER = "official"
@dataclass
class DependencyNode:
"""A node in the dependency graph."""
owner: str # e.g., "official" (resolved), "" if unresolved
name: str # e.g., "summarize"
version_constraint: Optional[str] # e.g., "^1.0.0", "*", None
resolved_version: Optional[str] # e.g., "1.2.3" (after resolution)
source: Optional[str] # "local", "global", "registry", None (unresolved)
path: Optional[Path] # Path to installed tool (if installed)
children: List[str] = field(default_factory=list) # Qualified names of dependencies
requested_by: str = "root" # Parent that introduced this dep
@property
def qualified_name(self) -> str:
"""Consistent key for graph lookups."""
return f"{self.owner}/{self.name}" if self.owner else self.name
@property
def is_resolved(self) -> bool:
"""Check if this node has been resolved to a source."""
return self.source is not None
@dataclass
class DependencyGraph:
"""Complete dependency graph with transitive closure."""
root_dependencies: List[str] = field(default_factory=list) # Direct deps (qualified names)
nodes: Dict[str, DependencyNode] = field(default_factory=dict) # qualified_name -> node
resolution_order: List[str] = field(default_factory=list) # Topological sort (install order)
conflicts: List[Tuple[str, str, str, str, str]] = field(default_factory=list) # (tool, c1, from1, c2, from2)
cycles: List[List[str]] = field(default_factory=list) # Detected cycles
def add_node(self, node: DependencyNode, requested_by: str = "root") -> None:
"""Add or update a node. Detects version conflicts with provenance."""
key = node.qualified_name
node.requested_by = requested_by
if key in self.nodes:
existing = self.nodes[key]
# Check for version conflict
if (existing.version_constraint and node.version_constraint and
existing.version_constraint != node.version_constraint):
self.conflicts.append((
key,
existing.version_constraint,
existing.requested_by,
node.version_constraint,
requested_by
))
self.nodes[key] = node
def get_node(self, qualified_name: str) -> Optional[DependencyNode]:
"""Get a node by qualified name."""
return self.nodes.get(qualified_name)
def get_install_order(self) -> List[DependencyNode]:
"""Return nodes in installation order (dependencies first)."""
return [self.nodes[name] for name in self.resolution_order if name in self.nodes]
def has_conflicts(self) -> bool:
"""Check if any version conflicts were detected."""
return len(self.conflicts) > 0
def has_cycles(self) -> bool:
"""Check if any circular dependencies were detected."""
return len(self.cycles) > 0
def get_unresolved(self) -> List[DependencyNode]:
"""Get all unresolved nodes."""
return [node for node in self.nodes.values() if not node.is_resolved]
def get_registry_nodes(self) -> List[DependencyNode]:
"""Get nodes that need to be installed from registry."""
return [
node for node in self.nodes.values()
if node.source == "registry" and node.path is None
]
class DependencyGraphBuilder:
"""Builds a complete dependency graph with transitive resolution."""
def __init__(
self,
resolver: Optional["ToolResolver"] = None,
client: Optional["RegistryClient"] = None,
max_depth: int = 50,
verbose: bool = False
):
"""
Initialize the graph builder.
Args:
resolver: ToolResolver for local/global lookups (created if not provided)
client: RegistryClient for registry lookups (created if not provided)
max_depth: Maximum dependency depth (prevents infinite recursion)
verbose: Print debug info
"""
self.resolver = resolver
self.client = client
self.max_depth = max_depth
self.verbose = verbose
self._visited: Set[str] = set() # qualified names
self._stack: List[str] = [] # For cycle detection
self._registry_cache: Dict[str, "DownloadResult"] = {} # cache_key -> DownloadResult
self._resolved_count = 0
self.include_implicit = True
def _get_resolver(self) -> "ToolResolver":
"""Get or create the tool resolver."""
if self.resolver is None:
from .resolver import ToolResolver
self.resolver = ToolResolver(auto_fetch=False)
return self.resolver
def _get_client(self) -> "RegistryClient":
"""Get or create the registry client."""
if self.client is None:
from .registry_client import get_client
self.client = get_client()
return self.client
def build(
self,
dependencies: List[Dependency],
include_implicit: bool = True
) -> DependencyGraph:
"""
Build complete dependency graph.
Args:
dependencies: Direct dependencies to resolve
include_implicit: Whether to include ToolStep dependencies
Returns:
DependencyGraph with all transitive dependencies
"""
graph = DependencyGraph()
self.include_implicit = include_implicit
# Reset state for new build
self._visited.clear()
self._stack.clear()
self._resolved_count = 0
# Track original names to resolve qualified names after
original_names = []
for dep in dependencies:
original_names.append(dep.name)
self._resolve_recursive(dep.name, dep.version, graph, depth=0)
# Update root_dependencies with qualified names from resolved nodes
for orig_name in original_names:
# Find the resolved node - check both qualified and unqualified
node = graph.get_node(orig_name)
if node:
graph.root_dependencies.append(node.qualified_name)
else:
# Try to find by matching tool name across all nodes
found = False
for qname, n in graph.nodes.items():
if n.name == orig_name or qname == orig_name:
graph.root_dependencies.append(n.qualified_name)
found = True
break
if not found:
# Unresolved - keep original name for error reporting
graph.root_dependencies.append(orig_name)
graph.resolution_order = self._topological_sort(graph)
return graph
def _qualify_name(self, name: str, resolved_owner: Optional[str] = None) -> str:
"""Return qualified name after resolution (local-first)."""
if "/" in name:
return name
owner = resolved_owner or DEFAULT_OWNER
return f"{owner}/{name}"
def _parse_qualified(self, qualified: str) -> Tuple[Optional[str], str]:
"""Split qualified name into owner and name (owner may be None)."""
if "/" in qualified:
parts = qualified.split("/", 1)
return (parts[0], parts[1])
return (None, qualified)
def _resolve_recursive(
self,
qualified_name: str,
version_constraint: Optional[str],
graph: DependencyGraph,
depth: int
) -> Optional[DependencyNode]:
"""Recursively resolve a dependency and its children."""
# Cycle detection
if qualified_name in self._stack:
cycle = self._stack[self._stack.index(qualified_name):] + [qualified_name]
graph.cycles.append(cycle)
if self.verbose:
print(f" Cycle detected: {' -> '.join(cycle)}")
return None
# Already resolved - check for version conflicts
if qualified_name in self._visited:
existing = graph.get_node(qualified_name)
if existing and version_constraint and existing.version_constraint != version_constraint:
graph.conflicts.append((
qualified_name,
existing.version_constraint or "*",
existing.requested_by or "unknown",
version_constraint or "*",
self._stack[-1] if self._stack else "unknown"
))
return existing
# Depth limit
if depth > self.max_depth:
raise RecursionError(f"Dependency depth exceeded {self.max_depth}")
self._visited.add(qualified_name)
self._stack.append(qualified_name)
try:
# Resolve the tool
node = self._resolve_single(qualified_name, version_constraint)
if node is None:
return None
# Also track the resolved qualified name to prevent duplicates
if node.qualified_name != qualified_name:
self._visited.add(node.qualified_name)
graph.add_node(node, requested_by=self._stack[-2] if len(self._stack) > 1 else "root")
self._resolved_count += 1
if self.verbose:
print(f" Resolved {self._resolved_count}: {node.qualified_name}")
# Get child dependencies (only if this node is resolved)
if node.is_resolved:
child_deps = self._get_tool_dependencies(node)
else:
child_deps = []
# Recurse into children and capture resolved keys
node.children = []
for child in child_deps:
child_node = self._resolve_recursive(child.name, child.version, graph, depth + 1)
if child_node and child_node.is_resolved:
node.children.append(child_node.qualified_name)
elif child_node:
# Child exists but is unresolved - still track for error reporting
node.children.append(child_node.qualified_name)
if self.verbose:
print(f" Warning: Unresolved dependency {child.name}")
return node
finally:
self._stack.pop()
def _resolve_single(
self,
qualified_name: str,
version_constraint: Optional[str]
) -> Optional[DependencyNode]:
"""Resolve a single dependency to a node."""
owner, name = self._parse_qualified(qualified_name)
owner_hint = None
# Try local/global first via resolver
try:
from .resolver import ToolNotFoundError
resolver = self._get_resolver()
resolved = resolver.resolve(qualified_name)
owner_hint = resolved.owner
if version_constraint and version_constraint not in ("", "*", "latest"):
resolved_version = resolved.version or ""
if not resolved_version or not matches_constraint(resolved_version, version_constraint):
if self.verbose:
print(
f" Local/global {qualified_name}@{resolved_version or 'unknown'} "
f"does not satisfy {version_constraint}, resolving from registry"
)
else:
return DependencyNode(
owner=resolved.owner or owner or "",
name=resolved.tool.name,
version_constraint=version_constraint,
resolved_version=resolved.version,
source=resolved.source,
path=resolved.path,
children=[]
)
else:
return DependencyNode(
owner=resolved.owner or owner or "",
name=resolved.tool.name,
version_constraint=version_constraint,
resolved_version=resolved.version,
source=resolved.source,
path=resolved.path,
children=[]
)
except ToolNotFoundError:
pass
except Exception as e:
if self.verbose:
print(f" Warning: Resolver error for {qualified_name}: {e}")
# Try registry
try:
reg_owner = owner or owner_hint or DEFAULT_OWNER
result = self._download_registry_config(
reg_owner,
name,
version_constraint
)
return DependencyNode(
owner=reg_owner,
name=name,
version_constraint=version_constraint,
resolved_version=result.resolved_version or "",
source="registry",
path=None, # Not installed yet
children=[]
)
except Exception as e:
if self.verbose:
print(f" Could not resolve {qualified_name}: {e}")
# Record unresolved node so graph reflects missing dependency
return DependencyNode(
owner=owner or owner_hint or "",
name=name,
version_constraint=version_constraint,
resolved_version=None,
source=None,
path=None,
children=[]
)
def _get_tool_dependencies(self, node: DependencyNode) -> List[Dependency]:
"""Get dependencies for a resolved tool node."""
deps = []
if node.source in ("local", "global") and node.path:
# Read from local config file
deps.extend(self._read_deps_from_config(node.path))
elif node.source == "registry":
# Fetch config from registry without installing
deps.extend(self._read_deps_from_registry(node.owner, node.name, node.resolved_version or node.version_constraint))
return deps
def _read_deps_from_config(self, tool_path: Path) -> List[Dependency]:
"""Read dependencies from a local tool config."""
deps = []
config_path = tool_path / "config.yaml"
if not config_path.exists():
return deps
try:
data = yaml.safe_load(config_path.read_text()) or {}
# Explicit dependencies
for dep in data.get("dependencies", []):
deps.append(Dependency.from_dict(dep))
# Implicit dependencies from ToolSteps
if self.include_implicit:
for step in data.get("steps", []):
if step.get("type") == "tool":
tool_ref = step.get("tool")
if tool_ref:
deps.append(Dependency.from_dict(tool_ref))
except Exception as e:
if self.verbose:
print(f" Warning: Could not read deps from {config_path}: {e}")
return deps
def _read_deps_from_registry(self, owner: str, name: str, version: Optional[str]) -> List[Dependency]:
"""Read dependencies from registry tool config."""
deps = []
try:
# Download config without installing (cache by qualified name + version/constraint)
result = self._download_registry_config(owner, name, version)
config_yaml = result.config_yaml
data = yaml.safe_load(config_yaml) or {}
# Explicit dependencies
for dep in data.get("dependencies", []):
deps.append(Dependency.from_dict(dep))
# Implicit dependencies from ToolSteps
if self.include_implicit:
for step in data.get("steps", []):
if step.get("type") == "tool":
tool_ref = step.get("tool")
if tool_ref:
deps.append(Dependency.from_dict(tool_ref))
except Exception as e:
if self.verbose:
print(f" Warning: Could not read deps from registry {owner}/{name}: {e}")
return deps
def _download_registry_config(
self,
owner: str,
name: str,
version: Optional[str]
) -> "DownloadResult":
"""Download registry config with caching (version or constraint aware)."""
cache_key = f"{owner}/{name}@{version or ''}"
if cache_key in self._registry_cache:
return self._registry_cache[cache_key]
client = self._get_client()
result = client.download_tool(owner, name, version=version, install=False)
self._registry_cache[cache_key] = result
return result
def _topological_sort(self, graph: DependencyGraph) -> List[str]:
"""Return nodes in installation order (dependencies first)."""
# Kahn's algorithm
in_degree = {name: 0 for name in graph.nodes}
# Build in-degree counts
for node in graph.nodes.values():
for child in node.children:
if child in in_degree:
in_degree[child] += 1
# Start with nodes that have no dependencies
queue = [name for name, deg in in_degree.items() if deg == 0]
result = []
while queue:
name = queue.pop(0)
result.append(name)
node = graph.nodes.get(name)
if node:
for child in node.children:
if child in in_degree:
in_degree[child] -= 1
if in_degree[child] == 0:
queue.append(child)
# In our graph, A.children contains B means "A depends on B".
# Kahn's algorithm processes nodes with in_degree=0 first.
# Nodes with in_degree=0 are roots (no one depends on them).
# So the raw output is: dependents first, then dependencies.
# We reverse to get: dependencies first, then dependents (correct install order).
return list(reversed(result))
@dataclass
class ResolutionResult:
"""Result of dependency resolution."""
graph: DependencyGraph
to_install: List[DependencyNode] # Tools needing installation
already_installed: List[DependencyNode] # Already present
failed: List[Tuple[str, str]] # (qualified_name, error_message)
warnings: List[str] # Non-fatal issues
@property
def success(self) -> bool:
"""Check if resolution was successful."""
return not self.graph.has_cycles() and not self.failed
def resolve_dependencies(
dependencies: List[Dependency],
verbose: bool = False,
include_implicit: bool = True
) -> ResolutionResult:
"""
Convenience function to resolve dependencies and categorize results.
Args:
dependencies: Direct dependencies to resolve
verbose: Print debug info
include_implicit: Include ToolStep dependencies
Returns:
ResolutionResult with categorized nodes
"""
builder = DependencyGraphBuilder(verbose=verbose)
graph = builder.build(dependencies, include_implicit=include_implicit)
to_install = []
already_installed = []
failed = []
warnings = []
for node in graph.get_install_order():
if not node.is_resolved:
failed.append((node.qualified_name, "Tool not found"))
elif node.source == "registry" and node.path is None:
to_install.append(node)
else:
already_installed.append(node)
if graph.has_conflicts():
for tool, v1, from1, v2, from2 in graph.conflicts:
warnings.append(f"Version conflict for {tool}: {v1} (from {from1}) vs {v2} (from {from2})")
if graph.has_cycles():
for cycle in graph.cycles:
warnings.append(f"Circular dependency: {' -> '.join(cycle)}")
return ResolutionResult(
graph=graph,
to_install=to_install,
already_installed=already_installed,
failed=failed,
warnings=warnings
)

371
src/cmdforge/lockfile.py Normal file
View File

@ -0,0 +1,371 @@
"""Lock file for reproducible dependency installation.
Provides cmdforge.lock file support for:
- Recording exact versions of all dependencies (direct + transitive)
- Reproducible installs across machines
- Integrity verification via content hashes
"""
import hashlib
from dataclasses import dataclass, field
from datetime import datetime
from pathlib import Path
from typing import Dict, List, Optional
import yaml
@dataclass
class LockedPackage:
"""A locked dependency with exact version and integrity."""
name: str # Qualified: owner/name
version: str # Exact resolved version
constraint: str # Original constraint
integrity: str # sha256 hash (from registry config_hash)
source: str # "registry", "local", "global"
direct: bool # True if in manifest
required_by: List[str] = field(default_factory=list) # Parent packages
path: Optional[str] = None # Relative path for local tools
@property
def owner(self) -> str:
"""Extract owner from qualified name."""
return self.name.split("/")[0] if "/" in self.name else ""
@property
def tool_name(self) -> str:
"""Extract tool name from qualified name."""
return self.name.split("/")[1] if "/" in self.name else self.name
@dataclass
class LockfileMetadata:
"""Metadata about when/how the lock file was generated."""
generated_at: str
cmdforge_version: str
manifest_hash: str
platform: str
python_version: str
@dataclass
class Lockfile:
"""Complete lock file representation."""
lockfile_version: int = 1
metadata: Optional[LockfileMetadata] = None
packages: Dict[str, LockedPackage] = field(default_factory=dict)
@classmethod
def load(cls, path: Path = None) -> Optional["Lockfile"]:
"""Load lock file from disk.
Args:
path: Path to lock file (default: ./cmdforge.lock)
Returns:
Lockfile object, or None if not found
"""
path = path or Path("cmdforge.lock")
if not path.exists():
return None
try:
with open(path) as f:
data = yaml.safe_load(f) or {}
return cls._from_dict(data)
except Exception as e:
print(f"Warning: Could not load lock file: {e}")
return None
def save(self, path: Path = None) -> None:
"""Save lock file to disk.
Args:
path: Path to save to (default: ./cmdforge.lock)
"""
path = path or Path("cmdforge.lock")
with open(path, "w") as f:
# Add header comment
f.write("# cmdforge.lock\n")
f.write("# Auto-generated by 'cmdforge lock' - do not edit manually\n")
f.write("# To update: cmdforge lock --force\n\n")
yaml.safe_dump(
self._to_dict(),
f,
sort_keys=False,
default_flow_style=False
)
def get_package(self, name: str) -> Optional[LockedPackage]:
"""Get a locked package by qualified name."""
return self.packages.get(name)
def is_stale(self, manifest_path: Path = None) -> bool:
"""Check if lock file is outdated relative to manifest.
Note: This only checks if the manifest hash changed.
It does NOT detect if local tools were edited after locking.
Use verify_lockfile() for full integrity checking.
Args:
manifest_path: Path to manifest (default: ./cmdforge.yaml)
Returns:
True if lock file is stale
"""
manifest_path = manifest_path or Path("cmdforge.yaml")
if not manifest_path.exists():
return True
if not self.metadata:
return True
current_hash = compute_file_hash(manifest_path)
return current_hash != self.metadata.manifest_hash
@classmethod
def _from_dict(cls, data: dict) -> "Lockfile":
"""Parse lock file from dict."""
metadata = None
if "metadata" in data:
m = data["metadata"]
metadata = LockfileMetadata(
generated_at=m.get("generated_at", ""),
cmdforge_version=m.get("cmdforge_version", ""),
manifest_hash=m.get("manifest_hash", ""),
platform=m.get("platform", ""),
python_version=m.get("python_version", "")
)
packages = {}
for name, pkg_data in data.get("packages", {}).items():
packages[name] = LockedPackage(
name=name,
version=pkg_data.get("version", ""),
constraint=pkg_data.get("constraint", "*"),
integrity=pkg_data.get("integrity", ""),
source=pkg_data.get("source", "registry"),
direct=pkg_data.get("direct", False),
required_by=pkg_data.get("required_by", []),
path=pkg_data.get("path")
)
return cls(
lockfile_version=data.get("lockfile_version", 1),
metadata=metadata,
packages=packages
)
def _to_dict(self) -> dict:
"""Convert to dict for YAML serialization."""
d = {"lockfile_version": self.lockfile_version}
if self.metadata:
d["metadata"] = {
"generated_at": self.metadata.generated_at,
"cmdforge_version": self.metadata.cmdforge_version,
"manifest_hash": self.metadata.manifest_hash,
"platform": self.metadata.platform,
"python_version": self.metadata.python_version
}
d["packages"] = {}
for name, pkg in self.packages.items():
pkg_dict = {
"version": pkg.version,
"constraint": pkg.constraint,
"integrity": pkg.integrity,
"source": pkg.source,
"direct": pkg.direct
}
if pkg.required_by:
pkg_dict["required_by"] = pkg.required_by
if pkg.path:
pkg_dict["path"] = pkg.path
d["packages"][name] = pkg_dict
return d
def compute_file_hash(path: Path) -> str:
"""Compute SHA256 hash of raw file bytes.
Used ONLY for manifest hash (to detect any change, including formatting).
For tool config integrity, use hash_utils.compute_yaml_hash() instead.
Args:
path: Path to file
Returns:
Hash string in format "sha256:<64-char-hex>"
"""
sha256 = hashlib.sha256()
with open(path, "rb") as f:
for chunk in iter(lambda: f.read(8192), b""):
sha256.update(chunk)
return f"sha256:{sha256.hexdigest()}"
def generate_lockfile(
manifest: "Manifest",
graph: "DependencyGraph",
client: "RegistryClient"
) -> Lockfile:
"""
Generate a lock file from resolved dependencies.
Args:
manifest: Project manifest with constraints
graph: Resolved dependency graph (from DependencyGraphBuilder)
client: Registry client for fetching config_hash
Returns:
Complete Lockfile ready to save
"""
import platform
import sys
from . import __version__
# Compute manifest hash
manifest_path = Path("cmdforge.yaml")
manifest_hash = compute_file_hash(manifest_path) if manifest_path.exists() else ""
metadata = LockfileMetadata(
generated_at=datetime.now().astimezone().isoformat(),
cmdforge_version=__version__,
manifest_hash=manifest_hash,
platform=platform.system().lower(),
python_version=f"{sys.version_info.major}.{sys.version_info.minor}.{sys.version_info.micro}"
)
lock = Lockfile(metadata=metadata)
# Get direct dependency names for marking
direct_names = {d.name for d in manifest.dependencies}
# Process all nodes in the graph
for qualified_name, node in graph.nodes.items():
if not node.is_resolved:
continue
integrity = _get_integrity_hash(node, client)
pkg = LockedPackage(
name=qualified_name,
version=node.resolved_version or "",
constraint=node.version_constraint or "*",
integrity=integrity,
source=node.source or "registry",
direct=qualified_name in direct_names or _is_direct(qualified_name, direct_names),
required_by=_find_parents(qualified_name, graph),
path=str(node.path) if node.source == "local" and node.path else None
)
lock.packages[qualified_name] = pkg
return lock
def _get_integrity_hash(node: "DependencyNode", client: "RegistryClient") -> str:
"""Get integrity hash for a dependency node."""
if node.source in ("local", "global") and node.path:
# For local/global tools, hash normalized config
config_path = node.path / "config.yaml"
if config_path.exists():
from .hash_utils import compute_yaml_hash
return compute_yaml_hash(config_path.read_text())
return ""
elif node.source == "registry":
# For registry tools, get the config_hash from registry
# This is the hash BEFORE any local modifications
try:
result = client.download_tool(
node.owner,
node.name,
version=node.resolved_version,
install=False # Don't count as install, just get config
)
# Use the config_hash from registry response
if hasattr(result, 'config_hash') and result.config_hash:
# config_hash already includes "sha256:" prefix
return result.config_hash
# Fallback: hash the config_yaml content using normalized hashing
if hasattr(result, 'config_yaml') and result.config_yaml:
from .hash_utils import compute_yaml_hash
return compute_yaml_hash(result.config_yaml)
except Exception:
return ""
return ""
def _is_direct(qualified_name: str, direct_names: set) -> bool:
"""Check if a qualified name matches any direct dependency."""
# Handle case where manifest has unqualified names
tool_name = qualified_name.split("/")[1] if "/" in qualified_name else qualified_name
return qualified_name in direct_names or tool_name in direct_names
def _find_parents(qualified_name: str, graph: "DependencyGraph") -> List[str]:
"""Find packages that depend on this one."""
parents = []
for name, node in graph.nodes.items():
if qualified_name in node.children:
parents.append(name)
return parents
def verify_lockfile(
lock: Lockfile,
client: "RegistryClient"
) -> List[str]:
"""
Verify installed tools match lock file.
Args:
lock: Lock file to verify against
client: Registry client for hash verification
Returns:
List of verification errors (empty if all OK)
"""
from .resolver import ToolResolver, ToolNotFoundError
errors = []
# Create resolver that doesn't use manifest (bypass version overrides)
resolver = ToolResolver(auto_fetch=False)
resolver.manifest = None
for name, locked in lock.packages.items():
# Check tool exists
try:
resolved = resolver.resolve(name)
except ToolNotFoundError:
errors.append(f"{name}: not installed")
continue
except Exception:
errors.append(f"{name}: not installed")
continue
# Check version matches
resolved_version = resolved.version or ""
locked_version = locked.version or ""
if resolved_version and locked_version and resolved_version != locked_version:
errors.append(
f"{name}: version mismatch "
f"(installed: {resolved_version}, locked: {locked_version})"
)
# Check integrity against installed tool config
if locked.integrity and resolved.path:
try:
config_path = resolved.path / "config.yaml"
if config_path.exists():
from .hash_utils import compute_yaml_hash
current_hash = compute_yaml_hash(config_path.read_text())
if current_hash != locked.integrity:
errors.append(
f"{name}: integrity mismatch "
f"(installed tool differs from lock)"
)
except Exception as e:
errors.append(f"{name}: could not verify integrity ({e})")
return errors

250
src/cmdforge/version.py Normal file
View File

@ -0,0 +1,250 @@
"""Semantic version constraint matching.
Provides version parsing and constraint matching for dependency resolution.
Follows semver 2.0.0 specification with support for common constraint operators.
"""
import re
from dataclasses import dataclass
from typing import Optional, Tuple, List
VERSION_PATTERN = re.compile(
r'^(?P<major>\d+)\.(?P<minor>\d+)\.(?P<patch>\d+)'
r'(?:-(?P<prerelease>[0-9A-Za-z.-]+))?'
r'(?:\+(?P<build>[0-9A-Za-z.-]+))?$'
)
@dataclass
class Version:
"""Semantic version representation."""
major: int
minor: int
patch: int
prerelease: Optional[str] = None
build: Optional[str] = None
@classmethod
def parse(cls, version_str: str) -> Optional["Version"]:
"""Parse a version string into a Version object.
Args:
version_str: Version string like "1.2.3" or "1.0.0-alpha.1+build.123"
Returns:
Version object, or None if parsing fails
"""
if not version_str:
return None
match = VERSION_PATTERN.match(version_str.strip())
if not match:
return None
return cls(
major=int(match.group("major")),
minor=int(match.group("minor")),
patch=int(match.group("patch")),
prerelease=match.group("prerelease"),
build=match.group("build")
)
def __str__(self) -> str:
s = f"{self.major}.{self.minor}.{self.patch}"
if self.prerelease:
s += f"-{self.prerelease}"
if self.build:
s += f"+{self.build}"
return s
@property
def tuple(self) -> Tuple[int, int, int]:
"""Get (major, minor, patch) tuple for comparison."""
return (self.major, self.minor, self.patch)
def __eq__(self, other: object) -> bool:
if not isinstance(other, Version):
return False
return self.tuple == other.tuple and self.prerelease == other.prerelease
def __lt__(self, other: "Version") -> bool:
if self.tuple != other.tuple:
return self.tuple < other.tuple
# Prerelease versions are less than release versions
# No prerelease = release version
if self.prerelease is None and other.prerelease is not None:
return False # 1.0.0 > 1.0.0-alpha
if self.prerelease is not None and other.prerelease is None:
return True # 1.0.0-alpha < 1.0.0
# Both have prerelease, compare lexicographically
return (self.prerelease or "") < (other.prerelease or "")
def __le__(self, other: "Version") -> bool:
return self == other or self < other
def __gt__(self, other: "Version") -> bool:
return not self <= other
def __ge__(self, other: "Version") -> bool:
return not self < other
def __hash__(self) -> int:
return hash((self.tuple, self.prerelease))
def matches_constraint(version: str, constraint: str) -> bool:
"""
Check if a version satisfies a constraint.
Supported constraints:
- "*" or "latest" or "": any version
- "1.0.0": exact match
- "^1.2.3": compatible (>=1.2.3 and <2.0.0)
- "^0.2.3": for 0.x, means >=0.2.3 and <0.3.0 (semver special case)
- "^0.0.3": for 0.0.x, means exactly 0.0.3 (semver special case)
- "~1.2.3": approximately (>=1.2.3 and <1.3.0)
- ">=1.0.0", "<=1.0.0", ">1.0.0", "<1.0.0": comparisons
- "=1.0.0": explicit exact match
Args:
version: Version string to check (e.g., "1.2.3")
constraint: Constraint string (e.g., "^1.0.0")
Returns:
True if version satisfies constraint
"""
if not constraint or constraint in ("*", "latest"):
return True
v = Version.parse(version)
if not v:
return False
constraint = constraint.strip()
# Standard semver behavior: prerelease versions do NOT satisfy ranges
# unless the constraint itself includes a prerelease.
if v.prerelease and "-" not in constraint:
return False
# Caret: ^1.2.3 means >=1.2.3 and <2.0.0
# Special cases:
# ^0.2.3 -> >=0.2.3 and <0.3.0
# ^0.0.3 -> >=0.0.3 and <0.0.4 (exact match for 0.0.x series)
if constraint.startswith("^"):
c = Version.parse(constraint[1:])
if not c:
return False
# Must be >= constraint version
if v < c:
return False
# Upper bound depends on major version
if c.major == 0:
if c.minor == 0:
# ^0.0.y allows ONLY 0.0.y (exact match for 0.0.x series)
return v.major == 0 and v.minor == 0 and v.patch == c.patch
# ^0.x.y allows >=0.x.y and <0.(x+1).0
return v.major == 0 and v.minor == c.minor
else:
# ^x.y.z allows changes that don't modify major: <(x+1).0.0
return v.major == c.major
# Tilde: ~1.2.3 means >=1.2.3 and <1.3.0
if constraint.startswith("~"):
c = Version.parse(constraint[1:])
if not c:
return False
# Must be >= constraint version
if v < c:
return False
# Must be same major.minor
return v.major == c.major and v.minor == c.minor
# Comparison operators (check longest first)
for op in (">=", "<=", ">", "<", "="):
if constraint.startswith(op):
c = Version.parse(constraint[len(op):].strip())
if not c:
return False
if op == ">=":
return v >= c
elif op == "<=":
return v <= c
elif op == ">":
return v > c
elif op == "<":
return v < c
elif op == "=":
return v == c
# Exact match (no operator)
c = Version.parse(constraint)
if c:
return v == c
return False
def find_best_match(versions: List[str], constraint: str) -> Optional[str]:
"""
Find the best (highest) version that matches a constraint.
Args:
versions: List of available versions
constraint: Version constraint
Returns:
Best matching version, or None if no match
"""
matching = [v for v in versions if matches_constraint(v, constraint)]
if not matching:
return None
# Sort by parsed version, return highest
parsed = [(Version.parse(v), v) for v in matching]
parsed = [(p, v) for p, v in parsed if p is not None]
if not parsed:
return None
parsed.sort(key=lambda x: x[0], reverse=True)
return parsed[0][1]
def is_valid_version(version: str) -> bool:
"""Check if a string is a valid semver version.
Args:
version: Version string to validate
Returns:
True if valid semver format
"""
return Version.parse(version) is not None
def compare_versions(v1: str, v2: str) -> int:
"""Compare two version strings.
Args:
v1: First version
v2: Second version
Returns:
-1 if v1 < v2, 0 if equal, 1 if v1 > v2
Raises:
ValueError: If either version is invalid
"""
parsed_v1 = Version.parse(v1)
parsed_v2 = Version.parse(v2)
if not parsed_v1 or not parsed_v2:
raise ValueError(f"Invalid version: {v1 if not parsed_v1 else v2}")
if parsed_v1 < parsed_v2:
return -1
elif parsed_v1 > parsed_v2:
return 1
return 0

View File

@ -0,0 +1,293 @@
"""Tests for dependency_graph.py - Transitive dependency resolution."""
import pytest
from pathlib import Path
from unittest.mock import MagicMock, patch
import tempfile
import yaml
from cmdforge.dependency_graph import (
DependencyNode,
DependencyGraph,
DependencyGraphBuilder,
ResolutionResult,
resolve_dependencies
)
from cmdforge.manifest import Dependency
class TestDependencyNode:
"""Tests for DependencyNode dataclass."""
def test_qualified_name_with_owner(self):
node = DependencyNode(
owner="official",
name="summarize",
version_constraint="^1.0.0",
resolved_version="1.2.3",
source="global",
path=Path("/test")
)
assert node.qualified_name == "official/summarize"
def test_qualified_name_without_owner(self):
node = DependencyNode(
owner="",
name="summarize",
version_constraint="*",
resolved_version=None,
source=None,
path=None
)
assert node.qualified_name == "summarize"
def test_is_resolved(self):
resolved = DependencyNode(
owner="official",
name="test",
version_constraint="*",
resolved_version="1.0.0",
source="global",
path=Path("/test")
)
assert resolved.is_resolved
unresolved = DependencyNode(
owner="official",
name="test",
version_constraint="*",
resolved_version=None,
source=None,
path=None
)
assert not unresolved.is_resolved
class TestDependencyGraph:
"""Tests for DependencyGraph dataclass."""
def test_add_node(self):
graph = DependencyGraph()
node = DependencyNode(
owner="official",
name="test",
version_constraint="*",
resolved_version="1.0.0",
source="global",
path=Path("/test")
)
graph.add_node(node)
assert "official/test" in graph.nodes
def test_get_node(self):
graph = DependencyGraph()
node = DependencyNode(
owner="official",
name="test",
version_constraint="*",
resolved_version="1.0.0",
source="global",
path=Path("/test")
)
graph.add_node(node)
assert graph.get_node("official/test") == node
assert graph.get_node("nonexistent") is None
def test_version_conflict_detection(self):
graph = DependencyGraph()
node1 = DependencyNode(
owner="official",
name="test",
version_constraint="^1.0.0",
resolved_version="1.0.0",
source="global",
path=Path("/test"),
requested_by="root"
)
graph.add_node(node1, requested_by="root")
node2 = DependencyNode(
owner="official",
name="test",
version_constraint="^2.0.0",
resolved_version="2.0.0",
source="global",
path=Path("/test"),
requested_by="other"
)
graph.add_node(node2, requested_by="other")
assert graph.has_conflicts()
assert len(graph.conflicts) == 1
def test_get_unresolved(self):
graph = DependencyGraph()
resolved = DependencyNode(
owner="official",
name="resolved",
version_constraint="*",
resolved_version="1.0.0",
source="global",
path=Path("/test")
)
unresolved = DependencyNode(
owner="official",
name="unresolved",
version_constraint="*",
resolved_version=None,
source=None,
path=None
)
graph.add_node(resolved)
graph.add_node(unresolved)
unresolved_list = graph.get_unresolved()
assert len(unresolved_list) == 1
assert unresolved_list[0].name == "unresolved"
class TestDependencyGraphBuilder:
"""Tests for DependencyGraphBuilder class."""
def test_build_empty_deps(self):
builder = DependencyGraphBuilder(verbose=False)
graph = builder.build([])
assert len(graph.nodes) == 0
def test_topological_sort_simple(self):
"""Test that dependencies come before dependents."""
graph = DependencyGraph()
# A depends on B
node_a = DependencyNode(
owner="official",
name="a",
version_constraint="*",
resolved_version="1.0.0",
source="global",
path=Path("/a"),
children=["official/b"]
)
node_b = DependencyNode(
owner="official",
name="b",
version_constraint="*",
resolved_version="1.0.0",
source="global",
path=Path("/b"),
children=[]
)
graph.add_node(node_a)
graph.add_node(node_b)
graph.root_dependencies = ["official/a"]
builder = DependencyGraphBuilder()
order = builder._topological_sort(graph)
# B should come before A since A depends on B
b_idx = order.index("official/b")
a_idx = order.index("official/a")
assert b_idx < a_idx
def test_cycle_detection(self):
"""Test that cycles are detected."""
# This requires mocking the resolver to create a cycle
# For now, just verify the cycle list exists
graph = DependencyGraph()
assert not graph.has_cycles()
graph.cycles.append(["a", "b", "a"])
assert graph.has_cycles()
class TestResolveDepencies:
"""Tests for resolve_dependencies convenience function."""
@patch('cmdforge.dependency_graph.DependencyGraphBuilder.build')
def test_categorizes_results(self, mock_build):
"""Test that results are properly categorized."""
# Create mock graph
mock_graph = DependencyGraph()
mock_graph.resolution_order = ["official/test"]
node = DependencyNode(
owner="official",
name="test",
version_constraint="*",
resolved_version="1.0.0",
source="registry",
path=None # Not installed yet
)
mock_graph.nodes["official/test"] = node
mock_build.return_value = mock_graph
deps = [Dependency(name="official/test", version="*")]
result = resolve_dependencies(deps)
assert isinstance(result, ResolutionResult)
assert len(result.to_install) == 1
assert result.to_install[0].name == "test"
class TestIntegration:
"""Integration tests with real file system."""
def test_reads_local_tool_deps(self, tmp_path):
"""Test that dependencies can be read from a local tool config."""
# Create a tool config with dependencies
tool_dir = tmp_path / "test-tool"
tool_dir.mkdir()
config = {
"name": "test-tool",
"description": "Test tool",
"dependencies": [
{"name": "official/helper", "version": "^1.0.0"}
],
"steps": [
{"type": "prompt", "prompt": "test", "output_var": "result"}
]
}
(tool_dir / "config.yaml").write_text(yaml.dump(config))
# Create a node pointing to this tool
node = DependencyNode(
owner="local",
name="test-tool",
version_constraint="*",
resolved_version="1.0.0",
source="local",
path=tool_dir
)
builder = DependencyGraphBuilder()
deps = builder._read_deps_from_config(tool_dir)
assert len(deps) == 1
assert deps[0].name == "official/helper"
assert deps[0].version == "^1.0.0"
def test_reads_implicit_tool_deps(self, tmp_path):
"""Test that ToolStep references are discovered as dependencies."""
tool_dir = tmp_path / "meta-tool"
tool_dir.mkdir()
config = {
"name": "meta-tool",
"description": "Meta tool that calls others",
"steps": [
{
"type": "tool",
"tool": "official/summarize",
"output_var": "summary"
}
]
}
(tool_dir / "config.yaml").write_text(yaml.dump(config))
builder = DependencyGraphBuilder()
deps = builder._read_deps_from_config(tool_dir)
assert len(deps) == 1
assert deps[0].name == "official/summarize"

374
tests/test_lockfile.py Normal file
View File

@ -0,0 +1,374 @@
"""Tests for lockfile.py - Lock file functionality."""
import pytest
from pathlib import Path
from unittest.mock import MagicMock, patch
import tempfile
import yaml
from cmdforge.lockfile import (
Lockfile, LockedPackage, LockfileMetadata,
generate_lockfile, verify_lockfile,
compute_file_hash
)
from cmdforge.hash_utils import compute_yaml_hash
class TestLockedPackage:
"""Tests for LockedPackage dataclass."""
def test_owner_and_name_qualified(self):
pkg = LockedPackage(
name="official/summarize",
version="1.0.0",
constraint="^1.0.0",
integrity="sha256:abc",
source="registry",
direct=True
)
assert pkg.owner == "official"
assert pkg.tool_name == "summarize"
def test_owner_and_name_unqualified(self):
pkg = LockedPackage(
name="summarize",
version="1.0.0",
constraint="*",
integrity="sha256:abc",
source="local",
direct=True
)
assert pkg.owner == ""
assert pkg.tool_name == "summarize"
def test_required_by_default_empty(self):
pkg = LockedPackage(
name="test",
version="1.0.0",
constraint="*",
integrity="",
source="registry",
direct=True
)
assert pkg.required_by == []
class TestLockfile:
"""Tests for Lockfile dataclass."""
def test_save_and_load(self, tmp_path):
lock = Lockfile(
metadata=LockfileMetadata(
generated_at="2026-01-26T00:00:00Z",
cmdforge_version="1.5.0",
manifest_hash="sha256:test123",
platform="linux",
python_version="3.12.0"
),
packages={
"official/summarize": LockedPackage(
name="official/summarize",
version="1.2.3",
constraint="^1.0.0",
integrity="sha256:abc123",
source="registry",
direct=True
)
}
)
lock_path = tmp_path / "cmdforge.lock"
lock.save(lock_path)
loaded = Lockfile.load(lock_path)
assert loaded is not None
assert "official/summarize" in loaded.packages
assert loaded.packages["official/summarize"].version == "1.2.3"
assert loaded.packages["official/summarize"].integrity == "sha256:abc123"
# Verify optional fields round-trip
assert loaded.packages["official/summarize"].required_by == []
assert loaded.packages["official/summarize"].path is None
def test_load_missing_file(self, tmp_path):
lock_path = tmp_path / "nonexistent.lock"
loaded = Lockfile.load(lock_path)
assert loaded is None
def test_is_stale_when_manifest_changed(self, tmp_path):
manifest_path = tmp_path / "cmdforge.yaml"
manifest_path.write_text("name: test\ndependencies: []")
original_hash = compute_file_hash(manifest_path)
lock = Lockfile(
metadata=LockfileMetadata(
generated_at="2026-01-26T00:00:00Z",
cmdforge_version="1.5.0",
manifest_hash=original_hash,
platform="linux",
python_version="3.12.0"
)
)
assert not lock.is_stale(manifest_path)
# Modify manifest
manifest_path.write_text("name: test\ndependencies:\n - official/foo")
assert lock.is_stale(manifest_path)
def test_is_stale_without_metadata(self):
lock = Lockfile()
assert lock.is_stale()
def test_get_package(self):
lock = Lockfile(
packages={
"official/summarize": LockedPackage(
name="official/summarize",
version="1.0.0",
constraint="*",
integrity="",
source="registry",
direct=True
)
}
)
assert lock.get_package("official/summarize") is not None
assert lock.get_package("nonexistent") is None
def test_lockfile_version(self):
lock = Lockfile()
assert lock.lockfile_version == 1
def test_save_includes_header(self, tmp_path):
lock = Lockfile()
lock_path = tmp_path / "cmdforge.lock"
lock.save(lock_path)
content = lock_path.read_text()
assert "# cmdforge.lock" in content
assert "# Auto-generated" in content
assert "do not edit manually" in content
class TestHashFunctions:
"""Tests for hash functions."""
def test_compute_file_hash(self, tmp_path):
test_file = tmp_path / "test.txt"
test_file.write_text("hello world")
hash1 = compute_file_hash(test_file)
assert hash1.startswith("sha256:")
# Same content = same hash
hash2 = compute_file_hash(test_file)
assert hash1 == hash2
# Different content = different hash
test_file.write_text("different")
hash3 = compute_file_hash(test_file)
assert hash1 != hash3
def test_compute_yaml_hash_normalized(self):
"""Test that YAML hash normalizes content (sorted keys, excluded fields)."""
# Same logical content, different formatting
yaml1 = "name: test\ndescription: foo"
yaml2 = "description: foo\nname: test" # Different key order
# Should produce same hash (normalized)
assert compute_yaml_hash(yaml1) == compute_yaml_hash(yaml2)
# Different content = different hash
yaml3 = "name: test\ndescription: bar"
assert compute_yaml_hash(yaml1) != compute_yaml_hash(yaml3)
# registry_hash field should be excluded
yaml4 = "name: test\ndescription: foo\nregistry_hash: sha256:abc"
assert compute_yaml_hash(yaml1) == compute_yaml_hash(yaml4)
class TestGenerateLockfile:
"""Tests for generate_lockfile function."""
def test_generates_complete_lockfile(self, tmp_path):
"""Test that lock file includes all graph nodes."""
from cmdforge.manifest import Manifest, Dependency
from cmdforge.dependency_graph import DependencyGraph, DependencyNode
from pathlib import Path
# Create mock manifest
manifest = Manifest(
name="test-project",
version="1.0.0",
dependencies=[
Dependency(name="official/summarize", version="^1.0.0")
]
)
# Create mock graph with local tool (has path for integrity)
tool_dir = tmp_path / "tool"
tool_dir.mkdir()
(tool_dir / "config.yaml").write_text("name: summarize\ndescription: test")
graph = DependencyGraph()
graph.root_dependencies = ["official/summarize"]
graph.nodes = {
"official/summarize": DependencyNode(
owner="official",
name="summarize",
version_constraint="^1.0.0",
resolved_version="1.2.3",
source="local", # Use local so we can compute hash from file
path=tool_dir,
children=["official/text-utils"]
),
"official/text-utils": DependencyNode(
owner="official",
name="text-utils",
version_constraint="*",
resolved_version="1.0.0",
source="registry",
path=None,
children=[]
)
}
# Mock registry client - won't be called for local tools
mock_client = MagicMock()
mock_client.download_tool.return_value = MagicMock(
config_hash="sha256:registry_hash",
config_yaml="name: text-utils"
)
# Create a cmdforge.yaml for manifest hash
manifest_path = tmp_path / "cmdforge.yaml"
manifest_path.write_text("name: test\ndependencies: []")
import os
old_cwd = os.getcwd()
os.chdir(tmp_path)
try:
lock = generate_lockfile(manifest, graph, mock_client)
finally:
os.chdir(old_cwd)
assert len(lock.packages) == 2
assert "official/summarize" in lock.packages
assert "official/text-utils" in lock.packages
assert lock.packages["official/summarize"].direct is True
assert lock.packages["official/text-utils"].direct is False
class TestVerifyLockfile:
"""Tests for verify_lockfile function."""
def test_reports_missing_tools(self):
"""Test that missing tools are reported."""
from cmdforge.resolver import ToolNotFoundError, ToolSpec
lock = Lockfile(
packages={
"official/missing": LockedPackage(
name="official/missing",
version="1.0.0",
constraint="*",
integrity="sha256:abc",
source="registry",
direct=True
)
}
)
mock_client = MagicMock()
with patch('cmdforge.resolver.ToolResolver') as MockResolver:
mock_instance = MockResolver.return_value
mock_instance.manifest = None
mock_instance.resolve.side_effect = ToolNotFoundError(
ToolSpec.parse("official/missing"),
[]
)
errors = verify_lockfile(lock, mock_client)
assert len(errors) == 1
assert "not installed" in errors[0]
class TestLockfileRoundTrip:
"""Tests for complete lock file round-trip."""
def test_full_round_trip(self, tmp_path):
"""Test saving and loading a complete lock file."""
lock = Lockfile(
lockfile_version=1,
metadata=LockfileMetadata(
generated_at="2026-01-26T10:30:00Z",
cmdforge_version="1.5.0",
manifest_hash="sha256:abc123def456",
platform="linux",
python_version="3.12.0"
),
packages={
"official/summarize": LockedPackage(
name="official/summarize",
version="1.2.3",
constraint="^1.0.0",
integrity="sha256:abc123",
source="registry",
direct=True,
required_by=[]
),
"official/text-utils": LockedPackage(
name="official/text-utils",
version="1.0.0",
constraint="*",
integrity="sha256:def456",
source="registry",
direct=False,
required_by=["official/summarize"]
),
"my-local-tool": LockedPackage(
name="my-local-tool",
version="0.1.0",
constraint="*",
integrity="sha256:ghi789",
source="local",
direct=True,
path=".cmdforge/my-local-tool"
)
}
)
lock_path = tmp_path / "cmdforge.lock"
lock.save(lock_path)
loaded = Lockfile.load(lock_path)
# Verify metadata
assert loaded.lockfile_version == 1
assert loaded.metadata.generated_at == "2026-01-26T10:30:00Z"
assert loaded.metadata.cmdforge_version == "1.5.0"
assert loaded.metadata.manifest_hash == "sha256:abc123def456"
assert loaded.metadata.platform == "linux"
assert loaded.metadata.python_version == "3.12.0"
# Verify packages
assert len(loaded.packages) == 3
summarize = loaded.packages["official/summarize"]
assert summarize.version == "1.2.3"
assert summarize.constraint == "^1.0.0"
assert summarize.integrity == "sha256:abc123"
assert summarize.source == "registry"
assert summarize.direct is True
text_utils = loaded.packages["official/text-utils"]
assert text_utils.direct is False
assert text_utils.required_by == ["official/summarize"]
local_tool = loaded.packages["my-local-tool"]
assert local_tool.source == "local"
assert local_tool.path == ".cmdforge/my-local-tool"

218
tests/test_version.py Normal file
View File

@ -0,0 +1,218 @@
"""Tests for version.py - Semantic version parsing and constraint matching."""
import pytest
from cmdforge.version import (
Version,
matches_constraint,
find_best_match,
is_valid_version,
compare_versions
)
class TestVersion:
"""Tests for Version dataclass."""
def test_parse_basic(self):
v = Version.parse("1.2.3")
assert v is not None
assert v.major == 1
assert v.minor == 2
assert v.patch == 3
assert v.prerelease is None
assert v.build is None
def test_parse_with_prerelease(self):
v = Version.parse("1.0.0-alpha.1")
assert v is not None
assert v.major == 1
assert v.minor == 0
assert v.patch == 0
assert v.prerelease == "alpha.1"
def test_parse_with_build(self):
v = Version.parse("1.0.0+build.123")
assert v is not None
assert v.build == "build.123"
def test_parse_full(self):
v = Version.parse("2.1.0-beta.2+20240115")
assert v is not None
assert v.major == 2
assert v.minor == 1
assert v.patch == 0
assert v.prerelease == "beta.2"
assert v.build == "20240115"
def test_parse_invalid(self):
assert Version.parse("") is None
assert Version.parse("invalid") is None
assert Version.parse("1.2") is None
assert Version.parse("1.2.3.4") is None
def test_str(self):
assert str(Version(1, 2, 3)) == "1.2.3"
assert str(Version(1, 0, 0, prerelease="alpha")) == "1.0.0-alpha"
assert str(Version(1, 0, 0, build="123")) == "1.0.0+123"
assert str(Version(1, 0, 0, prerelease="rc.1", build="build")) == "1.0.0-rc.1+build"
def test_tuple(self):
v = Version(1, 2, 3)
assert v.tuple == (1, 2, 3)
def test_equality(self):
v1 = Version(1, 2, 3)
v2 = Version(1, 2, 3)
v3 = Version(1, 2, 4)
assert v1 == v2
assert v1 != v3
def test_comparison(self):
assert Version(1, 0, 0) < Version(2, 0, 0)
assert Version(1, 0, 0) < Version(1, 1, 0)
assert Version(1, 0, 0) < Version(1, 0, 1)
assert Version(2, 0, 0) > Version(1, 9, 9)
def test_prerelease_comparison(self):
# Prerelease < release
assert Version(1, 0, 0, prerelease="alpha") < Version(1, 0, 0)
assert Version(1, 0, 0, prerelease="beta") < Version(1, 0, 0)
# Alphabetic ordering for prerelease
assert Version(1, 0, 0, prerelease="alpha") < Version(1, 0, 0, prerelease="beta")
def test_hash(self):
v1 = Version(1, 2, 3)
v2 = Version(1, 2, 3)
assert hash(v1) == hash(v2)
# Can be used in sets/dicts
s = {v1, v2}
assert len(s) == 1
class TestMatchesConstraint:
"""Tests for matches_constraint function."""
def test_any_version(self):
assert matches_constraint("1.0.0", "*")
assert matches_constraint("2.3.4", "*")
assert matches_constraint("1.0.0", "latest")
assert matches_constraint("1.0.0", "")
def test_exact_match(self):
assert matches_constraint("1.2.3", "1.2.3")
assert not matches_constraint("1.2.4", "1.2.3")
assert matches_constraint("1.0.0", "=1.0.0")
def test_caret_major(self):
# ^1.2.3 allows >=1.2.3 and <2.0.0
assert matches_constraint("1.2.3", "^1.2.3")
assert matches_constraint("1.2.4", "^1.2.3")
assert matches_constraint("1.9.9", "^1.2.3")
assert not matches_constraint("1.2.2", "^1.2.3")
assert not matches_constraint("2.0.0", "^1.2.3")
def test_caret_zero_minor(self):
# ^0.2.3 allows >=0.2.3 and <0.3.0
assert matches_constraint("0.2.3", "^0.2.3")
assert matches_constraint("0.2.9", "^0.2.3")
assert not matches_constraint("0.3.0", "^0.2.3")
assert not matches_constraint("0.2.2", "^0.2.3")
def test_caret_zero_zero(self):
# ^0.0.3 allows ONLY 0.0.3
assert matches_constraint("0.0.3", "^0.0.3")
assert not matches_constraint("0.0.4", "^0.0.3")
assert not matches_constraint("0.0.2", "^0.0.3")
def test_tilde(self):
# ~1.2.3 allows >=1.2.3 and <1.3.0
assert matches_constraint("1.2.3", "~1.2.3")
assert matches_constraint("1.2.9", "~1.2.3")
assert not matches_constraint("1.3.0", "~1.2.3")
assert not matches_constraint("1.2.2", "~1.2.3")
def test_greater_than_or_equal(self):
assert matches_constraint("1.0.0", ">=1.0.0")
assert matches_constraint("2.0.0", ">=1.0.0")
assert not matches_constraint("0.9.9", ">=1.0.0")
def test_less_than_or_equal(self):
assert matches_constraint("1.0.0", "<=1.0.0")
assert matches_constraint("0.9.0", "<=1.0.0")
assert not matches_constraint("1.0.1", "<=1.0.0")
def test_greater_than(self):
assert matches_constraint("1.0.1", ">1.0.0")
assert not matches_constraint("1.0.0", ">1.0.0")
def test_less_than(self):
assert matches_constraint("0.9.9", "<1.0.0")
assert not matches_constraint("1.0.0", "<1.0.0")
def test_prerelease_excluded_by_default(self):
# Prerelease versions should NOT match ranges without prerelease specifier
assert not matches_constraint("1.0.0-alpha", "^1.0.0")
assert not matches_constraint("1.0.0-beta", ">=1.0.0")
def test_invalid_version(self):
assert not matches_constraint("invalid", "^1.0.0")
class TestFindBestMatch:
"""Tests for find_best_match function."""
def test_finds_highest(self):
versions = ["1.0.0", "1.1.0", "1.2.0", "2.0.0"]
assert find_best_match(versions, "*") == "2.0.0"
def test_respects_constraint(self):
versions = ["1.0.0", "1.1.0", "1.2.0", "2.0.0"]
assert find_best_match(versions, "^1.0.0") == "1.2.0"
def test_no_match(self):
versions = ["1.0.0", "1.1.0"]
assert find_best_match(versions, ">=2.0.0") is None
def test_exact_match(self):
versions = ["1.0.0", "1.1.0", "1.2.0"]
assert find_best_match(versions, "1.1.0") == "1.1.0"
def test_empty_versions(self):
assert find_best_match([], "*") is None
class TestIsValidVersion:
"""Tests for is_valid_version function."""
def test_valid(self):
assert is_valid_version("1.0.0")
assert is_valid_version("0.1.0")
assert is_valid_version("1.0.0-alpha")
assert is_valid_version("1.0.0+build")
def test_invalid(self):
assert not is_valid_version("")
assert not is_valid_version("1.0")
assert not is_valid_version("invalid")
class TestCompareVersions:
"""Tests for compare_versions function."""
def test_less_than(self):
assert compare_versions("1.0.0", "2.0.0") == -1
assert compare_versions("1.0.0", "1.1.0") == -1
def test_equal(self):
assert compare_versions("1.0.0", "1.0.0") == 0
def test_greater_than(self):
assert compare_versions("2.0.0", "1.0.0") == 1
assert compare_versions("1.1.0", "1.0.0") == 1
def test_invalid_raises(self):
with pytest.raises(ValueError):
compare_versions("invalid", "1.0.0")
with pytest.raises(ValueError):
compare_versions("1.0.0", "invalid")