CascadingDev/automation/workflow.py

524 lines
17 KiB
Python

#!/usr/bin/env python3
"""
Discussion workflow automation for CascadingDev.
Phase 1 (Basic):
• Parse VOTE: lines and update summaries
Phase 2 (AI-Enhanced):
• Use Claude agents to extract questions, actions, decisions
• Track @mentions and awaiting replies
• Maintain timeline and structured summaries
• Process only incremental changes via git diff
Always exits 0 so pre-commit hook never blocks commits.
"""
from __future__ import annotations
import argparse
import re
import subprocess
import sys
from collections import Counter
from pathlib import Path
from typing import Iterable, Mapping, Any
VOTE_TOKEN = "vote:"
DISCUSSION_SUFFIXES = (
".discussion.md",
".design.md",
".review.md",
".plan.md",
)
SUMMARY_SUFFIX = ".sum.md"
MENTION_PATTERN = re.compile(r"@(\w+|all)")
def extract_structured_basic(text: str) -> dict[str, list]:
"""
Derive structured discussion signals using lightweight pattern matching.
Recognises explicit markers (Q:, TODO:, DONE:, DECISION:) and @mentions.
"""
questions: list[dict[str, str]] = []
action_items: list[dict[str, str]] = []
decisions: list[dict[str, str]] = []
mentions: list[dict[str, str]] = []
timeline_data: dict[str, str] | None = None
for line in text.splitlines():
participant, remainder = _extract_participant(line)
stripped = line.strip()
if not stripped:
continue
if stripped.startswith("#"):
continue
analysis = remainder.strip() if participant else stripped
if not analysis:
continue
lowered = analysis.lower()
participant_name = participant or "unknown"
if timeline_data is None:
timeline_data = {
"participant": participant_name,
"summary": _truncate_summary(analysis),
}
# Questions
if lowered.startswith("q:") or lowered.startswith("question:"):
_, _, body = analysis.partition(":")
question_text = body.strip()
if question_text:
questions.append(
{"participant": participant_name, "question": question_text, "status": "OPEN"}
)
elif analysis.endswith("?"):
question_text = analysis.rstrip("?").strip()
if question_text:
questions.append(
{"participant": participant_name, "question": question_text, "status": "OPEN"}
)
# Action items
if lowered.startswith(("todo:", "action:")):
_, _, body = analysis.partition(":")
action_text = body.strip()
if action_text:
assignee = None
match = MENTION_PATTERN.search(line)
if match:
assignee = match.group(1)
action_items.append(
{
"participant": participant_name,
"action": action_text,
"status": "TODO",
"assignee": assignee,
}
)
elif lowered.startswith("assigned:"):
_, _, body = analysis.partition(":")
action_text = body.strip()
if action_text:
# Extract assignee from @mention in the line
assignee = participant_name # Default to participant claiming it
match = MENTION_PATTERN.search(line)
if match:
assignee = match.group(1)
action_items.append(
{
"participant": participant_name,
"action": action_text,
"status": "ASSIGNED",
"assignee": assignee,
}
)
elif lowered.startswith("done:"):
_, _, body = analysis.partition(":")
action_text = body.strip()
if action_text:
action_items.append(
{
"participant": participant_name,
"action": action_text,
"status": "DONE",
"completed_by": participant_name,
}
)
# Decisions
if lowered.startswith("decision:"):
_, _, body = analysis.partition(":")
decision_text = body.strip()
if decision_text:
decisions.append(
{
"participant": participant_name,
"decision": decision_text,
"rationale": "",
"supporters": [],
}
)
# Mentions
for match in MENTION_PATTERN.finditer(line):
mentions.append(
{
"from": participant_name,
"to": match.group(1),
"context": stripped,
}
)
return {
"questions": questions,
"action_items": action_items,
"decisions": decisions,
"mentions": mentions,
"timeline": timeline_data,
}
def _truncate_summary(text: str, limit: int = 120) -> str:
return text if len(text) <= limit else text[: limit - 1].rstrip() + ""
def get_staged_files() -> list[Path]:
"""Return staged file paths relative to the repository root."""
result = subprocess.run(
["git", "diff", "--cached", "--name-only"],
capture_output=True,
text=True,
check=False,
)
if result.returncode != 0:
sys.stderr.write("[workflow] warning: git diff --cached failed; assuming no staged files.\n")
return []
files = []
for line in result.stdout.splitlines():
line = line.strip()
if line:
files.append(Path(line))
return files
def read_staged_file(path: Path) -> str | None:
"""
Return the staged contents of `path` from the git index.
Falls back to working tree contents if the file is not in the index.
"""
spec = f":{path.as_posix()}"
result = subprocess.run(
["git", "show", spec],
capture_output=True,
text=True,
check=False,
)
if result.returncode == 0:
return result.stdout
if path.exists():
try:
return path.read_text(encoding="utf-8")
except OSError:
sys.stderr.write(f"[workflow] warning: unable to read {path}\n")
return None
return None
def find_discussions(paths: Iterable[Path]) -> list[Path]:
"""Filter staged files down to Markdown discussions (excluding summaries)."""
discussions: list[Path] = []
for path in paths:
name = path.name.lower()
if name.endswith(SUMMARY_SUFFIX):
continue
if any(name.endswith(suffix) for suffix in DISCUSSION_SUFFIXES):
discussions.append(path)
return discussions
def parse_votes(path: Path) -> Mapping[str, str]:
"""
Parse `VOTE:` lines and return the latest vote per participant.
A participant is inferred from the leading bullet label (e.g. `- Alice:`) when present,
otherwise the line index is used to avoid conflating multiple votes.
"""
latest_per_participant: dict[str, str] = {}
text = read_staged_file(path)
if text is None:
return {}
for idx, line in enumerate(text.splitlines()):
participant_name, remaining_line = _extract_participant(line)
# Now, search for "VOTE:" in the remaining_line
lower_remaining_line = remaining_line.lower()
marker_idx = lower_remaining_line.rfind(VOTE_TOKEN)
if marker_idx == -1:
continue # No VOTE_TOKEN found in this part of the line
# Extract the part after VOTE_TOKEN and pass to _extract_vote_value
vote_string_candidate = remaining_line[marker_idx + len(VOTE_TOKEN):].strip()
vote_value = _extract_vote_value(vote_string_candidate)
if vote_value:
# Determine the participant key
participant_key = participant_name if participant_name else f"line-{idx}"
latest_per_participant[participant_key] = vote_value
return latest_per_participant
def _extract_participant(line: str) -> tuple[str | None, str]:
stripped = line.strip()
if not stripped:
return None, line
if stripped[0] in "-*":
parts = stripped[1:].split(":", 1)
if len(parts) == 2:
candidate = parts[0].strip()
if candidate:
return candidate, parts[1].strip()
colon_pos = stripped.find(":")
if colon_pos > 0:
candidate = stripped[:colon_pos].strip()
remainder = stripped[colon_pos + 1 :].strip()
if candidate:
return candidate, remainder
return None, line
def _extract_vote_value(vote_string: str) -> str | None:
potential_vote = vote_string.strip().upper()
if potential_vote in ("READY", "CHANGES", "REJECT"):
return potential_vote
return None
def get_discussion_changes(discussion_path: Path) -> str:
"""
Return the staged additions for a discussion file.
When the file is newly staged, the full staged contents are returned.
Otherwise, only the added lines from the staged diff are included.
"""
result = subprocess.run(
["git", "diff", "--cached", "--unified=0", "--", discussion_path.as_posix()],
capture_output=True,
text=True,
check=False,
)
if result.returncode != 0:
sys.stderr.write(f"[workflow] warning: git diff --cached failed for {discussion_path}; using staged contents.\n")
staged = read_staged_file(discussion_path)
return staged or ""
if not result.stdout.strip():
staged = read_staged_file(discussion_path)
return staged or ""
new_lines: list[str] = []
for line in result.stdout.splitlines():
if line.startswith("+") and not line.startswith("+++"):
new_lines.append(line[1:])
if new_lines:
return "\n".join(new_lines)
staged = read_staged_file(discussion_path)
return staged or ""
def update_summary_votes(summary_path: Path, votes: Mapping[str, str]) -> None:
"""
Update the VOTES section in a summary file with current vote counts.
Creates the summary file from template if it doesn't exist.
Updates only the content between <!-- SUMMARY:VOTES START/END --> markers.
"""
# If summary doesn't exist, it will be created by pre-commit hook
# We should only update if it already exists
if not summary_path.exists():
return
try:
content = summary_path.read_text(encoding="utf-8")
except OSError:
sys.stderr.write(f"[workflow] warning: unable to read {summary_path}\n")
return
# Calculate vote counts
counts = Counter(votes.values())
ready = counts.get("READY", 0)
changes = counts.get("CHANGES", 0)
reject = counts.get("REJECT", 0)
# Build the new votes section content
votes_content_lines = [
f"READY: {ready} • CHANGES: {changes} • REJECT: {reject}"
]
if votes:
for participant, vote in sorted(votes.items()):
votes_content_lines.append(f"- {participant}: {vote}")
else:
votes_content_lines.append("- (no votes yet)")
new_votes_section = "\n".join(votes_content_lines)
# Find and replace content between markers
import re
pattern = r"(<!-- SUMMARY:VOTES START -->)(.*?)(<!-- SUMMARY:VOTES END -->)"
def replacer(match):
return f"{match.group(1)}\n## Votes (latest per participant)\n{new_votes_section}\n{match.group(3)}"
updated_content = re.sub(pattern, replacer, content, flags=re.DOTALL)
# Write back to file
try:
summary_path.write_text(updated_content, encoding="utf-8")
except OSError:
sys.stderr.write(f"[workflow] warning: unable to write {summary_path}\n")
def print_vote_summary(path: Path, votes: Mapping[str, str]) -> None:
rel = path.as_posix()
print(f"[workflow] {rel}")
if not votes:
print(" - No votes recorded.")
return
counts = Counter(votes.values())
for vote, count in sorted(counts.items()):
plural = "s" if count != 1 else ""
print(f" - {vote}: {count} vote{plural}")
print(" Participants' latest votes:")
for participant, vote in sorted(votes.items()):
print(f" - {participant}: {vote}")
def process_discussion_with_ai(
discussion_path: Path,
summary_path: Path,
incremental_content: str
) -> dict[str, Any]:
"""
Process discussion content with AI agents to extract structured information.
Returns a dict with: questions, action_items, decisions, mentions
"""
structured = extract_structured_basic(incremental_content)
if not incremental_content.strip():
return structured
try:
try:
from automation import agents
except ImportError:
import agents # type: ignore
except ImportError:
return structured
normalized = agents.normalize_discussion(incremental_content)
if normalized:
if normalized.get("questions"):
structured["questions"] = normalized["questions"]
if normalized.get("action_items"):
structured["action_items"] = normalized["action_items"]
if normalized.get("decisions"):
structured["decisions"] = normalized["decisions"]
if normalized.get("mentions"):
structured["mentions"] = normalized["mentions"]
if normalized.get("timeline"):
structured["timeline"] = normalized["timeline"]
else:
if not structured["mentions"]:
structured["mentions"] = agents.extract_mentions(incremental_content)
return structured
def _run_status() -> int:
staged = get_staged_files()
discussions = find_discussions(staged)
if not discussions:
print("[workflow] No staged discussion files.")
return 0
for discussion in discussions:
# Parse votes from the full file (maintains latest vote per participant)
votes = parse_votes(Path(discussion))
# Print summary to terminal
print_vote_summary(discussion, votes)
# Update the corresponding .sum.md file if it exists
dir_path = discussion.parent
base_name = discussion.stem # e.g., "feature-x.discussion" from "feature-x.discussion.md"
summary_path = dir_path / f"{base_name}.sum.md"
if summary_path.exists():
# Get incremental changes for AI processing
incremental_content = get_discussion_changes(Path(discussion))
# Process with AI if available
ai_data = process_discussion_with_ai(
Path(discussion),
summary_path,
incremental_content
)
# Update summary file with all extracted information
try:
# Try both import styles (for different execution contexts)
try:
from automation import summary as summary_module
except ImportError:
import summary as summary_module # type: ignore
timeline_entry = None
timeline_info = ai_data.get("timeline")
if isinstance(timeline_info, dict):
participant = timeline_info.get("participant", "unknown")
summary_text = timeline_info.get("summary", "")
if summary_text:
timeline_entry = summary_module.format_timeline_entry(participant, summary_text)
success = summary_module.update_summary_file(
summary_path,
votes=votes,
questions=ai_data.get("questions"),
action_items=ai_data.get("action_items"),
decisions=ai_data.get("decisions"),
mentions=ai_data.get("mentions"),
timeline_entry=timeline_entry,
)
if success:
# Stage the updated summary file
subprocess.run(
["git", "add", str(summary_path)],
capture_output=True,
check=False,
)
print(f"[workflow] → Updated {summary_path.as_posix()}")
except ImportError:
# Fall back to basic vote update
update_summary_votes(summary_path, votes)
subprocess.run(
["git", "add", str(summary_path)],
capture_output=True,
check=False,
)
print(f"[workflow] → Updated {summary_path.as_posix()} (votes only)")
return 0
def main(argv: list[str] | None = None) -> int:
parser = argparse.ArgumentParser(
prog="workflow.py",
description="CascadingDev automation workflow (Phase 1: status reporter)",
)
parser.add_argument(
"--status",
action="store_true",
help="Print vote status for staged discussion files (default).",
)
args = parser.parse_args(argv)
# Status is currently the only command; run it for --status or no args.
return _run_status()
if __name__ == "__main__":
sys.exit(main())