CmdForge/scripts/vet_pipeline.py

380 lines
12 KiB
Python
Executable File

#!/usr/bin/env python3
"""
Auto-vetting pipeline for CmdForge tool submissions.
Combines scrutiny (quality checks) and similarity (duplicate detection)
to automatically triage incoming tools:
- Auto-approve: High quality, no duplicates
- Auto-reject: Low quality or exact duplicates
- Review queue: Needs human review
Usage:
# Vet a single tool
python scripts/vet_pipeline.py path/to/tool/config.yaml
# Vet with custom thresholds
python scripts/vet_pipeline.py --approve-threshold 0.85 path/to/tool/config.yaml
# Process all tools in import directory
python scripts/vet_pipeline.py --batch /tmp/fabric-import/
# Output detailed JSON report
python scripts/vet_pipeline.py --json path/to/tool/config.yaml
"""
import argparse
import json
import sys
from dataclasses import dataclass, asdict
from enum import Enum
from pathlib import Path
from typing import Optional
# Import our vetting modules
from scrutiny import (
VetResult as ScrutinyResult,
VetReport,
load_tool_config,
vet_tool,
)
from similarity import (
ToolText,
SimilarityMatch,
extract_tool_text,
load_corpus,
compute_tfidf,
find_similar,
DUPLICATE_THRESHOLD,
SIMILAR_THRESHOLD,
)
class PipelineDecision(Enum):
"""Final pipeline decision."""
AUTO_APPROVE = "auto_approve"
AUTO_REJECT = "auto_reject"
NEEDS_REVIEW = "needs_review"
ERROR = "error"
@dataclass
class PipelineResult:
"""Complete pipeline result for a tool."""
tool_name: str
tool_path: str
decision: PipelineDecision
reason: str
scrutiny_report: Optional[VetReport] = None
similarity_matches: list = None
suggestions: list = None
def __post_init__(self):
if self.similarity_matches is None:
self.similarity_matches = []
if self.suggestions is None:
self.suggestions = []
def to_dict(self) -> dict:
d = {
"tool_name": self.tool_name,
"tool_path": self.tool_path,
"decision": self.decision.value,
"reason": self.reason,
"suggestions": self.suggestions,
}
if self.scrutiny_report:
d["scrutiny"] = self.scrutiny_report.to_dict()
if self.similarity_matches:
d["similar_tools"] = [m.to_dict() for m in self.similarity_matches]
return d
# Pipeline thresholds
DEFAULT_APPROVE_THRESHOLD = 0.8
DEFAULT_REJECT_THRESHOLD = 0.3
def run_pipeline(
tool_path: Path,
corpus_dir: Path,
approve_threshold: float = DEFAULT_APPROVE_THRESHOLD,
reject_threshold: float = DEFAULT_REJECT_THRESHOLD,
) -> PipelineResult:
"""Run the complete vetting pipeline on a tool."""
# Load tool config
config = load_tool_config(tool_path)
if not config:
return PipelineResult(
tool_name="unknown",
tool_path=str(tool_path),
decision=PipelineDecision.ERROR,
reason=f"Could not load tool config from {tool_path}",
)
tool_name = config.get("name", "unknown")
# Phase 1: Scrutiny (quality checks)
scrutiny_report = vet_tool(config, str(tool_path))
# Phase 2: Similarity check
tool_text = extract_tool_text(config, str(tool_path))
corpus = load_corpus(corpus_dir) if corpus_dir.exists() else []
similarity_matches = []
if corpus:
all_tools = corpus + [tool_text]
compute_tfidf(all_tools)
similarity_matches = find_similar(tool_text, corpus, threshold=0.5)
# Phase 3: Decision logic
decision, reason, suggestions = make_decision(
scrutiny_report,
similarity_matches,
approve_threshold,
reject_threshold,
)
return PipelineResult(
tool_name=tool_name,
tool_path=str(tool_path),
decision=decision,
reason=reason,
scrutiny_report=scrutiny_report,
similarity_matches=similarity_matches,
suggestions=suggestions,
)
def make_decision(
scrutiny: VetReport,
similarity_matches: list[SimilarityMatch],
approve_threshold: float,
reject_threshold: float,
) -> tuple[PipelineDecision, str, list[str]]:
"""Make final decision based on scrutiny and similarity results."""
suggestions = list(scrutiny.suggestions) if scrutiny.suggestions else []
score = scrutiny.overall_score
# Check for exact duplicates first
duplicates = [m for m in similarity_matches if m.match_type == "duplicate"]
if duplicates:
dupe = duplicates[0]
return (
PipelineDecision.AUTO_REJECT,
f"Duplicate of existing tool '{dupe.tool2_name}' ({dupe.similarity:.0%} match)",
suggestions + [f"Consider updating {dupe.tool2_name} instead of creating a new tool"],
)
# Check for very similar tools
similar = [m for m in similarity_matches if m.match_type == "similar"]
if similar:
# If high quality + similar, might be an improved version - needs review
if score >= approve_threshold:
sim = similar[0]
return (
PipelineDecision.NEEDS_REVIEW,
f"High quality but similar to '{sim.tool2_name}' ({sim.similarity:.0%})",
suggestions + [f"Review whether this improves on {sim.tool2_name}"],
)
else:
sim = similar[0]
return (
PipelineDecision.NEEDS_REVIEW,
f"Similar to existing tool '{sim.tool2_name}' ({sim.similarity:.0%})",
suggestions + [f"Consider if this duplicates {sim.tool2_name}"],
)
# No duplicates or very similar tools - decide based on quality
if scrutiny.result == ScrutinyResult.REJECT or score < reject_threshold:
return (
PipelineDecision.AUTO_REJECT,
f"Quality score too low ({score:.2f} < {reject_threshold})",
suggestions,
)
if scrutiny.result == ScrutinyResult.APPROVE and score >= approve_threshold:
return (
PipelineDecision.AUTO_APPROVE,
f"High quality score ({score:.2f}) with no similar tools",
suggestions,
)
# Middle ground - needs review
return (
PipelineDecision.NEEDS_REVIEW,
f"Quality score {score:.2f} - needs human review",
suggestions,
)
def print_result(result: PipelineResult, verbose: bool = False):
"""Print pipeline result to console."""
decision_emoji = {
PipelineDecision.AUTO_APPROVE: "",
PipelineDecision.AUTO_REJECT: "",
PipelineDecision.NEEDS_REVIEW: "⚠️",
PipelineDecision.ERROR: "💥",
}
decision_color = {
PipelineDecision.AUTO_APPROVE: "\033[92m", # Green
PipelineDecision.AUTO_REJECT: "\033[91m", # Red
PipelineDecision.NEEDS_REVIEW: "\033[93m", # Yellow
PipelineDecision.ERROR: "\033[91m", # Red
}
reset = "\033[0m"
emoji = decision_emoji.get(result.decision, "")
color = decision_color.get(result.decision, "")
print(f"\n{emoji} {color}{result.tool_name}: {result.decision.value.upper()}{reset}")
print(f" {result.reason}")
if verbose:
print(f" Path: {result.tool_path}")
if result.scrutiny_report:
print(f"\n Quality Scores:")
for score in result.scrutiny_report.scores:
bar = "" * int(score.score * 10) + "" * (10 - int(score.score * 10))
print(f" {score.criterion:12} [{bar}] {score.score:.2f}")
if result.similarity_matches:
print(f"\n Similar Tools:")
for match in result.similarity_matches[:3]:
print(f"{match.tool2_name} ({match.similarity:.0%} {match.match_type})")
if result.suggestions:
print(f"\n Suggestions:")
for suggestion in result.suggestions[:5]:
print(f"{suggestion}")
def main():
parser = argparse.ArgumentParser(
description="Run auto-vetting pipeline on CmdForge tools",
formatter_class=argparse.RawDescriptionHelpFormatter,
epilog=__doc__
)
parser.add_argument(
"path",
type=Path,
nargs="?",
help="Tool config file or directory to vet"
)
parser.add_argument(
"--batch",
type=Path,
metavar="DIR",
help="Process all tools in directory"
)
parser.add_argument(
"--corpus",
type=Path,
default=Path.home() / ".cmdforge",
help="Directory of existing tools for comparison (default: ~/.cmdforge)"
)
parser.add_argument(
"--approve-threshold",
type=float,
default=DEFAULT_APPROVE_THRESHOLD,
help=f"Score threshold for auto-approve (default: {DEFAULT_APPROVE_THRESHOLD})"
)
parser.add_argument(
"--reject-threshold",
type=float,
default=DEFAULT_REJECT_THRESHOLD,
help=f"Score threshold for auto-reject (default: {DEFAULT_REJECT_THRESHOLD})"
)
parser.add_argument(
"--json",
action="store_true",
help="Output as JSON"
)
parser.add_argument(
"--verbose", "-v",
action="store_true",
help="Show detailed output"
)
args = parser.parse_args()
if not args.path and not args.batch:
parser.error("Specify a tool path or use --batch")
results = []
if args.batch:
# Batch mode
batch_dir = args.batch
if not batch_dir.exists():
print(f"Error: Directory {batch_dir} does not exist", file=sys.stderr)
return 1
print(f"Processing tools in {batch_dir}...")
for entry in sorted(batch_dir.iterdir()):
config_file = None
if entry.is_dir():
config_file = entry / "config.yaml"
elif entry.suffix in [".yaml", ".yml"]:
config_file = entry
if config_file and config_file.exists():
result = run_pipeline(
config_file,
args.corpus,
args.approve_threshold,
args.reject_threshold,
)
results.append(result)
else:
# Single tool mode
result = run_pipeline(
args.path,
args.corpus,
args.approve_threshold,
args.reject_threshold,
)
results.append(result)
# Output
if args.json:
output = [r.to_dict() for r in results]
print(json.dumps(output, indent=2))
else:
for result in results:
print_result(result, args.verbose)
# Summary
if len(results) > 1:
approved = sum(1 for r in results if r.decision == PipelineDecision.AUTO_APPROVE)
rejected = sum(1 for r in results if r.decision == PipelineDecision.AUTO_REJECT)
review = sum(1 for r in results if r.decision == PipelineDecision.NEEDS_REVIEW)
errors = sum(1 for r in results if r.decision == PipelineDecision.ERROR)
print(f"\n{'' * 50}")
print(f"Pipeline Summary: {len(results)} tools processed")
print(f" ✅ Auto-approved: {approved}")
print(f" ⚠️ Needs review: {review}")
print(f" ❌ Auto-rejected: {rejected}")
if errors:
print(f" 💥 Errors: {errors}")
# Return codes
has_rejected = any(r.decision == PipelineDecision.AUTO_REJECT for r in results)
has_review = any(r.decision == PipelineDecision.NEEDS_REVIEW for r in results)
if has_rejected:
return 2
elif has_review:
return 1
return 0
if __name__ == "__main__":
sys.exit(main())