1227 lines
43 KiB
Python
1227 lines
43 KiB
Python
#!/usr/bin/env python3
|
|
"""
|
|
Discussion workflow automation for CascadingDev.
|
|
|
|
Phase 1 (Basic):
|
|
• Parse VOTE: lines and update summaries
|
|
|
|
Phase 2 (AI-Enhanced):
|
|
• Use Claude agents to extract questions, actions, decisions
|
|
• Track @mentions and awaiting replies
|
|
• Maintain timeline and structured summaries
|
|
• Process only incremental changes via git diff
|
|
|
|
Always exits 0 so pre-commit hook never blocks commits.
|
|
"""
|
|
from __future__ import annotations
|
|
|
|
import argparse
|
|
import re
|
|
import subprocess
|
|
import sys
|
|
import yaml
|
|
from collections import Counter
|
|
from dataclasses import dataclass
|
|
from pathlib import Path
|
|
from typing import Iterable, Mapping, Any
|
|
|
|
VOTE_TOKEN = "vote:"
|
|
DISCUSSION_SUFFIXES = (
|
|
".discussion.md",
|
|
".design.md",
|
|
".review.md",
|
|
".plan.md",
|
|
)
|
|
SUMMARY_SUFFIX = ".sum.md"
|
|
MENTION_PATTERN = re.compile(r"@(\w+|all)")
|
|
RESERVED_TOKENS = {"vote", "question", "todo", "action", "decision", "done", "assigned"}
|
|
DEFAULT_READY_STATUS = "READY_FOR_DESIGN"
|
|
DEFAULT_REJECT_STATUS = "REJECTED"
|
|
PROMOTION_STATUS_BY_STAGE: dict[str, dict[str, str]] = {
|
|
"feature": {
|
|
"ready": "READY_FOR_DESIGN",
|
|
"reject": "FEATURE_REJECTED",
|
|
},
|
|
"design": {
|
|
"ready": "READY_FOR_IMPLEMENTATION",
|
|
"reject": "DESIGN_REJECTED",
|
|
},
|
|
"implementation": {
|
|
"ready": "READY_FOR_TESTING",
|
|
"reject": "IMPLEMENTATION_REJECTED",
|
|
},
|
|
}
|
|
CHECKBOX_PATTERN = re.compile(r"^(- \[([ xX])\]\s+.*)$", re.MULTILINE)
|
|
PR_REF_PATTERN = re.compile(r"\[(?:PR\s*)?#(\d+)\]")
|
|
COMMIT_REF_PATTERN = re.compile(r"\b[0-9a-f]{7,40}\b", re.IGNORECASE)
|
|
OWNER_PATTERN = re.compile(r"@(\w+)")
|
|
AUTO_IMPLEMENTATION_BLOCK_START = "<!-- AUTO:IMPLEMENTATION_STATUS START -->"
|
|
AUTO_IMPLEMENTATION_BLOCK_END = "<!-- AUTO:IMPLEMENTATION_STATUS END -->"
|
|
|
|
|
|
def _is_human_participant(name: str | None) -> bool:
|
|
if not name:
|
|
return False
|
|
lowered = name.strip()
|
|
return bool(lowered) and not lowered.startswith("AI_") and not lowered.startswith("line-")
|
|
|
|
|
|
def _extract_checkboxes(text: str) -> list[dict[str, Any]]:
|
|
matches = CHECKBOX_PATTERN.findall(text)
|
|
checkboxes: list[dict[str, Any]] = []
|
|
for full_line, state in matches:
|
|
owner_match = OWNER_PATTERN.search(full_line)
|
|
owner = owner_match.group(1) if owner_match else None
|
|
label = full_line
|
|
closing = full_line.find("]")
|
|
if closing != -1 and closing + 1 < len(full_line):
|
|
label = full_line[closing + 1 :].strip()
|
|
pr_refs = {f"#{match}" for match in PR_REF_PATTERN.findall(full_line)}
|
|
commit_refs = {
|
|
match for match in COMMIT_REF_PATTERN.findall(full_line)
|
|
if len(match) >= 7 # filter out short hashes / numbers
|
|
}
|
|
refs: list[str] = sorted(pr_refs | commit_refs, key=str.lower)
|
|
checkboxes.append(
|
|
{
|
|
"line": full_line,
|
|
"label": label,
|
|
"done": state.strip().lower() == "x",
|
|
"owner": owner,
|
|
"refs": refs,
|
|
}
|
|
)
|
|
return checkboxes
|
|
|
|
|
|
def _parse_threshold_spec(spec: int | str, total: int, human_total: int) -> tuple[int, str | None]:
|
|
if isinstance(spec, int):
|
|
return spec, None
|
|
lowered = str(spec).strip().lower()
|
|
if lowered == "all":
|
|
return total, "all"
|
|
if lowered == "all_human":
|
|
return human_total, "all_human"
|
|
human_match = re.match(r"(\d+)_human", lowered)
|
|
if human_match:
|
|
return int(human_match.group(1)), "human"
|
|
try:
|
|
return int(lowered), None
|
|
except ValueError:
|
|
return total, None
|
|
|
|
@dataclass
|
|
class CommentBlock:
|
|
participant: str
|
|
body_lines: list[str]
|
|
vote: str | None = None
|
|
|
|
def comment_text(self) -> str:
|
|
"""
|
|
Return the participant's comment as a trimmed string.
|
|
"""
|
|
if not self.body_lines:
|
|
return ""
|
|
joined = "\n".join(line.rstrip() for line in self.body_lines)
|
|
return joined.strip()
|
|
|
|
|
|
def _strip_vote_suffix(text: str) -> str:
|
|
"""
|
|
Remove trailing 'VOTE: ...' segments from extracted snippets.
|
|
"""
|
|
parts = re.split(r"\bVOTE:\s*(READY|CHANGES|REJECT)\b", text, flags=re.IGNORECASE)
|
|
if parts:
|
|
return parts[0].strip()
|
|
return text.strip()
|
|
|
|
|
|
def extract_comment_blocks(text: str) -> list[CommentBlock]:
|
|
"""
|
|
Extract participant comment blocks between Name:/VOTE: markers.
|
|
|
|
Supports the new multi-line structure:
|
|
Name: Alice
|
|
...comment lines...
|
|
VOTE: READY
|
|
|
|
Falls back to legacy single-line "Alice: ... VOTE: READY" format if the new
|
|
markers are not present.
|
|
"""
|
|
lines = text.splitlines()
|
|
if not lines:
|
|
return []
|
|
|
|
# Skip YAML front matter from consideration
|
|
processed_lines: list[tuple[int, str]] = []
|
|
in_front_matter = False
|
|
for idx, raw_line in enumerate(lines):
|
|
stripped = raw_line.strip()
|
|
if idx == 0 and stripped == "---":
|
|
in_front_matter = True
|
|
continue
|
|
if in_front_matter:
|
|
if stripped == "---":
|
|
in_front_matter = False
|
|
continue
|
|
processed_lines.append((idx, raw_line))
|
|
|
|
def _extract_name_header(line: str) -> tuple[str | None, str]:
|
|
stripped = line.strip()
|
|
if not stripped:
|
|
return None, ""
|
|
lowered = stripped.lower()
|
|
if lowered.startswith("name:"):
|
|
name = stripped[5:].strip()
|
|
return (name or None), ""
|
|
if stripped[0] in "-*":
|
|
body = stripped[1:].lstrip()
|
|
lowered_body = body.lower()
|
|
if lowered_body.startswith("name:"):
|
|
name = body[5:].strip()
|
|
return (name or None), ""
|
|
return None, ""
|
|
|
|
uses_name_blocks = any(
|
|
raw.strip().lower().startswith("name:")
|
|
or (raw.strip().startswith(("-", "*")) and raw.strip()[1:].lstrip().lower().startswith("name:"))
|
|
for _, raw in processed_lines
|
|
)
|
|
|
|
blocks: list[CommentBlock] = []
|
|
|
|
if uses_name_blocks:
|
|
current: CommentBlock | None = None
|
|
for idx, raw_line in processed_lines:
|
|
stripped = raw_line.strip()
|
|
if not stripped:
|
|
if current is not None:
|
|
current.body_lines.append("")
|
|
continue
|
|
if stripped == "---":
|
|
# section separator; close comment body spacing
|
|
if current is not None and current.body_lines and current.body_lines[-1]:
|
|
current.body_lines.append("")
|
|
continue
|
|
|
|
name_header, remainder = _extract_name_header(raw_line)
|
|
if name_header:
|
|
# Finalize existing block by trimming trailing blanks
|
|
if current is not None:
|
|
while current.body_lines and not current.body_lines[-1].strip():
|
|
current.body_lines.pop()
|
|
current = CommentBlock(participant=name_header, body_lines=[], vote=None)
|
|
blocks.append(current)
|
|
if remainder:
|
|
current.body_lines.append(remainder.strip())
|
|
continue
|
|
|
|
if current is None:
|
|
# Ignore lines before the first Name header
|
|
continue
|
|
|
|
lowered = stripped.lower()
|
|
if lowered.startswith("vote:"):
|
|
vote_value = _extract_vote_value(stripped[5:].strip())
|
|
if vote_value:
|
|
current.vote = vote_value
|
|
continue
|
|
|
|
current.body_lines.append(stripped)
|
|
|
|
if current is not None:
|
|
while current.body_lines and not current.body_lines[-1].strip():
|
|
current.body_lines.pop()
|
|
|
|
return [block for block in blocks if block.participant]
|
|
|
|
# Legacy fallback: treat "Alice: ... VOTE: " lines as single blocks
|
|
current: CommentBlock | None = None
|
|
for idx, raw_line in processed_lines:
|
|
stripped = raw_line.strip()
|
|
if not stripped:
|
|
if current is not None:
|
|
current.body_lines.append("")
|
|
continue
|
|
|
|
participant_name, remainder = _extract_participant(raw_line)
|
|
lowered = stripped.lower()
|
|
|
|
if participant_name:
|
|
current = CommentBlock(participant=participant_name, body_lines=[], vote=None)
|
|
blocks.append(current)
|
|
if remainder:
|
|
lower_remainder = remainder.lower()
|
|
marker_idx = lower_remainder.rfind(VOTE_TOKEN)
|
|
if marker_idx != -1:
|
|
vote_candidate = remainder[marker_idx + len(VOTE_TOKEN):].strip()
|
|
vote_value = _extract_vote_value(vote_candidate)
|
|
if vote_value:
|
|
current.vote = vote_value
|
|
comment_part = remainder[:marker_idx].strip()
|
|
if comment_part:
|
|
current.body_lines.append(comment_part)
|
|
else:
|
|
current.body_lines.append(remainder.strip())
|
|
continue
|
|
|
|
if current is None:
|
|
continue
|
|
|
|
if VOTE_TOKEN in lowered:
|
|
marker_idx = lowered.rfind(VOTE_TOKEN)
|
|
vote_candidate = stripped[marker_idx + len(VOTE_TOKEN):].strip()
|
|
vote_value = _extract_vote_value(vote_candidate)
|
|
if vote_value:
|
|
current.vote = vote_value
|
|
comment_part = stripped[:marker_idx].strip()
|
|
if comment_part:
|
|
current.body_lines.append(comment_part)
|
|
continue
|
|
|
|
current.body_lines.append(stripped)
|
|
|
|
return [block for block in blocks if block.participant]
|
|
|
|
|
|
def summarize_participants(blocks: list[CommentBlock]) -> list[dict[str, Any]]:
|
|
"""
|
|
Produce a participant roster from comment blocks, retaining the latest
|
|
comment body and vote per participant.
|
|
"""
|
|
roster: dict[str, dict[str, Any]] = {}
|
|
order: list[str] = []
|
|
|
|
for idx, block in enumerate(blocks):
|
|
name = block.participant or f"unknown-{idx}"
|
|
entry = roster.get(name)
|
|
if entry is None:
|
|
entry = {
|
|
"name": name,
|
|
"is_agent": name.upper().startswith("AI_"),
|
|
"last_comment": "",
|
|
"vote": None,
|
|
}
|
|
roster[name] = entry
|
|
order.append(name)
|
|
|
|
comment_text = block.comment_text()
|
|
if comment_text:
|
|
entry["last_comment"] = comment_text
|
|
if block.vote:
|
|
entry["vote"] = block.vote
|
|
|
|
return [roster[name] for name in order]
|
|
|
|
|
|
def extract_structured_basic(text: str) -> dict[str, list]:
|
|
"""
|
|
Minimal fallback extraction for strictly-formatted line-start markers.
|
|
|
|
Only matches explicit markers at the start of text (case-insensitive):
|
|
- DECISION: - Architectural/technical decisions
|
|
- QUESTION: / Q: - Open questions needing answers
|
|
- ACTION: / TODO: - Action items with optional @assignee
|
|
- ASSIGNED: / DONE: - Legacy status markers
|
|
- @mentions - References to participants
|
|
|
|
Natural conversation with embedded markers (e.g., "I think **DECISION**: we should...")
|
|
is handled by AI normalization in agents.py. This function serves as a simple
|
|
fallback when AI is unavailable or fails.
|
|
"""
|
|
questions: list[dict[str, str]] = []
|
|
action_items: list[dict[str, str]] = []
|
|
decisions: list[dict[str, str]] = []
|
|
mentions: list[dict[str, str]] = []
|
|
timeline_data: dict[str, str] | None = None
|
|
|
|
in_comment = False
|
|
|
|
current_participant: str | None = None
|
|
|
|
for line in text.splitlines():
|
|
participant, remainder = _extract_participant(line)
|
|
stripped = line.strip()
|
|
if not stripped:
|
|
continue
|
|
lowered_stripped = stripped.lower()
|
|
if lowered_stripped.startswith("name:"):
|
|
name_candidate = stripped[5:].strip()
|
|
if name_candidate:
|
|
current_participant = name_candidate
|
|
continue
|
|
if lowered_stripped.startswith("vote:"):
|
|
continue
|
|
if stripped.startswith("<!--"):
|
|
if not stripped.endswith("-->"):
|
|
in_comment = True
|
|
continue
|
|
if in_comment:
|
|
if stripped.endswith("-->"):
|
|
in_comment = False
|
|
continue
|
|
if stripped.startswith("#"):
|
|
continue
|
|
if participant:
|
|
current_participant = participant
|
|
remaining_line = remainder.strip()
|
|
else:
|
|
remaining_line = stripped
|
|
|
|
if not remaining_line:
|
|
continue
|
|
analysis = remaining_line
|
|
participant_name = participant or current_participant or "unknown"
|
|
|
|
if timeline_data is None:
|
|
timeline_data = {
|
|
"participant": participant_name,
|
|
"summary": _truncate_summary(analysis),
|
|
}
|
|
|
|
# Simple line-start matching for explicit markers only
|
|
# Natural conversation is handled by AI normalization in agents.py
|
|
lowered = analysis.lower()
|
|
question_recorded = False
|
|
|
|
if lowered.startswith("decision:"):
|
|
decision_text = _strip_vote_suffix(analysis[9:].strip())
|
|
if decision_text:
|
|
decisions.append({
|
|
"participant": participant_name,
|
|
"decision": decision_text,
|
|
"rationale": "",
|
|
"supporters": [],
|
|
})
|
|
|
|
elif lowered.startswith("question:"):
|
|
question_text = _strip_vote_suffix(analysis[9:].strip())
|
|
if question_text:
|
|
questions.append({
|
|
"participant": participant_name,
|
|
"question": question_text,
|
|
"status": "OPEN",
|
|
})
|
|
question_recorded = True
|
|
|
|
elif lowered.startswith("q:"):
|
|
question_text = _strip_vote_suffix(analysis[2:].strip())
|
|
if question_text:
|
|
questions.append({
|
|
"participant": participant_name,
|
|
"question": question_text,
|
|
"status": "OPEN",
|
|
})
|
|
question_recorded = True
|
|
|
|
elif lowered.startswith("action:"):
|
|
action_text = _strip_vote_suffix(analysis[7:].strip())
|
|
if action_text:
|
|
assignee = None
|
|
mention_match = MENTION_PATTERN.search(action_text)
|
|
if mention_match:
|
|
assignee = mention_match.group(1)
|
|
action_items.append({
|
|
"participant": participant_name,
|
|
"action": action_text,
|
|
"status": "TODO",
|
|
"assignee": assignee,
|
|
})
|
|
|
|
elif lowered.startswith("todo:"):
|
|
action_text = _strip_vote_suffix(analysis[5:].strip())
|
|
if action_text:
|
|
assignee = None
|
|
mention_match = MENTION_PATTERN.search(action_text)
|
|
if mention_match:
|
|
assignee = mention_match.group(1)
|
|
action_items.append({
|
|
"participant": participant_name,
|
|
"action": action_text,
|
|
"status": "TODO",
|
|
"assignee": assignee,
|
|
})
|
|
|
|
if not question_recorded:
|
|
question_scope = _strip_vote_suffix(analysis)
|
|
scope_stripped = question_scope.strip()
|
|
if scope_stripped.endswith("?"):
|
|
question_text = scope_stripped
|
|
if question_text:
|
|
questions.append({
|
|
"participant": participant_name,
|
|
"question": question_text,
|
|
"status": "OPEN",
|
|
})
|
|
|
|
# Legacy support for plain text markers at line start
|
|
if lowered.startswith("assigned:"):
|
|
_, _, action_text = analysis.partition(":")
|
|
action_text = action_text.strip()
|
|
if action_text:
|
|
assignee = participant_name
|
|
mention_match = MENTION_PATTERN.search(line)
|
|
if mention_match:
|
|
assignee = mention_match.group(1)
|
|
action_items.append(
|
|
{
|
|
"participant": participant_name,
|
|
"action": action_text,
|
|
"status": "ASSIGNED",
|
|
"assignee": assignee,
|
|
}
|
|
)
|
|
elif lowered.startswith("done:"):
|
|
_, _, action_text = analysis.partition(":")
|
|
action_text = action_text.strip()
|
|
if action_text:
|
|
action_items.append(
|
|
{
|
|
"participant": participant_name,
|
|
"action": action_text,
|
|
"status": "DONE",
|
|
"completed_by": participant_name,
|
|
}
|
|
)
|
|
|
|
# Mentions
|
|
for match in MENTION_PATTERN.finditer(line):
|
|
mentions.append(
|
|
{
|
|
"from": participant_name,
|
|
"to": match.group(1),
|
|
"context": stripped,
|
|
}
|
|
)
|
|
|
|
return {
|
|
"questions": questions,
|
|
"action_items": action_items,
|
|
"decisions": decisions,
|
|
"mentions": mentions,
|
|
"timeline": timeline_data,
|
|
}
|
|
|
|
|
|
def _truncate_summary(text: str, limit: int = 120) -> str:
|
|
return text if len(text) <= limit else text[: limit - 1].rstrip() + "…"
|
|
|
|
|
|
def get_staged_files() -> list[Path]:
|
|
"""Return staged file paths relative to the repository root."""
|
|
result = subprocess.run(
|
|
["git", "diff", "--cached", "--name-only"],
|
|
capture_output=True,
|
|
text=True,
|
|
check=False,
|
|
)
|
|
if result.returncode != 0:
|
|
sys.stderr.write("[workflow] warning: git diff --cached failed; assuming no staged files.\n")
|
|
return []
|
|
|
|
files = []
|
|
for line in result.stdout.splitlines():
|
|
line = line.strip()
|
|
if line:
|
|
files.append(Path(line))
|
|
return files
|
|
|
|
|
|
def read_staged_file(path: Path) -> str | None:
|
|
"""
|
|
Return the staged contents of `path` from the git index.
|
|
|
|
Falls back to working tree contents if the file is not in the index.
|
|
"""
|
|
spec = f":{path.as_posix()}"
|
|
result = subprocess.run(
|
|
["git", "show", spec],
|
|
capture_output=True,
|
|
text=True,
|
|
check=False,
|
|
)
|
|
if result.returncode == 0:
|
|
return result.stdout
|
|
|
|
if path.exists():
|
|
try:
|
|
return path.read_text(encoding="utf-8")
|
|
except OSError:
|
|
sys.stderr.write(f"[workflow] warning: unable to read {path}\n")
|
|
return None
|
|
return None
|
|
|
|
|
|
def find_discussions(paths: Iterable[Path]) -> list[Path]:
|
|
"""Filter staged files down to Markdown discussions (excluding summaries)."""
|
|
discussions: list[Path] = []
|
|
for path in paths:
|
|
# Ensure the path is within the 'Docs' directory
|
|
if not path.as_posix().startswith("Docs/"):
|
|
continue
|
|
name = path.name.lower()
|
|
if name.endswith(SUMMARY_SUFFIX):
|
|
continue
|
|
if any(name.endswith(suffix) for suffix in DISCUSSION_SUFFIXES):
|
|
discussions.append(path)
|
|
return discussions
|
|
|
|
|
|
def parse_comment_blocks(path: Path) -> list[CommentBlock]:
|
|
"""
|
|
Parse the staged snapshot of `path` and return comment blocks.
|
|
"""
|
|
text = read_staged_file(path)
|
|
if text is None:
|
|
return []
|
|
return extract_comment_blocks(text)
|
|
|
|
|
|
def parse_votes(path: Path, blocks: list[CommentBlock] | None = None) -> Mapping[str, str]:
|
|
"""
|
|
Parse `VOTE:` lines and return the latest vote per participant.
|
|
|
|
A participant is inferred from the leading bullet label (e.g. `- Alice:`) when present,
|
|
otherwise the line index is used to avoid conflating multiple votes.
|
|
"""
|
|
latest_per_participant: dict[str, str] = {}
|
|
if blocks is None:
|
|
blocks = parse_comment_blocks(path)
|
|
if not blocks:
|
|
return latest_per_participant
|
|
|
|
for idx, block in enumerate(blocks):
|
|
if block.vote:
|
|
key = block.participant or f"block-{idx}"
|
|
latest_per_participant[key] = block.vote
|
|
|
|
return latest_per_participant
|
|
|
|
|
|
def _extract_participant(line: str) -> tuple[str | None, str]:
|
|
stripped = line.strip()
|
|
if not stripped:
|
|
return None, line
|
|
lowered = stripped.lower()
|
|
if lowered.startswith("name:"):
|
|
candidate = stripped[5:].strip()
|
|
if candidate:
|
|
return candidate, ""
|
|
if stripped[0] in "-*":
|
|
body = stripped[1:].lstrip()
|
|
lowered_body = body.lower()
|
|
if lowered_body.startswith("name:"):
|
|
candidate = body[5:].strip()
|
|
if candidate:
|
|
return candidate, ""
|
|
parts = body.split(":", 1)
|
|
if len(parts) == 2:
|
|
candidate = parts[0].strip()
|
|
if candidate.lower() in RESERVED_TOKENS:
|
|
return None, line
|
|
if candidate:
|
|
return candidate, parts[1].strip()
|
|
colon_pos = stripped.find(":")
|
|
if colon_pos > 0:
|
|
candidate = stripped[:colon_pos].strip()
|
|
if candidate.lower() in RESERVED_TOKENS:
|
|
return None, line
|
|
remainder = stripped[colon_pos + 1 :].strip()
|
|
if candidate:
|
|
return candidate, remainder
|
|
return None, line
|
|
|
|
|
|
def _extract_vote_value(vote_string: str) -> str | None:
|
|
potential_vote = vote_string.strip().upper()
|
|
if potential_vote in ("READY", "CHANGES", "REJECT"):
|
|
return potential_vote
|
|
return None
|
|
def get_discussion_changes(discussion_path: Path) -> str:
|
|
"""
|
|
Return the staged additions for a discussion file.
|
|
|
|
When the file is newly staged, the full staged contents are returned.
|
|
Otherwise, only the added lines from the staged diff are included.
|
|
"""
|
|
result = subprocess.run(
|
|
["git", "diff", "--cached", "--unified=0", "--", discussion_path.as_posix()],
|
|
capture_output=True,
|
|
text=True,
|
|
check=False,
|
|
)
|
|
|
|
if result.returncode != 0:
|
|
sys.stderr.write(f"[workflow] warning: git diff --cached failed for {discussion_path}; using staged contents.\n")
|
|
staged = read_staged_file(discussion_path)
|
|
return staged or ""
|
|
|
|
if not result.stdout.strip():
|
|
staged = read_staged_file(discussion_path)
|
|
return staged or ""
|
|
|
|
new_lines: list[str] = []
|
|
for line in result.stdout.splitlines():
|
|
if line.startswith("+") and not line.startswith("+++"):
|
|
new_lines.append(line[1:])
|
|
|
|
if new_lines:
|
|
return "\n".join(new_lines)
|
|
|
|
staged = read_staged_file(discussion_path)
|
|
return staged or ""
|
|
|
|
|
|
def update_summary_votes(summary_path: Path, votes: Mapping[str, str]) -> None:
|
|
"""
|
|
Update the VOTES section in a summary file with current vote counts.
|
|
|
|
Creates the summary file from template if it doesn't exist.
|
|
Updates only the content between <!-- SUMMARY:VOTES START/END --> markers.
|
|
"""
|
|
# If summary doesn't exist, it will be created by pre-commit hook
|
|
# We should only update if it already exists
|
|
if not summary_path.exists():
|
|
return
|
|
|
|
try:
|
|
content = summary_path.read_text(encoding="utf-8")
|
|
except OSError:
|
|
sys.stderr.write(f"[workflow] warning: unable to read {summary_path}\n")
|
|
return
|
|
|
|
# Calculate vote counts
|
|
counts = Counter(votes.values())
|
|
ready = counts.get("READY", 0)
|
|
changes = counts.get("CHANGES", 0)
|
|
reject = counts.get("REJECT", 0)
|
|
|
|
# Build the new votes section content
|
|
votes_content_lines = [
|
|
f"READY: {ready} • CHANGES: {changes} • REJECT: {reject}"
|
|
]
|
|
|
|
if votes:
|
|
for participant, vote in sorted(votes.items()):
|
|
votes_content_lines.append(f"- {participant}: {vote}")
|
|
else:
|
|
votes_content_lines.append("- (no votes yet)")
|
|
|
|
new_votes_section = "\n".join(votes_content_lines)
|
|
|
|
# Find and replace content between markers
|
|
import re
|
|
pattern = r"(<!-- SUMMARY:VOTES START -->)(.*?)(<!-- SUMMARY:VOTES END -->)"
|
|
|
|
def replacer(match):
|
|
return f"{match.group(1)}\n## Votes (latest per participant)\n{new_votes_section}\n{match.group(3)}"
|
|
|
|
updated_content = re.sub(pattern, replacer, content, flags=re.DOTALL)
|
|
|
|
# Write back to file
|
|
try:
|
|
summary_path.write_text(updated_content, encoding="utf-8")
|
|
except OSError:
|
|
sys.stderr.write(f"[workflow] warning: unable to write {summary_path}\n")
|
|
|
|
|
|
def print_vote_summary(path: Path, votes: Mapping[str, str]) -> None:
|
|
rel = path.as_posix()
|
|
print(f"[workflow] {rel}")
|
|
if not votes:
|
|
print(" - No votes recorded.")
|
|
return
|
|
|
|
counts = Counter(votes.values())
|
|
for vote, count in sorted(counts.items()):
|
|
plural = "s" if count != 1 else ""
|
|
print(f" - {vote}: {count} vote{plural}")
|
|
|
|
print(" Participants' latest votes:")
|
|
for participant, vote in sorted(votes.items()):
|
|
print(f" - {participant}: {vote}")
|
|
|
|
|
|
def process_discussion_with_ai(
|
|
discussion_path: Path,
|
|
summary_path: Path,
|
|
incremental_content: str
|
|
) -> dict[str, Any]:
|
|
"""
|
|
Process discussion content with AI agents to extract structured information.
|
|
|
|
Returns a dict with: questions, action_items, decisions, mentions
|
|
"""
|
|
structured = extract_structured_basic(incremental_content)
|
|
if not incremental_content.strip():
|
|
return structured
|
|
|
|
try:
|
|
try:
|
|
from automation import agents
|
|
except ImportError:
|
|
import agents # type: ignore
|
|
except ImportError:
|
|
return structured
|
|
|
|
normalized = agents.normalize_discussion(incremental_content)
|
|
if normalized:
|
|
if normalized.get("questions"):
|
|
structured["questions"] = normalized["questions"]
|
|
if normalized.get("action_items"):
|
|
structured["action_items"] = normalized["action_items"]
|
|
if normalized.get("decisions"):
|
|
structured["decisions"] = normalized["decisions"]
|
|
if normalized.get("mentions"):
|
|
structured["mentions"] = normalized["mentions"]
|
|
if normalized.get("timeline"):
|
|
structured["timeline"] = normalized["timeline"]
|
|
else:
|
|
if not structured["mentions"]:
|
|
structured["mentions"] = agents.extract_mentions(incremental_content)
|
|
|
|
return structured
|
|
|
|
|
|
def parse_yaml_frontmatter(file_path: Path) -> tuple[dict[str, Any] | None, str]:
|
|
"""
|
|
Parse YAML front matter from a markdown file.
|
|
|
|
Returns (metadata_dict, remaining_content)
|
|
If no front matter found, returns (None, full_content)
|
|
"""
|
|
content = file_path.read_text(encoding="utf-8")
|
|
|
|
# Check for YAML front matter (--- at start and end)
|
|
if not content.startswith("---\n"):
|
|
return None, content
|
|
|
|
# Find the closing ---
|
|
end_marker = content.find("\n---\n", 4)
|
|
if end_marker == -1:
|
|
return None, content
|
|
|
|
yaml_content = content[4:end_marker]
|
|
remaining = content[end_marker + 5:] # Skip "\n---\n"
|
|
|
|
try:
|
|
metadata = yaml.safe_load(yaml_content)
|
|
return metadata, remaining
|
|
except yaml.YAMLError:
|
|
return None, content
|
|
|
|
|
|
def count_eligible_votes(votes: Mapping[str, str], allow_agent_votes: bool) -> dict[str, list[str]]:
|
|
"""
|
|
Count eligible votes by value, filtering out AI votes if configured.
|
|
|
|
Returns dict with keys: "READY", "CHANGES", "REJECT"
|
|
Each value is a list of participant names who voted that way.
|
|
"""
|
|
vote_groups: dict[str, list[str]] = {"READY": [], "CHANGES": [], "REJECT": []}
|
|
|
|
for participant, vote_value in votes.items():
|
|
# Skip AI votes if not allowed
|
|
if not allow_agent_votes and participant.startswith("AI_"):
|
|
continue
|
|
|
|
# Skip line-based keys (e.g., "line-5")
|
|
if participant.startswith("line-"):
|
|
continue
|
|
|
|
vote_upper = vote_value.upper()
|
|
if vote_upper in vote_groups:
|
|
vote_groups[vote_upper].append(participant)
|
|
|
|
return vote_groups
|
|
|
|
|
|
def resolve_status_targets(
|
|
stage: str | None,
|
|
promotion_rule: Mapping[str, Any] | None
|
|
) -> tuple[str, str]:
|
|
"""
|
|
Determine ready/reject status targets for a discussion stage.
|
|
|
|
Supports overrides via promotion_rule:
|
|
promotion_rule.ready_status
|
|
promotion_rule.reject_status
|
|
Falls back to stage-specific defaults when available.
|
|
"""
|
|
ready_override = None
|
|
reject_override = None
|
|
if promotion_rule:
|
|
ready_override = promotion_rule.get("ready_status")
|
|
reject_override = promotion_rule.get("reject_status")
|
|
|
|
stage_key = (stage or "").lower()
|
|
stage_defaults = PROMOTION_STATUS_BY_STAGE.get(stage_key, {})
|
|
|
|
ready_status = str(ready_override or stage_defaults.get("ready", DEFAULT_READY_STATUS))
|
|
reject_status = str(reject_override or stage_defaults.get("reject", DEFAULT_REJECT_STATUS))
|
|
|
|
return ready_status, reject_status
|
|
|
|
|
|
def check_promotion_threshold(
|
|
vote_groups: dict[str, list[str]],
|
|
ready_min: int | str,
|
|
reject_min: int | str,
|
|
ready_status: str,
|
|
reject_status: str
|
|
) -> str | None:
|
|
"""
|
|
Check if promotion thresholds are met.
|
|
|
|
Returns:
|
|
- "READY_FOR_DESIGN" (or next stage) if ready threshold met and reject not met
|
|
- "REJECTED" if reject threshold met
|
|
- None if no promotion
|
|
"""
|
|
ready_count = len(vote_groups["READY"])
|
|
reject_count = len(vote_groups["REJECT"])
|
|
total_eligible = sum(len(voters) for voters in vote_groups.values())
|
|
|
|
human_ready = {name for name in vote_groups["READY"] if _is_human_participant(name)}
|
|
human_reject = {name for name in vote_groups["REJECT"] if _is_human_participant(name)}
|
|
human_voters = {
|
|
name for voters in vote_groups.values() for name in voters if _is_human_participant(name)
|
|
}
|
|
human_total = len(human_voters)
|
|
|
|
ready_threshold, ready_mode = _parse_threshold_spec(ready_min, total_eligible, human_total)
|
|
reject_threshold, reject_mode = _parse_threshold_spec(reject_min, total_eligible, human_total)
|
|
|
|
# Evaluate reject condition first (blocking)
|
|
reject_condition = reject_count >= reject_threshold
|
|
if reject_mode == "all":
|
|
reject_condition = reject_condition and total_eligible > 0
|
|
elif reject_mode == "human":
|
|
reject_condition = reject_condition and human_total > 0 and len(human_reject) >= reject_threshold
|
|
elif reject_mode == "all_human":
|
|
reject_condition = reject_condition and human_total > 0 and len(human_reject) == human_total
|
|
|
|
if reject_condition:
|
|
return reject_status
|
|
|
|
ready_condition = ready_count >= ready_threshold
|
|
if ready_mode == "all":
|
|
ready_condition = ready_condition and total_eligible > 0
|
|
elif ready_mode == "human":
|
|
ready_condition = ready_condition and human_total > 0 and len(human_ready) >= ready_threshold
|
|
elif ready_mode == "all_human":
|
|
ready_condition = ready_condition and human_total > 0 and len(human_ready) == human_total
|
|
|
|
if ready_condition:
|
|
return ready_status
|
|
|
|
return None
|
|
|
|
|
|
def _implementation_tasks_complete(discussion_path: Path) -> bool:
|
|
"""
|
|
Return True when all checkboxes in the staged implementation discussion are complete.
|
|
"""
|
|
text = read_staged_file(discussion_path)
|
|
if not text:
|
|
return False
|
|
checkboxes = _extract_checkboxes(text)
|
|
if not checkboxes:
|
|
return False
|
|
return all(item["done"] for item in checkboxes)
|
|
|
|
|
|
def _sync_implementation_status_comment(discussion_path: Path, checkboxes: list[dict[str, Any]]) -> bool:
|
|
"""Ensure the implementation discussion has an updated automation status comment."""
|
|
try:
|
|
text = discussion_path.read_text(encoding="utf-8")
|
|
except OSError:
|
|
return False
|
|
|
|
pattern = re.compile(
|
|
re.escape(AUTO_IMPLEMENTATION_BLOCK_START) + r".*?" + re.escape(AUTO_IMPLEMENTATION_BLOCK_END),
|
|
re.DOTALL,
|
|
)
|
|
|
|
if not checkboxes:
|
|
if pattern.search(text):
|
|
updated = pattern.sub("", text).rstrip() + "\n"
|
|
discussion_path.write_text(updated, encoding="utf-8")
|
|
return True
|
|
return False
|
|
|
|
total = len(checkboxes)
|
|
completed = sum(1 for cb in checkboxes if cb["done"])
|
|
remaining_labels = [cb["label"].strip() for cb in checkboxes if not cb["done"]]
|
|
|
|
vote_value = "READY" if not remaining_labels else "CHANGES"
|
|
remaining_text = ", ".join(remaining_labels[:3]) if remaining_labels else "None"
|
|
if len(remaining_labels) > 3:
|
|
remaining_text += f" (+{len(remaining_labels) - 3} more)"
|
|
|
|
percent = 0 if total == 0 else int(round((completed / total) * 100))
|
|
summary_line = f"Tracked tasks: {completed}/{total} complete ({percent}%)"
|
|
outstanding_line = f"Outstanding: {remaining_text}" if remaining_labels else "Outstanding: None"
|
|
|
|
new_block = "\n".join(
|
|
[
|
|
AUTO_IMPLEMENTATION_BLOCK_START,
|
|
"Name: AI_Implementer",
|
|
summary_line,
|
|
outstanding_line,
|
|
f"VOTE: {vote_value}",
|
|
AUTO_IMPLEMENTATION_BLOCK_END,
|
|
"",
|
|
]
|
|
)
|
|
|
|
if pattern.search(text):
|
|
updated = pattern.sub(new_block, text)
|
|
else:
|
|
updated = text.rstrip() + "\n\n" + new_block
|
|
|
|
if updated != text:
|
|
discussion_path.write_text(updated, encoding="utf-8")
|
|
return True
|
|
return False
|
|
|
|
|
|
def update_discussion_status(file_path: Path, new_status: str) -> bool:
|
|
"""
|
|
Update the status field in the YAML front matter of a discussion file.
|
|
|
|
Returns True if successful, False otherwise.
|
|
"""
|
|
try:
|
|
content = file_path.read_text(encoding="utf-8")
|
|
|
|
# Find and replace status line in YAML front matter
|
|
# Pattern: "status: <old_value>" -> "status: <new_value>"
|
|
updated = re.sub(
|
|
r'(status:\s+)\S+',
|
|
f'\\1{new_status}',
|
|
content,
|
|
count=1
|
|
)
|
|
|
|
if updated != content:
|
|
file_path.write_text(updated, encoding="utf-8")
|
|
return True
|
|
|
|
return False
|
|
except Exception as e:
|
|
sys.stderr.write(f"[workflow] warning: failed to update status in {file_path}: {e}\n")
|
|
return False
|
|
|
|
|
|
def _run_status() -> int:
|
|
staged = get_staged_files()
|
|
discussions = find_discussions(staged)
|
|
if not discussions:
|
|
print("[workflow] No staged discussion files.")
|
|
return 0
|
|
|
|
for discussion in discussions:
|
|
discussion_path = Path(discussion)
|
|
comment_blocks = parse_comment_blocks(discussion_path)
|
|
# Parse votes from the full file (maintains the latest vote per participant)
|
|
votes = parse_votes(discussion_path, comment_blocks)
|
|
participant_roster = summarize_participants(comment_blocks)
|
|
|
|
# Print summary to terminal
|
|
print_vote_summary(discussion, votes)
|
|
|
|
# Check for status promotion
|
|
metadata, _ = parse_yaml_frontmatter(discussion_path)
|
|
if metadata and "promotion_rule" in metadata:
|
|
promo_rule_raw = metadata["promotion_rule"]
|
|
promo_rule = promo_rule_raw if isinstance(promo_rule_raw, Mapping) else {}
|
|
allow_ai = promo_rule.get("allow_agent_votes", False)
|
|
ready_min = promo_rule.get("ready_min_eligible_votes", 2)
|
|
reject_min = promo_rule.get("reject_min_eligible_votes", 1)
|
|
ready_status, reject_status = resolve_status_targets(metadata.get("stage"), promo_rule)
|
|
|
|
# Count eligible votes
|
|
vote_groups = count_eligible_votes(votes, allow_ai)
|
|
|
|
# Check if promotion threshold met
|
|
new_status = check_promotion_threshold(
|
|
vote_groups,
|
|
ready_min,
|
|
reject_min,
|
|
ready_status,
|
|
reject_status,
|
|
)
|
|
|
|
if new_status:
|
|
current_status = metadata.get("status", "OPEN")
|
|
if current_status != new_status:
|
|
allow_promotion = True
|
|
stage_name = (metadata.get("stage") or "").lower()
|
|
if stage_name == "implementation" and new_status == ready_status:
|
|
human_ready = [name for name in vote_groups["READY"] if _is_human_participant(name)]
|
|
if not human_ready:
|
|
allow_promotion = False
|
|
print(f"[workflow] → Implementation requires at least one human READY vote; keeping status {current_status}")
|
|
elif not _implementation_tasks_complete(discussion_path):
|
|
allow_promotion = False
|
|
print(f"[workflow] → Implementation tasks incomplete; keeping status {current_status}")
|
|
|
|
if allow_promotion and update_discussion_status(discussion_path, new_status):
|
|
subprocess.run(
|
|
["git", "add", str(discussion)],
|
|
capture_output=True,
|
|
check=False,
|
|
)
|
|
print(f"[workflow] → Status promoted: {current_status} → {new_status}")
|
|
|
|
# Update the corresponding .sum.md file if it exists
|
|
dir_path = discussion.parent
|
|
base_name = discussion.stem # e.g., "feature-x.discussion" from "feature-x.discussion.md"
|
|
summary_path = dir_path / f"{base_name}.sum.md"
|
|
|
|
tasks_summary: list[dict[str, Any]] | None = None
|
|
implementation_checkboxes: list[dict[str, Any]] = []
|
|
if metadata and (metadata.get("stage") or "").lower() == "implementation":
|
|
staged_text = read_staged_file(discussion_path) or ""
|
|
implementation_checkboxes = _extract_checkboxes(staged_text)
|
|
if implementation_checkboxes:
|
|
total_tasks = len(implementation_checkboxes)
|
|
completed_tasks = sum(1 for cb in implementation_checkboxes if cb["done"])
|
|
tasks_summary = [
|
|
{
|
|
"text": cb["label"],
|
|
"done": cb["done"],
|
|
"owner": cb.get("owner"),
|
|
"refs": cb.get("refs", []),
|
|
}
|
|
for cb in implementation_checkboxes
|
|
]
|
|
tasks_summary.insert(0, {
|
|
"progress_summary": {
|
|
"total": total_tasks,
|
|
"completed": completed_tasks,
|
|
}
|
|
})
|
|
|
|
if metadata and (metadata.get("stage") or "").lower() == "implementation":
|
|
if _sync_implementation_status_comment(discussion_path, implementation_checkboxes):
|
|
subprocess.run(
|
|
["git", "add", str(discussion)],
|
|
capture_output=True,
|
|
check=False,
|
|
)
|
|
print("[workflow] → Updated implementation status comment")
|
|
|
|
if summary_path.exists():
|
|
# Get incremental changes for AI processing
|
|
incremental_content = get_discussion_changes(discussion_path)
|
|
|
|
# Process with AI if available
|
|
ai_data = process_discussion_with_ai(
|
|
discussion_path,
|
|
summary_path,
|
|
incremental_content
|
|
)
|
|
|
|
# Update summary file with all extracted information
|
|
try:
|
|
# Try both import styles (for different execution contexts)
|
|
try:
|
|
from automation import summary as summary_module
|
|
except ImportError:
|
|
import summary as summary_module # type: ignore
|
|
|
|
timeline_entry = None
|
|
timeline_info = ai_data.get("timeline")
|
|
if isinstance(timeline_info, dict):
|
|
participant = timeline_info.get("participant", "unknown")
|
|
summary_text = timeline_info.get("summary", "")
|
|
if summary_text:
|
|
timeline_entry = summary_module.format_timeline_entry(participant, summary_text)
|
|
|
|
success = summary_module.update_summary_file(
|
|
summary_path,
|
|
votes=votes,
|
|
questions=ai_data.get("questions"),
|
|
action_items=ai_data.get("action_items"),
|
|
decisions=ai_data.get("decisions"),
|
|
mentions=ai_data.get("mentions"),
|
|
participants=participant_roster,
|
|
tasks=tasks_summary,
|
|
timeline_entry=timeline_entry,
|
|
)
|
|
|
|
if success:
|
|
# Stage the updated summary file
|
|
subprocess.run(
|
|
["git", "add", str(summary_path)],
|
|
capture_output=True,
|
|
check=False,
|
|
)
|
|
print(f"[workflow] → Updated {summary_path.as_posix()}")
|
|
|
|
except ImportError:
|
|
# Fall back to basic vote update
|
|
update_summary_votes(summary_path, votes)
|
|
subprocess.run(
|
|
["git", "add", str(summary_path)],
|
|
capture_output=True,
|
|
check=False,
|
|
)
|
|
print(f"[workflow] → Updated {summary_path.as_posix()} (votes only)")
|
|
|
|
if discussion.name == "implementation.discussion.md":
|
|
_update_tasks_md(discussion)
|
|
|
|
return 0
|
|
|
|
|
|
def main(argv: list[str] | None = None) -> int:
|
|
parser = argparse.ArgumentParser(
|
|
prog="workflow.py",
|
|
description="CascadingDev automation workflow (Phase 1: status reporter)",
|
|
)
|
|
parser.add_argument(
|
|
"--status",
|
|
action="store_true",
|
|
help="Print vote status for staged discussion files (default).",
|
|
)
|
|
args = parser.parse_args(argv)
|
|
|
|
# Status is currently the only command; run it for --status or no args.
|
|
return _run_status()
|
|
|
|
def _update_tasks_md(discussion_path: Path) -> None:
|
|
"""Update the tasks.md file based on the implementation.discussion.md file."""
|
|
# Read the content of the discussion file
|
|
discussion_content = read_staged_file(discussion_path)
|
|
if not discussion_content:
|
|
return
|
|
|
|
# Parse the file for checklist items
|
|
checklist_items = _extract_checkboxes(discussion_content)
|
|
if not checklist_items:
|
|
return
|
|
|
|
# Construct the path to the tasks.md file
|
|
tasks_path = discussion_path.parent.parent / "implementation" / "tasks.md"
|
|
tasks_path.parent.mkdir(parents=True, exist_ok=True)
|
|
|
|
# Read the content of the tasks.md file
|
|
tasks_content = ""
|
|
if tasks_path.exists():
|
|
tasks_content = tasks_path.read_text(encoding="utf-8")
|
|
|
|
# Update the tasks.md file with the new checklist items and statuses
|
|
updated_tasks_content = "# Implementation Tasks\n\n" + "\n".join([item["line"] for item in checklist_items]) + "\n"
|
|
|
|
if updated_tasks_content.strip() and tasks_content != updated_tasks_content:
|
|
tasks_path.write_text(updated_tasks_content, encoding="utf-8")
|
|
subprocess.run(["git", "add", str(tasks_path)], capture_output=True, check=False)
|
|
print(f"[workflow] → Updated {tasks_path.as_posix()}")
|
|
|
|
|
|
if __name__ == "__main__":
|
|
sys.exit(main())
|