380 lines
12 KiB
Python
Executable File
380 lines
12 KiB
Python
Executable File
#!/usr/bin/env python3
|
|
"""
|
|
Auto-vetting pipeline for CmdForge tool submissions.
|
|
|
|
Combines scrutiny (quality checks) and similarity (duplicate detection)
|
|
to automatically triage incoming tools:
|
|
- Auto-approve: High quality, no duplicates
|
|
- Auto-reject: Low quality or exact duplicates
|
|
- Review queue: Needs human review
|
|
|
|
Usage:
|
|
# Vet a single tool
|
|
python scripts/vet_pipeline.py path/to/tool/config.yaml
|
|
|
|
# Vet with custom thresholds
|
|
python scripts/vet_pipeline.py --approve-threshold 0.85 path/to/tool/config.yaml
|
|
|
|
# Process all tools in import directory
|
|
python scripts/vet_pipeline.py --batch /tmp/fabric-import/
|
|
|
|
# Output detailed JSON report
|
|
python scripts/vet_pipeline.py --json path/to/tool/config.yaml
|
|
"""
|
|
|
|
import argparse
|
|
import json
|
|
import sys
|
|
from dataclasses import dataclass, asdict
|
|
from enum import Enum
|
|
from pathlib import Path
|
|
from typing import Optional
|
|
|
|
# Import our vetting modules
|
|
from scrutiny import (
|
|
VetResult as ScrutinyResult,
|
|
VetReport,
|
|
load_tool_config,
|
|
vet_tool,
|
|
)
|
|
from similarity import (
|
|
ToolText,
|
|
SimilarityMatch,
|
|
extract_tool_text,
|
|
load_corpus,
|
|
compute_tfidf,
|
|
find_similar,
|
|
DUPLICATE_THRESHOLD,
|
|
SIMILAR_THRESHOLD,
|
|
)
|
|
|
|
|
|
class PipelineDecision(Enum):
|
|
"""Final pipeline decision."""
|
|
AUTO_APPROVE = "auto_approve"
|
|
AUTO_REJECT = "auto_reject"
|
|
NEEDS_REVIEW = "needs_review"
|
|
ERROR = "error"
|
|
|
|
|
|
@dataclass
|
|
class PipelineResult:
|
|
"""Complete pipeline result for a tool."""
|
|
tool_name: str
|
|
tool_path: str
|
|
decision: PipelineDecision
|
|
reason: str
|
|
scrutiny_report: Optional[VetReport] = None
|
|
similarity_matches: list = None
|
|
suggestions: list = None
|
|
|
|
def __post_init__(self):
|
|
if self.similarity_matches is None:
|
|
self.similarity_matches = []
|
|
if self.suggestions is None:
|
|
self.suggestions = []
|
|
|
|
def to_dict(self) -> dict:
|
|
d = {
|
|
"tool_name": self.tool_name,
|
|
"tool_path": self.tool_path,
|
|
"decision": self.decision.value,
|
|
"reason": self.reason,
|
|
"suggestions": self.suggestions,
|
|
}
|
|
if self.scrutiny_report:
|
|
d["scrutiny"] = self.scrutiny_report.to_dict()
|
|
if self.similarity_matches:
|
|
d["similar_tools"] = [m.to_dict() for m in self.similarity_matches]
|
|
return d
|
|
|
|
|
|
# Pipeline thresholds
|
|
DEFAULT_APPROVE_THRESHOLD = 0.8
|
|
DEFAULT_REJECT_THRESHOLD = 0.3
|
|
|
|
|
|
def run_pipeline(
|
|
tool_path: Path,
|
|
corpus_dir: Path,
|
|
approve_threshold: float = DEFAULT_APPROVE_THRESHOLD,
|
|
reject_threshold: float = DEFAULT_REJECT_THRESHOLD,
|
|
) -> PipelineResult:
|
|
"""Run the complete vetting pipeline on a tool."""
|
|
|
|
# Load tool config
|
|
config = load_tool_config(tool_path)
|
|
if not config:
|
|
return PipelineResult(
|
|
tool_name="unknown",
|
|
tool_path=str(tool_path),
|
|
decision=PipelineDecision.ERROR,
|
|
reason=f"Could not load tool config from {tool_path}",
|
|
)
|
|
|
|
tool_name = config.get("name", "unknown")
|
|
|
|
# Phase 1: Scrutiny (quality checks)
|
|
scrutiny_report = vet_tool(config, str(tool_path))
|
|
|
|
# Phase 2: Similarity check
|
|
tool_text = extract_tool_text(config, str(tool_path))
|
|
corpus = load_corpus(corpus_dir) if corpus_dir.exists() else []
|
|
|
|
similarity_matches = []
|
|
if corpus:
|
|
all_tools = corpus + [tool_text]
|
|
compute_tfidf(all_tools)
|
|
similarity_matches = find_similar(tool_text, corpus, threshold=0.5)
|
|
|
|
# Phase 3: Decision logic
|
|
decision, reason, suggestions = make_decision(
|
|
scrutiny_report,
|
|
similarity_matches,
|
|
approve_threshold,
|
|
reject_threshold,
|
|
)
|
|
|
|
return PipelineResult(
|
|
tool_name=tool_name,
|
|
tool_path=str(tool_path),
|
|
decision=decision,
|
|
reason=reason,
|
|
scrutiny_report=scrutiny_report,
|
|
similarity_matches=similarity_matches,
|
|
suggestions=suggestions,
|
|
)
|
|
|
|
|
|
def make_decision(
|
|
scrutiny: VetReport,
|
|
similarity_matches: list[SimilarityMatch],
|
|
approve_threshold: float,
|
|
reject_threshold: float,
|
|
) -> tuple[PipelineDecision, str, list[str]]:
|
|
"""Make final decision based on scrutiny and similarity results."""
|
|
|
|
suggestions = list(scrutiny.suggestions) if scrutiny.suggestions else []
|
|
score = scrutiny.overall_score
|
|
|
|
# Check for exact duplicates first
|
|
duplicates = [m for m in similarity_matches if m.match_type == "duplicate"]
|
|
if duplicates:
|
|
dupe = duplicates[0]
|
|
return (
|
|
PipelineDecision.AUTO_REJECT,
|
|
f"Duplicate of existing tool '{dupe.tool2_name}' ({dupe.similarity:.0%} match)",
|
|
suggestions + [f"Consider updating {dupe.tool2_name} instead of creating a new tool"],
|
|
)
|
|
|
|
# Check for very similar tools
|
|
similar = [m for m in similarity_matches if m.match_type == "similar"]
|
|
if similar:
|
|
# If high quality + similar, might be an improved version - needs review
|
|
if score >= approve_threshold:
|
|
sim = similar[0]
|
|
return (
|
|
PipelineDecision.NEEDS_REVIEW,
|
|
f"High quality but similar to '{sim.tool2_name}' ({sim.similarity:.0%})",
|
|
suggestions + [f"Review whether this improves on {sim.tool2_name}"],
|
|
)
|
|
else:
|
|
sim = similar[0]
|
|
return (
|
|
PipelineDecision.NEEDS_REVIEW,
|
|
f"Similar to existing tool '{sim.tool2_name}' ({sim.similarity:.0%})",
|
|
suggestions + [f"Consider if this duplicates {sim.tool2_name}"],
|
|
)
|
|
|
|
# No duplicates or very similar tools - decide based on quality
|
|
if scrutiny.result == ScrutinyResult.REJECT or score < reject_threshold:
|
|
return (
|
|
PipelineDecision.AUTO_REJECT,
|
|
f"Quality score too low ({score:.2f} < {reject_threshold})",
|
|
suggestions,
|
|
)
|
|
|
|
if scrutiny.result == ScrutinyResult.APPROVE and score >= approve_threshold:
|
|
return (
|
|
PipelineDecision.AUTO_APPROVE,
|
|
f"High quality score ({score:.2f}) with no similar tools",
|
|
suggestions,
|
|
)
|
|
|
|
# Middle ground - needs review
|
|
return (
|
|
PipelineDecision.NEEDS_REVIEW,
|
|
f"Quality score {score:.2f} - needs human review",
|
|
suggestions,
|
|
)
|
|
|
|
|
|
def print_result(result: PipelineResult, verbose: bool = False):
|
|
"""Print pipeline result to console."""
|
|
decision_emoji = {
|
|
PipelineDecision.AUTO_APPROVE: "✅",
|
|
PipelineDecision.AUTO_REJECT: "❌",
|
|
PipelineDecision.NEEDS_REVIEW: "⚠️",
|
|
PipelineDecision.ERROR: "💥",
|
|
}
|
|
|
|
decision_color = {
|
|
PipelineDecision.AUTO_APPROVE: "\033[92m", # Green
|
|
PipelineDecision.AUTO_REJECT: "\033[91m", # Red
|
|
PipelineDecision.NEEDS_REVIEW: "\033[93m", # Yellow
|
|
PipelineDecision.ERROR: "\033[91m", # Red
|
|
}
|
|
|
|
reset = "\033[0m"
|
|
emoji = decision_emoji.get(result.decision, "❓")
|
|
color = decision_color.get(result.decision, "")
|
|
|
|
print(f"\n{emoji} {color}{result.tool_name}: {result.decision.value.upper()}{reset}")
|
|
print(f" {result.reason}")
|
|
|
|
if verbose:
|
|
print(f" Path: {result.tool_path}")
|
|
|
|
if result.scrutiny_report:
|
|
print(f"\n Quality Scores:")
|
|
for score in result.scrutiny_report.scores:
|
|
bar = "█" * int(score.score * 10) + "░" * (10 - int(score.score * 10))
|
|
print(f" {score.criterion:12} [{bar}] {score.score:.2f}")
|
|
|
|
if result.similarity_matches:
|
|
print(f"\n Similar Tools:")
|
|
for match in result.similarity_matches[:3]:
|
|
print(f" • {match.tool2_name} ({match.similarity:.0%} {match.match_type})")
|
|
|
|
if result.suggestions:
|
|
print(f"\n Suggestions:")
|
|
for suggestion in result.suggestions[:5]:
|
|
print(f" • {suggestion}")
|
|
|
|
|
|
def main():
|
|
parser = argparse.ArgumentParser(
|
|
description="Run auto-vetting pipeline on CmdForge tools",
|
|
formatter_class=argparse.RawDescriptionHelpFormatter,
|
|
epilog=__doc__
|
|
)
|
|
|
|
parser.add_argument(
|
|
"path",
|
|
type=Path,
|
|
nargs="?",
|
|
help="Tool config file or directory to vet"
|
|
)
|
|
parser.add_argument(
|
|
"--batch",
|
|
type=Path,
|
|
metavar="DIR",
|
|
help="Process all tools in directory"
|
|
)
|
|
parser.add_argument(
|
|
"--corpus",
|
|
type=Path,
|
|
default=Path.home() / ".cmdforge",
|
|
help="Directory of existing tools for comparison (default: ~/.cmdforge)"
|
|
)
|
|
parser.add_argument(
|
|
"--approve-threshold",
|
|
type=float,
|
|
default=DEFAULT_APPROVE_THRESHOLD,
|
|
help=f"Score threshold for auto-approve (default: {DEFAULT_APPROVE_THRESHOLD})"
|
|
)
|
|
parser.add_argument(
|
|
"--reject-threshold",
|
|
type=float,
|
|
default=DEFAULT_REJECT_THRESHOLD,
|
|
help=f"Score threshold for auto-reject (default: {DEFAULT_REJECT_THRESHOLD})"
|
|
)
|
|
parser.add_argument(
|
|
"--json",
|
|
action="store_true",
|
|
help="Output as JSON"
|
|
)
|
|
parser.add_argument(
|
|
"--verbose", "-v",
|
|
action="store_true",
|
|
help="Show detailed output"
|
|
)
|
|
|
|
args = parser.parse_args()
|
|
|
|
if not args.path and not args.batch:
|
|
parser.error("Specify a tool path or use --batch")
|
|
|
|
results = []
|
|
|
|
if args.batch:
|
|
# Batch mode
|
|
batch_dir = args.batch
|
|
if not batch_dir.exists():
|
|
print(f"Error: Directory {batch_dir} does not exist", file=sys.stderr)
|
|
return 1
|
|
|
|
print(f"Processing tools in {batch_dir}...")
|
|
|
|
for entry in sorted(batch_dir.iterdir()):
|
|
config_file = None
|
|
if entry.is_dir():
|
|
config_file = entry / "config.yaml"
|
|
elif entry.suffix in [".yaml", ".yml"]:
|
|
config_file = entry
|
|
|
|
if config_file and config_file.exists():
|
|
result = run_pipeline(
|
|
config_file,
|
|
args.corpus,
|
|
args.approve_threshold,
|
|
args.reject_threshold,
|
|
)
|
|
results.append(result)
|
|
else:
|
|
# Single tool mode
|
|
result = run_pipeline(
|
|
args.path,
|
|
args.corpus,
|
|
args.approve_threshold,
|
|
args.reject_threshold,
|
|
)
|
|
results.append(result)
|
|
|
|
# Output
|
|
if args.json:
|
|
output = [r.to_dict() for r in results]
|
|
print(json.dumps(output, indent=2))
|
|
else:
|
|
for result in results:
|
|
print_result(result, args.verbose)
|
|
|
|
# Summary
|
|
if len(results) > 1:
|
|
approved = sum(1 for r in results if r.decision == PipelineDecision.AUTO_APPROVE)
|
|
rejected = sum(1 for r in results if r.decision == PipelineDecision.AUTO_REJECT)
|
|
review = sum(1 for r in results if r.decision == PipelineDecision.NEEDS_REVIEW)
|
|
errors = sum(1 for r in results if r.decision == PipelineDecision.ERROR)
|
|
|
|
print(f"\n{'═' * 50}")
|
|
print(f"Pipeline Summary: {len(results)} tools processed")
|
|
print(f" ✅ Auto-approved: {approved}")
|
|
print(f" ⚠️ Needs review: {review}")
|
|
print(f" ❌ Auto-rejected: {rejected}")
|
|
if errors:
|
|
print(f" 💥 Errors: {errors}")
|
|
|
|
# Return codes
|
|
has_rejected = any(r.decision == PipelineDecision.AUTO_REJECT for r in results)
|
|
has_review = any(r.decision == PipelineDecision.NEEDS_REVIEW for r in results)
|
|
|
|
if has_rejected:
|
|
return 2
|
|
elif has_review:
|
|
return 1
|
|
return 0
|
|
|
|
|
|
if __name__ == "__main__":
|
|
sys.exit(main())
|