#!/usr/bin/env python3 """ Auto-vetting pipeline for CmdForge tool submissions. Combines scrutiny (quality checks) and similarity (duplicate detection) to automatically triage incoming tools: - Auto-approve: High quality, no duplicates - Auto-reject: Low quality or exact duplicates - Review queue: Needs human review Usage: # Vet a single tool python scripts/vet_pipeline.py path/to/tool/config.yaml # Vet with custom thresholds python scripts/vet_pipeline.py --approve-threshold 0.85 path/to/tool/config.yaml # Process all tools in import directory python scripts/vet_pipeline.py --batch /tmp/fabric-import/ # Output detailed JSON report python scripts/vet_pipeline.py --json path/to/tool/config.yaml """ import argparse import json import sys from dataclasses import dataclass, asdict from enum import Enum from pathlib import Path from typing import Optional # Import our vetting modules from scrutiny import ( VetResult as ScrutinyResult, VetReport, load_tool_config, vet_tool, ) from similarity import ( ToolText, SimilarityMatch, extract_tool_text, load_corpus, compute_tfidf, find_similar, DUPLICATE_THRESHOLD, SIMILAR_THRESHOLD, ) class PipelineDecision(Enum): """Final pipeline decision.""" AUTO_APPROVE = "auto_approve" AUTO_REJECT = "auto_reject" NEEDS_REVIEW = "needs_review" ERROR = "error" @dataclass class PipelineResult: """Complete pipeline result for a tool.""" tool_name: str tool_path: str decision: PipelineDecision reason: str scrutiny_report: Optional[VetReport] = None similarity_matches: list = None suggestions: list = None def __post_init__(self): if self.similarity_matches is None: self.similarity_matches = [] if self.suggestions is None: self.suggestions = [] def to_dict(self) -> dict: d = { "tool_name": self.tool_name, "tool_path": self.tool_path, "decision": self.decision.value, "reason": self.reason, "suggestions": self.suggestions, } if self.scrutiny_report: d["scrutiny"] = self.scrutiny_report.to_dict() if self.similarity_matches: d["similar_tools"] = [m.to_dict() for m in self.similarity_matches] return d # Pipeline thresholds DEFAULT_APPROVE_THRESHOLD = 0.8 DEFAULT_REJECT_THRESHOLD = 0.3 def run_pipeline( tool_path: Path, corpus_dir: Path, approve_threshold: float = DEFAULT_APPROVE_THRESHOLD, reject_threshold: float = DEFAULT_REJECT_THRESHOLD, ) -> PipelineResult: """Run the complete vetting pipeline on a tool.""" # Load tool config config = load_tool_config(tool_path) if not config: return PipelineResult( tool_name="unknown", tool_path=str(tool_path), decision=PipelineDecision.ERROR, reason=f"Could not load tool config from {tool_path}", ) tool_name = config.get("name", "unknown") # Phase 1: Scrutiny (quality checks) scrutiny_report = vet_tool(config, str(tool_path)) # Phase 2: Similarity check tool_text = extract_tool_text(config, str(tool_path)) corpus = load_corpus(corpus_dir) if corpus_dir.exists() else [] similarity_matches = [] if corpus: all_tools = corpus + [tool_text] compute_tfidf(all_tools) similarity_matches = find_similar(tool_text, corpus, threshold=0.5) # Phase 3: Decision logic decision, reason, suggestions = make_decision( scrutiny_report, similarity_matches, approve_threshold, reject_threshold, ) return PipelineResult( tool_name=tool_name, tool_path=str(tool_path), decision=decision, reason=reason, scrutiny_report=scrutiny_report, similarity_matches=similarity_matches, suggestions=suggestions, ) def make_decision( scrutiny: VetReport, similarity_matches: list[SimilarityMatch], approve_threshold: float, reject_threshold: float, ) -> tuple[PipelineDecision, str, list[str]]: """Make final decision based on scrutiny and similarity results.""" suggestions = list(scrutiny.suggestions) if scrutiny.suggestions else [] score = scrutiny.overall_score # Check for exact duplicates first duplicates = [m for m in similarity_matches if m.match_type == "duplicate"] if duplicates: dupe = duplicates[0] return ( PipelineDecision.AUTO_REJECT, f"Duplicate of existing tool '{dupe.tool2_name}' ({dupe.similarity:.0%} match)", suggestions + [f"Consider updating {dupe.tool2_name} instead of creating a new tool"], ) # Check for very similar tools similar = [m for m in similarity_matches if m.match_type == "similar"] if similar: # If high quality + similar, might be an improved version - needs review if score >= approve_threshold: sim = similar[0] return ( PipelineDecision.NEEDS_REVIEW, f"High quality but similar to '{sim.tool2_name}' ({sim.similarity:.0%})", suggestions + [f"Review whether this improves on {sim.tool2_name}"], ) else: sim = similar[0] return ( PipelineDecision.NEEDS_REVIEW, f"Similar to existing tool '{sim.tool2_name}' ({sim.similarity:.0%})", suggestions + [f"Consider if this duplicates {sim.tool2_name}"], ) # No duplicates or very similar tools - decide based on quality if scrutiny.result == ScrutinyResult.REJECT or score < reject_threshold: return ( PipelineDecision.AUTO_REJECT, f"Quality score too low ({score:.2f} < {reject_threshold})", suggestions, ) if scrutiny.result == ScrutinyResult.APPROVE and score >= approve_threshold: return ( PipelineDecision.AUTO_APPROVE, f"High quality score ({score:.2f}) with no similar tools", suggestions, ) # Middle ground - needs review return ( PipelineDecision.NEEDS_REVIEW, f"Quality score {score:.2f} - needs human review", suggestions, ) def print_result(result: PipelineResult, verbose: bool = False): """Print pipeline result to console.""" decision_emoji = { PipelineDecision.AUTO_APPROVE: "✅", PipelineDecision.AUTO_REJECT: "❌", PipelineDecision.NEEDS_REVIEW: "⚠️", PipelineDecision.ERROR: "💥", } decision_color = { PipelineDecision.AUTO_APPROVE: "\033[92m", # Green PipelineDecision.AUTO_REJECT: "\033[91m", # Red PipelineDecision.NEEDS_REVIEW: "\033[93m", # Yellow PipelineDecision.ERROR: "\033[91m", # Red } reset = "\033[0m" emoji = decision_emoji.get(result.decision, "❓") color = decision_color.get(result.decision, "") print(f"\n{emoji} {color}{result.tool_name}: {result.decision.value.upper()}{reset}") print(f" {result.reason}") if verbose: print(f" Path: {result.tool_path}") if result.scrutiny_report: print(f"\n Quality Scores:") for score in result.scrutiny_report.scores: bar = "█" * int(score.score * 10) + "░" * (10 - int(score.score * 10)) print(f" {score.criterion:12} [{bar}] {score.score:.2f}") if result.similarity_matches: print(f"\n Similar Tools:") for match in result.similarity_matches[:3]: print(f" • {match.tool2_name} ({match.similarity:.0%} {match.match_type})") if result.suggestions: print(f"\n Suggestions:") for suggestion in result.suggestions[:5]: print(f" • {suggestion}") def main(): parser = argparse.ArgumentParser( description="Run auto-vetting pipeline on CmdForge tools", formatter_class=argparse.RawDescriptionHelpFormatter, epilog=__doc__ ) parser.add_argument( "path", type=Path, nargs="?", help="Tool config file or directory to vet" ) parser.add_argument( "--batch", type=Path, metavar="DIR", help="Process all tools in directory" ) parser.add_argument( "--corpus", type=Path, default=Path.home() / ".cmdforge", help="Directory of existing tools for comparison (default: ~/.cmdforge)" ) parser.add_argument( "--approve-threshold", type=float, default=DEFAULT_APPROVE_THRESHOLD, help=f"Score threshold for auto-approve (default: {DEFAULT_APPROVE_THRESHOLD})" ) parser.add_argument( "--reject-threshold", type=float, default=DEFAULT_REJECT_THRESHOLD, help=f"Score threshold for auto-reject (default: {DEFAULT_REJECT_THRESHOLD})" ) parser.add_argument( "--json", action="store_true", help="Output as JSON" ) parser.add_argument( "--verbose", "-v", action="store_true", help="Show detailed output" ) args = parser.parse_args() if not args.path and not args.batch: parser.error("Specify a tool path or use --batch") results = [] if args.batch: # Batch mode batch_dir = args.batch if not batch_dir.exists(): print(f"Error: Directory {batch_dir} does not exist", file=sys.stderr) return 1 print(f"Processing tools in {batch_dir}...") for entry in sorted(batch_dir.iterdir()): config_file = None if entry.is_dir(): config_file = entry / "config.yaml" elif entry.suffix in [".yaml", ".yml"]: config_file = entry if config_file and config_file.exists(): result = run_pipeline( config_file, args.corpus, args.approve_threshold, args.reject_threshold, ) results.append(result) else: # Single tool mode result = run_pipeline( args.path, args.corpus, args.approve_threshold, args.reject_threshold, ) results.append(result) # Output if args.json: output = [r.to_dict() for r in results] print(json.dumps(output, indent=2)) else: for result in results: print_result(result, args.verbose) # Summary if len(results) > 1: approved = sum(1 for r in results if r.decision == PipelineDecision.AUTO_APPROVE) rejected = sum(1 for r in results if r.decision == PipelineDecision.AUTO_REJECT) review = sum(1 for r in results if r.decision == PipelineDecision.NEEDS_REVIEW) errors = sum(1 for r in results if r.decision == PipelineDecision.ERROR) print(f"\n{'═' * 50}") print(f"Pipeline Summary: {len(results)} tools processed") print(f" ✅ Auto-approved: {approved}") print(f" ⚠️ Needs review: {review}") print(f" ❌ Auto-rejected: {rejected}") if errors: print(f" 💥 Errors: {errors}") # Return codes has_rejected = any(r.decision == PipelineDecision.AUTO_REJECT for r in results) has_review = any(r.decision == PipelineDecision.NEEDS_REVIEW for r in results) if has_rejected: return 2 elif has_review: return 1 return 0 if __name__ == "__main__": sys.exit(main())