orchestrated-discussions/.venv/lib/python3.12/site-packages/cmdforge/registry/scrutiny.py

589 lines
20 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""Tool scrutiny system for automated review of published tools.
This module analyzes tools on publish to verify:
1. Honesty - Does the tool do what its description claims?
2. Transparency - Is behavior visible or hidden/obfuscated?
3. Scope - Does the code stay within expected bounds?
4. Efficiency - Are AI calls necessary or wasteful?
"""
from __future__ import annotations
import base64
import re
from dataclasses import dataclass, field
from enum import Enum
from typing import Any, Dict, List, Optional, Tuple
import yaml
class CheckResult(Enum):
PASS = "pass"
WARNING = "warning"
FAIL = "fail"
@dataclass
class Finding:
"""A single finding from scrutiny analysis."""
check: str
result: CheckResult
message: str
suggestion: Optional[str] = None
location: Optional[str] = None # e.g., "step 2", "code block"
@dataclass
class ScrutinyReport:
"""Complete scrutiny report for a tool."""
tool_name: str
findings: List[Finding] = field(default_factory=list)
optimizations: List[Any] = field(default_factory=list) # OptimizationSuggestion list
@property
def passed(self) -> bool:
"""Tool passes if no FAIL results."""
return not any(f.result == CheckResult.FAIL for f in self.findings)
@property
def has_warnings(self) -> bool:
return any(f.result == CheckResult.WARNING for f in self.findings)
@property
def decision(self) -> str:
"""Auto-approve, flag for review, or reject."""
if any(f.result == CheckResult.FAIL for f in self.findings):
return "reject"
if any(f.result == CheckResult.WARNING for f in self.findings):
return "review"
return "approve"
def to_dict(self) -> Dict[str, Any]:
result = {
"tool_name": self.tool_name,
"decision": self.decision,
"passed": self.passed,
"has_warnings": self.has_warnings,
"findings": [
{
"check": f.check,
"result": f.result.value,
"message": f.message,
"suggestion": f.suggestion,
"location": f.location,
}
for f in self.findings
],
}
# Add optimization suggestions if any
if self.optimizations:
result["optimizations"] = [
{
"operation": opt.operation,
"current_prompt": opt.current_prompt,
"optimized_code": opt.optimized_code,
"tradeoffs": opt.tradeoffs,
"location": opt.location,
"action": "optional",
}
for opt in self.optimizations
]
return result
# Keywords that indicate specific behaviors
BEHAVIOR_KEYWORDS = {
"summarize": ["summary", "summarize", "condense", "brief", "shorten", "tldr"],
"translate": ["translate", "translation", "language", "convert to"],
"explain": ["explain", "clarify", "describe", "what is", "how does"],
"fix": ["fix", "correct", "repair", "grammar", "spelling"],
"generate": ["generate", "create", "write", "produce", "make"],
"extract": ["extract", "pull", "find", "identify", "parse"],
"analyze": ["analyze", "review", "examine", "check", "audit"],
"convert": ["convert", "transform", "format", "change to"],
}
# Patterns that suggest obfuscation or suspicious behavior
SUSPICIOUS_PATTERNS = [
(r'base64\.(b64decode|decodebytes)', "Base64 decoding detected"),
(r'exec\s*\(', "Dynamic code execution with exec()"),
(r'eval\s*\(', "Dynamic code execution with eval()"),
(r'__import__\s*\(', "Dynamic import detected"),
(r'subprocess\.(run|call|Popen)', "Subprocess execution detected"),
(r'os\.(system|popen|exec)', "OS command execution detected"),
(r'requests\.(get|post|put|delete)', "Network request detected"),
(r'urllib', "URL library usage detected"),
(r'socket\.', "Raw socket usage detected"),
(r'\\x[0-9a-fA-F]{2}', "Hex-encoded strings detected"),
]
# File/system access patterns
SCOPE_PATTERNS = [
(r'open\s*\([^)]*["\']/', "Absolute path file access"),
(r'open\s*\([^)]*~', "Home directory file access"),
(r'\.ssh', "SSH directory access"),
(r'\.aws', "AWS credentials access"),
(r'\.env', "Environment file access"),
(r'/etc/', "System config access"),
(r'os\.environ', "Environment variable access"),
(r'keyring|password|credential', "Credential-related access"),
]
# Operations that could be done with code (with tradeoffs)
# Format: (prompt_pattern, operation_name, code_template, code_benefit, ai_benefit)
CODE_OPTIMIZATIONS = [
(
r'count (the |how many )?words',
"Word counting",
'word_count = len(input.split())',
"Faster, no API cost, deterministic",
"Handles edge cases (hyphenated words, contractions, numbers as words)",
),
(
r'count (the |how many )?(lines|line)',
"Line counting",
'line_count = len(input.strip().split("\\n"))',
"Instant, no API cost",
"Could interpret 'meaningful lines' vs blank lines",
),
(
r'count (the |how many )?(characters?|chars?)',
"Character counting",
'char_count = len(input)',
"Instant, no API cost",
"Could exclude whitespace or count 'visible' characters",
),
(
r'convert.*(to|into) json',
"JSON conversion",
'import json\nresult = json.dumps(data, indent=2)',
"Reliable, no API cost, handles valid input perfectly",
"Can interpret messy/partial data and fix formatting issues",
),
(
r'parse.*(json|the json)',
"JSON parsing",
'import json\nresult = json.loads(input)',
"Fast, no API cost, strict validation",
"Can fix malformed JSON, handle comments, trailing commas",
),
(
r'convert.*(to|into) csv',
"CSV conversion",
'import csv\nimport io\n# ... csv.writer conversion',
"Reliable for structured data",
"Can infer structure from unstructured text",
),
(
r'(extract|find|get) (all )?(emails?|email addresses)',
"Email extraction",
'import re\nemails = re.findall(r"[\\w.+-]+@[\\w.-]+\\.[a-zA-Z]{2,}", input)',
"Fast, consistent pattern matching",
"Handles obfuscated emails (user [at] domain), context-aware",
),
(
r'(extract|find|get) (all )?(urls?|links?)',
"URL extraction",
'import re\nurls = re.findall(r"https?://[^\\s<>\"]+", input)',
"Fast, consistent pattern matching",
"Handles partial URLs, context-aware link detection",
),
(
r'(uppercase|to upper|make upper)',
"Uppercase conversion",
'result = input.upper()',
"Instant, no API cost",
"Language-aware (Turkish İ/i, Greek σ/ς edge cases)",
),
(
r'(lowercase|to lower|make lower)',
"Lowercase conversion",
'result = input.lower()',
"Instant, no API cost",
"Language-aware case handling",
),
(
r'(capitalize|title case)',
"Title case conversion",
'result = input.title()',
"Instant, no API cost",
"Knows style rules (articles, prepositions)",
),
(
r'(reverse|reversed?) (the |this )?(text|string|order)',
"Text reversal",
'result = input[::-1]',
"Instant, no API cost",
"Could reverse by word, sentence, or paragraph intelligently",
),
(
r'(sort|sorted?) (the |these |this )?(lines?|items?|list)',
"Sorting",
'result = "\\n".join(sorted(input.strip().split("\\n")))',
"Fast, deterministic alphabetical sort",
"Can sort by meaning ('sort by importance', 'by date mentioned')",
),
(
r'remove (duplicate|duplicated?) (lines?|items?)',
"Deduplication",
'seen = set()\nresult = "\\n".join(x for x in input.split("\\n") if not (x in seen or seen.add(x)))',
"Fast, exact matching",
"Can detect semantic duplicates ('USA' vs 'United States')",
),
(
r'(remove|strip|trim) (whitespace|spaces|blank)',
"Whitespace removal",
'result = " ".join(input.split())',
"Instant, predictable",
"Could preserve meaningful spacing, format-aware",
),
(
r'split (by|on|into)',
"Text splitting",
'# result = input.split(delimiter)',
"Fast, exact delimiter matching",
"Can split by meaning ('split into paragraphs about each topic')",
),
]
def analyze_tool(config_yaml: str, description: str) -> ScrutinyReport:
"""Run full scrutiny analysis on a tool.
Args:
config_yaml: The tool's YAML configuration
description: The tool's description
Returns:
ScrutinyReport with all findings
"""
try:
config = yaml.safe_load(config_yaml) or {}
except yaml.YAMLError:
return ScrutinyReport(
tool_name="unknown",
findings=[Finding(
check="parse",
result=CheckResult.FAIL,
message="Invalid YAML configuration",
)]
)
tool_name = config.get("name", "unknown")
report = ScrutinyReport(tool_name=tool_name)
# Run all checks
report.findings.extend(check_honesty(config, description))
report.findings.extend(check_transparency(config))
report.findings.extend(check_scope(config))
# Efficiency check returns findings and optimizations
efficiency_findings, optimizations = check_efficiency(config)
report.findings.extend(efficiency_findings)
report.optimizations = optimizations
# If no findings, add a pass
if not report.findings:
report.findings.append(Finding(
check="overall",
result=CheckResult.PASS,
message="All checks passed",
))
return report
def check_honesty(config: Dict[str, Any], description: str) -> List[Finding]:
"""Check if tool behavior matches its description.
Analyzes prompts and compares against description claims.
"""
findings = []
steps = config.get("steps", [])
description_lower = description.lower()
# Extract all prompt text
all_prompts = ""
for step in steps:
if step.get("type") == "prompt":
all_prompts += " " + (step.get("prompt") or "").lower()
# Detect what the description claims
claimed_behaviors = set()
for behavior, keywords in BEHAVIOR_KEYWORDS.items():
if any(kw in description_lower for kw in keywords):
claimed_behaviors.add(behavior)
# Detect what prompts actually do
actual_behaviors = set()
for behavior, keywords in BEHAVIOR_KEYWORDS.items():
if any(kw in all_prompts for kw in keywords):
actual_behaviors.add(behavior)
# Check for mismatches
claimed_not_done = claimed_behaviors - actual_behaviors
done_not_claimed = actual_behaviors - claimed_behaviors
if claimed_not_done:
findings.append(Finding(
check="honesty",
result=CheckResult.WARNING,
message=f"Description claims '{', '.join(claimed_not_done)}' but prompts don't reflect this",
suggestion="Update description to accurately reflect tool behavior",
))
if done_not_claimed and claimed_behaviors:
# Only warn if tool claims to do something specific but also does other things
findings.append(Finding(
check="honesty",
result=CheckResult.WARNING,
message=f"Prompts include '{', '.join(done_not_claimed)}' not mentioned in description",
suggestion="Update description to include all tool capabilities",
))
# Check for empty or minimal prompts
for i, step in enumerate(steps):
if step.get("type") == "prompt":
prompt = step.get("prompt", "").strip()
if len(prompt) < 10:
findings.append(Finding(
check="honesty",
result=CheckResult.WARNING,
message="Prompt is unusually short",
location=f"step {i + 1}",
))
if not findings:
findings.append(Finding(
check="honesty",
result=CheckResult.PASS,
message="Description matches observed behavior",
))
return findings
def check_transparency(config: Dict[str, Any]) -> List[Finding]:
"""Check for obfuscated or hidden behavior.
Looks for encoded strings, dynamic execution, etc.
"""
findings = []
steps = config.get("steps", [])
for i, step in enumerate(steps):
if step.get("type") == "code":
code = step.get("code", "")
location = f"step {i + 1}"
# Check for suspicious patterns
for pattern, description in SUSPICIOUS_PATTERNS:
if re.search(pattern, code, re.IGNORECASE):
# Determine severity
if "exec" in pattern or "eval" in pattern:
result = CheckResult.FAIL
msg = f"{description} - potential code injection risk"
elif "socket" in pattern or "subprocess" in pattern:
result = CheckResult.WARNING
msg = f"{description} - flagged for manual review"
else:
result = CheckResult.WARNING
msg = description
findings.append(Finding(
check="transparency",
result=result,
message=msg,
location=location,
))
# Check for obfuscated variable names
obfuscated = re.findall(r'\b([a-z])\1{2,}\b|\b[a-z]{1,2}\d+\b', code)
if len(obfuscated) > 3:
findings.append(Finding(
check="transparency",
result=CheckResult.WARNING,
message="Multiple obfuscated variable names detected",
location=location,
suggestion="Use descriptive variable names",
))
# Check for very long single lines (possible obfuscation)
for line_num, line in enumerate(code.split('\n'), 1):
if len(line) > 500:
findings.append(Finding(
check="transparency",
result=CheckResult.WARNING,
message=f"Unusually long code line ({len(line)} chars)",
location=f"{location}, line {line_num}",
))
if not findings:
findings.append(Finding(
check="transparency",
result=CheckResult.PASS,
message="No obfuscation detected",
))
return findings
def check_scope(config: Dict[str, Any]) -> List[Finding]:
"""Check if code accesses unexpected resources.
Flags file system, network, or credential access.
"""
findings = []
steps = config.get("steps", [])
tool_name = config.get("name", "").lower()
description = config.get("description", "").lower()
for i, step in enumerate(steps):
if step.get("type") == "code":
code = step.get("code", "")
location = f"step {i + 1}"
for pattern, description_text in SCOPE_PATTERNS:
if re.search(pattern, code, re.IGNORECASE):
# Check if this access seems expected based on tool name/description
is_expected = False
if "ssh" in pattern and ("ssh" in tool_name or "ssh" in description):
is_expected = True
if "env" in pattern and ("env" in tool_name or "environment" in description):
is_expected = True
if "file" in description_text.lower() and ("file" in tool_name or "file" in description):
is_expected = True
if not is_expected:
# Credential access is always flagged
if "credential" in pattern or ".ssh" in pattern or ".aws" in pattern:
result = CheckResult.FAIL
suggestion = "Remove credential access or justify in description"
else:
result = CheckResult.WARNING
suggestion = "Document this access in tool description"
findings.append(Finding(
check="scope",
result=result,
message=description_text,
location=location,
suggestion=suggestion,
))
if not findings:
findings.append(Finding(
check="scope",
result=CheckResult.PASS,
message="No unexpected resource access",
))
return findings
@dataclass
class OptimizationSuggestion:
"""A code optimization suggestion for an AI prompt."""
operation: str
current_prompt: str
optimized_code: str
tradeoffs: Dict[str, str]
location: str
def check_efficiency(config: Dict[str, Any]) -> List[Finding]:
"""Check for AI calls that could optionally be pure code.
Generates optimization suggestions with tradeoffs - these are
informational, not penalties. Authors can choose to keep AI
for valid reasons.
"""
findings = []
steps = config.get("steps", [])
optimizations = []
for i, step in enumerate(steps):
if step.get("type") == "prompt":
prompt = step.get("prompt", "")
prompt_lower = prompt.lower()
location = f"step {i + 1}"
# Check against known optimizable patterns
for pattern, operation, code, code_benefit, ai_benefit in CODE_OPTIMIZATIONS:
if re.search(pattern, prompt_lower):
optimizations.append(OptimizationSuggestion(
operation=operation,
current_prompt=prompt.strip()[:200], # Truncate for display
optimized_code=code,
tradeoffs={
"code": code_benefit,
"ai": ai_benefit,
},
location=location,
))
break # One suggestion per step
# Add optimization suggestions as findings (not warnings - optional)
for opt in optimizations:
findings.append(Finding(
check="efficiency",
result=CheckResult.PASS, # Not a warning - it's a suggestion
message=f"Optional optimization: {opt.operation} could use code",
location=opt.location,
suggestion=f"Code: {opt.optimized_code}",
))
# Check for all-AI tools (informational, not a warning)
code_steps = [s for s in steps if s.get("type") == "code"]
prompt_steps = [s for s in steps if s.get("type") == "prompt"]
if prompt_steps and not code_steps and len(prompt_steps) > 3:
findings.append(Finding(
check="efficiency",
result=CheckResult.PASS, # Informational
message=f"Tool has {len(prompt_steps)} AI steps - consider if any could be code",
suggestion="Mixing AI and code steps can reduce API costs",
))
if not findings:
findings.append(Finding(
check="efficiency",
result=CheckResult.PASS,
message="AI usage appears appropriate",
))
return findings, optimizations
def get_optimization_details(optimizations: List[OptimizationSuggestion]) -> List[Dict[str, Any]]:
"""Convert optimization suggestions to serializable format."""
return [
{
"operation": opt.operation,
"current_prompt": opt.current_prompt,
"optimized_code": opt.optimized_code,
"tradeoffs": opt.tradeoffs,
"location": opt.location,
"action": "optional", # Author chooses
}
for opt in optimizations
]
def scrutinize_tool(config_yaml: str, description: str, readme: Optional[str] = None) -> Dict[str, Any]:
"""Main entry point for tool scrutiny.
Args:
config_yaml: The tool's YAML configuration
description: The tool's description
readme: Optional README content (for additional context)
Returns:
Dictionary with scrutiny results
"""
report = analyze_tool(config_yaml, description)
return report.to_dict()