CascadingDev/automation/workflow.py

358 lines
12 KiB
Python

#!/usr/bin/env python3
"""
Discussion workflow automation for CascadingDev.
Phase 1 (Basic):
• Parse VOTE: lines and update summaries
Phase 2 (AI-Enhanced):
• Use Claude agents to extract questions, actions, decisions
• Track @mentions and awaiting replies
• Maintain timeline and structured summaries
• Process only incremental changes via git diff
Always exits 0 so pre-commit hook never blocks commits.
"""
from __future__ import annotations
import argparse
import subprocess
import sys
from collections import Counter
from pathlib import Path
from typing import Iterable, Mapping, Any
VOTE_TOKEN = "vote:"
DISCUSSION_SUFFIXES = (
".discussion.md",
".design.md",
".review.md",
".plan.md",
)
SUMMARY_SUFFIX = ".sum.md"
def get_staged_files() -> list[Path]:
"""Return staged file paths relative to the repository root."""
result = subprocess.run(
["git", "diff", "--cached", "--name-only"],
capture_output=True,
text=True,
check=False,
)
if result.returncode != 0:
sys.stderr.write("[workflow] warning: git diff --cached failed; assuming no staged files.\n")
return []
files = []
for line in result.stdout.splitlines():
line = line.strip()
if line:
files.append(Path(line))
return files
def find_discussions(paths: Iterable[Path]) -> list[Path]:
"""Filter staged files down to Markdown discussions (excluding summaries)."""
discussions: list[Path] = []
for path in paths:
name = path.name.lower()
if name.endswith(SUMMARY_SUFFIX):
continue
if any(name.endswith(suffix) for suffix in DISCUSSION_SUFFIXES):
discussions.append(path)
return discussions
def parse_votes(path: Path) -> Mapping[str, str]:
"""
Parse `VOTE:` lines and return the latest vote per participant.
A participant is inferred from the leading bullet label (e.g. `- Alice:`) when present,
otherwise the line index is used to avoid conflating multiple votes.
"""
if not path.exists():
return {}
latest_per_participant: dict[str, str] = {}
try:
text = path.read_text(encoding="utf-8")
except OSError:
sys.stderr.write(f"[workflow] warning: unable to read {path}\n")
return {}
for idx, line in enumerate(text.splitlines()):
participant_name, remaining_line = _extract_participant(line)
# Now, search for "VOTE:" in the remaining_line
lower_remaining_line = remaining_line.lower()
marker_idx = lower_remaining_line.rfind(VOTE_TOKEN)
if marker_idx == -1:
continue # No VOTE_TOKEN found in this part of the line
# Extract the part after VOTE_TOKEN and pass to _extract_vote_value
vote_string_candidate = remaining_line[marker_idx + len(VOTE_TOKEN):].strip()
vote_value = _extract_vote_value(vote_string_candidate)
if vote_value:
# Determine the participant key
participant_key = participant_name if participant_name else f"line-{idx}"
latest_per_participant[participant_key] = vote_value
return latest_per_participant
def _extract_participant(line: str) -> tuple[str | None, str]:
stripped = line.strip()
if not stripped:
return None, line
if stripped[0] in "-*":
parts = stripped[1:].split(":", 1)
if len(parts) == 2:
candidate = parts[0].strip()
if candidate:
return candidate, parts[1].strip()
return None, line
def _extract_vote_value(vote_string: str) -> str | None:
potential_vote = vote_string.strip().upper()
if potential_vote in ("READY", "CHANGES", "REJECT"):
return potential_vote
return None
def get_discussion_changes(discussion_path: Path) -> str:
"""
Get only the new lines added to a discussion file since the last commit.
Returns the entire file content if the file is new (not in HEAD),
otherwise returns only the lines that were added in the working tree.
"""
result = subprocess.run(
["git", "diff", "HEAD", str(discussion_path)],
capture_output=True,
text=True,
check=False,
)
if result.returncode != 0 or not result.stdout.strip():
# File is new (not in HEAD yet) or no changes, return entire content
if discussion_path.exists():
try:
return discussion_path.read_text(encoding="utf-8")
except OSError:
sys.stderr.write(f"[workflow] warning: unable to read {discussion_path}\n")
return ""
return ""
# Parse diff output to extract only added lines (starting with '+')
new_lines = []
for line in result.stdout.splitlines():
if line.startswith('+') and not line.startswith('+++'):
new_lines.append(line[1:]) # Remove the '+' prefix
return '\n'.join(new_lines)
def update_summary_votes(summary_path: Path, votes: Mapping[str, str]) -> None:
"""
Update the VOTES section in a summary file with current vote counts.
Creates the summary file from template if it doesn't exist.
Updates only the content between <!-- SUMMARY:VOTES START/END --> markers.
"""
# If summary doesn't exist, it will be created by pre-commit hook
# We should only update if it already exists
if not summary_path.exists():
return
try:
content = summary_path.read_text(encoding="utf-8")
except OSError:
sys.stderr.write(f"[workflow] warning: unable to read {summary_path}\n")
return
# Calculate vote counts
counts = Counter(votes.values())
ready = counts.get("READY", 0)
changes = counts.get("CHANGES", 0)
reject = counts.get("REJECT", 0)
# Build the new votes section content
votes_content_lines = [
f"READY: {ready} • CHANGES: {changes} • REJECT: {reject}"
]
if votes:
for participant, vote in sorted(votes.items()):
votes_content_lines.append(f"- {participant}: {vote}")
else:
votes_content_lines.append("- (no votes yet)")
new_votes_section = "\n".join(votes_content_lines)
# Find and replace content between markers
import re
pattern = r"(<!-- SUMMARY:VOTES START -->)(.*?)(<!-- SUMMARY:VOTES END -->)"
def replacer(match):
return f"{match.group(1)}\n## Votes (latest per participant)\n{new_votes_section}\n{match.group(3)}"
updated_content = re.sub(pattern, replacer, content, flags=re.DOTALL)
# Write back to file
try:
summary_path.write_text(updated_content, encoding="utf-8")
except OSError:
sys.stderr.write(f"[workflow] warning: unable to write {summary_path}\n")
def print_vote_summary(path: Path, votes: Mapping[str, str]) -> None:
rel = path.as_posix()
print(f"[workflow] {rel}")
if not votes:
print(" - No votes recorded.")
return
counts = Counter(votes.values())
for vote, count in sorted(counts.items()):
plural = "s" if count != 1 else ""
print(f" - {vote}: {count} vote{plural}")
def process_discussion_with_ai(
discussion_path: Path,
summary_path: Path,
incremental_content: str
) -> dict[str, Any]:
"""
Process discussion content with AI agents to extract structured information.
Returns a dict with: questions, action_items, decisions, mentions
"""
try:
# Try both import styles (for different execution contexts)
try:
from automation import agents
except ImportError:
import agents # type: ignore
except ImportError:
sys.stderr.write("[workflow] warning: agents module not available\n")
return {}
result = {}
# Extract @mentions (doesn't require Claude)
mentions = agents.extract_mentions(incremental_content)
if mentions:
result["mentions"] = mentions
# Try AI-powered extraction (requires ANTHROPIC_API_KEY)
normalized = agents.normalize_discussion(incremental_content)
if normalized:
# Extract questions
questions = normalized.get("questions", [])
if questions:
result["questions"] = questions
# Extract action items
action_items = normalized.get("action_items", [])
if action_items:
result["action_items"] = action_items
# Extract decisions
decisions = normalized.get("decisions", [])
if decisions:
result["decisions"] = decisions
return result
def _run_status() -> int:
staged = get_staged_files()
discussions = find_discussions(staged)
if not discussions:
print("[workflow] No staged discussion files.")
return 0
for discussion in discussions:
# Parse votes from the full file (maintains latest vote per participant)
votes = parse_votes(Path(discussion))
# Print summary to terminal
print_vote_summary(discussion, votes)
# Update the corresponding .sum.md file if it exists
dir_path = discussion.parent
base_name = discussion.stem # e.g., "feature-x.discussion" from "feature-x.discussion.md"
summary_path = dir_path / f"{base_name}.sum.md"
if summary_path.exists():
# Get incremental changes for AI processing
incremental_content = get_discussion_changes(Path(discussion))
# Process with AI if available
ai_data = process_discussion_with_ai(
Path(discussion),
summary_path,
incremental_content
)
# Update summary file with all extracted information
try:
# Try both import styles (for different execution contexts)
try:
from automation import summary as summary_module
except ImportError:
import summary as summary_module # type: ignore
success = summary_module.update_summary_file(
summary_path,
votes=votes,
questions=ai_data.get("questions"),
action_items=ai_data.get("action_items"),
decisions=ai_data.get("decisions"),
mentions=ai_data.get("mentions")
)
if success:
# Stage the updated summary file
subprocess.run(
["git", "add", str(summary_path)],
capture_output=True,
check=False,
)
print(f"[workflow] → Updated {summary_path.as_posix()}")
except ImportError:
# Fall back to basic vote update
update_summary_votes(summary_path, votes)
subprocess.run(
["git", "add", str(summary_path)],
capture_output=True,
check=False,
)
print(f"[workflow] → Updated {summary_path.as_posix()} (votes only)")
return 0
def main(argv: list[str] | None = None) -> int:
parser = argparse.ArgumentParser(
prog="workflow.py",
description="CascadingDev automation workflow (Phase 1: status reporter)",
)
parser.add_argument(
"--status",
action="store_true",
help="Print vote status for staged discussion files (default).",
)
args = parser.parse_args(argv)
# Status is currently the only command; run it for --status or no args.
return _run_status()
if __name__ == "__main__":
sys.exit(main())