#!/usr/bin/env python3 """ Scheduled Fabric pattern sync for CmdForge. Monitors the Fabric repository for new/updated patterns and syncs them to the CmdForge registry through the vetting pipeline. Usage: # Check for new patterns (dry run) python scripts/fabric_sync.py --dry-run # Sync new patterns to registry python scripts/fabric_sync.py --sync # Check status of tracked patterns python scripts/fabric_sync.py --status # Force resync of specific patterns python scripts/fabric_sync.py --force summarize extract_wisdom # Run as daemon with interval python scripts/fabric_sync.py --daemon --interval 3600 Setup for cron (daily sync): 0 3 * * * /path/to/venv/bin/python /path/to/scripts/fabric_sync.py --sync >> /var/log/fabric_sync.log 2>&1 Setup for systemd timer: See scripts/fabric-sync.service and scripts/fabric-sync.timer """ import argparse import hashlib import json import logging import os import subprocess import sys import time from dataclasses import dataclass, field, asdict from datetime import datetime, timezone from pathlib import Path from typing import Optional import yaml # Configure logging logging.basicConfig( level=logging.INFO, format='%(asctime)s [%(levelname)s] %(message)s', datefmt='%Y-%m-%d %H:%M:%S' ) logger = logging.getLogger(__name__) # Constants FABRIC_REPO = "https://github.com/danielmiessler/fabric.git" DEFAULT_SYNC_DIR = Path("/var/lib/cmdforge/fabric-sync") DEFAULT_STATE_FILE = DEFAULT_SYNC_DIR / "sync_state.json" DEFAULT_PROVIDER = "opencode-pickle" def run_ai_scrutiny_review(scrutiny_report: dict, config: dict, tool_name: str, description: str): """Run AI-powered secondary review of scrutiny warnings. Uses the scrutiny-ai-review tool to analyze warnings and identify false positives. Args: scrutiny_report: The scrutiny report with findings config: The tool configuration dict tool_name: Name of the tool being reviewed description: Tool description Returns: AI review result dict, or None if review fails """ # Check if the AI review tool exists tool_path = Path.home() / ".cmdforge" / "scrutiny-ai-review" / "config.yaml" if not tool_path.exists(): logger.debug(f" AI review skipped for {tool_name}: scrutiny-ai-review tool not found at {tool_path}") return None # Extract warnings from scrutiny report warnings = [ f for f in scrutiny_report.get("findings", []) if f.get("result") == "warning" ] if not warnings: logger.debug(f" AI review skipped for {tool_name}: no warnings to review") return None # Prepare tool config for review tool_config = { "name": tool_name, "description": description or "", "steps": config.get("steps", []), "arguments": config.get("arguments", []), } # Find cmdforge executable - try venv first, then PATH cmdforge_paths = [ Path(sys.executable).parent / "cmdforge", # Same venv as current Python Path("/srv/mergerfs/data_pool/home/rob/cmdforge-registry/venv/bin/cmdforge"), # Server venv "cmdforge", # PATH ] cmdforge_exe = None for p in cmdforge_paths: p = Path(p) if not isinstance(p, Path) else p if p.exists() if isinstance(p, Path) and p.is_absolute() else True: cmdforge_exe = str(p) break if not cmdforge_exe: logger.warning(f" AI review skipped for {tool_name}: cmdforge executable not found") return None # Run the tool (use -- to separate cmdforge args from tool args) logger.debug(f" Running AI review for {tool_name} ({len(warnings)} warnings)...") start_time = time.time() try: result = subprocess.run( [ cmdforge_exe, "run", "scrutiny-ai-review", "--", "--warnings", json.dumps(warnings), "--tool-config", json.dumps(tool_config), ], capture_output=True, text=True, timeout=60, # 60 second timeout ) elapsed = time.time() - start_time if result.returncode == 0 and result.stdout.strip(): logger.debug(f" AI review completed for {tool_name} in {elapsed:.1f}s") return json.loads(result.stdout.strip()) else: logger.warning(f" AI review failed for {tool_name}: returncode={result.returncode}, elapsed={elapsed:.1f}s") if result.stderr: logger.warning(f" stderr: {result.stderr[:200]}") return None except subprocess.TimeoutExpired: elapsed = time.time() - start_time logger.warning(f" AI review TIMEOUT for {tool_name} after {elapsed:.1f}s") return None except json.JSONDecodeError as e: elapsed = time.time() - start_time logger.warning(f" AI review JSON parse error for {tool_name}: {e}") return None except FileNotFoundError as e: logger.warning(f" AI review failed for {tool_name}: {e}") return None except Exception as e: elapsed = time.time() - start_time logger.warning(f" AI review unexpected error for {tool_name} after {elapsed:.1f}s: {e}") return None @dataclass class PatternState: """State of a single pattern.""" name: str hash: str # SHA256 of system.md content synced_at: Optional[str] = None version: str = "1.0.0" status: str = "pending" # pending, synced, failed, skipped @dataclass class SyncState: """Overall sync state.""" last_check: Optional[str] = None last_sync: Optional[str] = None repo_commit: Optional[str] = None patterns: dict = field(default_factory=dict) # name -> PatternState as dict def to_dict(self) -> dict: return { "last_check": self.last_check, "last_sync": self.last_sync, "repo_commit": self.repo_commit, "patterns": self.patterns, } @classmethod def from_dict(cls, data: dict) -> "SyncState": return cls( last_check=data.get("last_check"), last_sync=data.get("last_sync"), repo_commit=data.get("repo_commit"), patterns=data.get("patterns", {}), ) def load_state(state_file: Path) -> SyncState: """Load sync state from file.""" if state_file.exists(): with open(state_file) as f: data = json.load(f) return SyncState.from_dict(data) return SyncState() def save_state(state: SyncState, state_file: Path): """Save sync state to file.""" state_file.parent.mkdir(parents=True, exist_ok=True) with open(state_file, "w") as f: json.dump(state.to_dict(), f, indent=2) def clone_or_update_repo(sync_dir: Path) -> tuple[Path, str]: """Clone or update the Fabric repository. Returns: Tuple of (patterns_dir, commit_hash) """ fabric_dir = sync_dir / "fabric" patterns_dir = fabric_dir / "data" / "patterns" if fabric_dir.exists(): logger.info("Updating existing Fabric clone...") subprocess.run( ["git", "-C", str(fabric_dir), "fetch", "--quiet"], check=True, capture_output=True ) subprocess.run( ["git", "-C", str(fabric_dir), "reset", "--hard", "origin/main", "--quiet"], check=True, capture_output=True ) else: logger.info("Cloning Fabric repository...") sync_dir.mkdir(parents=True, exist_ok=True) subprocess.run( ["git", "clone", "--depth", "1", FABRIC_REPO, str(fabric_dir)], check=True, capture_output=True ) # Get current commit hash result = subprocess.run( ["git", "-C", str(fabric_dir), "rev-parse", "HEAD"], capture_output=True, text=True ) commit_hash = result.stdout.strip()[:12] return patterns_dir, commit_hash def hash_pattern(pattern_dir: Path) -> Optional[str]: """Calculate hash of pattern content.""" system_md = pattern_dir / "system.md" if not system_md.exists(): return None content = system_md.read_bytes() return hashlib.sha256(content).hexdigest()[:16] def scan_patterns(patterns_dir: Path) -> dict[str, str]: """Scan all patterns and return name -> hash mapping.""" patterns = {} for entry in sorted(patterns_dir.iterdir()): if entry.is_dir(): pattern_hash = hash_pattern(entry) if pattern_hash: patterns[entry.name] = pattern_hash return patterns def find_changes( current_patterns: dict[str, str], state: SyncState ) -> tuple[list[str], list[str], list[str]]: """Find new, updated, and removed patterns. Returns: Tuple of (new_patterns, updated_patterns, removed_patterns) """ new_patterns = [] updated_patterns = [] removed_patterns = [] # Check for new and updated for name, current_hash in current_patterns.items(): if name not in state.patterns: new_patterns.append(name) elif state.patterns[name].get("hash") != current_hash: updated_patterns.append(name) # Check for removed for name in state.patterns: if name not in current_patterns: removed_patterns.append(name) return new_patterns, updated_patterns, removed_patterns def vet_pattern(pattern_dir: Path, provider: str = DEFAULT_PROVIDER) -> tuple[bool, str]: """Run vetting pipeline on a pattern. Returns: Tuple of (passed, reason) """ try: # Try to import the vetting pipeline script_dir = Path(__file__).parent sys.path.insert(0, str(script_dir)) from import_fabric import create_tool_config, clean_prompt, get_category, pattern_to_display_name # Read pattern system_md = pattern_dir / "system.md" system_prompt = system_md.read_text() # Create config config = create_tool_config(pattern_dir.name, system_prompt, provider) # Run scrutiny try: from scrutiny import vet_tool, VetResult report = vet_tool(config, str(pattern_dir)) if report.result == VetResult.REJECT: return False, f"Rejected: {report.suggestions[0] if report.suggestions else 'quality too low'}" elif report.result == VetResult.REVIEW: return True, f"Approved (needs review): score {report.overall_score:.2f}" else: return True, f"Approved: score {report.overall_score:.2f}" except ImportError: # Scrutiny not available - basic validation if len(system_prompt.strip()) < 50: return False, "Pattern too short" return True, "Basic validation passed" except Exception as e: return False, f"Vetting error: {e}" def publish_to_registry( name: str, config_yaml: str, readme: str, provider: str, auto_approve: bool = False, skip_ai_review: bool = False ) -> tuple[bool, str, dict]: """Publish a tool directly to the registry database with vetting. Args: name: Tool name config_yaml: Tool configuration as YAML string readme: README content provider: AI provider name auto_approve: If True, auto-approve tools that pass scrutiny skip_ai_review: If True, skip AI secondary review (for large bulk imports) Returns: Tuple of (success, message, scrutiny_report) """ try: # Add src to path for registry imports src_dir = Path(__file__).parent.parent / "src" if str(src_dir) not in sys.path: sys.path.insert(0, str(src_dir)) from cmdforge.registry.db import connect_db, query_one from cmdforge.registry.scrutiny import scrutinize_tool from cmdforge.hash_utils import compute_yaml_hash # Parse config config = yaml.safe_load(config_yaml) version = config.get("version", "1.0.0") description = config.get("description", "") category = config.get("category") tags = config.get("tags", []) conn = connect_db() # Check if already exists existing = query_one( conn, "SELECT id, version FROM tools WHERE owner = ? AND name = ?", ["official", name], ) if existing: # Check if same version if existing["version"] == version: conn.close() return True, "Already exists (same version)", {} # Run scrutiny scrutiny_report = {} try: scrutiny_report = scrutinize_tool(config_yaml, description, readme) except Exception as e: logger.warning(f"Scrutiny failed for {name}: {e}") # Run AI secondary review if there are warnings (unless skipped) scrutiny_decision = scrutiny_report.get("decision", "review") if scrutiny_decision == "review" and skip_ai_review: logger.info(f" AI review skipped for {name} (--skip-ai-review flag)") scrutiny_report["ai_review_skipped"] = True elif scrutiny_decision == "review": try: ai_review = run_ai_scrutiny_review(scrutiny_report, config, name, description) if ai_review: scrutiny_report["ai_review"] = ai_review # Update decision based on AI review if ai_review.get("overall_verdict") == "APPROVE" and ai_review.get("confidence", 0) >= 0.8: scrutiny_report["decision"] = "approve" scrutiny_report["ai_approved"] = True scrutiny_decision = "approve" logger.info(f" AI review approved {name} (confidence: {ai_review.get('confidence', 0):.2f})") except Exception as e: scrutiny_report["ai_review_error"] = str(e) logger.warning(f"AI review failed for {name}: {e}") # Check scrutiny decision if scrutiny_decision == "reject": fail_findings = [f for f in scrutiny_report.get("findings", []) if f.get("result") == "fail"] fail_msg = fail_findings[0]["message"] if fail_findings else "quality too low" conn.close() return False, f"Rejected by scrutiny: {fail_msg}", scrutiny_report # Determine statuses if scrutiny_decision == "approve": scrutiny_status = "approved" elif scrutiny_decision == "review": scrutiny_status = "pending_review" else: scrutiny_status = "pending" # Moderation status based on auto_approve setting if auto_approve and scrutiny_status == "approved": moderation_status = "approved" else: moderation_status = "pending" # Compute hash config_hash = compute_yaml_hash(config_yaml) # Source attribution source_json = json.dumps({ "type": "imported", "original_tool": f"fabric/patterns/{name}", "url": "https://github.com/danielmiessler/fabric", "license": "MIT", "author": "Daniel Miessler" }) tags_json = json.dumps(tags) if tags else "[]" scrutiny_json = json.dumps(scrutiny_report) if scrutiny_report else None # Ensure official publisher exists publisher = query_one(conn, "SELECT id FROM publishers WHERE slug = ?", ["official"]) if not publisher: conn.execute( "INSERT INTO publishers (email, password_hash, slug, display_name, verified) VALUES (?, ?, ?, ?, ?)", ["official@cmdforge.local", "", "official", "Official", True] ) publisher_id = conn.execute("SELECT last_insert_rowid()").fetchone()[0] else: publisher_id = publisher["id"] if existing: # Update existing tool conn.execute( """ UPDATE tools SET version = ?, description = ?, category = ?, tags = ?, config_yaml = ?, readme = ?, scrutiny_status = ?, scrutiny_report = ?, source_json = ?, config_hash = ?, moderation_status = ?, published_at = ? WHERE id = ? """, [ version, description, category, tags_json, config_yaml, readme, scrutiny_status, scrutiny_json, source_json, config_hash, moderation_status, datetime.now(timezone.utc).isoformat(), existing["id"] ] ) else: # Insert new tool conn.execute( """ INSERT INTO tools ( owner, name, version, description, category, tags, config_yaml, readme, publisher_id, scrutiny_status, scrutiny_report, source_json, config_hash, visibility, moderation_status, published_at, downloads ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) """, [ "official", name, version, description, category, tags_json, config_yaml, readme, publisher_id, scrutiny_status, scrutiny_json, source_json, config_hash, "public", moderation_status, datetime.now(timezone.utc).isoformat(), 0 ] ) conn.commit() conn.close() decision = scrutiny_report.get("decision", "unknown") return True, f"Published (scrutiny: {decision}, moderation: {moderation_status})", scrutiny_report except Exception as e: logger.error(f"Registry publish failed: {e}") import traceback traceback.print_exc() return False, str(e), {} def sync_pattern( pattern_dir: Path, output_dir: Path, provider: str, state: SyncState, dry_run: bool = False, publish_to_db: bool = True, auto_approve: bool = False, skip_ai_review: bool = False ) -> bool: """Sync a single pattern. Args: pattern_dir: Path to the pattern directory output_dir: Output directory for local tools (if not publishing to DB) provider: AI provider name state: Sync state object dry_run: If True, don't make changes publish_to_db: If True, publish to registry database; if False, create local files auto_approve: If True, auto-approve tools that pass scrutiny skip_ai_review: If True, skip AI secondary review (for large bulk imports) Returns: True if successful """ name = pattern_dir.name pattern_hash = hash_pattern(pattern_dir) # Read pattern content system_md = pattern_dir / "system.md" if not system_md.exists(): logger.warning(f" ✗ {name}: No system.md found") return False system_prompt = system_md.read_text() # Import helper functions script_dir = Path(__file__).parent if str(script_dir) not in sys.path: sys.path.insert(0, str(script_dir)) from import_fabric import create_tool_config, pattern_to_display_name # Create tool config config = create_tool_config(name, system_prompt, provider) config_yaml = yaml.dump(config, default_flow_style=False, sort_keys=False) # Create README with usage and attribution display_name = pattern_to_display_name(name) readme = f"""# {display_name} {config.get('description', '')} ## Usage ```bash cat input.txt | {name} ``` ## Source This tool was imported from [Fabric](https://github.com/danielmiessler/fabric) patterns. - **Original pattern**: `{name}` - **Author**: Daniel Miessler - **License**: MIT """ if dry_run: # Quick vet check for dry run passed, reason = vet_pattern(pattern_dir, provider) if passed: logger.info(f" [DRY RUN] Would sync: {name} ({reason})") else: logger.info(f" [DRY RUN] Would skip: {name} ({reason})") return passed if publish_to_db: # Publish directly to registry database success, message, report = publish_to_registry(name, config_yaml, readme, provider, auto_approve, skip_ai_review) if success: logger.info(f" ✓ {name} -> registry ({message})") state.patterns[name] = { "name": name, "hash": pattern_hash, "status": "synced", "message": message, "synced_at": datetime.now(timezone.utc).isoformat(), } return True else: logger.warning(f" ✗ {name}: {message}") state.patterns[name] = { "name": name, "hash": pattern_hash, "status": "failed", "reason": message, "synced_at": datetime.now(timezone.utc).isoformat(), } return False else: # Original behavior: create local files passed, reason = vet_pattern(pattern_dir, provider) if not passed: logger.warning(f" ✗ {name}: {reason}") state.patterns[name] = { "name": name, "hash": pattern_hash, "status": "failed", "reason": reason, "synced_at": datetime.now(timezone.utc).isoformat(), } return False try: from import_fabric import import_pattern success = import_pattern( name, pattern_dir.parent, output_dir, provider, dry_run=False, registry_format=False, ) if success: logger.info(f" ✓ {name} -> {output_dir}/{name}") state.patterns[name] = { "name": name, "hash": pattern_hash, "status": "synced", "synced_at": datetime.now(timezone.utc).isoformat(), } return True else: logger.error(f" ✗ {name}: Import failed") state.patterns[name] = { "name": name, "hash": pattern_hash, "status": "failed", "reason": "Import failed", "synced_at": datetime.now(timezone.utc).isoformat(), } return False except Exception as e: logger.error(f" ✗ {name}: {e}") state.patterns[name] = { "name": name, "hash": pattern_hash, "status": "failed", "reason": str(e), "synced_at": datetime.now(timezone.utc).isoformat(), } return False def run_sync( sync_dir: Path, output_dir: Path, state_file: Path, provider: str, dry_run: bool = False, force_patterns: list[str] = None, publish_to_db: bool = True, auto_approve: bool = False, skip_ai_review: bool = False ) -> dict: """Run the sync process. Args: sync_dir: Directory for sync data (Fabric clone) output_dir: Output directory for local tools (if not publishing to DB) state_file: Path to state file provider: AI provider name dry_run: If True, don't make changes force_patterns: List of pattern names to force resync publish_to_db: If True, publish to registry database auto_approve: If True, auto-approve tools that pass scrutiny skip_ai_review: If True, skip AI secondary review (for large bulk imports) Returns: Summary dict with counts """ # Load state state = load_state(state_file) # Clone/update repo patterns_dir, commit_hash = clone_or_update_repo(sync_dir) # Scan patterns current_patterns = scan_patterns(patterns_dir) logger.info(f"Found {len(current_patterns)} patterns in Fabric repo (commit {commit_hash})") # Find changes if force_patterns: new_patterns = [p for p in force_patterns if p in current_patterns] updated_patterns = [] removed_patterns = [] else: new_patterns, updated_patterns, removed_patterns = find_changes(current_patterns, state) logger.info(f"Changes: {len(new_patterns)} new, {len(updated_patterns)} updated, {len(removed_patterns)} removed") # Update state timestamp state.last_check = datetime.now(timezone.utc).isoformat() state.repo_commit = commit_hash # Process new and updated patterns to_sync = new_patterns + updated_patterns synced = 0 failed = 0 if to_sync: dest = "registry database" if publish_to_db else output_dir logger.info(f"\nSyncing {len(to_sync)} patterns to {dest}...") for name in to_sync: pattern_dir = patterns_dir / name if sync_pattern(pattern_dir, output_dir, provider, state, dry_run, publish_to_db, auto_approve, skip_ai_review): synced += 1 else: failed += 1 # Mark removed patterns for name in removed_patterns: if name in state.patterns: state.patterns[name]["status"] = "removed" # Save state if not dry_run: state.last_sync = datetime.now(timezone.utc).isoformat() save_state(state, state_file) logger.info(f"\nState saved to {state_file}") # Summary summary = { "total_patterns": len(current_patterns), "new": len(new_patterns), "updated": len(updated_patterns), "removed": len(removed_patterns), "synced": synced, "failed": failed, "commit": commit_hash, } logger.info(f"\nSync complete: {synced} synced, {failed} failed") return summary def print_status(state_file: Path): """Print current sync status.""" state = load_state(state_file) print(f"\nFabric Sync Status") print(f"{'=' * 50}") print(f"Last check: {state.last_check or 'Never'}") print(f"Last sync: {state.last_sync or 'Never'}") print(f"Repo commit: {state.repo_commit or 'Unknown'}") if state.patterns: # Count by status by_status = {} for p in state.patterns.values(): status = p.get("status", "unknown") by_status[status] = by_status.get(status, 0) + 1 print(f"\nPatterns: {len(state.patterns)} total") for status, count in sorted(by_status.items()): print(f" {status}: {count}") # Show failed patterns failed = [p for p in state.patterns.values() if p.get("status") == "failed"] if failed: print(f"\nFailed patterns:") for p in failed[:10]: print(f" - {p['name']}: {p.get('reason', 'Unknown error')}") if len(failed) > 10: print(f" ... and {len(failed) - 10} more") else: print("\nNo patterns tracked yet. Run --sync to start.") def daemon_loop( sync_dir: Path, output_dir: Path, state_file: Path, provider: str, interval: int, publish_to_db: bool = True, auto_approve: bool = False, skip_ai_review: bool = False ): """Run sync in a loop.""" logger.info(f"Starting daemon mode with {interval}s interval") while True: try: run_sync(sync_dir, output_dir, state_file, provider, publish_to_db=publish_to_db, auto_approve=auto_approve, skip_ai_review=skip_ai_review) except Exception as e: logger.error(f"Sync failed: {e}") logger.info(f"Sleeping for {interval}s...") time.sleep(interval) def main(): parser = argparse.ArgumentParser( description="Scheduled Fabric pattern sync for CmdForge", formatter_class=argparse.RawDescriptionHelpFormatter, epilog=__doc__ ) parser.add_argument( "--sync", action="store_true", help="Run sync process" ) parser.add_argument( "--dry-run", action="store_true", help="Show what would be synced without making changes" ) parser.add_argument( "--status", action="store_true", help="Show current sync status" ) parser.add_argument( "--force", nargs="+", metavar="PATTERN", help="Force resync of specific patterns" ) parser.add_argument( "--daemon", action="store_true", help="Run in daemon mode" ) parser.add_argument( "--interval", type=int, default=3600, help="Sync interval in seconds for daemon mode (default: 3600)" ) parser.add_argument( "--sync-dir", type=Path, default=DEFAULT_SYNC_DIR, help=f"Directory for sync data (default: {DEFAULT_SYNC_DIR})" ) parser.add_argument( "--output", type=Path, default=Path.home() / ".cmdforge", help="Output directory for synced tools (default: ~/.cmdforge)" ) parser.add_argument( "--state-file", type=Path, help="State file path (default: /sync_state.json)" ) parser.add_argument( "--provider", default=DEFAULT_PROVIDER, help=f"Default provider for tools (default: {DEFAULT_PROVIDER})" ) parser.add_argument( "--local-only", action="store_true", help="Create local tools only (don't publish to registry database)" ) parser.add_argument( "--auto-approve", action="store_true", help="Auto-approve tools that pass scrutiny (skip moderation queue)" ) parser.add_argument( "--skip-ai-review", action="store_true", help="Skip AI secondary review (for large bulk imports to avoid rate limiting)" ) args = parser.parse_args() # Set state file default state_file = args.state_file or (args.sync_dir / "sync_state.json") if args.status: print_status(state_file) return 0 if args.daemon: daemon_loop( args.sync_dir, args.output, state_file, args.provider, args.interval, publish_to_db=not args.local_only, auto_approve=args.auto_approve, skip_ai_review=args.skip_ai_review ) return 0 if args.sync or args.dry_run or args.force: summary = run_sync( args.sync_dir, args.output, state_file, args.provider, dry_run=args.dry_run, force_patterns=args.force, publish_to_db=not args.local_only, auto_approve=args.auto_approve, skip_ai_review=args.skip_ai_review ) if summary["failed"] > 0: return 1 return 0 parser.print_help() return 1 if __name__ == "__main__": sys.exit(main())