210 lines
8.6 KiB
Python
210 lines
8.6 KiB
Python
"""
|
|
automation.runner
|
|
=================
|
|
|
|
Entry point invoked by the pre-commit hook to evaluate `.ai-rules.yml`
|
|
instructions. The runner is intentionally thin: it inspects staged files,
|
|
looks up the matching rule/output definitions, merges instruction strings and
|
|
delegates execution to `automation.patcher.generate_output`, which handles the
|
|
heavy lifting (prompt composition, AI invocation, patch application).
|
|
"""
|
|
from __future__ import annotations
|
|
|
|
import argparse
|
|
import os
|
|
import subprocess
|
|
import sys
|
|
from pathlib import Path
|
|
from typing import Dict, Iterable
|
|
|
|
from automation.config import RulesConfig
|
|
from automation.patcher import ModelConfig, generate_output, run
|
|
|
|
|
|
def get_staged_files(repo_root: Path) -> list[Path]:
|
|
"""
|
|
Return staged (added/modified) paths relative to the repository root.
|
|
"""
|
|
# We only care about what is in the index; the working tree may include
|
|
# experiments the developer does not intend to commit. `--diff-filter=AM`
|
|
# narrows the list to new or modified files.
|
|
result = run(
|
|
["git", "diff", "--cached", "--name-only", "--diff-filter=AM"],
|
|
cwd=repo_root,
|
|
check=False,
|
|
)
|
|
return [Path(line.strip()) for line in result.stdout.splitlines() if line.strip()]
|
|
|
|
|
|
def merge_instructions(source_instr: str, output_instr: str, append_instr: str) -> str:
|
|
"""
|
|
Combine source-level, output-level, and append instructions into a single prompt.
|
|
"""
|
|
final = output_instr.strip() if output_instr else source_instr.strip()
|
|
if not final:
|
|
final = source_instr.strip()
|
|
append_instr = append_instr.strip()
|
|
if append_instr:
|
|
prefix = (final + "\n\n") if final else ""
|
|
final = f"{prefix}Additional requirements for this output location:\n{append_instr}"
|
|
return final.strip() # Final, human-readable instruction block handed to the AI
|
|
|
|
|
|
def process(repo_root: Path, rules: RulesConfig, model: ModelConfig) -> int:
|
|
"""
|
|
Walk staged files, resolve matching outputs, and invoke the patcher for each.
|
|
"""
|
|
# 1) Gather the staged file list (Git index only).
|
|
staged_files = get_staged_files(repo_root)
|
|
if not staged_files:
|
|
return 0
|
|
|
|
# 2) For each staged file, look up the matching rule and iterate outputs.
|
|
for src_rel in staged_files:
|
|
# Find the most specific rule (nearest .ai-rules.yml wins).
|
|
rule_name = rules.get_rule_name(src_rel)
|
|
if not rule_name:
|
|
continue
|
|
|
|
rule_config = rules.cascade_for(src_rel, rule_name)
|
|
|
|
# --- Participant Agents Phase ---
|
|
participant_entries = rule_config.get("participants") or []
|
|
if isinstance(participant_entries, (str, dict)):
|
|
participant_entries = [participant_entries]
|
|
|
|
normalized_participants: list[dict[str, object]] = []
|
|
for entry in participant_entries:
|
|
if isinstance(entry, str):
|
|
normalized_participants.append({"path": entry, "background": False})
|
|
elif isinstance(entry, dict):
|
|
path = entry.get("path") or entry.get("script")
|
|
if not path:
|
|
continue
|
|
background = str(entry.get("background", "false")).lower() == "true"
|
|
normalized_participants.append({"path": path, "background": background})
|
|
else:
|
|
continue
|
|
|
|
for participant_cfg in normalized_participants:
|
|
participant = str(participant_cfg["path"])
|
|
background = bool(participant_cfg.get("background", False))
|
|
script_path = (repo_root / participant).resolve()
|
|
if not script_path.exists():
|
|
print(f"[runner] participant {participant} not found; skipping", file=sys.stderr)
|
|
continue
|
|
|
|
env = dict(os.environ)
|
|
pythonpath = env.get("PYTHONPATH", "")
|
|
# Add repo root to pythonpath to allow agents to import from src
|
|
path_entry = str(repo_root / "src")
|
|
env["PYTHONPATH"] = f"{path_entry}:{pythonpath}" if pythonpath else path_entry
|
|
|
|
cmd = [sys.executable, str(script_path), "--repo-root", str(repo_root), "--path", src_rel.as_posix()]
|
|
if background:
|
|
try:
|
|
process = subprocess.Popen(
|
|
cmd,
|
|
cwd=repo_root,
|
|
env=env,
|
|
stdout=subprocess.DEVNULL,
|
|
stderr=subprocess.DEVNULL,
|
|
)
|
|
print(f"[runner] participant {participant} running in background (pid {process.pid})")
|
|
except OSError as exc: # pragma: no cover - defensive
|
|
sys.stderr.write(f"[runner] participant {participant} failed to launch: {exc}\n")
|
|
continue
|
|
|
|
result = subprocess.run(cmd, cwd=repo_root, env=env, check=False, capture_output=True, text=True)
|
|
if result.returncode != 0:
|
|
stderr = result.stderr.strip()
|
|
if stderr:
|
|
sys.stderr.write(f"[runner] participant {participant} exited with {result.returncode}: {stderr}\n")
|
|
else:
|
|
sys.stderr.write(f"[runner] participant {participant} exited with {result.returncode}\n")
|
|
continue
|
|
# Re-stage the file in case the agent modified it
|
|
run(["git", "add", src_rel.as_posix()], cwd=repo_root, check=False)
|
|
|
|
# --- Outputs Phase ---
|
|
outputs: Dict[str, Dict] = rule_config.get("outputs") or {}
|
|
source_instruction = rule_config.get("instruction", "")
|
|
|
|
for output_name, output_cfg in outputs.items():
|
|
if not isinstance(output_cfg, dict):
|
|
continue
|
|
if str(output_cfg.get("enabled", "true")).lower() == "false":
|
|
continue
|
|
|
|
path_template = output_cfg.get("path")
|
|
if not path_template:
|
|
continue
|
|
|
|
rendered_path = rules.resolve_template(path_template, src_rel)
|
|
try:
|
|
output_rel = rules.normalize_repo_rel(rendered_path)
|
|
except ValueError:
|
|
print(f"[runner] skipping {output_name}: unsafe path {rendered_path}", file=sys.stderr)
|
|
continue
|
|
|
|
# Build the instruction set for this output. Output-specific text
|
|
# overrides the rule-level text, and we keep the source version as a
|
|
# fallback.
|
|
instruction = output_cfg.get("instruction", "") or source_instruction
|
|
append = output_cfg.get("instruction_append", "")
|
|
model_hint = rule_config.get("model_hint", "")
|
|
|
|
output_type = output_cfg.get("output_type")
|
|
if output_type:
|
|
extra = rules.cascade_for(output_rel, output_type)
|
|
instruction = extra.get("instruction", instruction)
|
|
append = extra.get("instruction_append", append)
|
|
# Output type can also override model hint
|
|
if "model_hint" in extra:
|
|
model_hint = extra["model_hint"]
|
|
|
|
final_instruction = merge_instructions(source_instruction, instruction, append)
|
|
|
|
# 3) Ask the patcher to build a diff with the assembled instruction.
|
|
try:
|
|
print(f"[runner] generating {output_rel.as_posix()} from {src_rel.as_posix()}")
|
|
generate_output(
|
|
repo_root=repo_root,
|
|
rules=rules,
|
|
model=model,
|
|
source_rel=src_rel,
|
|
output_rel=output_rel,
|
|
instruction=final_instruction,
|
|
model_hint=model_hint,
|
|
)
|
|
except Exception as exc: # pragma: no cover - defensive
|
|
print(f"[runner] error generating {output_rel}: {exc}", file=sys.stderr)
|
|
|
|
return 0
|
|
|
|
|
|
def main(argv: list[str] | None = None) -> int:
|
|
"""
|
|
CLI entry point used by the pre-commit hook.
|
|
"""
|
|
# Parse command-line options (only --model override today).
|
|
parser = argparse.ArgumentParser(description="CascadingDev AI runner")
|
|
parser.add_argument("--model", help="Override AI command (default from env)")
|
|
args = parser.parse_args(argv)
|
|
|
|
# Load the nearest .ai-rules.yml (fail quietly if missing).
|
|
repo_root = Path.cwd().resolve()
|
|
try:
|
|
rules = RulesConfig.load(repo_root)
|
|
except FileNotFoundError:
|
|
print("[runner] .ai-rules.yml not found; skipping")
|
|
return 0
|
|
|
|
# Instantiate the model config and delegate to the processing pipeline.
|
|
model = ModelConfig.from_sources(repo_root, args.model)
|
|
return process(repo_root, rules, model)
|
|
|
|
|
|
if __name__ == "__main__":
|
|
sys.exit(main())
|