"""Tool execution engine.""" import argparse import sys from pathlib import Path from typing import Optional from .tool import Tool, PromptStep, CodeStep, ToolStep from .providers import call_provider, mock_provider from .resolver import resolve_tool, ToolNotFoundError, ToolSpec from .manifest import load_manifest # Maximum recursion depth for nested tool calls MAX_TOOL_DEPTH = 10 def check_dependencies(tool: Tool, checked: set = None) -> list[str]: """ Check if all dependencies for a tool are available. Args: tool: Tool to check dependencies for checked: Set of already checked tools (prevents infinite loops) Returns: List of missing dependency tool references """ if checked is None: checked = set() missing = [] # Check explicit dependencies for dep in tool.dependencies: if dep in checked: continue checked.add(dep) try: resolved = resolve_tool(dep) # Recursively check nested dependencies nested_missing = check_dependencies(resolved.tool, checked) missing.extend(nested_missing) except ToolNotFoundError: missing.append(dep) # Also check tool steps for implicit dependencies for step in tool.steps: if isinstance(step, ToolStep): tool_ref = step.tool if tool_ref in checked: continue checked.add(tool_ref) try: resolved = resolve_tool(tool_ref) # Recursively check nested dependencies nested_missing = check_dependencies(resolved.tool, checked) missing.extend(nested_missing) except ToolNotFoundError: missing.append(tool_ref) return missing def substitute_variables(template: str, variables: dict) -> str: """ Substitute {variable} placeholders in a template. Supports escaping: use {{ for literal { and }} for literal } Args: template: String with {var} placeholders variables: Dict of variable name -> value Returns: String with placeholders replaced Examples: >>> substitute_variables("Hello {name}", {"name": "World"}) 'Hello World' >>> substitute_variables("Use {{braces}}", {"braces": "nope"}) 'Use {braces}' """ # Use unique placeholders for escaped braces ESCAPE_OPEN = "\x00\x01OPEN\x01\x00" ESCAPE_CLOSE = "\x00\x01CLOSE\x01\x00" # First, replace escaped braces with placeholders result = template.replace("{{", ESCAPE_OPEN).replace("}}", ESCAPE_CLOSE) # Now do variable substitution for name, value in variables.items(): result = result.replace(f"{{{name}}}", str(value) if value else "") # Finally, restore escaped braces as single braces result = result.replace(ESCAPE_OPEN, "{").replace(ESCAPE_CLOSE, "}") return result def execute_prompt_step(step: PromptStep, variables: dict, provider_override: str = None) -> tuple[str, bool]: """ Execute a prompt step. Args: step: The prompt step to execute variables: Current variable values provider_override: Override the step's provider Returns: Tuple of (output_value, success) """ # Build prompt with variable substitution prompt = substitute_variables(step.prompt, variables) # Call provider provider = provider_override or step.provider if provider.lower() == "mock": result = mock_provider(prompt) else: result = call_provider(provider, prompt) if not result.success: print(f"Error in prompt step: {result.error}", file=sys.stderr) return "", False return result.text, True def execute_code_step(step: CodeStep, variables: dict, step_num: int = 0) -> tuple[dict, bool]: """ Execute a code step. Args: step: The code step to execute variables: Current variable values (available in code) step_num: Step number for error reporting Returns: Tuple of (output_vars_dict, success) """ # Substitute variables in code (like {outputfile} -> actual value) code = substitute_variables(step.code, variables) # Create execution environment with variables local_vars = dict(variables) try: # Execute the code with substituted variables exec(code, {"__builtins__": __builtins__}, local_vars) # Support comma-separated output vars (e.g., "a, b, c") output_vars = [v.strip() for v in step.output_var.split(',')] outputs = {} for var in output_vars: outputs[var] = str(local_vars.get(var, "")) return outputs, True except Exception as e: import traceback code_lines = code.split('\n') # Extract line number from traceback error_line = None tb = traceback.extract_tb(e.__traceback__) for frame in tb: if frame.filename == '': # exec'd code error_line = frame.lineno break print(f"Error in code step (step {step_num}):", file=sys.stderr) print(f" {type(e).__name__}: {e}", file=sys.stderr) if error_line and 0 < error_line <= len(code_lines): print(file=sys.stderr) # Show context: line before, error line, line after start = max(0, error_line - 2) end = min(len(code_lines), error_line + 1) for i in range(start, end): marker = ">>>" if i == error_line - 1 else " " print(f" {marker} {i+1}: {code_lines[i]}", file=sys.stderr) print(file=sys.stderr) print(f" Available variables: {list(variables.keys())}", file=sys.stderr) return {}, False def _print_call_stack(call_stack: list, error_msg: str) -> None: """Print the tool call stack for error reporting.""" if len(call_stack) > 1: print("Error in tool chain:", file=sys.stderr) for i, (name, step_num) in enumerate(call_stack): indent = " " * i arrow = "-> " if i > 0 else "" step_info = f" (step {step_num})" if step_num else "" print(f"{indent}{arrow}{name}{step_info}", file=sys.stderr) print(f"{' ' * len(call_stack)}{error_msg}", file=sys.stderr) else: print(f"Error: {error_msg}", file=sys.stderr) def execute_tool_step( step: ToolStep, variables: dict, depth: int = 0, provider_override: Optional[str] = None, dry_run: bool = False, verbose: bool = False, call_stack: Optional[list] = None ) -> tuple[str, bool]: """ Execute a tool step by calling another tool. Args: step: The tool step to execute variables: Current variable values depth: Current recursion depth provider_override: Override provider for nested calls dry_run: Just show what would happen verbose: Show debug info call_stack: List of (tool_name, step_num) tuples for error reporting Returns: Tuple of (output_value, success) """ if call_stack is None: call_stack = [] if depth >= MAX_TOOL_DEPTH: _print_call_stack(call_stack, f"Maximum tool nesting depth ({MAX_TOOL_DEPTH}) exceeded") return "", False # Resolve the tool reference try: resolved = resolve_tool(step.tool) nested_tool = resolved.tool except ToolNotFoundError as e: _print_call_stack(call_stack, f"Tool '{step.tool}' not found") print(f"Hint: Install it with: cmdforge install {step.tool}", file=sys.stderr) return "", False # Prepare input by substituting variables input_text = substitute_variables(step.input_template, variables) # Prepare arguments by substituting variables in arg values custom_args = {} for key, value in step.args.items(): custom_args[key] = substitute_variables(str(value), variables) # Determine effective provider (step override > parent override) effective_provider = step.provider or provider_override if verbose: print(f"[verbose] Tool step: calling {step.tool}", file=sys.stderr) print(f"[verbose] Input length: {len(input_text)} chars", file=sys.stderr) print(f"[verbose] Args: {list(custom_args.keys())}", file=sys.stderr) if dry_run: return f"[DRY RUN - would call tool {step.tool}]", True # Run the nested tool output, exit_code = run_tool( tool=nested_tool, input_text=input_text, custom_args=custom_args, provider_override=effective_provider, dry_run=dry_run, show_prompt=False, verbose=verbose, _depth=depth + 1, _call_stack=call_stack ) return output, exit_code == 0 def run_tool( tool: Tool, input_text: str, custom_args: dict, provider_override: Optional[str] = None, dry_run: bool = False, show_prompt: bool = False, verbose: bool = False, _depth: int = 0, _call_stack: Optional[list] = None ) -> tuple[str, int]: """ Execute a tool. Args: tool: Tool definition input_text: Input content custom_args: Custom argument values provider_override: Override all providers dry_run: Just show what would happen show_prompt: Show prompts in addition to output verbose: Show debug info _depth: Internal recursion depth tracker _call_stack: Internal call stack for error reporting Returns: Tuple of (output_text, exit_code) """ # Initialize call stack if _call_stack is None: _call_stack = [] current_stack = _call_stack + [(tool.name, None)] # Check dependencies on first level only (not for nested calls) if _depth == 0 and (tool.dependencies or any(isinstance(s, ToolStep) for s in tool.steps)): missing = check_dependencies(tool) if missing: print(f"Warning: Missing dependencies: {', '.join(missing)}", file=sys.stderr) print(f"Install with: cmdforge install {' '.join(missing)}", file=sys.stderr) # Continue anyway - the actual step execution will fail with a better error # Initialize variables with input and arguments variables = {"input": input_text} # Add argument values (with defaults) for arg in tool.arguments: value = custom_args.get(arg.variable, arg.default) variables[arg.variable] = value if verbose: print(f"[verbose] Tool: {tool.name}", file=sys.stderr) print(f"[verbose] Variables: {list(variables.keys())}", file=sys.stderr) print(f"[verbose] Steps: {len(tool.steps)}", file=sys.stderr) # If no steps, just substitute output template if not tool.steps: output = substitute_variables(tool.output, variables) return output, 0 # Execute each step for i, step in enumerate(tool.steps): if verbose: if isinstance(step, PromptStep): step_type = "PROMPT" elif isinstance(step, CodeStep): step_type = "CODE" elif isinstance(step, ToolStep): step_type = f"TOOL({step.tool})" else: step_type = "UNKNOWN" print(f"[verbose] Step {i+1}: {step_type} -> {{{step.output_var}}}", file=sys.stderr) if isinstance(step, PromptStep): # Show prompt if requested if show_prompt or dry_run: prompt = substitute_variables(step.prompt, variables) print(f"=== PROMPT (Step {i+1}, provider={step.provider}) ===", file=sys.stderr) print(prompt, file=sys.stderr) print("=== END PROMPT ===", file=sys.stderr) if dry_run: variables[step.output_var] = f"[DRY RUN - would call {step.provider}]" else: output, success = execute_prompt_step(step, variables, provider_override) if not success: return "", 2 variables[step.output_var] = output elif isinstance(step, CodeStep): if verbose or dry_run: print(f"=== CODE (Step {i+1}) -> {{{step.output_var}}} ===", file=sys.stderr) print(step.code, file=sys.stderr) print("=== END CODE ===", file=sys.stderr) if dry_run: # Handle comma-separated output vars for dry run for var in [v.strip() for v in step.output_var.split(',')]: variables[var] = "[DRY RUN - would execute code]" else: outputs, success = execute_code_step(step, variables, step_num=i+1) if not success: return "", 1 # Merge all output vars into variables variables.update(outputs) elif isinstance(step, ToolStep): if verbose or dry_run: print(f"=== TOOL (Step {i+1}) -> {{{step.output_var}}} ===", file=sys.stderr) print(f" Calling: {step.tool}", file=sys.stderr) print(f" Args: {step.args}", file=sys.stderr) print("=== END TOOL ===", file=sys.stderr) # Update call stack with current step number step_stack = _call_stack + [(tool.name, i + 1)] output, success = execute_tool_step( step, variables, depth=_depth, provider_override=provider_override, dry_run=dry_run, verbose=verbose, call_stack=step_stack ) if not success: return "", 3 variables[step.output_var] = output # Generate final output output = substitute_variables(tool.output, variables) return output, 0 def create_argument_parser(tool: Tool) -> argparse.ArgumentParser: """ Create an argument parser for a tool. Args: tool: Tool definition Returns: Configured ArgumentParser """ parser = argparse.ArgumentParser( prog=tool.name, description=tool.description or f"CmdForge: {tool.name}" ) # Universal flags parser.add_argument("-i", "--input", dest="input_file", help="Input file (reads from stdin if piped)") parser.add_argument("--stdin", action="store_true", help="Read input interactively from stdin (type then Ctrl+D)") parser.add_argument("-o", "--output", dest="output_file", help="Output file (writes to stdout if omitted)") parser.add_argument("--dry-run", action="store_true", help="Show what would happen without executing") parser.add_argument("--show-prompt", action="store_true", help="Show prompts in addition to output") parser.add_argument("-p", "--provider", help="Override provider (e.g., --provider mock)") parser.add_argument("-v", "--verbose", action="store_true", help="Show debug information") # Tool-specific flags from arguments for arg in tool.arguments: parser.add_argument( arg.flag, dest=arg.variable, default=arg.default, help=arg.description or f"{arg.variable} (default: {arg.default})" ) return parser def main(): """Entry point for tool execution via wrapper script.""" if len(sys.argv) < 2: print("Usage: python -m cmdforge.runner [args...]", file=sys.stderr) sys.exit(1) tool_spec = sys.argv[1] # Resolve tool using new resolution order try: resolved = resolve_tool(tool_spec) tool = resolved.tool except ToolNotFoundError as e: print(f"Error: Tool '{tool_spec}' not found", file=sys.stderr) print(f"Searched: {', '.join(e.searched_paths[:3])}", file=sys.stderr) sys.exit(1) # Check for manifest overrides manifest = load_manifest() provider_override_from_manifest = None if manifest: override = manifest.get_override(tool_spec) if override and override.provider: provider_override_from_manifest = override.provider # Parse remaining arguments parser = create_argument_parser(tool) args = parser.parse_args(sys.argv[2:]) # Read input if args.input_file: # Read from file input_path = Path(args.input_file) if not input_path.exists(): print(f"Error: Input file not found: {args.input_file}", file=sys.stderr) sys.exit(1) input_text = input_path.read_text() elif args.stdin: # Explicit interactive input requested print("Reading from stdin (Ctrl+D to end):", file=sys.stderr) input_text = sys.stdin.read() elif not sys.stdin.isatty(): # Stdin is piped - read it input_text = sys.stdin.read() else: # No input provided - use empty string input_text = "" # Collect custom args custom_args = {} for arg in tool.arguments: value = getattr(args, arg.variable, None) if value is not None: custom_args[arg.variable] = value # Determine provider override (CLI flag takes precedence over manifest) effective_provider = args.provider or provider_override_from_manifest # Run tool output, exit_code = run_tool( tool=tool, input_text=input_text, custom_args=custom_args, provider_override=effective_provider, dry_run=args.dry_run, show_prompt=args.show_prompt, verbose=args.verbose ) # Write output if exit_code == 0 and output: if args.output_file: Path(args.output_file).write_text(output) else: print(output) sys.exit(exit_code) if __name__ == "__main__": main()