599 lines
20 KiB
Python
599 lines
20 KiB
Python
#!/usr/bin/env python3
|
|
"""
|
|
Summary file updater for CascadingDev discussions.
|
|
|
|
Updates marker blocks in .sum.md files with extracted information.
|
|
"""
|
|
from __future__ import annotations
|
|
|
|
import json
|
|
import re
|
|
import sys
|
|
from datetime import datetime
|
|
from pathlib import Path
|
|
from typing import Any, Mapping
|
|
|
|
|
|
def update_marker_block(
|
|
content: str,
|
|
marker_name: str,
|
|
new_content: str,
|
|
include_header: bool = True
|
|
) -> str:
|
|
"""
|
|
Update content between <!-- SUMMARY:{marker_name} START/END --> markers.
|
|
|
|
Args:
|
|
content: Full file content
|
|
marker_name: Marker name (e.g., "VOTES", "DECISIONS")
|
|
new_content: New content to insert (without markers)
|
|
include_header: Whether to include ## Header in the new content
|
|
|
|
Returns:
|
|
Updated content with replaced marker block
|
|
"""
|
|
# Markers are stable HTML comments placed in the .sum.md companion file.
|
|
# We only replace the text BETWEEN the START/END pair so that surrounding
|
|
# content (headings, links, human edits) remains intact and diffs stay tiny.
|
|
pattern = rf"(<!-- SUMMARY:{marker_name} START -->)(.*?)(<!-- SUMMARY:{marker_name} END -->)"
|
|
|
|
def replacer(match):
|
|
return f"{match.group(1)}\n{new_content}\n{match.group(3)}"
|
|
|
|
updated = re.sub(pattern, replacer, content, flags=re.DOTALL)
|
|
|
|
# If no replacement happened, the markers might not exist.
|
|
# Only warn if the file has OTHER markers (indicating it's initialized but missing this one).
|
|
# If there are NO markers at all, the file is uninitialized and warnings are noise.
|
|
if updated == content:
|
|
has_any_markers = "<!-- SUMMARY:" in content
|
|
if has_any_markers:
|
|
sys.stderr.write(
|
|
f"[summary] note: markers for {marker_name} not found "
|
|
"(summary file likely not initialized yet)\n"
|
|
)
|
|
|
|
return updated
|
|
|
|
|
|
def _preview_text(text: str, limit: int = 140) -> str:
|
|
"""
|
|
Collapse whitespace and truncate text for compact summary previews.
|
|
"""
|
|
collapsed = " ".join(text.split())
|
|
if len(collapsed) <= limit:
|
|
return collapsed
|
|
return collapsed[: limit - 1].rstrip() + "…"
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Summary state helpers
|
|
#
|
|
# We persist aggregated structured data (questions, action items, decisions,
|
|
# mentions) inside the summary file so incremental updates can merge with the
|
|
# existing state. The state lives in a lightweight HTML comment:
|
|
# <!-- SUMMARY:STATE {...json...} -->
|
|
# ---------------------------------------------------------------------------
|
|
|
|
STATE_MARKER = "SUMMARY:STATE"
|
|
STATE_PATTERN = re.compile(rf"<!-- {STATE_MARKER} ({{.*?}}) -->", re.DOTALL)
|
|
_KNOWN_FILE_EXTENSIONS = {"sip", "py", "md", "json", "yml", "yaml", "txt", "rst", "ini"}
|
|
|
|
|
|
def _canonical_text(value: str | None) -> str:
|
|
return (value or "").strip().lower()
|
|
|
|
|
|
def _sanitize_name(value: str | None) -> str:
|
|
"""
|
|
Clean up participant/assignee names that may contain file paths or other noise.
|
|
"""
|
|
if not value:
|
|
return "unknown"
|
|
name = value.strip()
|
|
if not name:
|
|
return "unknown"
|
|
|
|
# Strip any directory components.
|
|
for sep in ("/", "\\"):
|
|
if sep in name:
|
|
name = name.split(sep)[-1]
|
|
|
|
name = name.strip()
|
|
if not name:
|
|
return "unknown"
|
|
|
|
# Drop common file extensions that sneak in from tool output.
|
|
if "." in name:
|
|
base, ext = name.rsplit(".", 1)
|
|
if ext.lower() in _KNOWN_FILE_EXTENSIONS:
|
|
name = base.strip()
|
|
|
|
# Collapse whitespace and remove most punctuation (keep _, -, @, ., and ').
|
|
name = re.sub(r"\s+", " ", name)
|
|
name = re.sub(r"[^A-Za-z0-9_@.\-'\s]", "", name).strip()
|
|
|
|
if not name:
|
|
return "unknown"
|
|
|
|
if len(name) > 60:
|
|
name = name[:60].rstrip()
|
|
|
|
return name
|
|
|
|
|
|
def load_summary_state(content: str) -> tuple[dict[str, Any], str]:
|
|
"""
|
|
Extract stored state from the summary content. Returns (state, content)
|
|
where `content` has the marker intact (we update it separately).
|
|
"""
|
|
match = STATE_PATTERN.search(content)
|
|
if not match:
|
|
return {
|
|
"questions": [],
|
|
"action_items": [],
|
|
"decisions": [],
|
|
"mentions": [],
|
|
}, content
|
|
|
|
json_blob = match.group(1)
|
|
try:
|
|
state = json.loads(json_blob)
|
|
except json.JSONDecodeError:
|
|
state = {}
|
|
|
|
state.setdefault("questions", [])
|
|
state.setdefault("action_items", [])
|
|
state.setdefault("decisions", [])
|
|
state.setdefault("mentions", [])
|
|
return state, content
|
|
|
|
|
|
def save_summary_state(content: str, state: Mapping[str, Any]) -> str:
|
|
"""
|
|
Persist the state marker inside the summary content.
|
|
"""
|
|
state_json = json.dumps(state, sort_keys=True)
|
|
marker = f"<!-- {STATE_MARKER} {state_json} -->"
|
|
|
|
if STATE_PATTERN.search(content):
|
|
return STATE_PATTERN.sub(lambda _: marker, content)
|
|
|
|
# Insert near the top, preferably after META block if present.
|
|
if content.startswith("<!--META"):
|
|
closing = content.find("-->")
|
|
if closing != -1:
|
|
insertion_point = closing + 3
|
|
return content[:insertion_point] + "\n" + marker + content[insertion_point:]
|
|
return marker + "\n" + content
|
|
|
|
|
|
def _merge_items_by_key(items: list[dict[str, Any]], new_items: list[dict[str, Any]], key_fn) -> list[dict[str, Any]]:
|
|
"""
|
|
Merge dictionaries in-place keyed by `key_fn`. Later entries override fields.
|
|
"""
|
|
index: dict[str, dict[str, Any]] = {}
|
|
merged: list[dict[str, Any]] = []
|
|
|
|
for item in items:
|
|
key = key_fn(item)
|
|
if not key:
|
|
continue
|
|
copy = dict(item)
|
|
index[key] = copy
|
|
merged.append(copy)
|
|
|
|
for item in new_items:
|
|
key = key_fn(item)
|
|
if not key:
|
|
continue
|
|
|
|
existing = index.get(key)
|
|
if existing:
|
|
for field, value in item.items():
|
|
if value not in (None, ""):
|
|
existing[field] = value
|
|
else:
|
|
copy = dict(item)
|
|
index[key] = copy
|
|
merged.append(copy)
|
|
|
|
return merged
|
|
|
|
|
|
def merge_questions(existing: list[dict[str, Any]], new_items: list[dict[str, Any]]) -> list[dict[str, Any]]:
|
|
merged = _merge_items_by_key(existing, new_items, lambda item: _canonical_text(item.get("question")))
|
|
# Drop fully answered questions to keep the summary lean.
|
|
filtered: list[dict[str, Any]] = []
|
|
for item in merged:
|
|
status = (item.get("status") or "OPEN").upper()
|
|
if status in {"OPEN", "PARTIAL"}:
|
|
filtered.append(item)
|
|
for item in filtered:
|
|
item["participant"] = _sanitize_name(item.get("participant"))
|
|
return filtered
|
|
|
|
|
|
def merge_action_items(existing: list[dict[str, Any]], new_items: list[dict[str, Any]]) -> list[dict[str, Any]]:
|
|
merged = _merge_items_by_key(existing, new_items, lambda item: _canonical_text(item.get("action")))
|
|
for item in merged:
|
|
item["participant"] = _sanitize_name(item.get("participant"))
|
|
if "assignee" in item:
|
|
item["assignee"] = _sanitize_name(item.get("assignee"))
|
|
if "completed_by" in item:
|
|
item["completed_by"] = _sanitize_name(item.get("completed_by"))
|
|
return merged
|
|
|
|
|
|
def merge_decisions(existing: list[dict[str, Any]], new_items: list[dict[str, Any]]) -> list[dict[str, Any]]:
|
|
merged = _merge_items_by_key(existing, new_items, lambda item: _canonical_text(item.get("decision")))
|
|
|
|
# Deduplicate supporter lists.
|
|
for decision in merged:
|
|
decision["participant"] = _sanitize_name(decision.get("participant"))
|
|
supporters = decision.get("supporters")
|
|
if isinstance(supporters, list):
|
|
seen = set()
|
|
deduped = []
|
|
for supporter in supporters:
|
|
sanitized = _sanitize_name(supporter)
|
|
if sanitized not in seen:
|
|
seen.add(sanitized)
|
|
deduped.append(sanitized)
|
|
decision["supporters"] = deduped
|
|
return merged
|
|
|
|
|
|
def merge_mentions(existing: list[dict[str, Any]], new_items: list[dict[str, Any]]) -> list[dict[str, Any]]:
|
|
merged = _merge_items_by_key(existing, new_items, lambda item: f"{_canonical_text(item.get('to'))}|{_canonical_text(item.get('context'))}")
|
|
for item in merged:
|
|
item["from"] = _sanitize_name(item.get("from"))
|
|
item["to"] = _sanitize_name(item.get("to"))
|
|
return merged
|
|
|
|
|
|
def format_votes_section(votes: Mapping[str, str]) -> str:
|
|
"""Format the VOTES section content."""
|
|
# Count latest vote values and render a compact tally + per-participant list.
|
|
from collections import Counter
|
|
|
|
counts = Counter(votes.values())
|
|
ready = counts.get("READY", 0)
|
|
changes = counts.get("CHANGES", 0)
|
|
reject = counts.get("REJECT", 0)
|
|
|
|
lines = [
|
|
"## Votes (latest per participant)",
|
|
f"READY: {ready} • CHANGES: {changes} • REJECT: {reject}"
|
|
]
|
|
|
|
if votes:
|
|
for participant, vote in sorted(votes.items()):
|
|
lines.append(f"- {_sanitize_name(participant)}: {vote}")
|
|
else:
|
|
lines.append("- (no votes yet)")
|
|
|
|
return "\n".join(lines)
|
|
|
|
|
|
def format_participants_section(participants: list[dict[str, Any]]) -> str:
|
|
"""Format the PARTICIPANTS section content."""
|
|
lines = ["## Participants"]
|
|
|
|
if not participants:
|
|
lines.append("- (none yet)")
|
|
return "\n".join(lines)
|
|
|
|
total = len(participants)
|
|
agents = sum(1 for p in participants if p.get("is_agent"))
|
|
humans = total - agents
|
|
lines.append(f"Total: {total} (Humans: {humans} • Agents: {agents})")
|
|
|
|
for participant in participants:
|
|
name = _sanitize_name(participant.get("name"))
|
|
vote = participant.get("vote") or "—"
|
|
lines.append(f"- {name} — {vote}")
|
|
|
|
last_comment = participant.get("last_comment", "").strip()
|
|
if last_comment:
|
|
preview = _preview_text(last_comment)
|
|
lines.append(f" Last: {preview}")
|
|
|
|
return "\n".join(lines)
|
|
|
|
|
|
def format_tasks_section(tasks: list[dict[str, Any]]) -> str:
|
|
"""Format the TASKS section content."""
|
|
lines = ["## Tasks"]
|
|
|
|
progress = None
|
|
filtered: list[dict[str, Any]] = []
|
|
for task in tasks or []:
|
|
if "progress_summary" in task:
|
|
progress = task["progress_summary"]
|
|
else:
|
|
filtered.append(task)
|
|
|
|
if progress:
|
|
total = max(int(progress.get("total", 0)), 0)
|
|
completed = max(int(progress.get("completed", 0)), 0)
|
|
remaining = max(total - completed, 0)
|
|
percent = 0 if total == 0 else int(round((completed / total) * 100))
|
|
lines.append(f"Progress: {completed}/{total} complete ({percent}%) • Remaining: {remaining}")
|
|
|
|
if not filtered:
|
|
lines.append("- (none yet)")
|
|
return "\n".join(lines)
|
|
|
|
if progress:
|
|
lines.append("")
|
|
|
|
for task in filtered:
|
|
text = task.get("text", "").strip()
|
|
done = bool(task.get("done"))
|
|
owner = task.get("owner")
|
|
refs = task.get("refs") or []
|
|
entry_text = text if text else "(unnamed task)"
|
|
entry = f"- [{'x' if done else ' '}] {entry_text}"
|
|
if owner:
|
|
entry += f" (@{_sanitize_name(owner)})"
|
|
if refs:
|
|
formatted_refs = []
|
|
for ref in sorted(refs, key=str.lower):
|
|
if ref.startswith("#"):
|
|
formatted_refs.append(f"PR {ref}")
|
|
else:
|
|
formatted_refs.append(f"commit {ref[:7]}")
|
|
entry += f" (refs: {', '.join(formatted_refs)})"
|
|
lines.append(entry)
|
|
|
|
return "\n".join(lines)
|
|
|
|
|
|
def format_questions_section(questions: list[dict[str, Any]]) -> str:
|
|
"""Format the OPEN_QUESTIONS section content."""
|
|
lines = ["## Open Questions"]
|
|
|
|
if not questions:
|
|
lines.append("- (none yet)")
|
|
return "\n".join(lines)
|
|
|
|
# Split questions by status so OPEN items stay at the top and partial answers
|
|
# can be rendered with their follow-up context.
|
|
# Default to "OPEN" if status field is missing (for AI-extracted questions)
|
|
open_questions = [q for q in questions if q.get("status", "OPEN") == "OPEN"]
|
|
partial_questions = [q for q in questions if q.get("status") == "PARTIAL"]
|
|
|
|
if open_questions:
|
|
for q in open_questions:
|
|
participant = _sanitize_name(q.get("participant"))
|
|
question = q.get("question", "")
|
|
lines.append(f"- @{participant}: {question}")
|
|
|
|
if partial_questions:
|
|
lines.append("\n### Partially Answered:")
|
|
for q in partial_questions:
|
|
participant = _sanitize_name(q.get("participant"))
|
|
question = q.get("question", "")
|
|
answer = q.get("answer", "")
|
|
lines.append(f"- @{participant}: {question}")
|
|
lines.append(f" - Partial answer: {answer}")
|
|
|
|
if not open_questions and not partial_questions:
|
|
lines.append("- (all questions answered)")
|
|
|
|
return "\n".join(lines)
|
|
|
|
|
|
def format_action_items_section(items: list[dict[str, Any]]) -> str:
|
|
"""Format the ACTION_ITEMS section content."""
|
|
lines = ["## Action Items"]
|
|
|
|
if not items:
|
|
lines.append("- (none yet)")
|
|
return "\n".join(lines)
|
|
|
|
# Normalize items by lifecycle bucket so the rendered Markdown feels like a
|
|
# kanban snapshot (TODO → In Progress → Completed).
|
|
todo_items = [i for i in items if i.get("status") == "TODO"]
|
|
assigned_items = [i for i in items if i.get("status") == "ASSIGNED"]
|
|
done_items = [i for i in items if i.get("status") == "DONE"]
|
|
|
|
if todo_items:
|
|
lines.append("\n### TODO (unassigned):")
|
|
for item in todo_items:
|
|
action = item.get("action", "")
|
|
participant = _sanitize_name(item.get("participant"))
|
|
lines.append(f"- [ ] {action} (suggested by @{participant})")
|
|
|
|
if assigned_items:
|
|
lines.append("\n### In Progress:")
|
|
for item in assigned_items:
|
|
action = item.get("action", "")
|
|
assignee = _sanitize_name(item.get("assignee"))
|
|
lines.append(f"- [ ] {action} (@{assignee})")
|
|
|
|
if done_items:
|
|
lines.append("\n### Completed:")
|
|
for item in done_items:
|
|
action = item.get("action", "")
|
|
completed_by = _sanitize_name(item.get("completed_by") or item.get("assignee"))
|
|
lines.append(f"- [x] {action} (@{completed_by})")
|
|
|
|
return "\n".join(lines)
|
|
|
|
|
|
def format_decisions_section(decisions: list[dict[str, Any]]) -> str:
|
|
"""Format the DECISIONS section content (ADR-style)."""
|
|
lines = ["## Decisions (ADR-style)"]
|
|
|
|
if not decisions:
|
|
lines.append("- (none yet)")
|
|
return "\n".join(lines)
|
|
|
|
active_decisions = [d for d in decisions if d.get("status", "ACTIVE") == "ACTIVE"]
|
|
|
|
if not active_decisions:
|
|
lines.append("- (none yet)")
|
|
return "\n".join(lines)
|
|
|
|
for idx, decision in enumerate(active_decisions, 1):
|
|
decision_text = decision.get("decision", "")
|
|
rationale = decision.get("rationale", "")
|
|
participant = _sanitize_name(decision.get("participant"))
|
|
supporters = decision.get("supporters", [])
|
|
|
|
lines.append(f"\n### Decision {idx}: {decision_text}")
|
|
lines.append(f"- **Proposed by:** @{participant}")
|
|
|
|
if supporters:
|
|
supporters_str = ", ".join(f"@{_sanitize_name(s)}" for s in supporters)
|
|
lines.append(f"- **Supported by:** {supporters_str}")
|
|
|
|
if rationale:
|
|
lines.append(f"- **Rationale:** {rationale}")
|
|
|
|
alternatives = decision.get("alternatives", [])
|
|
if alternatives:
|
|
lines.append("- **Alternatives considered:**")
|
|
for alt in alternatives:
|
|
lines.append(f" - {alt}")
|
|
|
|
return "\n".join(lines)
|
|
|
|
|
|
def format_awaiting_section(mentions: list[dict[str, str]]) -> str:
|
|
"""Format the AWAITING section content (unanswered @mentions)."""
|
|
lines = ["## Awaiting Replies"]
|
|
|
|
if not mentions:
|
|
lines.append("- (none yet)")
|
|
return "\n".join(lines)
|
|
|
|
# Group by target
|
|
by_target: dict[str, list[str]] = {}
|
|
for mention in mentions:
|
|
to = mention.get("to", "unknown")
|
|
from_participant = _sanitize_name(mention.get("from"))
|
|
context = mention.get("context", "")
|
|
|
|
if to not in by_target:
|
|
by_target[to] = []
|
|
by_target[to].append(f"@{from_participant}: {context}")
|
|
|
|
for target, contexts in sorted(by_target.items()):
|
|
lines.append(f"\n### @{_sanitize_name(target)}")
|
|
for ctx in contexts:
|
|
lines.append(f"- {ctx}")
|
|
|
|
return "\n".join(lines)
|
|
|
|
|
|
def format_timeline_entry(participant: str, summary: str) -> str:
|
|
"""Format a single timeline entry."""
|
|
now = datetime.now().strftime("%Y-%m-%d %H:%M")
|
|
return f"- {now} @{_sanitize_name(participant)}: {summary}"
|
|
|
|
|
|
def append_timeline_entry(content: str, entry: str) -> str:
|
|
"""Append a new entry to the timeline section (most recent first)."""
|
|
pattern = r"(<!-- SUMMARY:TIMELINE START -->\s*## Timeline \(most recent first\)\s*)(.*?)(<!-- SUMMARY:TIMELINE END -->)"
|
|
|
|
def replacer(match):
|
|
header = match.group(1)
|
|
existing = match.group(2).strip()
|
|
footer = match.group(3)
|
|
|
|
# Remove placeholder if present
|
|
if existing.startswith("- <YYYY-MM-DD"):
|
|
existing = ""
|
|
|
|
# Add new entry at the top
|
|
if existing:
|
|
new_timeline = f"{entry}\n{existing}"
|
|
else:
|
|
new_timeline = entry
|
|
|
|
return f"{header}\n{new_timeline}\n{footer}"
|
|
|
|
return re.sub(pattern, replacer, content, flags=re.DOTALL)
|
|
|
|
|
|
def update_summary_file(
|
|
summary_path: Path,
|
|
votes: Mapping[str, str] | None = None,
|
|
questions: list[dict[str, Any]] | None = None,
|
|
action_items: list[dict[str, Any]] | None = None,
|
|
decisions: list[dict[str, Any]] | None = None,
|
|
mentions: list[dict[str, str]] | None = None,
|
|
participants: list[dict[str, Any]] | None = None,
|
|
tasks: list[dict[str, Any]] | None = None,
|
|
timeline_entry: str | None = None
|
|
) -> bool:
|
|
"""
|
|
Update a summary file with extracted information.
|
|
|
|
Returns True if successful, False otherwise.
|
|
"""
|
|
if not summary_path.exists():
|
|
sys.stderr.write(f"[summary] warning: {summary_path} does not exist\n")
|
|
return False
|
|
|
|
try:
|
|
content = summary_path.read_text(encoding="utf-8")
|
|
except OSError as e:
|
|
sys.stderr.write(f"[summary] error reading {summary_path}: {e}\n")
|
|
return False
|
|
|
|
state, content = load_summary_state(content)
|
|
|
|
if questions is not None:
|
|
state["questions"] = merge_questions(state.get("questions", []), questions)
|
|
|
|
if action_items is not None:
|
|
state["action_items"] = merge_action_items(state.get("action_items", []), action_items)
|
|
|
|
if decisions is not None:
|
|
state["decisions"] = merge_decisions(state.get("decisions", []), decisions)
|
|
|
|
if mentions is not None:
|
|
state["mentions"] = merge_mentions(state.get("mentions", []), mentions)
|
|
|
|
content = save_summary_state(content, state)
|
|
|
|
# Update each section that has new data
|
|
if votes is not None:
|
|
new_votes = format_votes_section(votes)
|
|
content = update_marker_block(content, "VOTES", new_votes)
|
|
|
|
if participants is not None:
|
|
new_participants = format_participants_section(participants)
|
|
content = update_marker_block(content, "PARTICIPANTS", new_participants)
|
|
|
|
if tasks is not None:
|
|
new_tasks = format_tasks_section(tasks)
|
|
content = update_marker_block(content, "TASKS", new_tasks)
|
|
|
|
new_questions = format_questions_section(state.get("questions", []))
|
|
content = update_marker_block(content, "OPEN_QUESTIONS", new_questions)
|
|
|
|
new_items = format_action_items_section(state.get("action_items", []))
|
|
content = update_marker_block(content, "ACTION_ITEMS", new_items)
|
|
|
|
new_decisions = format_decisions_section(state.get("decisions", []))
|
|
content = update_marker_block(content, "DECISIONS", new_decisions)
|
|
|
|
new_awaiting = format_awaiting_section(state.get("mentions", []))
|
|
content = update_marker_block(content, "AWAITING", new_awaiting)
|
|
|
|
if timeline_entry is not None:
|
|
content = append_timeline_entry(content, timeline_entry)
|
|
|
|
# Write back
|
|
try:
|
|
summary_path.write_text(content, encoding="utf-8")
|
|
return True
|
|
except OSError as e:
|
|
sys.stderr.write(f"[summary] error writing {summary_path}: {e}\n")
|
|
return False
|