""" Discussion runner - thin orchestration layer. This module orchestrates SmartTools to execute discussion turns. It contains NO business logic - all logic lives in SmartTools. The runner simply: 1. Loads pipeline configuration from template 2. Calls SmartTools via subprocess based on pipeline steps 3. Pipes data between tools 4. Updates the discussion file See docs/DESIGN.md for architecture details. See docs/PIPELINE_SCHEMA.md for pipeline configuration. """ import json import subprocess import sys import concurrent.futures import yaml from dataclasses import dataclass, field from pathlib import Path from typing import Optional from .discussion import Discussion # Default pipeline used when template doesn't specify one # Uses the new variable-based schema DEFAULT_PIPELINE = { "steps": [ { "tool": "discussion-parser", "input": "$discussion", "output": "$parsed" }, { "tool": "discussion-mention-router", "input": "$parsed", "output": "$routing", "when": "not $participants_specified", "args": {"--default-participants": "$participants_csv"} }, { "tool": "discussion-{participant}", "for_each": "$participants_to_call", "parallel": True, "input": "$discussion", "output": "$responses[]", "args": {"--callout": "$callout", "--templates-dir": "$templates_dir"} }, { "tool": "discussion-turn-appender", "input": "$discussion", "output": "$discussion", "args": {"--responses-json": "$responses_json"} }, { "tool": "discussion-parser", "input": "$discussion", "output": "$reparsed", "when": "$phase_voting" }, { "tool": "discussion-vote-counter", "input": "$reparsed", "output": "$votes", "when": "$phase_voting" }, { "tool": "discussion-status-promoter", "input": "$votes", "output": "$promotion", "when": "$phase_voting", "args": {"--current-status": "$status", "--current-phase": "$phase"} }, ] } @dataclass class ParticipantResponse: """Response from a participant SmartTool.""" alias: str name: str comment: Optional[str] = None vote: Optional[str] = None success: bool = True error: Optional[str] = None raw_output: str = "" @dataclass class TurnResult: """Result of running a discussion turn.""" responses: list[ParticipantResponse] = field(default_factory=list) vote_summary: dict = field(default_factory=dict) consensus_reached: bool = False consensus_reason: Optional[str] = None status_promoted: bool = False new_status: Optional[str] = None phase_advanced: bool = False new_phase: Optional[str] = None @property def successful_count(self) -> int: return sum(1 for r in self.responses if r.success and r.comment) @property def failed_count(self) -> int: return sum(1 for r in self.responses if not r.success) @property def skipped_count(self) -> int: return sum(1 for r in self.responses if r.success and not r.comment) def _call_tool(tool_name: str, input_data: str, args: list[str] = None, timeout: int = 300) -> subprocess.CompletedProcess: """ Call a SmartTool via subprocess. Args: tool_name: Name of the tool (e.g., "discussion-parser") input_data: Data to pass via stdin args: Additional command line arguments timeout: Timeout in seconds Returns: CompletedProcess with stdout/stderr """ cmd = [tool_name] + (args or []) return subprocess.run( cmd, input=input_data, capture_output=True, text=True, timeout=timeout, ) def _parse_json_output(output: str) -> dict: """ Parse JSON from tool output. Handles: - Pure JSON - JSON wrapped in markdown code blocks Args: output: Raw stdout from tool Returns: Parsed dict Raises: ValueError: If JSON cannot be parsed """ text = output.strip() # Try direct parse try: return json.loads(text) except json.JSONDecodeError: pass # Try extracting from markdown code block import re code_block = re.search(r'```(?:json)?\s*\n?(.*?)\n?```', text, re.DOTALL) if code_block: try: return json.loads(code_block.group(1).strip()) except json.JSONDecodeError: pass # Try finding JSON object start = text.find("{") end = text.rfind("}") + 1 if start >= 0 and end > start: try: return json.loads(text[start:end]) except json.JSONDecodeError: pass raise ValueError(f"Could not parse JSON: {text[:200]}...") def _load_template(template_name: str, templates_dir: Path = None) -> dict: """ Load a template YAML file. Args: template_name: Name of the template (e.g., "feature") templates_dir: Directory to search for templates Returns: Template dict or empty dict if not found """ search_dirs = [] if templates_dir: search_dirs.append(Path(templates_dir)) search_dirs.extend([ Path.cwd() / "templates", Path(__file__).parent.parent.parent / "templates", ]) for search_dir in search_dirs: template_path = search_dir / f"{template_name}.yaml" if template_path.exists(): with open(template_path) as f: return yaml.safe_load(f) or {} return {} def _get_pipeline(template: dict) -> dict: """ Get the pipeline configuration from a template. Returns DEFAULT_PIPELINE if template doesn't define one. """ return template.get("turn_pipeline", DEFAULT_PIPELINE) class VariableStore: """ Variable store for pipeline execution. Manages $variable references and supports: - Simple variables: $discussion, $parsed - JSON field access: $routing.participants_to_call - Array collection: $responses[] appends to list """ def __init__(self): self._store: dict = {} def set(self, name: str, value) -> None: """Set a variable. Name should not include $ prefix.""" # Handle array append syntax: responses[] if name.endswith("[]"): base_name = name[:-2] if base_name not in self._store: self._store[base_name] = [] self._store[base_name].append(value) else: self._store[name] = value def get(self, ref: str, default=None): """ Get a variable value. Ref can be: - $varname - simple lookup - $varname.field - JSON field access - $varname.field.subfield - nested access Args: ref: Variable reference (e.g., "$varname" or "$varname.field") default: Default value if variable not found """ if not ref.startswith("$"): return ref # Literal value path = ref[1:] # Remove $ parts = path.split(".") value = self._store.get(parts[0]) if value is None: return default # Navigate nested fields for part in parts[1:]: if isinstance(value, dict): value = value.get(part) elif isinstance(value, list) and part.isdigit(): idx = int(part) value = value[idx] if idx < len(value) else None else: return default if value is None: return default return value def resolve(self, value) -> str: """ Resolve any $variable references in a value. For strings, replaces $var with its value. For other types, returns as-is. """ if not isinstance(value, str): return value if not value.startswith("$"): return value resolved = self.get(value) if resolved is None: return "" # Convert to string for CLI args if isinstance(resolved, (list, dict)): return json.dumps(resolved) return str(resolved) def resolve_args(self, args: dict) -> list[str]: """Convert args dict to CLI argument list with variable resolution.""" result = [] for arg_name, value in args.items(): resolved = self.resolve(value) if resolved: # Skip empty values result.extend([arg_name, resolved]) return result def dump(self) -> dict: """Return copy of all variables for debugging.""" return dict(self._store) def _check_condition(condition: str, variables: VariableStore) -> bool: """ Check if a pipeline step condition is met. Supports: - "always" or empty - always true - "$variable" - true if variable is truthy - "not $variable" - true if variable is falsy - "$variable.field" - true if JSON field is truthy Args: condition: Condition string variables: VariableStore with current state Returns: True if condition is met """ if not condition or condition == "always": return True # Handle "not $variable" negated = False if condition.startswith("not "): negated = True condition = condition[4:].strip() # Resolve the variable reference if condition.startswith("$"): value = variables.get(condition) result = bool(value) # Truthy check return not result if negated else result # Legacy condition names for backward compatibility legacy_map = { "no_participants_specified": "not $participants_specified", "phase_has_voting": "$phase_voting", } if condition in legacy_map: return _check_condition(legacy_map[condition], variables) # Unknown condition - default to true return True def _interpolate_args(args: dict, context: dict) -> list[str]: """ Interpolate variables in argument values and convert to CLI args. Args: args: Dict of arg_name -> value context: Context dict with variables Returns: List of CLI arguments (e.g., ["--threshold-ready", "0.67"]) """ result = [] for arg_name, value in args.items(): if isinstance(value, str): # Interpolate {variable} patterns for key, val in context.items(): if isinstance(val, (str, int, float, bool)): value = value.replace(f"{{{key}}}", str(val)) result.extend([arg_name, str(value)]) return result def run_pipeline_turn( discussion_path: Path, participants: list[str] = None, callout: str = "", verbose: bool = False, provider: str = None, templates_dir: Path = None, ) -> TurnResult: """ Run a discussion turn using the template's pipeline configuration. This is a THIN orchestration layer. It: 1. Initializes variables from context 2. Executes each step by piping input -> tool -> output 3. Returns results without any business logic All logic lives in the SmartTools themselves. Args: discussion_path: Path to discussion markdown file participants: List of participant aliases (None = use router) callout: Optional callout/question for participants verbose: Print progress to stderr provider: Override AI provider templates_dir: Directory containing templates Returns: TurnResult with responses and status """ discussion_path = Path(discussion_path) discussion = Discussion.load(discussion_path) result = TurnResult() def log(msg: str): if verbose: print(f"[pipeline] {msg}", file=sys.stderr) # Load template and pipeline template_name = discussion.template or "feature" template = _load_template(template_name, templates_dir) pipeline = _get_pipeline(template) phases = template.get("phases", {}) phase_config = phases.get(discussion.phase, {}) log(f"Using template: {template_name}") log(f"Pipeline has {len(pipeline.get('steps', []))} steps") # Initialize variable store with built-in variables vars = VariableStore() vars.set("discussion", discussion.get_content()) vars.set("discussion_path", str(discussion_path)) vars.set("callout", callout or "") vars.set("templates_dir", str(templates_dir or Path.cwd() / "templates")) vars.set("participants", participants or discussion.participant_aliases or ["architect", "security", "pragmatist"]) vars.set("participants_csv", ",".join(vars.get("$participants"))) vars.set("participants_specified", participants is not None) vars.set("participants_to_call", vars.get("$participants")) # May be updated by router vars.set("status", discussion.status) vars.set("phase", discussion.phase) vars.set("template", template_name) vars.set("phase_voting", phase_config.get("voting", False)) vars.set("phase_threshold_ready", phase_config.get("threshold_ready", 0.67)) vars.set("phase_human_required", phase_config.get("human_required", True)) vars.set("provider", provider or "") # Execute pipeline steps for i, step in enumerate(pipeline.get("steps", [])): tool = step.get("tool", "") input_var = step.get("input", "$discussion") output_var = step.get("output") condition = step.get("when", "always") for_each = step.get("for_each") parallel = step.get("parallel", False) args = step.get("args", {}) log(f"Step {i+1}: {tool}") # Check condition if not _check_condition(condition, vars): log(f" Skipped (condition: {condition})") continue # Handle for_each expansion if for_each: items = vars.get(for_each) if for_each.startswith("$") else for_each if not items: log(f" Skipped (for_each {for_each} is empty)") continue if isinstance(items, str): items = [items] log(f" Iterating over {len(items)} items") _execute_for_each(vars, result, tool, input_var, output_var, items, parallel, args, provider, log) else: # Single tool execution _execute_step(vars, tool, input_var, output_var, args, log) # Build TurnResult from variables _finalize_result(vars, result, discussion_path, log) log("Pipeline complete") return result def _execute_step( vars: VariableStore, tool: str, input_var: str, output_var: Optional[str], args: dict, log, ) -> Optional[str]: """ Execute a single pipeline step. This is generic - no special handling for specific tools. Just: resolve input -> call tool -> store output """ # Get input data input_data = vars.get(input_var) if input_var else "" if isinstance(input_data, (dict, list)): input_data = json.dumps(input_data) input_data = str(input_data) if input_data else "" # Resolve args cli_args = vars.resolve_args(args) try: proc = _call_tool(tool, input_data, cli_args) if proc.returncode != 0: log(f" {tool} failed: {proc.stderr[:100] if proc.stderr else 'no error message'}") return None output = proc.stdout # Store output if specified if output_var: # Try to parse as JSON, fall back to string try: parsed = _parse_json_output(output) var_name = output_var.lstrip("$") vars.set(var_name, parsed) log(f" -> {output_var} (JSON)") except (ValueError, json.JSONDecodeError): var_name = output_var.lstrip("$") vars.set(var_name, output) log(f" -> {output_var} (string)") return output except FileNotFoundError: log(f" {tool} not found") return None except Exception as e: log(f" {tool} error: {e}") return None def _execute_for_each( vars: VariableStore, result: TurnResult, tool_pattern: str, input_var: str, output_var: Optional[str], items: list, parallel: bool, args: dict, provider: str, log, ): """ Execute a step for each item in a list. Handles {participant} substitution in tool names. Collects outputs into array if output_var ends with []. """ def execute_one(item: str): # Substitute {participant} in tool name tool = tool_pattern.replace("{participant}", item) log(f" [{item}] {tool}") # Get input data input_data = vars.get(input_var) if input_var else "" if isinstance(input_data, (dict, list)): input_data = json.dumps(input_data) input_data = str(input_data) if input_data else "" # Resolve args cli_args = vars.resolve_args(args) # Add provider if specified if provider: cli_args.extend(["--provider", provider]) try: proc = _call_tool(tool, input_data, cli_args) if proc.returncode != 0: log(f" [{item}] failed: {proc.stderr[:80] if proc.stderr else ''}") return ParticipantResponse( alias=item, name=f"AI-{item.capitalize()}", success=False, error=proc.stderr or "Tool failed", ) # Parse response try: response_data = _parse_json_output(proc.stdout) except (ValueError, json.JSONDecodeError): log(f" [{item}] invalid JSON output") return ParticipantResponse( alias=item, name=f"AI-{item.capitalize()}", success=False, error="Invalid JSON response", raw_output=proc.stdout, ) # Check for NO_RESPONSE sentinel if response_data.get("sentinel") == "NO_RESPONSE": log(f" [{item}] no response") return ParticipantResponse( alias=item, name=f"AI-{item.capitalize()}", success=True, ) comment = response_data.get("comment", "") vote = response_data.get("vote") log(f" [{item}] responded" + (f" [{vote}]" if vote else "")) return ParticipantResponse( alias=item, name=f"AI-{item.capitalize()}", comment=comment, vote=vote, success=True, raw_output=proc.stdout, ) except FileNotFoundError: log(f" [{item}] tool not found") return ParticipantResponse( alias=item, name=f"AI-{item.capitalize()}", success=False, error=f"Tool {tool} not found", ) except Exception as e: log(f" [{item}] error: {e}") return ParticipantResponse( alias=item, name=f"AI-{item.capitalize()}", success=False, error=str(e), ) responses = [] if parallel: with concurrent.futures.ThreadPoolExecutor(max_workers=len(items)) as executor: futures = {executor.submit(execute_one, item): item for item in items} for future in concurrent.futures.as_completed(futures): responses.append(future.result()) else: for item in items: responses.append(execute_one(item)) # Store responses in result for resp in responses: result.responses.append(resp) # Store in output variable if specified if output_var: var_name = output_var.lstrip("$") # Build list of response data for downstream tools response_data = [] for resp in responses: if resp.success and resp.comment: response_data.append({ "author": resp.name, "comment": resp.comment, "vote": resp.vote, }) vars.set(var_name, response_data) vars.set("responses_json", json.dumps(response_data)) def _finalize_result(vars: VariableStore, result: TurnResult, discussion_path: Path, log): """ Finalize the turn result by: 1. Writing updated discussion content back to file (if turn-appender was used) 2. Extracting vote summary and consensus from variables 3. Handling status promotion """ # Check if discussion was updated (turn-appender output) updated_discussion = vars.get("$discussion") if updated_discussion and isinstance(updated_discussion, str): # Write back to file discussion_path.write_text(updated_discussion) log(" Discussion file updated") # Extract vote summary if vote-counter was run votes = vars.get("$votes") if votes and isinstance(votes, dict): result.vote_summary = votes.get("vote_summary", {}) consensus = votes.get("consensus", {}) result.consensus_reached = consensus.get("reached", False) result.consensus_reason = consensus.get("reason") # Check for status promotion promotion = vars.get("$promotion") if promotion and isinstance(promotion, dict): if promotion.get("should_promote"): result.status_promoted = True result.new_status = promotion.get("new_status") # Update status in file discussion = Discussion.load(discussion_path) discussion.update_status(result.new_status) discussion.save() log(f" Status promoted to {result.new_status}") # Auto-advance phase if consensus reached in a voting phase phase_voting = vars.get("$phase_voting", False) if result.consensus_reached and phase_voting: template_name = vars.get("$template", "") current_phase = vars.get("$phase", "") if template_name and current_phase: template = _load_template(template_name) phases = template.get("phases", {}) phase_config = phases.get(current_phase, {}) next_phase = phase_config.get("next_phase") if next_phase: # Load discussion and update phase discussion = Discussion.load(discussion_path) old_phase = discussion.phase discussion.update_phase(next_phase) discussion.save() result.phase_advanced = True result.new_phase = next_phase log(f" Phase advanced: {old_phase} → {next_phase}") def run_turn( discussion_path: Path, participants: list[str] = None, callout: str = "", verbose: bool = False, provider: str = None, ) -> TurnResult: """ Run a discussion turn by orchestrating SmartTools. Pipeline: 1. discussion-parser - Parse current state 2. discussion-mention-router - Determine who responds (or use provided list) 3. discussion-{alias} - Call each participant 4. discussion-turn-appender - Append responses to file 5. discussion-vote-counter - Count votes 6. discussion-status-promoter - Check for status change Args: discussion_path: Path to discussion markdown file participants: List of participant aliases (None = use mention router) callout: Optional callout/question for participants verbose: Print progress to stderr provider: Override AI provider for participants Returns: TurnResult with responses and status """ discussion_path = Path(discussion_path) discussion_content = discussion_path.read_text() result = TurnResult() def log(msg: str): if verbose: print(f"[runner] {msg}", file=sys.stderr) # Step 1: Parse discussion log("Parsing discussion...") try: parser_result = _call_tool("discussion-parser", discussion_content) if parser_result.returncode != 0: raise RuntimeError(f"Parser failed: {parser_result.stderr}") state = _parse_json_output(parser_result.stdout) except FileNotFoundError: raise RuntimeError("discussion-parser not found. Is SmartTools installed?") current_status = state.get("metadata", {}).get("status", "OPEN") current_phase = state.get("metadata", {}).get("phase", "initial_feedback") log(f" Status: {current_status}, Phase: {current_phase}") # Step 2: Determine participants if participants: participants_to_call = participants else: log("Routing mentions...") try: router_result = _call_tool("discussion-mention-router", parser_result.stdout) if router_result.returncode != 0: raise RuntimeError(f"Mention router failed: {router_result.stderr}") routing = _parse_json_output(router_result.stdout) participants_to_call = routing.get("participants_to_call", []) except FileNotFoundError: raise RuntimeError("discussion-mention-router not found") log(f" Participants: {', '.join(participants_to_call)}") # Step 3: Call each participant responses_for_appender = [] for alias in participants_to_call: tool_name = f"discussion-{alias}" log(f"Calling {alias}...") args = [] if callout: args.extend(["--callout", callout]) if provider: args.extend(["--provider", provider]) try: participant_result = _call_tool(tool_name, discussion_content, args) if participant_result.returncode != 0: log(f" {alias} failed: {participant_result.stderr}") result.responses.append(ParticipantResponse( alias=alias, name=f"AI-{alias.capitalize()}", success=False, error=participant_result.stderr, raw_output=participant_result.stdout, )) continue # Parse response try: response_data = _parse_json_output(participant_result.stdout) except ValueError as e: log(f" {alias} returned invalid JSON: {e}") result.responses.append(ParticipantResponse( alias=alias, name=f"AI-{alias.capitalize()}", success=False, error=str(e), raw_output=participant_result.stdout, )) continue # Check for NO_RESPONSE sentinel if response_data.get("sentinel") == "NO_RESPONSE": log(f" {alias} has nothing to add") result.responses.append(ParticipantResponse( alias=alias, name=f"AI-{alias.capitalize()}", success=True, )) continue # Extract response data comment = response_data.get("comment", "") vote = response_data.get("vote") author = f"AI-{alias.capitalize()}" log(f" {alias} responded" + (f" [{vote}]" if vote else "")) result.responses.append(ParticipantResponse( alias=alias, name=author, comment=comment, vote=vote, success=True, raw_output=participant_result.stdout, )) # Prepare for appender responses_for_appender.append({ "author": author, "comment": comment, "vote": vote, }) except FileNotFoundError: log(f" {tool_name} not found, skipping") result.responses.append(ParticipantResponse( alias=alias, name=f"AI-{alias.capitalize()}", success=False, error=f"Tool {tool_name} not found", )) # Step 4: Append responses to discussion if responses_for_appender: log("Appending responses...") # Format: original content + delimiter + responses JSON appender_input = f"{discussion_content}\n---RESPONSES---\n{json.dumps(responses_for_appender)}" try: appender_result = _call_tool("discussion-turn-appender", appender_input) if appender_result.returncode != 0: log(f" Appender failed: {appender_result.stderr}") else: # Write updated content updated_content = appender_result.stdout discussion_path.write_text(updated_content) discussion_content = updated_content log(" Responses appended") except FileNotFoundError: log(" discussion-turn-appender not found, appending manually") # Fallback: append manually discussion = Discussion.load(discussion_path) for resp in responses_for_appender: discussion.add_comment( author=resp["author"], text=resp["comment"], vote=resp.get("vote"), ) discussion.save() discussion_content = discussion.get_content() # Step 5: Count votes log("Counting votes...") try: # Re-parse updated discussion parser_result = _call_tool("discussion-parser", discussion_content) if parser_result.returncode == 0: vote_result = _call_tool("discussion-vote-counter", parser_result.stdout) if vote_result.returncode == 0: votes = _parse_json_output(vote_result.stdout) result.vote_summary = votes.get("vote_summary", {}) consensus = votes.get("consensus", {}) result.consensus_reached = consensus.get("reached", False) result.consensus_reason = consensus.get("reason") log(f" Votes: {result.vote_summary}") log(f" Consensus: {result.consensus_reached}") except FileNotFoundError: log(" discussion-vote-counter not found") # Step 6: Check status promotion log("Checking status promotion...") try: vote_counter_output = json.dumps({ "vote_summary": result.vote_summary, "consensus": { "reached": result.consensus_reached, "reason": result.consensus_reason, } }) promoter_result = _call_tool( "discussion-status-promoter", vote_counter_output, ["--current-status", current_status, "--current-phase", current_phase] ) if promoter_result.returncode == 0: promotion = _parse_json_output(promoter_result.stdout) if promotion.get("should_promote"): result.status_promoted = True result.new_status = promotion.get("new_status") log(f" Status promoted: {current_status} -> {result.new_status}") # Update status in file discussion = Discussion.load(discussion_path) discussion.update_status(result.new_status) discussion.save() except FileNotFoundError: log(" discussion-status-promoter not found") log("Turn complete") return result # Convenience function for backward compatibility def run_discussion_turn( discussion_path: str | Path, participants: list[str] = None, callout: str = "", provider: str = None, verbose: bool = False, ) -> TurnResult: """ Convenience function to run a discussion turn. Args: discussion_path: Path to discussion file participants: Participant aliases to invoke callout: Request/question for participants provider: Override AI provider verbose: Enable verbose output Returns: TurnResult with responses """ return run_turn( discussion_path=Path(discussion_path), participants=participants, callout=callout, verbose=verbose, provider=provider, )