#!/usr/bin/env python3 """ Summary file updater for CascadingDev discussions. Updates marker blocks in .sum.md files with extracted information. """ from __future__ import annotations import json import re import sys from datetime import datetime from pathlib import Path from typing import Any, Mapping def update_marker_block( content: str, marker_name: str, new_content: str, include_header: bool = True ) -> str: """ Update content between markers. Args: content: Full file content marker_name: Marker name (e.g., "VOTES", "DECISIONS") new_content: New content to insert (without markers) include_header: Whether to include ## Header in the new content Returns: Updated content with replaced marker block """ # Markers are stable HTML comments placed in the .sum.md companion file. # We only replace the text BETWEEN the START/END pair so that surrounding # content (headings, links, human edits) remains intact and diffs stay tiny. pattern = rf"()(.*?)()" def replacer(match): return f"{match.group(1)}\n{new_content}\n{match.group(3)}" updated = re.sub(pattern, replacer, content, flags=re.DOTALL) # If no replacement happened, the markers might not exist if updated == content: sys.stderr.write( f"[summary] note: markers for {marker_name} not found " "(summary file likely not initialized yet)\n" ) return updated def _preview_text(text: str, limit: int = 140) -> str: """ Collapse whitespace and truncate text for compact summary previews. """ collapsed = " ".join(text.split()) if len(collapsed) <= limit: return collapsed return collapsed[: limit - 1].rstrip() + "…" # --------------------------------------------------------------------------- # Summary state helpers # # We persist aggregated structured data (questions, action items, decisions, # mentions) inside the summary file so incremental updates can merge with the # existing state. The state lives in a lightweight HTML comment: # # --------------------------------------------------------------------------- STATE_MARKER = "SUMMARY:STATE" STATE_PATTERN = re.compile(rf"", re.DOTALL) _KNOWN_FILE_EXTENSIONS = {"sip", "py", "md", "json", "yml", "yaml", "txt", "rst", "ini"} def _canonical_text(value: str | None) -> str: return (value or "").strip().lower() def _sanitize_name(value: str | None) -> str: """ Clean up participant/assignee names that may contain file paths or other noise. """ if not value: return "unknown" name = value.strip() if not name: return "unknown" # Strip any directory components. for sep in ("/", "\\"): if sep in name: name = name.split(sep)[-1] name = name.strip() if not name: return "unknown" # Drop common file extensions that sneak in from tool output. if "." in name: base, ext = name.rsplit(".", 1) if ext.lower() in _KNOWN_FILE_EXTENSIONS: name = base.strip() # Collapse whitespace and remove most punctuation (keep _, -, @, ., and '). name = re.sub(r"\s+", " ", name) name = re.sub(r"[^A-Za-z0-9_@.\-'\s]", "", name).strip() if not name: return "unknown" if len(name) > 60: name = name[:60].rstrip() return name def load_summary_state(content: str) -> tuple[dict[str, Any], str]: """ Extract stored state from the summary content. Returns (state, content) where `content` has the marker intact (we update it separately). """ match = STATE_PATTERN.search(content) if not match: return { "questions": [], "action_items": [], "decisions": [], "mentions": [], }, content json_blob = match.group(1) try: state = json.loads(json_blob) except json.JSONDecodeError: state = {} state.setdefault("questions", []) state.setdefault("action_items", []) state.setdefault("decisions", []) state.setdefault("mentions", []) return state, content def save_summary_state(content: str, state: Mapping[str, Any]) -> str: """ Persist the state marker inside the summary content. """ state_json = json.dumps(state, sort_keys=True) marker = f"" if STATE_PATTERN.search(content): return STATE_PATTERN.sub(lambda _: marker, content) # Insert near the top, preferably after META block if present. if content.startswith("") if closing != -1: insertion_point = closing + 3 return content[:insertion_point] + "\n" + marker + content[insertion_point:] return marker + "\n" + content def _merge_items_by_key(items: list[dict[str, Any]], new_items: list[dict[str, Any]], key_fn) -> list[dict[str, Any]]: """ Merge dictionaries in-place keyed by `key_fn`. Later entries override fields. """ index: dict[str, dict[str, Any]] = {} merged: list[dict[str, Any]] = [] for item in items: key = key_fn(item) if not key: continue copy = dict(item) index[key] = copy merged.append(copy) for item in new_items: key = key_fn(item) if not key: continue existing = index.get(key) if existing: for field, value in item.items(): if value not in (None, ""): existing[field] = value else: copy = dict(item) index[key] = copy merged.append(copy) return merged def merge_questions(existing: list[dict[str, Any]], new_items: list[dict[str, Any]]) -> list[dict[str, Any]]: merged = _merge_items_by_key(existing, new_items, lambda item: _canonical_text(item.get("question"))) # Drop fully answered questions to keep the summary lean. filtered: list[dict[str, Any]] = [] for item in merged: status = (item.get("status") or "OPEN").upper() if status in {"OPEN", "PARTIAL"}: filtered.append(item) for item in filtered: item["participant"] = _sanitize_name(item.get("participant")) return filtered def merge_action_items(existing: list[dict[str, Any]], new_items: list[dict[str, Any]]) -> list[dict[str, Any]]: merged = _merge_items_by_key(existing, new_items, lambda item: _canonical_text(item.get("action"))) for item in merged: item["participant"] = _sanitize_name(item.get("participant")) if "assignee" in item: item["assignee"] = _sanitize_name(item.get("assignee")) if "completed_by" in item: item["completed_by"] = _sanitize_name(item.get("completed_by")) return merged def merge_decisions(existing: list[dict[str, Any]], new_items: list[dict[str, Any]]) -> list[dict[str, Any]]: merged = _merge_items_by_key(existing, new_items, lambda item: _canonical_text(item.get("decision"))) # Deduplicate supporter lists. for decision in merged: decision["participant"] = _sanitize_name(decision.get("participant")) supporters = decision.get("supporters") if isinstance(supporters, list): seen = set() deduped = [] for supporter in supporters: sanitized = _sanitize_name(supporter) if sanitized not in seen: seen.add(sanitized) deduped.append(sanitized) decision["supporters"] = deduped return merged def merge_mentions(existing: list[dict[str, Any]], new_items: list[dict[str, Any]]) -> list[dict[str, Any]]: merged = _merge_items_by_key(existing, new_items, lambda item: f"{_canonical_text(item.get('to'))}|{_canonical_text(item.get('context'))}") for item in merged: item["from"] = _sanitize_name(item.get("from")) item["to"] = _sanitize_name(item.get("to")) return merged def format_votes_section(votes: Mapping[str, str]) -> str: """Format the VOTES section content.""" # Count latest vote values and render a compact tally + per-participant list. from collections import Counter counts = Counter(votes.values()) ready = counts.get("READY", 0) changes = counts.get("CHANGES", 0) reject = counts.get("REJECT", 0) lines = [ "## Votes (latest per participant)", f"READY: {ready} • CHANGES: {changes} • REJECT: {reject}" ] if votes: for participant, vote in sorted(votes.items()): lines.append(f"- {_sanitize_name(participant)}: {vote}") else: lines.append("- (no votes yet)") return "\n".join(lines) def format_participants_section(participants: list[dict[str, Any]]) -> str: """Format the PARTICIPANTS section content.""" lines = ["## Participants"] if not participants: lines.append("- (none yet)") return "\n".join(lines) total = len(participants) agents = sum(1 for p in participants if p.get("is_agent")) humans = total - agents lines.append(f"Total: {total} (Humans: {humans} • Agents: {agents})") for participant in participants: name = _sanitize_name(participant.get("name")) vote = participant.get("vote") or "—" lines.append(f"- {name} — {vote}") last_comment = participant.get("last_comment", "").strip() if last_comment: preview = _preview_text(last_comment) lines.append(f" Last: {preview}") return "\n".join(lines) def format_tasks_section(tasks: list[dict[str, Any]]) -> str: """Format the TASKS section content.""" lines = ["## Tasks"] progress = None filtered: list[dict[str, Any]] = [] for task in tasks or []: if "progress_summary" in task: progress = task["progress_summary"] else: filtered.append(task) if progress: total = max(int(progress.get("total", 0)), 0) completed = max(int(progress.get("completed", 0)), 0) remaining = max(total - completed, 0) percent = 0 if total == 0 else int(round((completed / total) * 100)) lines.append(f"Progress: {completed}/{total} complete ({percent}%) • Remaining: {remaining}") if not filtered: lines.append("- (none yet)") return "\n".join(lines) if progress: lines.append("") for task in filtered: text = task.get("text", "").strip() done = bool(task.get("done")) owner = task.get("owner") refs = task.get("refs") or [] entry_text = text if text else "(unnamed task)" entry = f"- [{'x' if done else ' '}] {entry_text}" if owner: entry += f" (@{_sanitize_name(owner)})" if refs: formatted_refs = [] for ref in sorted(refs, key=str.lower): if ref.startswith("#"): formatted_refs.append(f"PR {ref}") else: formatted_refs.append(f"commit {ref[:7]}") entry += f" (refs: {', '.join(formatted_refs)})" lines.append(entry) return "\n".join(lines) def format_questions_section(questions: list[dict[str, Any]]) -> str: """Format the OPEN_QUESTIONS section content.""" lines = ["## Open Questions"] if not questions: lines.append("- (none yet)") return "\n".join(lines) # Split questions by status so OPEN items stay at the top and partial answers # can be rendered with their follow-up context. # Default to "OPEN" if status field is missing (for AI-extracted questions) open_questions = [q for q in questions if q.get("status", "OPEN") == "OPEN"] partial_questions = [q for q in questions if q.get("status") == "PARTIAL"] if open_questions: for q in open_questions: participant = _sanitize_name(q.get("participant")) question = q.get("question", "") lines.append(f"- @{participant}: {question}") if partial_questions: lines.append("\n### Partially Answered:") for q in partial_questions: participant = _sanitize_name(q.get("participant")) question = q.get("question", "") answer = q.get("answer", "") lines.append(f"- @{participant}: {question}") lines.append(f" - Partial answer: {answer}") if not open_questions and not partial_questions: lines.append("- (all questions answered)") return "\n".join(lines) def format_action_items_section(items: list[dict[str, Any]]) -> str: """Format the ACTION_ITEMS section content.""" lines = ["## Action Items"] if not items: lines.append("- (none yet)") return "\n".join(lines) # Normalize items by lifecycle bucket so the rendered Markdown feels like a # kanban snapshot (TODO → In Progress → Completed). todo_items = [i for i in items if i.get("status") == "TODO"] assigned_items = [i for i in items if i.get("status") == "ASSIGNED"] done_items = [i for i in items if i.get("status") == "DONE"] if todo_items: lines.append("\n### TODO (unassigned):") for item in todo_items: action = item.get("action", "") participant = _sanitize_name(item.get("participant")) lines.append(f"- [ ] {action} (suggested by @{participant})") if assigned_items: lines.append("\n### In Progress:") for item in assigned_items: action = item.get("action", "") assignee = _sanitize_name(item.get("assignee")) lines.append(f"- [ ] {action} (@{assignee})") if done_items: lines.append("\n### Completed:") for item in done_items: action = item.get("action", "") completed_by = _sanitize_name(item.get("completed_by") or item.get("assignee")) lines.append(f"- [x] {action} (@{completed_by})") return "\n".join(lines) def format_decisions_section(decisions: list[dict[str, Any]]) -> str: """Format the DECISIONS section content (ADR-style).""" lines = ["## Decisions (ADR-style)"] if not decisions: lines.append("- (none yet)") return "\n".join(lines) active_decisions = [d for d in decisions if d.get("status", "ACTIVE") == "ACTIVE"] if not active_decisions: lines.append("- (none yet)") return "\n".join(lines) for idx, decision in enumerate(active_decisions, 1): decision_text = decision.get("decision", "") rationale = decision.get("rationale", "") participant = _sanitize_name(decision.get("participant")) supporters = decision.get("supporters", []) lines.append(f"\n### Decision {idx}: {decision_text}") lines.append(f"- **Proposed by:** @{participant}") if supporters: supporters_str = ", ".join(f"@{_sanitize_name(s)}" for s in supporters) lines.append(f"- **Supported by:** {supporters_str}") if rationale: lines.append(f"- **Rationale:** {rationale}") alternatives = decision.get("alternatives", []) if alternatives: lines.append("- **Alternatives considered:**") for alt in alternatives: lines.append(f" - {alt}") return "\n".join(lines) def format_awaiting_section(mentions: list[dict[str, str]]) -> str: """Format the AWAITING section content (unanswered @mentions).""" lines = ["## Awaiting Replies"] if not mentions: lines.append("- (none yet)") return "\n".join(lines) # Group by target by_target: dict[str, list[str]] = {} for mention in mentions: to = mention.get("to", "unknown") from_participant = _sanitize_name(mention.get("from")) context = mention.get("context", "") if to not in by_target: by_target[to] = [] by_target[to].append(f"@{from_participant}: {context}") for target, contexts in sorted(by_target.items()): lines.append(f"\n### @{_sanitize_name(target)}") for ctx in contexts: lines.append(f"- {ctx}") return "\n".join(lines) def format_timeline_entry(participant: str, summary: str) -> str: """Format a single timeline entry.""" now = datetime.now().strftime("%Y-%m-%d %H:%M") return f"- {now} @{_sanitize_name(participant)}: {summary}" def append_timeline_entry(content: str, entry: str) -> str: """Append a new entry to the timeline section (most recent first).""" pattern = r"(\s*## Timeline \(most recent first\)\s*)(.*?)()" def replacer(match): header = match.group(1) existing = match.group(2).strip() footer = match.group(3) # Remove placeholder if present if existing.startswith("- bool: """ Update a summary file with extracted information. Returns True if successful, False otherwise. """ if not summary_path.exists(): sys.stderr.write(f"[summary] warning: {summary_path} does not exist\n") return False try: content = summary_path.read_text(encoding="utf-8") except OSError as e: sys.stderr.write(f"[summary] error reading {summary_path}: {e}\n") return False state, content = load_summary_state(content) if questions is not None: state["questions"] = merge_questions(state.get("questions", []), questions) if action_items is not None: state["action_items"] = merge_action_items(state.get("action_items", []), action_items) if decisions is not None: state["decisions"] = merge_decisions(state.get("decisions", []), decisions) if mentions is not None: state["mentions"] = merge_mentions(state.get("mentions", []), mentions) content = save_summary_state(content, state) # Update each section that has new data if votes is not None: new_votes = format_votes_section(votes) content = update_marker_block(content, "VOTES", new_votes) if participants is not None: new_participants = format_participants_section(participants) content = update_marker_block(content, "PARTICIPANTS", new_participants) if tasks is not None: new_tasks = format_tasks_section(tasks) content = update_marker_block(content, "TASKS", new_tasks) new_questions = format_questions_section(state.get("questions", [])) content = update_marker_block(content, "OPEN_QUESTIONS", new_questions) new_items = format_action_items_section(state.get("action_items", [])) content = update_marker_block(content, "ACTION_ITEMS", new_items) new_decisions = format_decisions_section(state.get("decisions", [])) content = update_marker_block(content, "DECISIONS", new_decisions) new_awaiting = format_awaiting_section(state.get("mentions", [])) content = update_marker_block(content, "AWAITING", new_awaiting) if timeline_entry is not None: content = append_timeline_entry(content, timeline_entry) # Write back try: summary_path.write_text(content, encoding="utf-8") return True except OSError as e: sys.stderr.write(f"[summary] error writing {summary_path}: {e}\n") return False