#!/usr/bin/env python3 """ Scheduled Fabric pattern sync for CmdForge. Monitors the Fabric repository for new/updated patterns and syncs them to the CmdForge registry through the vetting pipeline. Usage: # Check for new patterns (dry run) python scripts/fabric_sync.py --dry-run # Sync new patterns to registry python scripts/fabric_sync.py --sync # Check status of tracked patterns python scripts/fabric_sync.py --status # Force resync of specific patterns python scripts/fabric_sync.py --force summarize extract_wisdom # Run as daemon with interval python scripts/fabric_sync.py --daemon --interval 3600 Setup for cron (daily sync): 0 3 * * * /path/to/venv/bin/python /path/to/scripts/fabric_sync.py --sync >> /var/log/fabric_sync.log 2>&1 Setup for systemd timer: See scripts/fabric-sync.service and scripts/fabric-sync.timer """ import argparse import hashlib import json import logging import os import subprocess import sys import time from dataclasses import dataclass, field, asdict from datetime import datetime, timezone from pathlib import Path from typing import Optional import yaml # Configure logging logging.basicConfig( level=logging.INFO, format='%(asctime)s [%(levelname)s] %(message)s', datefmt='%Y-%m-%d %H:%M:%S' ) logger = logging.getLogger(__name__) # Constants FABRIC_REPO = "https://github.com/danielmiessler/fabric.git" DEFAULT_SYNC_DIR = Path("/var/lib/cmdforge/fabric-sync") DEFAULT_STATE_FILE = DEFAULT_SYNC_DIR / "sync_state.json" DEFAULT_PROVIDER = "opencode-pickle" @dataclass class PatternState: """State of a single pattern.""" name: str hash: str # SHA256 of system.md content synced_at: Optional[str] = None version: str = "1.0.0" status: str = "pending" # pending, synced, failed, skipped @dataclass class SyncState: """Overall sync state.""" last_check: Optional[str] = None last_sync: Optional[str] = None repo_commit: Optional[str] = None patterns: dict = field(default_factory=dict) # name -> PatternState as dict def to_dict(self) -> dict: return { "last_check": self.last_check, "last_sync": self.last_sync, "repo_commit": self.repo_commit, "patterns": self.patterns, } @classmethod def from_dict(cls, data: dict) -> "SyncState": return cls( last_check=data.get("last_check"), last_sync=data.get("last_sync"), repo_commit=data.get("repo_commit"), patterns=data.get("patterns", {}), ) def load_state(state_file: Path) -> SyncState: """Load sync state from file.""" if state_file.exists(): with open(state_file) as f: data = json.load(f) return SyncState.from_dict(data) return SyncState() def save_state(state: SyncState, state_file: Path): """Save sync state to file.""" state_file.parent.mkdir(parents=True, exist_ok=True) with open(state_file, "w") as f: json.dump(state.to_dict(), f, indent=2) def clone_or_update_repo(sync_dir: Path) -> tuple[Path, str]: """Clone or update the Fabric repository. Returns: Tuple of (patterns_dir, commit_hash) """ fabric_dir = sync_dir / "fabric" patterns_dir = fabric_dir / "data" / "patterns" if fabric_dir.exists(): logger.info("Updating existing Fabric clone...") subprocess.run( ["git", "-C", str(fabric_dir), "fetch", "--quiet"], check=True, capture_output=True ) subprocess.run( ["git", "-C", str(fabric_dir), "reset", "--hard", "origin/main", "--quiet"], check=True, capture_output=True ) else: logger.info("Cloning Fabric repository...") sync_dir.mkdir(parents=True, exist_ok=True) subprocess.run( ["git", "clone", "--depth", "1", FABRIC_REPO, str(fabric_dir)], check=True, capture_output=True ) # Get current commit hash result = subprocess.run( ["git", "-C", str(fabric_dir), "rev-parse", "HEAD"], capture_output=True, text=True ) commit_hash = result.stdout.strip()[:12] return patterns_dir, commit_hash def hash_pattern(pattern_dir: Path) -> Optional[str]: """Calculate hash of pattern content.""" system_md = pattern_dir / "system.md" if not system_md.exists(): return None content = system_md.read_bytes() return hashlib.sha256(content).hexdigest()[:16] def scan_patterns(patterns_dir: Path) -> dict[str, str]: """Scan all patterns and return name -> hash mapping.""" patterns = {} for entry in sorted(patterns_dir.iterdir()): if entry.is_dir(): pattern_hash = hash_pattern(entry) if pattern_hash: patterns[entry.name] = pattern_hash return patterns def find_changes( current_patterns: dict[str, str], state: SyncState ) -> tuple[list[str], list[str], list[str]]: """Find new, updated, and removed patterns. Returns: Tuple of (new_patterns, updated_patterns, removed_patterns) """ new_patterns = [] updated_patterns = [] removed_patterns = [] # Check for new and updated for name, current_hash in current_patterns.items(): if name not in state.patterns: new_patterns.append(name) elif state.patterns[name].get("hash") != current_hash: updated_patterns.append(name) # Check for removed for name in state.patterns: if name not in current_patterns: removed_patterns.append(name) return new_patterns, updated_patterns, removed_patterns def vet_pattern(pattern_dir: Path, provider: str = DEFAULT_PROVIDER) -> tuple[bool, str]: """Run vetting pipeline on a pattern. Returns: Tuple of (passed, reason) """ try: # Try to import the vetting pipeline script_dir = Path(__file__).parent sys.path.insert(0, str(script_dir)) from import_fabric import create_tool_config, clean_prompt, get_category, pattern_to_display_name # Read pattern system_md = pattern_dir / "system.md" system_prompt = system_md.read_text() # Create config config = create_tool_config(pattern_dir.name, system_prompt, provider) # Run scrutiny try: from scrutiny import vet_tool, VetResult report = vet_tool(config, str(pattern_dir)) if report.result == VetResult.REJECT: return False, f"Rejected: {report.suggestions[0] if report.suggestions else 'quality too low'}" elif report.result == VetResult.REVIEW: return True, f"Approved (needs review): score {report.overall_score:.2f}" else: return True, f"Approved: score {report.overall_score:.2f}" except ImportError: # Scrutiny not available - basic validation if len(system_prompt.strip()) < 50: return False, "Pattern too short" return True, "Basic validation passed" except Exception as e: return False, f"Vetting error: {e}" def sync_pattern( pattern_dir: Path, output_dir: Path, provider: str, state: SyncState, dry_run: bool = False ) -> bool: """Sync a single pattern. Returns: True if successful """ name = pattern_dir.name pattern_hash = hash_pattern(pattern_dir) # Vet the pattern passed, reason = vet_pattern(pattern_dir, provider) if not passed: logger.warning(f" ✗ {name}: {reason}") state.patterns[name] = { "name": name, "hash": pattern_hash, "status": "failed", "reason": reason, "synced_at": datetime.now(timezone.utc).isoformat(), } return False if dry_run: logger.info(f" [DRY RUN] Would sync: {name} ({reason})") return True # Import the pattern try: script_dir = Path(__file__).parent sys.path.insert(0, str(script_dir)) from import_fabric import import_pattern success = import_pattern( name, pattern_dir.parent, output_dir, provider, dry_run=False, registry_format=False, ) if success: logger.info(f" ✓ {name}: {reason}") state.patterns[name] = { "name": name, "hash": pattern_hash, "status": "synced", "synced_at": datetime.now(timezone.utc).isoformat(), } return True else: logger.error(f" ✗ {name}: Import failed") state.patterns[name] = { "name": name, "hash": pattern_hash, "status": "failed", "reason": "Import failed", "synced_at": datetime.now(timezone.utc).isoformat(), } return False except Exception as e: logger.error(f" ✗ {name}: {e}") state.patterns[name] = { "name": name, "hash": pattern_hash, "status": "failed", "reason": str(e), "synced_at": datetime.now(timezone.utc).isoformat(), } return False def run_sync( sync_dir: Path, output_dir: Path, state_file: Path, provider: str, dry_run: bool = False, force_patterns: list[str] = None ) -> dict: """Run the sync process. Returns: Summary dict with counts """ # Load state state = load_state(state_file) # Clone/update repo patterns_dir, commit_hash = clone_or_update_repo(sync_dir) # Scan patterns current_patterns = scan_patterns(patterns_dir) logger.info(f"Found {len(current_patterns)} patterns in Fabric repo (commit {commit_hash})") # Find changes if force_patterns: new_patterns = [p for p in force_patterns if p in current_patterns] updated_patterns = [] removed_patterns = [] else: new_patterns, updated_patterns, removed_patterns = find_changes(current_patterns, state) logger.info(f"Changes: {len(new_patterns)} new, {len(updated_patterns)} updated, {len(removed_patterns)} removed") # Update state timestamp state.last_check = datetime.now(timezone.utc).isoformat() state.repo_commit = commit_hash # Process new and updated patterns to_sync = new_patterns + updated_patterns synced = 0 failed = 0 if to_sync: logger.info(f"\nSyncing {len(to_sync)} patterns...") for name in to_sync: pattern_dir = patterns_dir / name if sync_pattern(pattern_dir, output_dir, provider, state, dry_run): synced += 1 else: failed += 1 # Mark removed patterns for name in removed_patterns: if name in state.patterns: state.patterns[name]["status"] = "removed" # Save state if not dry_run: state.last_sync = datetime.now(timezone.utc).isoformat() save_state(state, state_file) logger.info(f"\nState saved to {state_file}") # Summary summary = { "total_patterns": len(current_patterns), "new": len(new_patterns), "updated": len(updated_patterns), "removed": len(removed_patterns), "synced": synced, "failed": failed, "commit": commit_hash, } logger.info(f"\nSync complete: {synced} synced, {failed} failed") return summary def print_status(state_file: Path): """Print current sync status.""" state = load_state(state_file) print(f"\nFabric Sync Status") print(f"{'=' * 50}") print(f"Last check: {state.last_check or 'Never'}") print(f"Last sync: {state.last_sync or 'Never'}") print(f"Repo commit: {state.repo_commit or 'Unknown'}") if state.patterns: # Count by status by_status = {} for p in state.patterns.values(): status = p.get("status", "unknown") by_status[status] = by_status.get(status, 0) + 1 print(f"\nPatterns: {len(state.patterns)} total") for status, count in sorted(by_status.items()): print(f" {status}: {count}") # Show failed patterns failed = [p for p in state.patterns.values() if p.get("status") == "failed"] if failed: print(f"\nFailed patterns:") for p in failed[:10]: print(f" - {p['name']}: {p.get('reason', 'Unknown error')}") if len(failed) > 10: print(f" ... and {len(failed) - 10} more") else: print("\nNo patterns tracked yet. Run --sync to start.") def daemon_loop( sync_dir: Path, output_dir: Path, state_file: Path, provider: str, interval: int ): """Run sync in a loop.""" logger.info(f"Starting daemon mode with {interval}s interval") while True: try: run_sync(sync_dir, output_dir, state_file, provider) except Exception as e: logger.error(f"Sync failed: {e}") logger.info(f"Sleeping for {interval}s...") time.sleep(interval) def main(): parser = argparse.ArgumentParser( description="Scheduled Fabric pattern sync for CmdForge", formatter_class=argparse.RawDescriptionHelpFormatter, epilog=__doc__ ) parser.add_argument( "--sync", action="store_true", help="Run sync process" ) parser.add_argument( "--dry-run", action="store_true", help="Show what would be synced without making changes" ) parser.add_argument( "--status", action="store_true", help="Show current sync status" ) parser.add_argument( "--force", nargs="+", metavar="PATTERN", help="Force resync of specific patterns" ) parser.add_argument( "--daemon", action="store_true", help="Run in daemon mode" ) parser.add_argument( "--interval", type=int, default=3600, help="Sync interval in seconds for daemon mode (default: 3600)" ) parser.add_argument( "--sync-dir", type=Path, default=DEFAULT_SYNC_DIR, help=f"Directory for sync data (default: {DEFAULT_SYNC_DIR})" ) parser.add_argument( "--output", type=Path, default=Path.home() / ".cmdforge", help="Output directory for synced tools (default: ~/.cmdforge)" ) parser.add_argument( "--state-file", type=Path, help="State file path (default: /sync_state.json)" ) parser.add_argument( "--provider", default=DEFAULT_PROVIDER, help=f"Default provider for tools (default: {DEFAULT_PROVIDER})" ) args = parser.parse_args() # Set state file default state_file = args.state_file or (args.sync_dir / "sync_state.json") if args.status: print_status(state_file) return 0 if args.daemon: daemon_loop( args.sync_dir, args.output, state_file, args.provider, args.interval ) return 0 if args.sync or args.dry_run or args.force: summary = run_sync( args.sync_dir, args.output, state_file, args.provider, dry_run=args.dry_run, force_patterns=args.force ) if summary["failed"] > 0: return 1 return 0 parser.print_help() return 1 if __name__ == "__main__": sys.exit(main())