#!/usr/bin/env python3 """ Discussion workflow automation for CascadingDev. Phase 1 (Basic): • Parse VOTE: lines and update summaries Phase 2 (AI-Enhanced): • Use Claude agents to extract questions, actions, decisions • Track @mentions and awaiting replies • Maintain timeline and structured summaries • Process only incremental changes via git diff Always exits 0 so pre-commit hook never blocks commits. """ from __future__ import annotations import argparse import re import subprocess import sys from collections import Counter from pathlib import Path from typing import Iterable, Mapping, Any VOTE_TOKEN = "vote:" DISCUSSION_SUFFIXES = ( ".discussion.md", ".design.md", ".review.md", ".plan.md", ) SUMMARY_SUFFIX = ".sum.md" MENTION_PATTERN = re.compile(r"@(\w+|all)") def extract_structured_basic(text: str) -> dict[str, list]: """ Derive structured discussion signals using lightweight pattern matching. Recognises explicit markers (Q:, TODO:, DONE:, DECISION:) and @mentions. """ questions: list[dict[str, str]] = [] action_items: list[dict[str, str]] = [] decisions: list[dict[str, str]] = [] mentions: list[dict[str, str]] = [] timeline_data: dict[str, str] | None = None in_comment = False for line in text.splitlines(): participant, remainder = _extract_participant(line) stripped = line.strip() if not stripped: continue if stripped.startswith(""): in_comment = True continue if in_comment: if stripped.endswith("-->"): in_comment = False continue if stripped.startswith("#"): continue analysis = remainder.strip() if participant else stripped if not analysis: continue lowered = analysis.lower() participant_name = participant or "unknown" if timeline_data is None: timeline_data = { "participant": participant_name, "summary": _truncate_summary(analysis), } # Questions if lowered.startswith("q:") or lowered.startswith("question:"): _, _, body = analysis.partition(":") question_text = body.strip() if question_text: questions.append( {"participant": participant_name, "question": question_text, "status": "OPEN"} ) elif analysis.endswith("?"): question_text = analysis.rstrip("?").strip() if question_text: questions.append( {"participant": participant_name, "question": question_text, "status": "OPEN"} ) # Action items if lowered.startswith(("todo:", "action:")): _, _, body = analysis.partition(":") action_text = body.strip() if action_text: assignee = None match = MENTION_PATTERN.search(line) if match: assignee = match.group(1) action_items.append( { "participant": participant_name, "action": action_text, "status": "TODO", "assignee": assignee, } ) elif lowered.startswith("assigned:"): _, _, body = analysis.partition(":") action_text = body.strip() if action_text: # Extract assignee from @mention in the line assignee = participant_name # Default to participant claiming it match = MENTION_PATTERN.search(line) if match: assignee = match.group(1) action_items.append( { "participant": participant_name, "action": action_text, "status": "ASSIGNED", "assignee": assignee, } ) elif lowered.startswith("done:"): _, _, body = analysis.partition(":") action_text = body.strip() if action_text: action_items.append( { "participant": participant_name, "action": action_text, "status": "DONE", "completed_by": participant_name, } ) # Decisions if lowered.startswith("decision:"): _, _, body = analysis.partition(":") decision_text = body.strip() if decision_text: decisions.append( { "participant": participant_name, "decision": decision_text, "rationale": "", "supporters": [], } ) # Mentions for match in MENTION_PATTERN.finditer(line): mentions.append( { "from": participant_name, "to": match.group(1), "context": stripped, } ) return { "questions": questions, "action_items": action_items, "decisions": decisions, "mentions": mentions, "timeline": timeline_data, } def _truncate_summary(text: str, limit: int = 120) -> str: return text if len(text) <= limit else text[: limit - 1].rstrip() + "…" def get_staged_files() -> list[Path]: """Return staged file paths relative to the repository root.""" result = subprocess.run( ["git", "diff", "--cached", "--name-only"], capture_output=True, text=True, check=False, ) if result.returncode != 0: sys.stderr.write("[workflow] warning: git diff --cached failed; assuming no staged files.\n") return [] files = [] for line in result.stdout.splitlines(): line = line.strip() if line: files.append(Path(line)) return files def read_staged_file(path: Path) -> str | None: """ Return the staged contents of `path` from the git index. Falls back to working tree contents if the file is not in the index. """ spec = f":{path.as_posix()}" result = subprocess.run( ["git", "show", spec], capture_output=True, text=True, check=False, ) if result.returncode == 0: return result.stdout if path.exists(): try: return path.read_text(encoding="utf-8") except OSError: sys.stderr.write(f"[workflow] warning: unable to read {path}\n") return None return None def find_discussions(paths: Iterable[Path]) -> list[Path]: """Filter staged files down to Markdown discussions (excluding summaries).""" discussions: list[Path] = [] for path in paths: name = path.name.lower() if name.endswith(SUMMARY_SUFFIX): continue if any(name.endswith(suffix) for suffix in DISCUSSION_SUFFIXES): discussions.append(path) return discussions def parse_votes(path: Path) -> Mapping[str, str]: """ Parse `VOTE:` lines and return the latest vote per participant. A participant is inferred from the leading bullet label (e.g. `- Alice:`) when present, otherwise the line index is used to avoid conflating multiple votes. """ latest_per_participant: dict[str, str] = {} text = read_staged_file(path) if text is None: return {} for idx, line in enumerate(text.splitlines()): participant_name, remaining_line = _extract_participant(line) # Now, search for "VOTE:" in the remaining_line lower_remaining_line = remaining_line.lower() marker_idx = lower_remaining_line.rfind(VOTE_TOKEN) if marker_idx == -1: continue # No VOTE_TOKEN found in this part of the line # Extract the part after VOTE_TOKEN and pass to _extract_vote_value vote_string_candidate = remaining_line[marker_idx + len(VOTE_TOKEN):].strip() vote_value = _extract_vote_value(vote_string_candidate) if vote_value: # Determine the participant key participant_key = participant_name if participant_name else f"line-{idx}" latest_per_participant[participant_key] = vote_value return latest_per_participant def _extract_participant(line: str) -> tuple[str | None, str]: stripped = line.strip() if not stripped: return None, line if stripped[0] in "-*": parts = stripped[1:].split(":", 1) if len(parts) == 2: candidate = parts[0].strip() if candidate: return candidate, parts[1].strip() colon_pos = stripped.find(":") if colon_pos > 0: candidate = stripped[:colon_pos].strip() remainder = stripped[colon_pos + 1 :].strip() if candidate: return candidate, remainder return None, line def _extract_vote_value(vote_string: str) -> str | None: potential_vote = vote_string.strip().upper() if potential_vote in ("READY", "CHANGES", "REJECT"): return potential_vote return None def get_discussion_changes(discussion_path: Path) -> str: """ Return the staged additions for a discussion file. When the file is newly staged, the full staged contents are returned. Otherwise, only the added lines from the staged diff are included. """ result = subprocess.run( ["git", "diff", "--cached", "--unified=0", "--", discussion_path.as_posix()], capture_output=True, text=True, check=False, ) if result.returncode != 0: sys.stderr.write(f"[workflow] warning: git diff --cached failed for {discussion_path}; using staged contents.\n") staged = read_staged_file(discussion_path) return staged or "" if not result.stdout.strip(): staged = read_staged_file(discussion_path) return staged or "" new_lines: list[str] = [] for line in result.stdout.splitlines(): if line.startswith("+") and not line.startswith("+++"): new_lines.append(line[1:]) if new_lines: return "\n".join(new_lines) staged = read_staged_file(discussion_path) return staged or "" def update_summary_votes(summary_path: Path, votes: Mapping[str, str]) -> None: """ Update the VOTES section in a summary file with current vote counts. Creates the summary file from template if it doesn't exist. Updates only the content between markers. """ # If summary doesn't exist, it will be created by pre-commit hook # We should only update if it already exists if not summary_path.exists(): return try: content = summary_path.read_text(encoding="utf-8") except OSError: sys.stderr.write(f"[workflow] warning: unable to read {summary_path}\n") return # Calculate vote counts counts = Counter(votes.values()) ready = counts.get("READY", 0) changes = counts.get("CHANGES", 0) reject = counts.get("REJECT", 0) # Build the new votes section content votes_content_lines = [ f"READY: {ready} • CHANGES: {changes} • REJECT: {reject}" ] if votes: for participant, vote in sorted(votes.items()): votes_content_lines.append(f"- {participant}: {vote}") else: votes_content_lines.append("- (no votes yet)") new_votes_section = "\n".join(votes_content_lines) # Find and replace content between markers import re pattern = r"()(.*?)()" def replacer(match): return f"{match.group(1)}\n## Votes (latest per participant)\n{new_votes_section}\n{match.group(3)}" updated_content = re.sub(pattern, replacer, content, flags=re.DOTALL) # Write back to file try: summary_path.write_text(updated_content, encoding="utf-8") except OSError: sys.stderr.write(f"[workflow] warning: unable to write {summary_path}\n") def print_vote_summary(path: Path, votes: Mapping[str, str]) -> None: rel = path.as_posix() print(f"[workflow] {rel}") if not votes: print(" - No votes recorded.") return counts = Counter(votes.values()) for vote, count in sorted(counts.items()): plural = "s" if count != 1 else "" print(f" - {vote}: {count} vote{plural}") print(" Participants' latest votes:") for participant, vote in sorted(votes.items()): print(f" - {participant}: {vote}") def process_discussion_with_ai( discussion_path: Path, summary_path: Path, incremental_content: str ) -> dict[str, Any]: """ Process discussion content with AI agents to extract structured information. Returns a dict with: questions, action_items, decisions, mentions """ structured = extract_structured_basic(incremental_content) if not incremental_content.strip(): return structured try: try: from automation import agents except ImportError: import agents # type: ignore except ImportError: return structured normalized = agents.normalize_discussion(incremental_content) if normalized: if normalized.get("questions"): structured["questions"] = normalized["questions"] if normalized.get("action_items"): structured["action_items"] = normalized["action_items"] if normalized.get("decisions"): structured["decisions"] = normalized["decisions"] if normalized.get("mentions"): structured["mentions"] = normalized["mentions"] if normalized.get("timeline"): structured["timeline"] = normalized["timeline"] else: if not structured["mentions"]: structured["mentions"] = agents.extract_mentions(incremental_content) return structured def _run_status() -> int: staged = get_staged_files() discussions = find_discussions(staged) if not discussions: print("[workflow] No staged discussion files.") return 0 for discussion in discussions: # Parse votes from the full file (maintains latest vote per participant) votes = parse_votes(Path(discussion)) # Print summary to terminal print_vote_summary(discussion, votes) # Update the corresponding .sum.md file if it exists dir_path = discussion.parent base_name = discussion.stem # e.g., "feature-x.discussion" from "feature-x.discussion.md" summary_path = dir_path / f"{base_name}.sum.md" if summary_path.exists(): # Get incremental changes for AI processing incremental_content = get_discussion_changes(Path(discussion)) # Process with AI if available ai_data = process_discussion_with_ai( Path(discussion), summary_path, incremental_content ) # Update summary file with all extracted information try: # Try both import styles (for different execution contexts) try: from automation import summary as summary_module except ImportError: import summary as summary_module # type: ignore timeline_entry = None timeline_info = ai_data.get("timeline") if isinstance(timeline_info, dict): participant = timeline_info.get("participant", "unknown") summary_text = timeline_info.get("summary", "") if summary_text: timeline_entry = summary_module.format_timeline_entry(participant, summary_text) success = summary_module.update_summary_file( summary_path, votes=votes, questions=ai_data.get("questions"), action_items=ai_data.get("action_items"), decisions=ai_data.get("decisions"), mentions=ai_data.get("mentions"), timeline_entry=timeline_entry, ) if success: # Stage the updated summary file subprocess.run( ["git", "add", str(summary_path)], capture_output=True, check=False, ) print(f"[workflow] → Updated {summary_path.as_posix()}") except ImportError: # Fall back to basic vote update update_summary_votes(summary_path, votes) subprocess.run( ["git", "add", str(summary_path)], capture_output=True, check=False, ) print(f"[workflow] → Updated {summary_path.as_posix()} (votes only)") return 0 def main(argv: list[str] | None = None) -> int: parser = argparse.ArgumentParser( prog="workflow.py", description="CascadingDev automation workflow (Phase 1: status reporter)", ) parser.add_argument( "--status", action="store_true", help="Print vote status for staged discussion files (default).", ) args = parser.parse_args(argv) # Status is currently the only command; run it for --status or no args. return _run_status() if __name__ == "__main__": sys.exit(main())