CmdForge/scripts/fabric_sync.py

885 lines
28 KiB
Python
Executable File

#!/usr/bin/env python3
"""
Scheduled Fabric pattern sync for CmdForge.
Monitors the Fabric repository for new/updated patterns and syncs them
to the CmdForge registry through the vetting pipeline.
Usage:
# Check for new patterns (dry run)
python scripts/fabric_sync.py --dry-run
# Sync new patterns to registry
python scripts/fabric_sync.py --sync
# Check status of tracked patterns
python scripts/fabric_sync.py --status
# Force resync of specific patterns
python scripts/fabric_sync.py --force summarize extract_wisdom
# Run as daemon with interval
python scripts/fabric_sync.py --daemon --interval 3600
Setup for cron (daily sync):
0 3 * * * /path/to/venv/bin/python /path/to/scripts/fabric_sync.py --sync >> /var/log/fabric_sync.log 2>&1
Setup for systemd timer:
See scripts/fabric-sync.service and scripts/fabric-sync.timer
"""
import argparse
import hashlib
import json
import logging
import os
import subprocess
import sys
import time
from dataclasses import dataclass, field, asdict
from datetime import datetime, timezone
from pathlib import Path
from typing import Optional
import yaml
# Configure logging
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s [%(levelname)s] %(message)s',
datefmt='%Y-%m-%d %H:%M:%S'
)
logger = logging.getLogger(__name__)
# Constants
FABRIC_REPO = "https://github.com/danielmiessler/fabric.git"
DEFAULT_SYNC_DIR = Path("/var/lib/cmdforge/fabric-sync")
DEFAULT_STATE_FILE = DEFAULT_SYNC_DIR / "sync_state.json"
DEFAULT_PROVIDER = "opencode-pickle"
def run_ai_scrutiny_review(scrutiny_report: dict, config: dict, tool_name: str, description: str):
"""Run AI-powered secondary review of scrutiny warnings.
Uses the scrutiny-ai-review tool to analyze warnings and identify false positives.
Args:
scrutiny_report: The scrutiny report with findings
config: The tool configuration dict
tool_name: Name of the tool being reviewed
description: Tool description
Returns:
AI review result dict, or None if review fails
"""
# Check if the AI review tool exists
tool_path = Path.home() / ".cmdforge" / "scrutiny-ai-review" / "config.yaml"
if not tool_path.exists():
return None
# Extract warnings from scrutiny report
warnings = [
f for f in scrutiny_report.get("findings", [])
if f.get("result") == "warning"
]
if not warnings:
return None
# Prepare tool config for review
tool_config = {
"name": tool_name,
"description": description or "",
"steps": config.get("steps", []),
"arguments": config.get("arguments", []),
}
# Run the tool
try:
result = subprocess.run(
[
"cmdforge", "run", "scrutiny-ai-review",
"--warnings", json.dumps(warnings),
"--tool-config", json.dumps(tool_config),
],
capture_output=True,
text=True,
timeout=60, # 60 second timeout
)
if result.returncode == 0 and result.stdout.strip():
return json.loads(result.stdout.strip())
return None
except (subprocess.TimeoutExpired, json.JSONDecodeError, FileNotFoundError):
return None
@dataclass
class PatternState:
"""State of a single pattern."""
name: str
hash: str # SHA256 of system.md content
synced_at: Optional[str] = None
version: str = "1.0.0"
status: str = "pending" # pending, synced, failed, skipped
@dataclass
class SyncState:
"""Overall sync state."""
last_check: Optional[str] = None
last_sync: Optional[str] = None
repo_commit: Optional[str] = None
patterns: dict = field(default_factory=dict) # name -> PatternState as dict
def to_dict(self) -> dict:
return {
"last_check": self.last_check,
"last_sync": self.last_sync,
"repo_commit": self.repo_commit,
"patterns": self.patterns,
}
@classmethod
def from_dict(cls, data: dict) -> "SyncState":
return cls(
last_check=data.get("last_check"),
last_sync=data.get("last_sync"),
repo_commit=data.get("repo_commit"),
patterns=data.get("patterns", {}),
)
def load_state(state_file: Path) -> SyncState:
"""Load sync state from file."""
if state_file.exists():
with open(state_file) as f:
data = json.load(f)
return SyncState.from_dict(data)
return SyncState()
def save_state(state: SyncState, state_file: Path):
"""Save sync state to file."""
state_file.parent.mkdir(parents=True, exist_ok=True)
with open(state_file, "w") as f:
json.dump(state.to_dict(), f, indent=2)
def clone_or_update_repo(sync_dir: Path) -> tuple[Path, str]:
"""Clone or update the Fabric repository.
Returns:
Tuple of (patterns_dir, commit_hash)
"""
fabric_dir = sync_dir / "fabric"
patterns_dir = fabric_dir / "data" / "patterns"
if fabric_dir.exists():
logger.info("Updating existing Fabric clone...")
subprocess.run(
["git", "-C", str(fabric_dir), "fetch", "--quiet"],
check=True,
capture_output=True
)
subprocess.run(
["git", "-C", str(fabric_dir), "reset", "--hard", "origin/main", "--quiet"],
check=True,
capture_output=True
)
else:
logger.info("Cloning Fabric repository...")
sync_dir.mkdir(parents=True, exist_ok=True)
subprocess.run(
["git", "clone", "--depth", "1", FABRIC_REPO, str(fabric_dir)],
check=True,
capture_output=True
)
# Get current commit hash
result = subprocess.run(
["git", "-C", str(fabric_dir), "rev-parse", "HEAD"],
capture_output=True,
text=True
)
commit_hash = result.stdout.strip()[:12]
return patterns_dir, commit_hash
def hash_pattern(pattern_dir: Path) -> Optional[str]:
"""Calculate hash of pattern content."""
system_md = pattern_dir / "system.md"
if not system_md.exists():
return None
content = system_md.read_bytes()
return hashlib.sha256(content).hexdigest()[:16]
def scan_patterns(patterns_dir: Path) -> dict[str, str]:
"""Scan all patterns and return name -> hash mapping."""
patterns = {}
for entry in sorted(patterns_dir.iterdir()):
if entry.is_dir():
pattern_hash = hash_pattern(entry)
if pattern_hash:
patterns[entry.name] = pattern_hash
return patterns
def find_changes(
current_patterns: dict[str, str],
state: SyncState
) -> tuple[list[str], list[str], list[str]]:
"""Find new, updated, and removed patterns.
Returns:
Tuple of (new_patterns, updated_patterns, removed_patterns)
"""
new_patterns = []
updated_patterns = []
removed_patterns = []
# Check for new and updated
for name, current_hash in current_patterns.items():
if name not in state.patterns:
new_patterns.append(name)
elif state.patterns[name].get("hash") != current_hash:
updated_patterns.append(name)
# Check for removed
for name in state.patterns:
if name not in current_patterns:
removed_patterns.append(name)
return new_patterns, updated_patterns, removed_patterns
def vet_pattern(pattern_dir: Path, provider: str = DEFAULT_PROVIDER) -> tuple[bool, str]:
"""Run vetting pipeline on a pattern.
Returns:
Tuple of (passed, reason)
"""
try:
# Try to import the vetting pipeline
script_dir = Path(__file__).parent
sys.path.insert(0, str(script_dir))
from import_fabric import create_tool_config, clean_prompt, get_category, pattern_to_display_name
# Read pattern
system_md = pattern_dir / "system.md"
system_prompt = system_md.read_text()
# Create config
config = create_tool_config(pattern_dir.name, system_prompt, provider)
# Run scrutiny
try:
from scrutiny import vet_tool, VetResult
report = vet_tool(config, str(pattern_dir))
if report.result == VetResult.REJECT:
return False, f"Rejected: {report.suggestions[0] if report.suggestions else 'quality too low'}"
elif report.result == VetResult.REVIEW:
return True, f"Approved (needs review): score {report.overall_score:.2f}"
else:
return True, f"Approved: score {report.overall_score:.2f}"
except ImportError:
# Scrutiny not available - basic validation
if len(system_prompt.strip()) < 50:
return False, "Pattern too short"
return True, "Basic validation passed"
except Exception as e:
return False, f"Vetting error: {e}"
def publish_to_registry(
name: str,
config_yaml: str,
readme: str,
provider: str,
auto_approve: bool = False
) -> tuple[bool, str, dict]:
"""Publish a tool directly to the registry database with vetting.
Returns:
Tuple of (success, message, scrutiny_report)
"""
try:
# Add src to path for registry imports
src_dir = Path(__file__).parent.parent / "src"
if str(src_dir) not in sys.path:
sys.path.insert(0, str(src_dir))
from cmdforge.registry.db import connect_db, query_one
from cmdforge.registry.scrutiny import scrutinize_tool
from cmdforge.hash_utils import compute_yaml_hash
# Parse config
config = yaml.safe_load(config_yaml)
version = config.get("version", "1.0.0")
description = config.get("description", "")
category = config.get("category")
tags = config.get("tags", [])
conn = connect_db()
# Check if already exists
existing = query_one(
conn,
"SELECT id, version FROM tools WHERE owner = ? AND name = ?",
["official", name],
)
if existing:
# Check if same version
if existing["version"] == version:
conn.close()
return True, "Already exists (same version)", {}
# Run scrutiny
scrutiny_report = {}
try:
scrutiny_report = scrutinize_tool(config_yaml, description, readme)
except Exception as e:
logger.warning(f"Scrutiny failed for {name}: {e}")
# Run AI secondary review if there are warnings
scrutiny_decision = scrutiny_report.get("decision", "review")
if scrutiny_decision == "review":
try:
ai_review = run_ai_scrutiny_review(scrutiny_report, config, name, description)
if ai_review:
scrutiny_report["ai_review"] = ai_review
# Update decision based on AI review
if ai_review.get("overall_verdict") == "APPROVE" and ai_review.get("confidence", 0) >= 0.8:
scrutiny_report["decision"] = "approve"
scrutiny_report["ai_approved"] = True
scrutiny_decision = "approve"
logger.info(f" AI review approved {name} (confidence: {ai_review.get('confidence', 0):.2f})")
except Exception as e:
scrutiny_report["ai_review_error"] = str(e)
logger.warning(f"AI review failed for {name}: {e}")
# Check scrutiny decision
if scrutiny_decision == "reject":
fail_findings = [f for f in scrutiny_report.get("findings", []) if f.get("result") == "fail"]
fail_msg = fail_findings[0]["message"] if fail_findings else "quality too low"
conn.close()
return False, f"Rejected by scrutiny: {fail_msg}", scrutiny_report
# Determine statuses
if scrutiny_decision == "approve":
scrutiny_status = "approved"
elif scrutiny_decision == "review":
scrutiny_status = "pending_review"
else:
scrutiny_status = "pending"
# Moderation status based on auto_approve setting
if auto_approve and scrutiny_status == "approved":
moderation_status = "approved"
else:
moderation_status = "pending"
# Compute hash
config_hash = compute_yaml_hash(config_yaml)
# Source attribution
source_json = json.dumps({
"type": "imported",
"original_tool": f"fabric/patterns/{name}",
"url": "https://github.com/danielmiessler/fabric",
"license": "MIT",
"author": "Daniel Miessler"
})
tags_json = json.dumps(tags) if tags else "[]"
scrutiny_json = json.dumps(scrutiny_report) if scrutiny_report else None
# Ensure official publisher exists
publisher = query_one(conn, "SELECT id FROM publishers WHERE slug = ?", ["official"])
if not publisher:
conn.execute(
"INSERT INTO publishers (email, password_hash, slug, display_name, verified) VALUES (?, ?, ?, ?, ?)",
["official@cmdforge.local", "", "official", "Official", True]
)
publisher_id = conn.execute("SELECT last_insert_rowid()").fetchone()[0]
else:
publisher_id = publisher["id"]
if existing:
# Update existing tool
conn.execute(
"""
UPDATE tools SET
version = ?, description = ?, category = ?, tags = ?,
config_yaml = ?, readme = ?, scrutiny_status = ?, scrutiny_report = ?,
source_json = ?, config_hash = ?, moderation_status = ?,
published_at = ?
WHERE id = ?
""",
[
version, description, category, tags_json,
config_yaml, readme, scrutiny_status, scrutiny_json,
source_json, config_hash, moderation_status,
datetime.now(timezone.utc).isoformat(),
existing["id"]
]
)
else:
# Insert new tool
conn.execute(
"""
INSERT INTO tools (
owner, name, version, description, category, tags, config_yaml, readme,
publisher_id, scrutiny_status, scrutiny_report, source_json,
config_hash, visibility, moderation_status, published_at, downloads
) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
""",
[
"official", name, version, description, category, tags_json,
config_yaml, readme, publisher_id, scrutiny_status, scrutiny_json,
source_json, config_hash, "public", moderation_status,
datetime.now(timezone.utc).isoformat(), 0
]
)
conn.commit()
conn.close()
decision = scrutiny_report.get("decision", "unknown")
return True, f"Published (scrutiny: {decision}, moderation: {moderation_status})", scrutiny_report
except Exception as e:
logger.error(f"Registry publish failed: {e}")
import traceback
traceback.print_exc()
return False, str(e), {}
def sync_pattern(
pattern_dir: Path,
output_dir: Path,
provider: str,
state: SyncState,
dry_run: bool = False,
publish_to_db: bool = True,
auto_approve: bool = False
) -> bool:
"""Sync a single pattern.
Args:
pattern_dir: Path to the pattern directory
output_dir: Output directory for local tools (if not publishing to DB)
provider: AI provider name
state: Sync state object
dry_run: If True, don't make changes
publish_to_db: If True, publish to registry database; if False, create local files
auto_approve: If True, auto-approve tools that pass scrutiny
Returns:
True if successful
"""
name = pattern_dir.name
pattern_hash = hash_pattern(pattern_dir)
# Read pattern content
system_md = pattern_dir / "system.md"
if not system_md.exists():
logger.warning(f"{name}: No system.md found")
return False
system_prompt = system_md.read_text()
# Import helper functions
script_dir = Path(__file__).parent
if str(script_dir) not in sys.path:
sys.path.insert(0, str(script_dir))
from import_fabric import create_tool_config, pattern_to_display_name
# Create tool config
config = create_tool_config(name, system_prompt, provider)
config_yaml = yaml.dump(config, default_flow_style=False, sort_keys=False)
# Create README with usage and attribution
display_name = pattern_to_display_name(name)
readme = f"""# {display_name}
{config.get('description', '')}
## Usage
```bash
cat input.txt | {name}
```
## Source
This tool was imported from [Fabric](https://github.com/danielmiessler/fabric) patterns.
- **Original pattern**: `{name}`
- **Author**: Daniel Miessler
- **License**: MIT
"""
if dry_run:
# Quick vet check for dry run
passed, reason = vet_pattern(pattern_dir, provider)
if passed:
logger.info(f" [DRY RUN] Would sync: {name} ({reason})")
else:
logger.info(f" [DRY RUN] Would skip: {name} ({reason})")
return passed
if publish_to_db:
# Publish directly to registry database
success, message, report = publish_to_registry(name, config_yaml, readme, provider, auto_approve)
if success:
logger.info(f"{name} -> registry ({message})")
state.patterns[name] = {
"name": name,
"hash": pattern_hash,
"status": "synced",
"message": message,
"synced_at": datetime.now(timezone.utc).isoformat(),
}
return True
else:
logger.warning(f"{name}: {message}")
state.patterns[name] = {
"name": name,
"hash": pattern_hash,
"status": "failed",
"reason": message,
"synced_at": datetime.now(timezone.utc).isoformat(),
}
return False
else:
# Original behavior: create local files
passed, reason = vet_pattern(pattern_dir, provider)
if not passed:
logger.warning(f"{name}: {reason}")
state.patterns[name] = {
"name": name,
"hash": pattern_hash,
"status": "failed",
"reason": reason,
"synced_at": datetime.now(timezone.utc).isoformat(),
}
return False
try:
from import_fabric import import_pattern
success = import_pattern(
name,
pattern_dir.parent,
output_dir,
provider,
dry_run=False,
registry_format=False,
)
if success:
logger.info(f"{name} -> {output_dir}/{name}")
state.patterns[name] = {
"name": name,
"hash": pattern_hash,
"status": "synced",
"synced_at": datetime.now(timezone.utc).isoformat(),
}
return True
else:
logger.error(f"{name}: Import failed")
state.patterns[name] = {
"name": name,
"hash": pattern_hash,
"status": "failed",
"reason": "Import failed",
"synced_at": datetime.now(timezone.utc).isoformat(),
}
return False
except Exception as e:
logger.error(f"{name}: {e}")
state.patterns[name] = {
"name": name,
"hash": pattern_hash,
"status": "failed",
"reason": str(e),
"synced_at": datetime.now(timezone.utc).isoformat(),
}
return False
def run_sync(
sync_dir: Path,
output_dir: Path,
state_file: Path,
provider: str,
dry_run: bool = False,
force_patterns: list[str] = None,
publish_to_db: bool = True,
auto_approve: bool = False
) -> dict:
"""Run the sync process.
Args:
sync_dir: Directory for sync data (Fabric clone)
output_dir: Output directory for local tools (if not publishing to DB)
state_file: Path to state file
provider: AI provider name
dry_run: If True, don't make changes
force_patterns: List of pattern names to force resync
publish_to_db: If True, publish to registry database
auto_approve: If True, auto-approve tools that pass scrutiny
Returns:
Summary dict with counts
"""
# Load state
state = load_state(state_file)
# Clone/update repo
patterns_dir, commit_hash = clone_or_update_repo(sync_dir)
# Scan patterns
current_patterns = scan_patterns(patterns_dir)
logger.info(f"Found {len(current_patterns)} patterns in Fabric repo (commit {commit_hash})")
# Find changes
if force_patterns:
new_patterns = [p for p in force_patterns if p in current_patterns]
updated_patterns = []
removed_patterns = []
else:
new_patterns, updated_patterns, removed_patterns = find_changes(current_patterns, state)
logger.info(f"Changes: {len(new_patterns)} new, {len(updated_patterns)} updated, {len(removed_patterns)} removed")
# Update state timestamp
state.last_check = datetime.now(timezone.utc).isoformat()
state.repo_commit = commit_hash
# Process new and updated patterns
to_sync = new_patterns + updated_patterns
synced = 0
failed = 0
if to_sync:
dest = "registry database" if publish_to_db else output_dir
logger.info(f"\nSyncing {len(to_sync)} patterns to {dest}...")
for name in to_sync:
pattern_dir = patterns_dir / name
if sync_pattern(pattern_dir, output_dir, provider, state, dry_run, publish_to_db, auto_approve):
synced += 1
else:
failed += 1
# Mark removed patterns
for name in removed_patterns:
if name in state.patterns:
state.patterns[name]["status"] = "removed"
# Save state
if not dry_run:
state.last_sync = datetime.now(timezone.utc).isoformat()
save_state(state, state_file)
logger.info(f"\nState saved to {state_file}")
# Summary
summary = {
"total_patterns": len(current_patterns),
"new": len(new_patterns),
"updated": len(updated_patterns),
"removed": len(removed_patterns),
"synced": synced,
"failed": failed,
"commit": commit_hash,
}
logger.info(f"\nSync complete: {synced} synced, {failed} failed")
return summary
def print_status(state_file: Path):
"""Print current sync status."""
state = load_state(state_file)
print(f"\nFabric Sync Status")
print(f"{'=' * 50}")
print(f"Last check: {state.last_check or 'Never'}")
print(f"Last sync: {state.last_sync or 'Never'}")
print(f"Repo commit: {state.repo_commit or 'Unknown'}")
if state.patterns:
# Count by status
by_status = {}
for p in state.patterns.values():
status = p.get("status", "unknown")
by_status[status] = by_status.get(status, 0) + 1
print(f"\nPatterns: {len(state.patterns)} total")
for status, count in sorted(by_status.items()):
print(f" {status}: {count}")
# Show failed patterns
failed = [p for p in state.patterns.values() if p.get("status") == "failed"]
if failed:
print(f"\nFailed patterns:")
for p in failed[:10]:
print(f" - {p['name']}: {p.get('reason', 'Unknown error')}")
if len(failed) > 10:
print(f" ... and {len(failed) - 10} more")
else:
print("\nNo patterns tracked yet. Run --sync to start.")
def daemon_loop(
sync_dir: Path,
output_dir: Path,
state_file: Path,
provider: str,
interval: int,
publish_to_db: bool = True,
auto_approve: bool = False
):
"""Run sync in a loop."""
logger.info(f"Starting daemon mode with {interval}s interval")
while True:
try:
run_sync(sync_dir, output_dir, state_file, provider,
publish_to_db=publish_to_db, auto_approve=auto_approve)
except Exception as e:
logger.error(f"Sync failed: {e}")
logger.info(f"Sleeping for {interval}s...")
time.sleep(interval)
def main():
parser = argparse.ArgumentParser(
description="Scheduled Fabric pattern sync for CmdForge",
formatter_class=argparse.RawDescriptionHelpFormatter,
epilog=__doc__
)
parser.add_argument(
"--sync",
action="store_true",
help="Run sync process"
)
parser.add_argument(
"--dry-run",
action="store_true",
help="Show what would be synced without making changes"
)
parser.add_argument(
"--status",
action="store_true",
help="Show current sync status"
)
parser.add_argument(
"--force",
nargs="+",
metavar="PATTERN",
help="Force resync of specific patterns"
)
parser.add_argument(
"--daemon",
action="store_true",
help="Run in daemon mode"
)
parser.add_argument(
"--interval",
type=int,
default=3600,
help="Sync interval in seconds for daemon mode (default: 3600)"
)
parser.add_argument(
"--sync-dir",
type=Path,
default=DEFAULT_SYNC_DIR,
help=f"Directory for sync data (default: {DEFAULT_SYNC_DIR})"
)
parser.add_argument(
"--output",
type=Path,
default=Path.home() / ".cmdforge",
help="Output directory for synced tools (default: ~/.cmdforge)"
)
parser.add_argument(
"--state-file",
type=Path,
help="State file path (default: <sync-dir>/sync_state.json)"
)
parser.add_argument(
"--provider",
default=DEFAULT_PROVIDER,
help=f"Default provider for tools (default: {DEFAULT_PROVIDER})"
)
parser.add_argument(
"--local-only",
action="store_true",
help="Create local tools only (don't publish to registry database)"
)
parser.add_argument(
"--auto-approve",
action="store_true",
help="Auto-approve tools that pass scrutiny (skip moderation queue)"
)
args = parser.parse_args()
# Set state file default
state_file = args.state_file or (args.sync_dir / "sync_state.json")
if args.status:
print_status(state_file)
return 0
if args.daemon:
daemon_loop(
args.sync_dir,
args.output,
state_file,
args.provider,
args.interval,
publish_to_db=not args.local_only,
auto_approve=args.auto_approve
)
return 0
if args.sync or args.dry_run or args.force:
summary = run_sync(
args.sync_dir,
args.output,
state_file,
args.provider,
dry_run=args.dry_run,
force_patterns=args.force,
publish_to_db=not args.local_only,
auto_approve=args.auto_approve
)
if summary["failed"] > 0:
return 1
return 0
parser.print_help()
return 1
if __name__ == "__main__":
sys.exit(main())