ai-workflow-test/workflow.py

336 lines
11 KiB
Python
Executable File

#!/usr/bin/env python3
"""workflow.py
Guides the Mission Control / Step Agent loop with prompt bundling and state updates.
Commands:
validate -> Check that required files exist and no placeholders remain
start-mc -> Print a Mission Control prompt (reads repo docs)
start-step --task "TASK_ID" -> Print a Step Agent prompt for a task from tasks.yaml
new-task --id ID --title "..." [--owner "name"] [--status "in-progress"] [--dry-run]
complete-task --id ID [--dry-run]
submit-report --task "TASK_ID" --report path.md [--dry-run]
status -> Show top of plan/state and list tasks
Notes:
- Uses a strict, tiny YAML subset for project/tasks.yaml:
tasks:
- id: TASK-001
title: Implement authentication
owner: step-agent
status: in-progress # or todo, done, blocked
started: 2025-10-01
"""
import argparse, sys, os, re, shutil, subprocess, datetime
from pathlib import Path
REPO = Path(".").resolve()
REQ_FILES = [
"roles/mission_control.md",
"roles/step_agent.md",
"project/plan.md",
"project/state.md",
"project/rules.md",
"project/tasks.yaml",
]
PLACEHOLDER_RE = re.compile(r"\{\{[^}]+\}\}")
def read(p: Path) -> str:
try:
return p.read_text(encoding="utf-8").strip()
except Exception:
return ""
def box(text: str) -> str:
lines = text.splitlines()
width = max((len(l) for l in lines), default=0) + 2
top = "" + ""*width + ""
bot = "" + ""*width + ""
body = "\n".join("" + l.ljust(width-2) + "" for l in lines)
return f"{top}\n{body}\n{bot}"
def git(cmd):
r = subprocess.run(["git"] + cmd, cwd=REPO, check=False, capture_output=True, text=True)
return r
# --- Minimal YAML parser/writer for our simple tasks.yaml schema ---
def parse_tasks_yaml(text: str):
"""
Accepts a tiny subset:
tasks:
- id: TASK-001
title: Foo
owner: someone
status: in-progress
started: 2025-10-01
"""
tasks = []
lines = [l.rstrip("\n") for l in text.splitlines()]
i = 0
# find "tasks:"
while i < len(lines) and not lines[i].strip().startswith("tasks:"):
i += 1
if i == len(lines): return {"tasks": []}
i += 1
current = None
while i < len(lines):
line = lines[i]
if not line.strip():
i += 1; continue
if line.lstrip().startswith("- "):
if current: tasks.append(current)
current = {}
# allow "- id: TASK-001" inline
rest = line.strip()[2:].strip()
if rest:
if ":" in rest:
k,v = rest.split(":",1)
current[k.strip()] = v.strip()
elif line.startswith(" ") or line.startswith("\t"):
stripped = line.strip()
if ":" in stripped and current is not None:
k,v = stripped.split(":",1)
current[k.strip()] = v.strip()
else:
# anything else ends tasks section
break
i += 1
if current: tasks.append(current)
return {"tasks": tasks}
def dump_tasks_yaml(data):
out = ["tasks:"]
for t in data.get("tasks", []):
out.append(f" - id: {t.get('id','')}")
for k in ("title","owner","status","started"):
if t.get(k):
out.append(f" {k}: {t[k]}")
return "\n".join(out) + "\n"
def load_tasks():
p = REPO/"project/tasks.yaml"
if not p.exists(): return {"tasks": []}
return parse_tasks_yaml(read(p))
def save_tasks(data, dry_run=False):
content = dump_tasks_yaml(data)
if dry_run:
print("---- tasks.yaml (dry-run) ----")
print(content)
return
(REPO/"project/tasks.yaml").write_text(content, encoding="utf-8")
# --- Commands ---
def cmd_validate(args):
ok = True
for f in REQ_FILES:
if not (REPO/f).exists():
print(f"Missing required file: {f}", file=sys.stderr)
ok = False
# scan for unreplaced placeholders
for f in REQ_FILES:
text = read(REPO/f)
if PLACEHOLDER_RE.search(text):
print(f"Unreplaced placeholders in: {f}", file=sys.stderr)
ok = False
if ok:
print("Repository looks valid ✅")
else:
sys.exit(2)
def start_mc(args):
mc_role = read(REPO/"roles/mission_control.md")
plan = read(REPO/"project/plan.md")
state = read(REPO/"project/state.md")
rules = read(REPO/"project/rules.md")
prompt = f"""
You are **Mission Control**.
--- ROLE ---
{mc_role}
--- PLAN ---
{plan}
--- STATE ---
{state}
--- RULES ---
{rules}
YOUR TASK NOW:
1) Review the PLAN and STATE.
2) Emit exactly one **Task Prompt** for a Step Agent with:
- Goal & Acceptance Criteria
- Minimal context links (paths)
- Timebox from rules
3) If you believe a **major plan change** is required, draft the change and ask for Overseer review; Overseer must obtain explicit Human confirmation before approval.
""".strip()
print(box(prompt))
def start_step(args):
tid = args.task.strip()
tasks = load_tasks().get("tasks", [])
task = next((t for t in tasks if t.get("id") == tid), None)
step_role = read(REPO/"roles/step_agent.md")
rules = read(REPO/"project/rules.md")
if not task:
context = "(task not found in tasks.yaml; follow the Mission Control prompt you received)"
else:
context = f"ID: {task.get('id')}\nTitle: {task.get('title')}\nOwner: {task.get('owner','')}\nStatus: {task.get('status','')}\nStarted: {task.get('started','')}"
prompt = f"""
You are a **Step Agent** for the task: {tid}
--- ROLE ---
{step_role}
--- RULES ---
{rules}
--- TASK CONTEXT (from tasks.yaml) ---
{context}
INSTRUCTIONS:
- Work **only** on the assigned task.
- When editing governed files, output **Proposed Diffs** (unified patches) or open a PR (no silent changes).
- End the timebox with a **Report** using `/process/report-template.md`.
""".strip()
print(box(prompt))
def cmd_new_task(args):
tid = args.id.strip()
tasks = load_tasks()
if any(t.get("id")==tid for t in tasks["tasks"]):
print(f"Task {tid} already exists.", file=sys.stderr); sys.exit(1)
task = {
"id": tid,
"title": args.title.strip(),
"owner": (args.owner or "").strip(),
"status": (args.status or "todo").strip(),
"started": args.started or datetime.date.today().isoformat(),
}
tasks["tasks"].append(task)
save_tasks(tasks, dry_run=args.dry_run)
if not args.dry_run:
# append a small block to state.md for visibility
state_p = REPO/"project/state.md"
state = read(state_p) or "# Current State\n"
block = f"\n- Task: {tid}{task['title']} (owner: {task['owner'] or 'MC assigned'}) Status: {task['status']}\n"
state += block
state_p.write_text(state, encoding="utf-8")
git(["add","-A"]); r = git(["commit","-m",f"docs(state): add task {tid}"])
if r.returncode != 0:
print("git commit failed:", r.stderr, file=sys.stderr)
print(f"Task {tid} added.")
def cmd_complete_task(args):
tid = args.id.strip()
tasks = load_tasks()
found = False
for t in tasks["tasks"]:
if t.get("id")==tid:
t["status"]="done"; found=True
if not found:
print(f"Task {tid} not found.", file=sys.stderr); sys.exit(1)
save_tasks(tasks, dry_run=args.dry_run)
if not args.dry_run:
git(["add","-A"]); r = git(["commit","-m",f"docs(state): mark task {tid} done"])
if r.returncode != 0:
print("git commit failed:", r.stderr, file=sys.stderr)
print(f"Task {tid} marked done.")
def submit_report(args):
tid = args.task.strip()
src = Path(args.report).resolve()
if not src.exists():
print(f"Report file not found: {src}", file=sys.stderr); sys.exit(1)
reports_dir = REPO/"reports"
reports_dir.mkdir(parents=True, exist_ok=True)
ts = datetime.datetime.now().strftime("%Y%m%d-%H%M%S")
safe_task = re.sub(r'[^A-Za-z0-9_.-]+','_',tid)
dest = reports_dir / f"{ts}-{safe_task}.md"
if args.dry_run:
print(f"(dry-run) Would copy {src} -> {dest}")
else:
shutil.copyfile(src, dest)
# Append to Recently Completed; do not auto-remove from In-Flight
state_p = REPO/"project/state.md"
state = read(state_p)
addition = f"- {tid} — report: {dest.as_posix()} ({ts})"
if "## Recently Completed" in state:
new_state = state.replace("## Recently Completed", f"## Recently Completed\n{addition}\n", 1)
else:
new_state = state + f"\n\n## Recently Completed\n{addition}\n"
if args.dry_run:
print("---- state.md (dry-run snippet) ----")
print(addition)
else:
state_p.write_text(new_state, encoding="utf-8")
git(["add","-A"]); r = git(["commit","-m",f"docs(report): submit report for {tid}"])
if r.returncode != 0:
print("git commit failed:", r.stderr, file=sys.stderr)
print(f"Report {'prepared' if args.dry_run else 'stored'} at {dest}")
print("State " + ("previewed" if args.dry_run else "updated and committed") + ".")
def status(args):
plan = read(REPO/"project/plan.md")
state = read(REPO/"project/state.md")
tasks = load_tasks().get("tasks", [])
print("---- PLAN (top) ----")
print("\n".join(plan.splitlines()[:20]))
print("\n---- STATE (top) ----")
print("\n".join(state.splitlines()[:40]))
print("\n---- TASKS (tasks.yaml) ----")
for t in tasks:
print(f"{t.get('id')}: {t.get('title')} [{t.get('status','')}] owner={t.get('owner','')}")
def main():
ap = argparse.ArgumentParser()
sub = ap.add_subparsers(dest="cmd", required=True)
p_val = sub.add_parser("validate", help="Check repository structure and placeholders")
p_val.set_defaults(func=cmd_validate)
p_mc = sub.add_parser("start-mc", help="Generate Mission Control startup prompt")
p_mc.set_defaults(func=start_mc)
p_step = sub.add_parser("start-step", help="Generate Step Agent startup prompt")
p_step.add_argument("--task", required=True, help='Task ID from tasks.yaml')
p_step.set_defaults(func=start_step)
p_new = sub.add_parser("new-task", help="Add a new task (writes tasks.yaml, appends to state.md)")
p_new.add_argument("--id", required=True)
p_new.add_argument("--title", required=True)
p_new.add_argument("--owner", default="")
p_new.add_argument("--status", default="todo")
p_new.add_argument("--started", default=None)
p_new.add_argument("--dry-run", action="store_true")
p_new.set_defaults(func=cmd_new_task)
p_done = sub.add_parser("complete-task", help="Mark a task done in tasks.yaml")
p_done.add_argument("--id", required=True)
p_done.add_argument("--dry-run", action="store_true")
p_done.set_defaults(func=cmd_complete_task)
p_sub = sub.add_parser("submit-report", help="Attach a report and update state.md")
p_sub.add_argument("--task", required=True, help="Task ID")
p_sub.add_argument("--report", required=True, help="Path to markdown report file")
p_sub.add_argument("--dry-run", action="store_true")
p_sub.set_defaults(func=submit_report)
p_status = sub.add_parser("status", help="Show a quick view of plan/state and tasks")
p_status.set_defaults(func=status)
args = ap.parse_args()
args.func(args)
if __name__ == "__main__":
main()