"""Tool scrutiny system for automated review of published tools. This module analyzes tools on publish to verify: 1. Honesty - Does the tool do what its description claims? 2. Transparency - Is behavior visible or hidden/obfuscated? 3. Scope - Does the code stay within expected bounds? 4. Efficiency - Are AI calls necessary or wasteful? """ from __future__ import annotations import base64 import re from dataclasses import dataclass, field from enum import Enum from typing import Any, Dict, List, Optional, Tuple import yaml class CheckResult(Enum): PASS = "pass" WARNING = "warning" FAIL = "fail" @dataclass class Finding: """A single finding from scrutiny analysis.""" check: str result: CheckResult message: str suggestion: Optional[str] = None location: Optional[str] = None # e.g., "step 2", "code block" @dataclass class ScrutinyReport: """Complete scrutiny report for a tool.""" tool_name: str findings: List[Finding] = field(default_factory=list) optimizations: List[Any] = field(default_factory=list) # OptimizationSuggestion list @property def passed(self) -> bool: """Tool passes if no FAIL results.""" return not any(f.result == CheckResult.FAIL for f in self.findings) @property def has_warnings(self) -> bool: return any(f.result == CheckResult.WARNING for f in self.findings) @property def decision(self) -> str: """Auto-approve, flag for review, or reject.""" if any(f.result == CheckResult.FAIL for f in self.findings): return "reject" if any(f.result == CheckResult.WARNING for f in self.findings): return "review" return "approve" def to_dict(self) -> Dict[str, Any]: result = { "tool_name": self.tool_name, "decision": self.decision, "passed": self.passed, "has_warnings": self.has_warnings, "findings": [ { "check": f.check, "result": f.result.value, "message": f.message, "suggestion": f.suggestion, "location": f.location, } for f in self.findings ], } # Add optimization suggestions if any if self.optimizations: result["optimizations"] = [ { "operation": opt.operation, "current_prompt": opt.current_prompt, "optimized_code": opt.optimized_code, "tradeoffs": opt.tradeoffs, "location": opt.location, "action": "optional", } for opt in self.optimizations ] return result # Keywords that indicate specific behaviors BEHAVIOR_KEYWORDS = { "summarize": ["summary", "summarize", "condense", "brief", "shorten", "tldr"], "translate": ["translate", "translation", "language", "convert to"], "explain": ["explain", "clarify", "describe", "what is", "how does"], "fix": ["fix", "correct", "repair", "grammar", "spelling"], "generate": ["generate", "create", "write", "produce", "make"], "extract": ["extract", "pull", "find", "identify", "parse"], "analyze": ["analyze", "review", "examine", "check", "audit"], "convert": ["convert", "transform", "format", "change to"], } # Patterns that suggest obfuscation or suspicious behavior SUSPICIOUS_PATTERNS = [ (r'base64\.(b64decode|decodebytes)', "Base64 decoding detected"), (r'exec\s*\(', "Dynamic code execution with exec()"), (r'eval\s*\(', "Dynamic code execution with eval()"), (r'__import__\s*\(', "Dynamic import detected"), (r'subprocess\.(run|call|Popen)', "Subprocess execution detected"), (r'os\.(system|popen|exec)', "OS command execution detected"), (r'requests\.(get|post|put|delete)', "Network request detected"), (r'urllib', "URL library usage detected"), (r'socket\.', "Raw socket usage detected"), (r'\\x[0-9a-fA-F]{2}', "Hex-encoded strings detected"), ] # File/system access patterns SCOPE_PATTERNS = [ (r'open\s*\([^)]*["\']/', "Absolute path file access"), (r'open\s*\([^)]*~', "Home directory file access"), (r'\.ssh', "SSH directory access"), (r'\.aws', "AWS credentials access"), (r'\.env', "Environment file access"), (r'/etc/', "System config access"), (r'os\.environ', "Environment variable access"), (r'keyring|password|credential', "Credential-related access"), ] # Operations that could be done with code (with tradeoffs) # Format: (prompt_pattern, operation_name, code_template, code_benefit, ai_benefit) CODE_OPTIMIZATIONS = [ ( r'count (the |how many )?words', "Word counting", 'word_count = len(input.split())', "Faster, no API cost, deterministic", "Handles edge cases (hyphenated words, contractions, numbers as words)", ), ( r'count (the |how many )?(lines|line)', "Line counting", 'line_count = len(input.strip().split("\\n"))', "Instant, no API cost", "Could interpret 'meaningful lines' vs blank lines", ), ( r'count (the |how many )?(characters?|chars?)', "Character counting", 'char_count = len(input)', "Instant, no API cost", "Could exclude whitespace or count 'visible' characters", ), ( r'convert.*(to|into) json', "JSON conversion", 'import json\nresult = json.dumps(data, indent=2)', "Reliable, no API cost, handles valid input perfectly", "Can interpret messy/partial data and fix formatting issues", ), ( r'parse.*(json|the json)', "JSON parsing", 'import json\nresult = json.loads(input)', "Fast, no API cost, strict validation", "Can fix malformed JSON, handle comments, trailing commas", ), ( r'convert.*(to|into) csv', "CSV conversion", 'import csv\nimport io\n# ... csv.writer conversion', "Reliable for structured data", "Can infer structure from unstructured text", ), ( r'(extract|find|get) (all )?(emails?|email addresses)', "Email extraction", 'import re\nemails = re.findall(r"[\\w.+-]+@[\\w.-]+\\.[a-zA-Z]{2,}", input)', "Fast, consistent pattern matching", "Handles obfuscated emails (user [at] domain), context-aware", ), ( r'(extract|find|get) (all )?(urls?|links?)', "URL extraction", 'import re\nurls = re.findall(r"https?://[^\\s<>\"]+", input)', "Fast, consistent pattern matching", "Handles partial URLs, context-aware link detection", ), ( r'(uppercase|to upper|make upper)', "Uppercase conversion", 'result = input.upper()', "Instant, no API cost", "Language-aware (Turkish İ/i, Greek σ/ς edge cases)", ), ( r'(lowercase|to lower|make lower)', "Lowercase conversion", 'result = input.lower()', "Instant, no API cost", "Language-aware case handling", ), ( r'(capitalize|title case)', "Title case conversion", 'result = input.title()', "Instant, no API cost", "Knows style rules (articles, prepositions)", ), ( r'(reverse|reversed?) (the |this )?(text|string|order)', "Text reversal", 'result = input[::-1]', "Instant, no API cost", "Could reverse by word, sentence, or paragraph intelligently", ), ( r'(sort|sorted?) (the |these |this )?(lines?|items?|list)', "Sorting", 'result = "\\n".join(sorted(input.strip().split("\\n")))', "Fast, deterministic alphabetical sort", "Can sort by meaning ('sort by importance', 'by date mentioned')", ), ( r'remove (duplicate|duplicated?) (lines?|items?)', "Deduplication", 'seen = set()\nresult = "\\n".join(x for x in input.split("\\n") if not (x in seen or seen.add(x)))', "Fast, exact matching", "Can detect semantic duplicates ('USA' vs 'United States')", ), ( r'(remove|strip|trim) (whitespace|spaces|blank)', "Whitespace removal", 'result = " ".join(input.split())', "Instant, predictable", "Could preserve meaningful spacing, format-aware", ), ( r'split (by|on|into)', "Text splitting", '# result = input.split(delimiter)', "Fast, exact delimiter matching", "Can split by meaning ('split into paragraphs about each topic')", ), ] def analyze_tool(config_yaml: str, description: str) -> ScrutinyReport: """Run full scrutiny analysis on a tool. Args: config_yaml: The tool's YAML configuration description: The tool's description Returns: ScrutinyReport with all findings """ try: config = yaml.safe_load(config_yaml) or {} except yaml.YAMLError: return ScrutinyReport( tool_name="unknown", findings=[Finding( check="parse", result=CheckResult.FAIL, message="Invalid YAML configuration", )] ) tool_name = config.get("name", "unknown") report = ScrutinyReport(tool_name=tool_name) # Run all checks report.findings.extend(check_honesty(config, description)) report.findings.extend(check_transparency(config)) report.findings.extend(check_scope(config)) # Efficiency check returns findings and optimizations efficiency_findings, optimizations = check_efficiency(config) report.findings.extend(efficiency_findings) report.optimizations = optimizations # If no findings, add a pass if not report.findings: report.findings.append(Finding( check="overall", result=CheckResult.PASS, message="All checks passed", )) return report def check_honesty(config: Dict[str, Any], description: str) -> List[Finding]: """Check if tool behavior matches its description. Analyzes prompts and compares against description claims. """ findings = [] steps = config.get("steps", []) description_lower = description.lower() # Extract all prompt text all_prompts = "" for step in steps: if step.get("type") == "prompt": all_prompts += " " + (step.get("prompt") or "").lower() # Detect what the description claims claimed_behaviors = set() for behavior, keywords in BEHAVIOR_KEYWORDS.items(): if any(kw in description_lower for kw in keywords): claimed_behaviors.add(behavior) # Detect what prompts actually do actual_behaviors = set() for behavior, keywords in BEHAVIOR_KEYWORDS.items(): if any(kw in all_prompts for kw in keywords): actual_behaviors.add(behavior) # Check for mismatches claimed_not_done = claimed_behaviors - actual_behaviors done_not_claimed = actual_behaviors - claimed_behaviors if claimed_not_done: findings.append(Finding( check="honesty", result=CheckResult.WARNING, message=f"Description claims '{', '.join(claimed_not_done)}' but prompts don't reflect this", suggestion="Update description to accurately reflect tool behavior", )) if done_not_claimed and claimed_behaviors: # Only warn if tool claims to do something specific but also does other things findings.append(Finding( check="honesty", result=CheckResult.WARNING, message=f"Prompts include '{', '.join(done_not_claimed)}' not mentioned in description", suggestion="Update description to include all tool capabilities", )) # Check for empty or minimal prompts for i, step in enumerate(steps): if step.get("type") == "prompt": prompt = step.get("prompt", "").strip() if len(prompt) < 10: findings.append(Finding( check="honesty", result=CheckResult.WARNING, message="Prompt is unusually short", location=f"step {i + 1}", )) if not findings: findings.append(Finding( check="honesty", result=CheckResult.PASS, message="Description matches observed behavior", )) return findings def check_transparency(config: Dict[str, Any]) -> List[Finding]: """Check for obfuscated or hidden behavior. Looks for encoded strings, dynamic execution, etc. """ findings = [] steps = config.get("steps", []) for i, step in enumerate(steps): if step.get("type") == "code": code = step.get("code", "") location = f"step {i + 1}" # Check for suspicious patterns for pattern, description in SUSPICIOUS_PATTERNS: if re.search(pattern, code, re.IGNORECASE): # Determine severity if "exec" in pattern or "eval" in pattern: result = CheckResult.FAIL msg = f"{description} - potential code injection risk" elif "socket" in pattern or "subprocess" in pattern: result = CheckResult.WARNING msg = f"{description} - flagged for manual review" else: result = CheckResult.WARNING msg = description findings.append(Finding( check="transparency", result=result, message=msg, location=location, )) # Check for obfuscated variable names obfuscated = re.findall(r'\b([a-z])\1{2,}\b|\b[a-z]{1,2}\d+\b', code) if len(obfuscated) > 3: findings.append(Finding( check="transparency", result=CheckResult.WARNING, message="Multiple obfuscated variable names detected", location=location, suggestion="Use descriptive variable names", )) # Check for very long single lines (possible obfuscation) for line_num, line in enumerate(code.split('\n'), 1): if len(line) > 500: findings.append(Finding( check="transparency", result=CheckResult.WARNING, message=f"Unusually long code line ({len(line)} chars)", location=f"{location}, line {line_num}", )) if not findings: findings.append(Finding( check="transparency", result=CheckResult.PASS, message="No obfuscation detected", )) return findings def check_scope(config: Dict[str, Any]) -> List[Finding]: """Check if code accesses unexpected resources. Flags file system, network, or credential access. """ findings = [] steps = config.get("steps", []) tool_name = config.get("name", "").lower() description = config.get("description", "").lower() for i, step in enumerate(steps): if step.get("type") == "code": code = step.get("code", "") location = f"step {i + 1}" for pattern, description_text in SCOPE_PATTERNS: if re.search(pattern, code, re.IGNORECASE): # Check if this access seems expected based on tool name/description is_expected = False if "ssh" in pattern and ("ssh" in tool_name or "ssh" in description): is_expected = True if "env" in pattern and ("env" in tool_name or "environment" in description): is_expected = True if "file" in description_text.lower() and ("file" in tool_name or "file" in description): is_expected = True if not is_expected: # Credential access is always flagged if "credential" in pattern or ".ssh" in pattern or ".aws" in pattern: result = CheckResult.FAIL suggestion = "Remove credential access or justify in description" else: result = CheckResult.WARNING suggestion = "Document this access in tool description" findings.append(Finding( check="scope", result=result, message=description_text, location=location, suggestion=suggestion, )) if not findings: findings.append(Finding( check="scope", result=CheckResult.PASS, message="No unexpected resource access", )) return findings @dataclass class OptimizationSuggestion: """A code optimization suggestion for an AI prompt.""" operation: str current_prompt: str optimized_code: str tradeoffs: Dict[str, str] location: str def check_efficiency(config: Dict[str, Any]) -> List[Finding]: """Check for AI calls that could optionally be pure code. Generates optimization suggestions with tradeoffs - these are informational, not penalties. Authors can choose to keep AI for valid reasons. """ findings = [] steps = config.get("steps", []) optimizations = [] for i, step in enumerate(steps): if step.get("type") == "prompt": prompt = step.get("prompt", "") prompt_lower = prompt.lower() location = f"step {i + 1}" # Check against known optimizable patterns for pattern, operation, code, code_benefit, ai_benefit in CODE_OPTIMIZATIONS: if re.search(pattern, prompt_lower): optimizations.append(OptimizationSuggestion( operation=operation, current_prompt=prompt.strip()[:200], # Truncate for display optimized_code=code, tradeoffs={ "code": code_benefit, "ai": ai_benefit, }, location=location, )) break # One suggestion per step # Add optimization suggestions as findings (not warnings - optional) for opt in optimizations: findings.append(Finding( check="efficiency", result=CheckResult.PASS, # Not a warning - it's a suggestion message=f"Optional optimization: {opt.operation} could use code", location=opt.location, suggestion=f"Code: {opt.optimized_code}", )) # Check for all-AI tools (informational, not a warning) code_steps = [s for s in steps if s.get("type") == "code"] prompt_steps = [s for s in steps if s.get("type") == "prompt"] if prompt_steps and not code_steps and len(prompt_steps) > 3: findings.append(Finding( check="efficiency", result=CheckResult.PASS, # Informational message=f"Tool has {len(prompt_steps)} AI steps - consider if any could be code", suggestion="Mixing AI and code steps can reduce API costs", )) if not findings: findings.append(Finding( check="efficiency", result=CheckResult.PASS, message="AI usage appears appropriate", )) return findings, optimizations def get_optimization_details(optimizations: List[OptimizationSuggestion]) -> List[Dict[str, Any]]: """Convert optimization suggestions to serializable format.""" return [ { "operation": opt.operation, "current_prompt": opt.current_prompt, "optimized_code": opt.optimized_code, "tradeoffs": opt.tradeoffs, "location": opt.location, "action": "optional", # Author chooses } for opt in optimizations ] def scrutinize_tool(config_yaml: str, description: str, readme: Optional[str] = None) -> Dict[str, Any]: """Main entry point for tool scrutiny. Args: config_yaml: The tool's YAML configuration description: The tool's description readme: Optional README content (for additional context) Returns: Dictionary with scrutiny results """ report = analyze_tool(config_yaml, description) return report.to_dict()