301 lines
9.9 KiB
Python
301 lines
9.9 KiB
Python
#!/usr/bin/env python3
|
|
"""
|
|
Summary file updater for CascadingDev discussions.
|
|
|
|
Updates marker blocks in .sum.md files with extracted information.
|
|
"""
|
|
from __future__ import annotations
|
|
|
|
import re
|
|
import sys
|
|
from datetime import datetime
|
|
from pathlib import Path
|
|
from typing import Any, Mapping
|
|
|
|
|
|
def update_marker_block(
|
|
content: str,
|
|
marker_name: str,
|
|
new_content: str,
|
|
include_header: bool = True
|
|
) -> str:
|
|
"""
|
|
Update content between <!-- SUMMARY:{marker_name} START/END --> markers.
|
|
|
|
Args:
|
|
content: Full file content
|
|
marker_name: Marker name (e.g., "VOTES", "DECISIONS")
|
|
new_content: New content to insert (without markers)
|
|
include_header: Whether to include ## Header in the new content
|
|
|
|
Returns:
|
|
Updated content with replaced marker block
|
|
"""
|
|
# Markers are stable HTML comments placed in the .sum.md companion file.
|
|
# We only replace the text BETWEEN the START/END pair so that surrounding
|
|
# content (headings, links, human edits) remains intact and diffs stay tiny.
|
|
pattern = rf"(<!-- SUMMARY:{marker_name} START -->)(.*?)(<!-- SUMMARY:{marker_name} END -->)"
|
|
|
|
def replacer(match):
|
|
return f"{match.group(1)}\n{new_content}\n{match.group(3)}"
|
|
|
|
updated = re.sub(pattern, replacer, content, flags=re.DOTALL)
|
|
|
|
# If no replacement happened, the markers might not exist
|
|
if updated == content:
|
|
sys.stderr.write(
|
|
f"[summary] note: markers for {marker_name} not found "
|
|
"(summary file likely not initialized yet)\n"
|
|
)
|
|
|
|
return updated
|
|
|
|
|
|
def format_votes_section(votes: Mapping[str, str]) -> str:
|
|
"""Format the VOTES section content."""
|
|
# Count latest vote values and render a compact tally + per-participant list.
|
|
from collections import Counter
|
|
|
|
counts = Counter(votes.values())
|
|
ready = counts.get("READY", 0)
|
|
changes = counts.get("CHANGES", 0)
|
|
reject = counts.get("REJECT", 0)
|
|
|
|
lines = [
|
|
"## Votes (latest per participant)",
|
|
f"READY: {ready} • CHANGES: {changes} • REJECT: {reject}"
|
|
]
|
|
|
|
if votes:
|
|
for participant, vote in sorted(votes.items()):
|
|
lines.append(f"- {participant}: {vote}")
|
|
else:
|
|
lines.append("- (no votes yet)")
|
|
|
|
return "\n".join(lines)
|
|
|
|
|
|
def format_questions_section(questions: list[dict[str, Any]]) -> str:
|
|
"""Format the OPEN_QUESTIONS section content."""
|
|
lines = ["## Open Questions"]
|
|
|
|
if not questions:
|
|
lines.append("- (none yet)")
|
|
return "\n".join(lines)
|
|
|
|
# Split questions by status so OPEN items stay at the top and partial answers
|
|
# can be rendered with their follow-up context.
|
|
open_questions = [q for q in questions if q.get("status") == "OPEN"]
|
|
partial_questions = [q for q in questions if q.get("status") == "PARTIAL"]
|
|
|
|
if open_questions:
|
|
for q in open_questions:
|
|
participant = q.get("participant", "unknown")
|
|
question = q.get("question", "")
|
|
lines.append(f"- @{participant}: {question}")
|
|
|
|
if partial_questions:
|
|
lines.append("\n### Partially Answered:")
|
|
for q in partial_questions:
|
|
participant = q.get("participant", "unknown")
|
|
question = q.get("question", "")
|
|
answer = q.get("answer", "")
|
|
lines.append(f"- @{participant}: {question}")
|
|
lines.append(f" - Partial answer: {answer}")
|
|
|
|
if not open_questions and not partial_questions:
|
|
lines.append("- (all questions answered)")
|
|
|
|
return "\n".join(lines)
|
|
|
|
|
|
def format_action_items_section(items: list[dict[str, Any]]) -> str:
|
|
"""Format the ACTION_ITEMS section content."""
|
|
lines = ["## Action Items"]
|
|
|
|
if not items:
|
|
lines.append("- (none yet)")
|
|
return "\n".join(lines)
|
|
|
|
# Normalize items by lifecycle bucket so the rendered Markdown feels like a
|
|
# kanban snapshot (TODO → In Progress → Completed).
|
|
todo_items = [i for i in items if i.get("status") == "TODO"]
|
|
assigned_items = [i for i in items if i.get("status") == "ASSIGNED"]
|
|
done_items = [i for i in items if i.get("status") == "DONE"]
|
|
|
|
if todo_items:
|
|
lines.append("\n### TODO (unassigned):")
|
|
for item in todo_items:
|
|
action = item.get("action", "")
|
|
participant = item.get("participant", "unknown")
|
|
lines.append(f"- [ ] {action} (suggested by @{participant})")
|
|
|
|
if assigned_items:
|
|
lines.append("\n### In Progress:")
|
|
for item in assigned_items:
|
|
action = item.get("action", "")
|
|
assignee = item.get("assignee", "unknown")
|
|
lines.append(f"- [ ] {action} (@{assignee})")
|
|
|
|
if done_items:
|
|
lines.append("\n### Completed:")
|
|
for item in done_items:
|
|
action = item.get("action", "")
|
|
completed_by = item.get("completed_by") or item.get("assignee", "unknown")
|
|
lines.append(f"- [x] {action} (@{completed_by})")
|
|
|
|
return "\n".join(lines)
|
|
|
|
|
|
def format_decisions_section(decisions: list[dict[str, Any]]) -> str:
|
|
"""Format the DECISIONS section content (ADR-style)."""
|
|
lines = ["## Decisions (ADR-style)"]
|
|
|
|
if not decisions:
|
|
lines.append("- (none yet)")
|
|
return "\n".join(lines)
|
|
|
|
active_decisions = [d for d in decisions if d.get("status", "ACTIVE") == "ACTIVE"]
|
|
|
|
if not active_decisions:
|
|
lines.append("- (none yet)")
|
|
return "\n".join(lines)
|
|
|
|
for idx, decision in enumerate(active_decisions, 1):
|
|
decision_text = decision.get("decision", "")
|
|
rationale = decision.get("rationale", "")
|
|
participant = decision.get("participant", "unknown")
|
|
supporters = decision.get("supporters", [])
|
|
|
|
lines.append(f"\n### Decision {idx}: {decision_text}")
|
|
lines.append(f"- **Proposed by:** @{participant}")
|
|
|
|
if supporters:
|
|
supporters_str = ", ".join(f"@{s}" for s in supporters)
|
|
lines.append(f"- **Supported by:** {supporters_str}")
|
|
|
|
if rationale:
|
|
lines.append(f"- **Rationale:** {rationale}")
|
|
|
|
alternatives = decision.get("alternatives", [])
|
|
if alternatives:
|
|
lines.append("- **Alternatives considered:**")
|
|
for alt in alternatives:
|
|
lines.append(f" - {alt}")
|
|
|
|
return "\n".join(lines)
|
|
|
|
|
|
def format_awaiting_section(mentions: list[dict[str, str]]) -> str:
|
|
"""Format the AWAITING section content (unanswered @mentions)."""
|
|
lines = ["## Awaiting Replies"]
|
|
|
|
if not mentions:
|
|
lines.append("- (none yet)")
|
|
return "\n".join(lines)
|
|
|
|
# Group by target
|
|
by_target: dict[str, list[str]] = {}
|
|
for mention in mentions:
|
|
to = mention.get("to", "unknown")
|
|
from_participant = mention.get("from", "unknown")
|
|
context = mention.get("context", "")
|
|
|
|
if to not in by_target:
|
|
by_target[to] = []
|
|
by_target[to].append(f"@{from_participant}: {context}")
|
|
|
|
for target, contexts in sorted(by_target.items()):
|
|
lines.append(f"\n### @{target}")
|
|
for ctx in contexts:
|
|
lines.append(f"- {ctx}")
|
|
|
|
return "\n".join(lines)
|
|
|
|
|
|
def format_timeline_entry(participant: str, summary: str) -> str:
|
|
"""Format a single timeline entry."""
|
|
now = datetime.now().strftime("%Y-%m-%d %H:%M")
|
|
return f"- {now} @{participant}: {summary}"
|
|
|
|
|
|
def append_timeline_entry(content: str, entry: str) -> str:
|
|
"""Append a new entry to the timeline section (most recent first)."""
|
|
pattern = r"(<!-- SUMMARY:TIMELINE START -->\s*## Timeline \(most recent first\)\s*)(.*?)(<!-- SUMMARY:TIMELINE END -->)"
|
|
|
|
def replacer(match):
|
|
header = match.group(1)
|
|
existing = match.group(2).strip()
|
|
footer = match.group(3)
|
|
|
|
# Remove placeholder if present
|
|
if existing.startswith("- <YYYY-MM-DD"):
|
|
existing = ""
|
|
|
|
# Add new entry at the top
|
|
if existing:
|
|
new_timeline = f"{entry}\n{existing}"
|
|
else:
|
|
new_timeline = entry
|
|
|
|
return f"{header}\n{new_timeline}\n{footer}"
|
|
|
|
return re.sub(pattern, replacer, content, flags=re.DOTALL)
|
|
|
|
|
|
def update_summary_file(
|
|
summary_path: Path,
|
|
votes: Mapping[str, str] | None = None,
|
|
questions: list[dict[str, Any]] | None = None,
|
|
action_items: list[dict[str, Any]] | None = None,
|
|
decisions: list[dict[str, Any]] | None = None,
|
|
mentions: list[dict[str, str]] | None = None,
|
|
timeline_entry: str | None = None
|
|
) -> bool:
|
|
"""
|
|
Update a summary file with extracted information.
|
|
|
|
Returns True if successful, False otherwise.
|
|
"""
|
|
if not summary_path.exists():
|
|
sys.stderr.write(f"[summary] warning: {summary_path} does not exist\n")
|
|
return False
|
|
|
|
try:
|
|
content = summary_path.read_text(encoding="utf-8")
|
|
except OSError as e:
|
|
sys.stderr.write(f"[summary] error reading {summary_path}: {e}\n")
|
|
return False
|
|
|
|
# Update each section that has new data
|
|
if votes is not None:
|
|
new_votes = format_votes_section(votes)
|
|
content = update_marker_block(content, "VOTES", new_votes)
|
|
|
|
if questions is not None:
|
|
new_questions = format_questions_section(questions)
|
|
content = update_marker_block(content, "OPEN_QUESTIONS", new_questions)
|
|
|
|
if action_items is not None:
|
|
new_items = format_action_items_section(action_items)
|
|
content = update_marker_block(content, "ACTION_ITEMS", new_items)
|
|
|
|
if decisions is not None:
|
|
new_decisions = format_decisions_section(decisions)
|
|
content = update_marker_block(content, "DECISIONS", new_decisions)
|
|
|
|
if mentions is not None:
|
|
new_awaiting = format_awaiting_section(mentions)
|
|
content = update_marker_block(content, "AWAITING", new_awaiting)
|
|
|
|
if timeline_entry is not None:
|
|
content = append_timeline_entry(content, timeline_entry)
|
|
|
|
# Write back
|
|
try:
|
|
summary_path.write_text(content, encoding="utf-8")
|
|
return True
|
|
except OSError as e:
|
|
sys.stderr.write(f"[summary] error writing {summary_path}: {e}\n")
|
|
return False
|