105 lines
3.5 KiB
Python
105 lines
3.5 KiB
Python
"""Python entrypoint for AI rule processing (replaces legacy bash hook)."""
|
|
from __future__ import annotations
|
|
|
|
import argparse
|
|
import sys
|
|
from pathlib import Path
|
|
|
|
from automation.config import RulesConfig
|
|
from automation.patcher import ModelConfig, generate_output, run
|
|
|
|
|
|
def get_staged_files(repo_root: Path) -> list[Path]:
|
|
result = run(["git", "diff", "--cached", "--name-only", "--diff-filter=AM"], cwd=repo_root, check=False)
|
|
paths: list[Path] = []
|
|
for line in result.stdout.splitlines():
|
|
line = line.strip()
|
|
if line:
|
|
paths.append(Path(line))
|
|
return paths
|
|
|
|
|
|
def merge_instructions(source_instr: str, output_instr: str, append_instr: str) -> str:
|
|
final = output_instr.strip() if output_instr else source_instr.strip()
|
|
if not final:
|
|
final = source_instr.strip()
|
|
if append_instr and append_instr.strip():
|
|
final = (final + "\n\n" if final else "") + "Additional requirements for this output location:\n" + append_instr.strip()
|
|
return final.strip()
|
|
|
|
|
|
def process(repo_root: Path, rules: RulesConfig, model: ModelConfig) -> int:
|
|
staged_files = get_staged_files(repo_root)
|
|
if not staged_files:
|
|
return 0
|
|
|
|
for src_rel in staged_files:
|
|
rule_name = rules.get_rule_name(src_rel)
|
|
if not rule_name:
|
|
continue
|
|
|
|
rule_config = rules.cascade_for(src_rel, rule_name)
|
|
outputs = rule_config.get("outputs") or {}
|
|
source_instruction = rule_config.get("instruction", "")
|
|
|
|
for output_name, output_cfg in outputs.items():
|
|
if not isinstance(output_cfg, dict):
|
|
continue
|
|
if str(output_cfg.get("enabled", "true")).lower() == "false":
|
|
continue
|
|
|
|
path_template = output_cfg.get("path")
|
|
if not path_template:
|
|
continue
|
|
|
|
rendered_path = rules.resolve_template(path_template, src_rel)
|
|
try:
|
|
output_rel = rules.normalize_repo_rel(rendered_path)
|
|
except ValueError:
|
|
print(f"[runner] skipping {output_name}: unsafe path {rendered_path}", file=sys.stderr)
|
|
continue
|
|
|
|
instruction = source_instruction
|
|
if output_cfg.get("instruction"):
|
|
instruction = output_cfg.get("instruction")
|
|
|
|
append = output_cfg.get("instruction_append", "")
|
|
|
|
if output_cfg.get("output_type"):
|
|
extra = rules.cascade_for(output_rel, output_cfg["output_type"])
|
|
instruction = extra.get("instruction", instruction)
|
|
append = extra.get("instruction_append", append)
|
|
|
|
final_instruction = merge_instructions(source_instruction, instruction, append)
|
|
|
|
generate_output(
|
|
repo_root=repo_root,
|
|
rules=rules,
|
|
model=model,
|
|
source_rel=src_rel,
|
|
output_rel=output_rel,
|
|
instruction=final_instruction,
|
|
)
|
|
|
|
return 0
|
|
|
|
|
|
def main(argv: list[str] | None = None) -> int:
|
|
parser = argparse.ArgumentParser(description="CascadingDev AI runner")
|
|
parser.add_argument("--model", help="Override AI command (default from env)")
|
|
args = parser.parse_args(argv)
|
|
|
|
repo_root = Path.cwd().resolve()
|
|
try:
|
|
rules = RulesConfig.load(repo_root)
|
|
except FileNotFoundError:
|
|
print("[runner] .ai-rules.yml not found; skipping")
|
|
return 0
|
|
|
|
model = ModelConfig(command=args.model or ModelConfig().command)
|
|
return process(repo_root, rules, model)
|
|
|
|
|
|
if __name__ == "__main__":
|
|
sys.exit(main())
|