CmdForge/scripts/similarity.py

442 lines
13 KiB
Python
Executable File

#!/usr/bin/env python3
"""
Duplicate detection for CmdForge tools using text similarity.
Finds tools that may be duplicates or very similar to existing tools.
Uses TF-IDF vectorization and cosine similarity for comparison.
Usage:
# Check a tool against all existing tools
python scripts/similarity.py path/to/tool/config.yaml
# Check against tools in a specific directory
python scripts/similarity.py path/to/tool/config.yaml --against ~/.cmdforge/
# Find all similar pairs in a directory
python scripts/similarity.py --scan ~/.cmdforge/
# Set similarity threshold (default: 0.7)
python scripts/similarity.py --threshold 0.8 path/to/tool/config.yaml
# Output as JSON
python scripts/similarity.py --json path/to/tool/config.yaml
"""
import argparse
import json
import math
import re
import sys
from collections import Counter
from dataclasses import dataclass, field
from pathlib import Path
from typing import Optional
import yaml
@dataclass
class SimilarityMatch:
"""A similarity match between two tools."""
tool1_name: str
tool1_path: str
tool2_name: str
tool2_path: str
similarity: float # 0.0 to 1.0
match_type: str # "duplicate", "similar", "related"
def to_dict(self) -> dict:
return {
"tool1": {"name": self.tool1_name, "path": self.tool1_path},
"tool2": {"name": self.tool2_name, "path": self.tool2_path},
"similarity": self.similarity,
"match_type": self.match_type,
}
@dataclass
class ToolText:
"""Extracted text from a tool for similarity comparison."""
name: str
path: str
text: str
tokens: list[str] = field(default_factory=list)
tfidf: dict = field(default_factory=dict)
# Similarity thresholds
DUPLICATE_THRESHOLD = 0.9 # >= 0.9 is likely a duplicate
SIMILAR_THRESHOLD = 0.7 # >= 0.7 is very similar
RELATED_THRESHOLD = 0.5 # >= 0.5 is related
def load_tool_config(path: Path) -> Optional[dict]:
"""Load tool configuration from YAML file."""
if path.is_dir():
config_file = path / "config.yaml"
else:
config_file = path
if not config_file.exists():
return None
with open(config_file) as f:
return yaml.safe_load(f)
def extract_tool_text(config: dict, path: str) -> ToolText:
"""Extract all meaningful text from a tool config."""
texts = []
# Name and description
name = config.get("name", "")
texts.append(name)
texts.append(config.get("description", ""))
# Category and tags
texts.append(config.get("category", ""))
tags = config.get("tags", [])
if isinstance(tags, list):
texts.extend(tags)
# Steps - extract prompts and code
for step in config.get("steps", []):
if step.get("type") == "prompt":
texts.append(step.get("prompt", ""))
elif step.get("type") == "code":
# Extract meaningful parts from code (comments, strings)
code = step.get("code", "")
# Add variable names
texts.append(code)
# Arguments
for arg in config.get("arguments", []):
texts.append(arg.get("flag", ""))
texts.append(arg.get("description", ""))
texts.append(arg.get("variable", ""))
# Combine and clean
combined = " ".join(texts)
return ToolText(name=name, path=path, text=combined)
def tokenize(text: str) -> list[str]:
"""Tokenize text into words, removing stopwords."""
# Convert to lowercase and extract words
words = re.findall(r'\b[a-z]{2,}\b', text.lower())
# Remove common stopwords
stopwords = {
"the", "a", "an", "and", "or", "but", "in", "on", "at", "to", "for",
"of", "with", "by", "from", "as", "is", "was", "are", "were", "been",
"be", "have", "has", "had", "do", "does", "did", "will", "would",
"could", "should", "may", "might", "must", "shall", "can", "this",
"that", "these", "those", "it", "its", "you", "your", "we", "our",
"they", "their", "he", "his", "she", "her", "if", "then", "else",
"when", "where", "which", "who", "what", "how", "all", "each",
"every", "both", "few", "more", "most", "other", "some", "such",
"no", "not", "only", "same", "so", "than", "too", "very", "just",
"also", "now", "here", "there", "any", "into", "out", "up", "down",
}
return [w for w in words if w not in stopwords]
def compute_tfidf(documents: list[ToolText]) -> None:
"""Compute TF-IDF vectors for all documents (modifies in place)."""
# Tokenize all documents
for doc in documents:
doc.tokens = tokenize(doc.text)
# Compute document frequencies
doc_freq = Counter()
for doc in documents:
unique_tokens = set(doc.tokens)
doc_freq.update(unique_tokens)
num_docs = len(documents)
# Compute TF-IDF for each document
for doc in documents:
term_freq = Counter(doc.tokens)
total_terms = len(doc.tokens) or 1
tfidf = {}
for term, count in term_freq.items():
tf = count / total_terms
# Add 1 to avoid division by zero
idf = math.log((num_docs + 1) / (doc_freq[term] + 1)) + 1
tfidf[term] = tf * idf
doc.tfidf = tfidf
def cosine_similarity(vec1: dict, vec2: dict) -> float:
"""Compute cosine similarity between two TF-IDF vectors."""
# Get all terms
all_terms = set(vec1.keys()) | set(vec2.keys())
if not all_terms:
return 0.0
# Compute dot product and magnitudes
dot_product = 0.0
mag1 = 0.0
mag2 = 0.0
for term in all_terms:
v1 = vec1.get(term, 0.0)
v2 = vec2.get(term, 0.0)
dot_product += v1 * v2
mag1 += v1 * v1
mag2 += v2 * v2
mag1 = math.sqrt(mag1)
mag2 = math.sqrt(mag2)
if mag1 == 0 or mag2 == 0:
return 0.0
return dot_product / (mag1 * mag2)
def classify_similarity(score: float) -> str:
"""Classify similarity score into a match type."""
if score >= DUPLICATE_THRESHOLD:
return "duplicate"
elif score >= SIMILAR_THRESHOLD:
return "similar"
elif score >= RELATED_THRESHOLD:
return "related"
return "different"
def find_similar(tool: ToolText, corpus: list[ToolText], threshold: float = RELATED_THRESHOLD) -> list[SimilarityMatch]:
"""Find tools in corpus similar to the given tool."""
matches = []
for other in corpus:
if other.path == tool.path:
continue
similarity = cosine_similarity(tool.tfidf, other.tfidf)
if similarity >= threshold:
match_type = classify_similarity(similarity)
matches.append(SimilarityMatch(
tool1_name=tool.name,
tool1_path=tool.path,
tool2_name=other.name,
tool2_path=other.path,
similarity=similarity,
match_type=match_type,
))
# Sort by similarity descending
matches.sort(key=lambda m: m.similarity, reverse=True)
return matches
def scan_directory(directory: Path, threshold: float = RELATED_THRESHOLD) -> list[SimilarityMatch]:
"""Scan a directory for similar tool pairs."""
# Load all tools
tools = []
for entry in directory.iterdir():
config_file = None
if entry.is_dir():
config_file = entry / "config.yaml"
elif entry.suffix in [".yaml", ".yml"]:
config_file = entry
if config_file and config_file.exists():
config = load_tool_config(config_file)
if config:
tool_text = extract_tool_text(config, str(entry))
tools.append(tool_text)
if len(tools) < 2:
return []
# Compute TF-IDF
compute_tfidf(tools)
# Find all similar pairs
matches = []
seen_pairs = set()
for tool in tools:
for other in tools:
if tool.path >= other.path: # Avoid duplicates
continue
pair_key = (tool.path, other.path)
if pair_key in seen_pairs:
continue
seen_pairs.add(pair_key)
similarity = cosine_similarity(tool.tfidf, other.tfidf)
if similarity >= threshold:
match_type = classify_similarity(similarity)
matches.append(SimilarityMatch(
tool1_name=tool.name,
tool1_path=tool.path,
tool2_name=other.name,
tool2_path=other.path,
similarity=similarity,
match_type=match_type,
))
# Sort by similarity descending
matches.sort(key=lambda m: m.similarity, reverse=True)
return matches
def load_corpus(directory: Path) -> list[ToolText]:
"""Load all tools from a directory as ToolText objects."""
tools = []
for entry in directory.iterdir():
config_file = None
if entry.is_dir():
config_file = entry / "config.yaml"
elif entry.suffix in [".yaml", ".yml"]:
config_file = entry
if config_file and config_file.exists():
config = load_tool_config(config_file)
if config:
tool_text = extract_tool_text(config, str(entry))
tools.append(tool_text)
return tools
def print_match(match: SimilarityMatch, verbose: bool = False):
"""Print a similarity match to console."""
# Match type emoji
type_emoji = {
"duplicate": "🔴",
"similar": "🟠",
"related": "🟡",
}
emoji = type_emoji.get(match.match_type, "")
bar = "" * int(match.similarity * 20) + "" * (20 - int(match.similarity * 20))
print(f" {emoji} {match.match_type.upper()} [{bar}] {match.similarity:.2%}")
print(f" {match.tool1_name} <-> {match.tool2_name}")
if verbose:
print(f" Paths: {match.tool1_path}")
print(f" {match.tool2_path}")
def main():
parser = argparse.ArgumentParser(
description="Find similar/duplicate CmdForge tools",
formatter_class=argparse.RawDescriptionHelpFormatter,
epilog=__doc__
)
parser.add_argument(
"path",
type=Path,
nargs="?",
help="Tool config file to check"
)
parser.add_argument(
"--against",
type=Path,
default=Path.home() / ".cmdforge",
help="Directory to compare against (default: ~/.cmdforge)"
)
parser.add_argument(
"--scan",
type=Path,
metavar="DIR",
help="Scan directory for all similar pairs"
)
parser.add_argument(
"--threshold", "-t",
type=float,
default=RELATED_THRESHOLD,
help=f"Similarity threshold (default: {RELATED_THRESHOLD})"
)
parser.add_argument(
"--json",
action="store_true",
help="Output as JSON"
)
parser.add_argument(
"--verbose", "-v",
action="store_true",
help="Show detailed output"
)
args = parser.parse_args()
matches = []
if args.scan:
# Scan mode - find all similar pairs
print(f"Scanning {args.scan} for similar tools...")
matches = scan_directory(args.scan, args.threshold)
elif args.path:
# Check single tool against corpus
config = load_tool_config(args.path)
if not config:
print(f"Error: Could not load tool config from {args.path}", file=sys.stderr)
return 1
tool_text = extract_tool_text(config, str(args.path))
# Load corpus
corpus = load_corpus(args.against)
if not corpus:
print(f"No tools found in {args.against}", file=sys.stderr)
return 1
# Add the new tool to corpus for TF-IDF computation
all_tools = corpus + [tool_text]
compute_tfidf(all_tools)
print(f"Checking {tool_text.name} against {len(corpus)} existing tools...")
matches = find_similar(tool_text, corpus, args.threshold)
else:
parser.error("Specify a tool path or use --scan")
# Output
if args.json:
output = [m.to_dict() for m in matches]
print(json.dumps(output, indent=2))
else:
if not matches:
print("\n✅ No similar tools found above threshold")
else:
print(f"\nFound {len(matches)} match(es):\n")
for match in matches:
print_match(match, args.verbose)
# Summary by type
duplicates = sum(1 for m in matches if m.match_type == "duplicate")
similar = sum(1 for m in matches if m.match_type == "similar")
related = sum(1 for m in matches if m.match_type == "related")
print(f"\n{'' * 40}")
print(f"Summary: {duplicates} duplicates, {similar} similar, {related} related")
# Return code
has_duplicates = any(m.match_type == "duplicate" for m in matches)
has_similar = any(m.match_type == "similar" for m in matches)
if has_duplicates:
return 2
elif has_similar:
return 1
return 0
if __name__ == "__main__":
sys.exit(main())