289 lines
8.9 KiB
Python
289 lines
8.9 KiB
Python
"""Tool execution engine."""
|
|
|
|
import argparse
|
|
import sys
|
|
from pathlib import Path
|
|
from typing import Optional
|
|
|
|
from .tool import load_tool, Tool, PromptStep, CodeStep
|
|
from .providers import call_provider, mock_provider
|
|
|
|
|
|
def substitute_variables(template: str, variables: dict) -> str:
|
|
"""
|
|
Substitute {variable} placeholders in a template.
|
|
|
|
Args:
|
|
template: String with {var} placeholders
|
|
variables: Dict of variable name -> value
|
|
|
|
Returns:
|
|
String with placeholders replaced
|
|
"""
|
|
result = template
|
|
for name, value in variables.items():
|
|
result = result.replace(f"{{{name}}}", str(value) if value else "")
|
|
return result
|
|
|
|
|
|
def execute_prompt_step(step: PromptStep, variables: dict, provider_override: str = None) -> tuple[str, bool]:
|
|
"""
|
|
Execute a prompt step.
|
|
|
|
Args:
|
|
step: The prompt step to execute
|
|
variables: Current variable values
|
|
provider_override: Override the step's provider
|
|
|
|
Returns:
|
|
Tuple of (output_value, success)
|
|
"""
|
|
# Build prompt with variable substitution
|
|
prompt = substitute_variables(step.prompt, variables)
|
|
|
|
# Call provider
|
|
provider = provider_override or step.provider
|
|
|
|
if provider.lower() == "mock":
|
|
result = mock_provider(prompt)
|
|
else:
|
|
result = call_provider(provider, prompt)
|
|
|
|
if not result.success:
|
|
print(f"Error in prompt step: {result.error}", file=sys.stderr)
|
|
return "", False
|
|
|
|
return result.text, True
|
|
|
|
|
|
def execute_code_step(step: CodeStep, variables: dict) -> tuple[dict, bool]:
|
|
"""
|
|
Execute a code step.
|
|
|
|
Args:
|
|
step: The code step to execute
|
|
variables: Current variable values (available in code)
|
|
|
|
Returns:
|
|
Tuple of (output_vars_dict, success)
|
|
"""
|
|
# Substitute variables in code (like {outputfile} -> actual value)
|
|
code = substitute_variables(step.code, variables)
|
|
|
|
# Create execution environment with variables
|
|
local_vars = dict(variables)
|
|
|
|
try:
|
|
# Execute the code with substituted variables
|
|
exec(code, {"__builtins__": __builtins__}, local_vars)
|
|
|
|
# Support comma-separated output vars (e.g., "a, b, c")
|
|
output_vars = [v.strip() for v in step.output_var.split(',')]
|
|
outputs = {}
|
|
for var in output_vars:
|
|
outputs[var] = str(local_vars.get(var, ""))
|
|
|
|
return outputs, True
|
|
|
|
except Exception as e:
|
|
print(f"Error in code step: {e}", file=sys.stderr)
|
|
return {}, False
|
|
|
|
|
|
def run_tool(
|
|
tool: Tool,
|
|
input_text: str,
|
|
custom_args: dict,
|
|
provider_override: Optional[str] = None,
|
|
dry_run: bool = False,
|
|
show_prompt: bool = False,
|
|
verbose: bool = False
|
|
) -> tuple[str, int]:
|
|
"""
|
|
Execute a tool.
|
|
|
|
Args:
|
|
tool: Tool definition
|
|
input_text: Input content
|
|
custom_args: Custom argument values
|
|
provider_override: Override all providers
|
|
dry_run: Just show what would happen
|
|
show_prompt: Show prompts in addition to output
|
|
verbose: Show debug info
|
|
|
|
Returns:
|
|
Tuple of (output_text, exit_code)
|
|
"""
|
|
# Initialize variables with input and arguments
|
|
variables = {"input": input_text}
|
|
|
|
# Add argument values (with defaults)
|
|
for arg in tool.arguments:
|
|
value = custom_args.get(arg.variable, arg.default)
|
|
variables[arg.variable] = value
|
|
|
|
if verbose:
|
|
print(f"[verbose] Tool: {tool.name}", file=sys.stderr)
|
|
print(f"[verbose] Variables: {list(variables.keys())}", file=sys.stderr)
|
|
print(f"[verbose] Steps: {len(tool.steps)}", file=sys.stderr)
|
|
|
|
# If no steps, just substitute output template
|
|
if not tool.steps:
|
|
output = substitute_variables(tool.output, variables)
|
|
return output, 0
|
|
|
|
# Execute each step
|
|
for i, step in enumerate(tool.steps):
|
|
if verbose:
|
|
step_type = "PROMPT" if isinstance(step, PromptStep) else "CODE"
|
|
print(f"[verbose] Step {i+1}: {step_type} -> {{{step.output_var}}}", file=sys.stderr)
|
|
|
|
if isinstance(step, PromptStep):
|
|
# Show prompt if requested
|
|
if show_prompt or dry_run:
|
|
prompt = substitute_variables(step.prompt, variables)
|
|
print(f"=== PROMPT (Step {i+1}, provider={step.provider}) ===", file=sys.stderr)
|
|
print(prompt, file=sys.stderr)
|
|
print("=== END PROMPT ===", file=sys.stderr)
|
|
|
|
if dry_run:
|
|
variables[step.output_var] = f"[DRY RUN - would call {step.provider}]"
|
|
else:
|
|
output, success = execute_prompt_step(step, variables, provider_override)
|
|
if not success:
|
|
return "", 2
|
|
variables[step.output_var] = output
|
|
|
|
elif isinstance(step, CodeStep):
|
|
if verbose or dry_run:
|
|
print(f"=== CODE (Step {i+1}) -> {{{step.output_var}}} ===", file=sys.stderr)
|
|
print(step.code, file=sys.stderr)
|
|
print("=== END CODE ===", file=sys.stderr)
|
|
|
|
if dry_run:
|
|
# Handle comma-separated output vars for dry run
|
|
for var in [v.strip() for v in step.output_var.split(',')]:
|
|
variables[var] = "[DRY RUN - would execute code]"
|
|
else:
|
|
outputs, success = execute_code_step(step, variables)
|
|
if not success:
|
|
return "", 1
|
|
# Merge all output vars into variables
|
|
variables.update(outputs)
|
|
|
|
# Generate final output
|
|
output = substitute_variables(tool.output, variables)
|
|
|
|
return output, 0
|
|
|
|
|
|
def create_argument_parser(tool: Tool) -> argparse.ArgumentParser:
|
|
"""
|
|
Create an argument parser for a tool.
|
|
|
|
Args:
|
|
tool: Tool definition
|
|
|
|
Returns:
|
|
Configured ArgumentParser
|
|
"""
|
|
parser = argparse.ArgumentParser(
|
|
prog=tool.name,
|
|
description=tool.description or f"SmartTools: {tool.name}"
|
|
)
|
|
|
|
# Universal flags
|
|
parser.add_argument("-i", "--input", dest="input_file",
|
|
help="Input file (reads from stdin if piped)")
|
|
parser.add_argument("--stdin", action="store_true",
|
|
help="Read input interactively from stdin (type then Ctrl+D)")
|
|
parser.add_argument("-o", "--output", dest="output_file",
|
|
help="Output file (writes to stdout if omitted)")
|
|
parser.add_argument("--dry-run", action="store_true",
|
|
help="Show what would happen without executing")
|
|
parser.add_argument("--show-prompt", action="store_true",
|
|
help="Show prompts in addition to output")
|
|
parser.add_argument("-p", "--provider",
|
|
help="Override provider (e.g., --provider mock)")
|
|
parser.add_argument("-v", "--verbose", action="store_true",
|
|
help="Show debug information")
|
|
|
|
# Tool-specific flags from arguments
|
|
for arg in tool.arguments:
|
|
parser.add_argument(
|
|
arg.flag,
|
|
dest=arg.variable,
|
|
default=arg.default,
|
|
help=arg.description or f"{arg.variable} (default: {arg.default})"
|
|
)
|
|
|
|
return parser
|
|
|
|
|
|
def main():
|
|
"""Entry point for tool execution via wrapper script."""
|
|
if len(sys.argv) < 2:
|
|
print("Usage: python -m smarttools.runner <tool_name> [args...]", file=sys.stderr)
|
|
sys.exit(1)
|
|
|
|
tool_name = sys.argv[1]
|
|
tool = load_tool(tool_name)
|
|
|
|
if tool is None:
|
|
print(f"Error: Tool '{tool_name}' not found", file=sys.stderr)
|
|
sys.exit(1)
|
|
|
|
# Parse remaining arguments
|
|
parser = create_argument_parser(tool)
|
|
args = parser.parse_args(sys.argv[2:])
|
|
|
|
# Read input
|
|
if args.input_file:
|
|
# Read from file
|
|
input_path = Path(args.input_file)
|
|
if not input_path.exists():
|
|
print(f"Error: Input file not found: {args.input_file}", file=sys.stderr)
|
|
sys.exit(1)
|
|
input_text = input_path.read_text()
|
|
elif args.stdin:
|
|
# Explicit interactive input requested
|
|
print("Reading from stdin (Ctrl+D to end):", file=sys.stderr)
|
|
input_text = sys.stdin.read()
|
|
elif not sys.stdin.isatty():
|
|
# Stdin is piped - read it
|
|
input_text = sys.stdin.read()
|
|
else:
|
|
# No input provided - use empty string
|
|
input_text = ""
|
|
|
|
# Collect custom args
|
|
custom_args = {}
|
|
for arg in tool.arguments:
|
|
value = getattr(args, arg.variable, None)
|
|
if value is not None:
|
|
custom_args[arg.variable] = value
|
|
|
|
# Run tool
|
|
output, exit_code = run_tool(
|
|
tool=tool,
|
|
input_text=input_text,
|
|
custom_args=custom_args,
|
|
provider_override=args.provider,
|
|
dry_run=args.dry_run,
|
|
show_prompt=args.show_prompt,
|
|
verbose=args.verbose
|
|
)
|
|
|
|
# Write output
|
|
if exit_code == 0 and output:
|
|
if args.output_file:
|
|
Path(args.output_file).write_text(output)
|
|
else:
|
|
print(output)
|
|
|
|
sys.exit(exit_code)
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main()
|