#!/usr/bin/env python3 """ Duplicate detection for CmdForge tools using text similarity. Finds tools that may be duplicates or very similar to existing tools. Uses TF-IDF vectorization and cosine similarity for comparison. Usage: # Check a tool against all existing tools python scripts/similarity.py path/to/tool/config.yaml # Check against tools in a specific directory python scripts/similarity.py path/to/tool/config.yaml --against ~/.cmdforge/ # Find all similar pairs in a directory python scripts/similarity.py --scan ~/.cmdforge/ # Set similarity threshold (default: 0.7) python scripts/similarity.py --threshold 0.8 path/to/tool/config.yaml # Output as JSON python scripts/similarity.py --json path/to/tool/config.yaml """ import argparse import json import math import re import sys from collections import Counter from dataclasses import dataclass, field from pathlib import Path from typing import Optional import yaml @dataclass class SimilarityMatch: """A similarity match between two tools.""" tool1_name: str tool1_path: str tool2_name: str tool2_path: str similarity: float # 0.0 to 1.0 match_type: str # "duplicate", "similar", "related" def to_dict(self) -> dict: return { "tool1": {"name": self.tool1_name, "path": self.tool1_path}, "tool2": {"name": self.tool2_name, "path": self.tool2_path}, "similarity": self.similarity, "match_type": self.match_type, } @dataclass class ToolText: """Extracted text from a tool for similarity comparison.""" name: str path: str text: str tokens: list[str] = field(default_factory=list) tfidf: dict = field(default_factory=dict) # Similarity thresholds DUPLICATE_THRESHOLD = 0.9 # >= 0.9 is likely a duplicate SIMILAR_THRESHOLD = 0.7 # >= 0.7 is very similar RELATED_THRESHOLD = 0.5 # >= 0.5 is related def load_tool_config(path: Path) -> Optional[dict]: """Load tool configuration from YAML file.""" if path.is_dir(): config_file = path / "config.yaml" else: config_file = path if not config_file.exists(): return None with open(config_file) as f: return yaml.safe_load(f) def extract_tool_text(config: dict, path: str) -> ToolText: """Extract all meaningful text from a tool config.""" texts = [] # Name and description name = config.get("name", "") texts.append(name) texts.append(config.get("description", "")) # Category and tags texts.append(config.get("category", "")) tags = config.get("tags", []) if isinstance(tags, list): texts.extend(tags) # Steps - extract prompts and code for step in config.get("steps", []): if step.get("type") == "prompt": texts.append(step.get("prompt", "")) elif step.get("type") == "code": # Extract meaningful parts from code (comments, strings) code = step.get("code", "") # Add variable names texts.append(code) # Arguments for arg in config.get("arguments", []): texts.append(arg.get("flag", "")) texts.append(arg.get("description", "")) texts.append(arg.get("variable", "")) # Combine and clean combined = " ".join(texts) return ToolText(name=name, path=path, text=combined) def tokenize(text: str) -> list[str]: """Tokenize text into words, removing stopwords.""" # Convert to lowercase and extract words words = re.findall(r'\b[a-z]{2,}\b', text.lower()) # Remove common stopwords stopwords = { "the", "a", "an", "and", "or", "but", "in", "on", "at", "to", "for", "of", "with", "by", "from", "as", "is", "was", "are", "were", "been", "be", "have", "has", "had", "do", "does", "did", "will", "would", "could", "should", "may", "might", "must", "shall", "can", "this", "that", "these", "those", "it", "its", "you", "your", "we", "our", "they", "their", "he", "his", "she", "her", "if", "then", "else", "when", "where", "which", "who", "what", "how", "all", "each", "every", "both", "few", "more", "most", "other", "some", "such", "no", "not", "only", "same", "so", "than", "too", "very", "just", "also", "now", "here", "there", "any", "into", "out", "up", "down", } return [w for w in words if w not in stopwords] def compute_tfidf(documents: list[ToolText]) -> None: """Compute TF-IDF vectors for all documents (modifies in place).""" # Tokenize all documents for doc in documents: doc.tokens = tokenize(doc.text) # Compute document frequencies doc_freq = Counter() for doc in documents: unique_tokens = set(doc.tokens) doc_freq.update(unique_tokens) num_docs = len(documents) # Compute TF-IDF for each document for doc in documents: term_freq = Counter(doc.tokens) total_terms = len(doc.tokens) or 1 tfidf = {} for term, count in term_freq.items(): tf = count / total_terms # Add 1 to avoid division by zero idf = math.log((num_docs + 1) / (doc_freq[term] + 1)) + 1 tfidf[term] = tf * idf doc.tfidf = tfidf def cosine_similarity(vec1: dict, vec2: dict) -> float: """Compute cosine similarity between two TF-IDF vectors.""" # Get all terms all_terms = set(vec1.keys()) | set(vec2.keys()) if not all_terms: return 0.0 # Compute dot product and magnitudes dot_product = 0.0 mag1 = 0.0 mag2 = 0.0 for term in all_terms: v1 = vec1.get(term, 0.0) v2 = vec2.get(term, 0.0) dot_product += v1 * v2 mag1 += v1 * v1 mag2 += v2 * v2 mag1 = math.sqrt(mag1) mag2 = math.sqrt(mag2) if mag1 == 0 or mag2 == 0: return 0.0 return dot_product / (mag1 * mag2) def classify_similarity(score: float) -> str: """Classify similarity score into a match type.""" if score >= DUPLICATE_THRESHOLD: return "duplicate" elif score >= SIMILAR_THRESHOLD: return "similar" elif score >= RELATED_THRESHOLD: return "related" return "different" def find_similar(tool: ToolText, corpus: list[ToolText], threshold: float = RELATED_THRESHOLD) -> list[SimilarityMatch]: """Find tools in corpus similar to the given tool.""" matches = [] for other in corpus: if other.path == tool.path: continue similarity = cosine_similarity(tool.tfidf, other.tfidf) if similarity >= threshold: match_type = classify_similarity(similarity) matches.append(SimilarityMatch( tool1_name=tool.name, tool1_path=tool.path, tool2_name=other.name, tool2_path=other.path, similarity=similarity, match_type=match_type, )) # Sort by similarity descending matches.sort(key=lambda m: m.similarity, reverse=True) return matches def scan_directory(directory: Path, threshold: float = RELATED_THRESHOLD) -> list[SimilarityMatch]: """Scan a directory for similar tool pairs.""" # Load all tools tools = [] for entry in directory.iterdir(): config_file = None if entry.is_dir(): config_file = entry / "config.yaml" elif entry.suffix in [".yaml", ".yml"]: config_file = entry if config_file and config_file.exists(): config = load_tool_config(config_file) if config: tool_text = extract_tool_text(config, str(entry)) tools.append(tool_text) if len(tools) < 2: return [] # Compute TF-IDF compute_tfidf(tools) # Find all similar pairs matches = [] seen_pairs = set() for tool in tools: for other in tools: if tool.path >= other.path: # Avoid duplicates continue pair_key = (tool.path, other.path) if pair_key in seen_pairs: continue seen_pairs.add(pair_key) similarity = cosine_similarity(tool.tfidf, other.tfidf) if similarity >= threshold: match_type = classify_similarity(similarity) matches.append(SimilarityMatch( tool1_name=tool.name, tool1_path=tool.path, tool2_name=other.name, tool2_path=other.path, similarity=similarity, match_type=match_type, )) # Sort by similarity descending matches.sort(key=lambda m: m.similarity, reverse=True) return matches def load_corpus(directory: Path) -> list[ToolText]: """Load all tools from a directory as ToolText objects.""" tools = [] for entry in directory.iterdir(): config_file = None if entry.is_dir(): config_file = entry / "config.yaml" elif entry.suffix in [".yaml", ".yml"]: config_file = entry if config_file and config_file.exists(): config = load_tool_config(config_file) if config: tool_text = extract_tool_text(config, str(entry)) tools.append(tool_text) return tools def print_match(match: SimilarityMatch, verbose: bool = False): """Print a similarity match to console.""" # Match type emoji type_emoji = { "duplicate": "šŸ”“", "similar": "🟠", "related": "🟔", } emoji = type_emoji.get(match.match_type, "⚪") bar = "ā–ˆ" * int(match.similarity * 20) + "ā–‘" * (20 - int(match.similarity * 20)) print(f" {emoji} {match.match_type.upper()} [{bar}] {match.similarity:.2%}") print(f" {match.tool1_name} <-> {match.tool2_name}") if verbose: print(f" Paths: {match.tool1_path}") print(f" {match.tool2_path}") def main(): parser = argparse.ArgumentParser( description="Find similar/duplicate CmdForge tools", formatter_class=argparse.RawDescriptionHelpFormatter, epilog=__doc__ ) parser.add_argument( "path", type=Path, nargs="?", help="Tool config file to check" ) parser.add_argument( "--against", type=Path, default=Path.home() / ".cmdforge", help="Directory to compare against (default: ~/.cmdforge)" ) parser.add_argument( "--scan", type=Path, metavar="DIR", help="Scan directory for all similar pairs" ) parser.add_argument( "--threshold", "-t", type=float, default=RELATED_THRESHOLD, help=f"Similarity threshold (default: {RELATED_THRESHOLD})" ) parser.add_argument( "--json", action="store_true", help="Output as JSON" ) parser.add_argument( "--verbose", "-v", action="store_true", help="Show detailed output" ) args = parser.parse_args() matches = [] if args.scan: # Scan mode - find all similar pairs print(f"Scanning {args.scan} for similar tools...") matches = scan_directory(args.scan, args.threshold) elif args.path: # Check single tool against corpus config = load_tool_config(args.path) if not config: print(f"Error: Could not load tool config from {args.path}", file=sys.stderr) return 1 tool_text = extract_tool_text(config, str(args.path)) # Load corpus corpus = load_corpus(args.against) if not corpus: print(f"No tools found in {args.against}", file=sys.stderr) return 1 # Add the new tool to corpus for TF-IDF computation all_tools = corpus + [tool_text] compute_tfidf(all_tools) print(f"Checking {tool_text.name} against {len(corpus)} existing tools...") matches = find_similar(tool_text, corpus, args.threshold) else: parser.error("Specify a tool path or use --scan") # Output if args.json: output = [m.to_dict() for m in matches] print(json.dumps(output, indent=2)) else: if not matches: print("\nāœ… No similar tools found above threshold") else: print(f"\nFound {len(matches)} match(es):\n") for match in matches: print_match(match, args.verbose) # Summary by type duplicates = sum(1 for m in matches if m.match_type == "duplicate") similar = sum(1 for m in matches if m.match_type == "similar") related = sum(1 for m in matches if m.match_type == "related") print(f"\n{'─' * 40}") print(f"Summary: {duplicates} duplicates, {similar} similar, {related} related") # Return code has_duplicates = any(m.match_type == "duplicate" for m in matches) has_similar = any(m.match_type == "similar" for m in matches) if has_duplicates: return 2 elif has_similar: return 1 return 0 if __name__ == "__main__": sys.exit(main())