CascadingDev/automation/summary.py

595 lines
20 KiB
Python

#!/usr/bin/env python3
"""
Summary file updater for CascadingDev discussions.
Updates marker blocks in .sum.md files with extracted information.
"""
from __future__ import annotations
import json
import re
import sys
from datetime import datetime
from pathlib import Path
from typing import Any, Mapping
def update_marker_block(
content: str,
marker_name: str,
new_content: str,
include_header: bool = True
) -> str:
"""
Update content between <!-- SUMMARY:{marker_name} START/END --> markers.
Args:
content: Full file content
marker_name: Marker name (e.g., "VOTES", "DECISIONS")
new_content: New content to insert (without markers)
include_header: Whether to include ## Header in the new content
Returns:
Updated content with replaced marker block
"""
# Markers are stable HTML comments placed in the .sum.md companion file.
# We only replace the text BETWEEN the START/END pair so that surrounding
# content (headings, links, human edits) remains intact and diffs stay tiny.
pattern = rf"(<!-- SUMMARY:{marker_name} START -->)(.*?)(<!-- SUMMARY:{marker_name} END -->)"
def replacer(match):
return f"{match.group(1)}\n{new_content}\n{match.group(3)}"
updated = re.sub(pattern, replacer, content, flags=re.DOTALL)
# If no replacement happened, the markers might not exist
if updated == content:
sys.stderr.write(
f"[summary] note: markers for {marker_name} not found "
"(summary file likely not initialized yet)\n"
)
return updated
def _preview_text(text: str, limit: int = 140) -> str:
"""
Collapse whitespace and truncate text for compact summary previews.
"""
collapsed = " ".join(text.split())
if len(collapsed) <= limit:
return collapsed
return collapsed[: limit - 1].rstrip() + ""
# ---------------------------------------------------------------------------
# Summary state helpers
#
# We persist aggregated structured data (questions, action items, decisions,
# mentions) inside the summary file so incremental updates can merge with the
# existing state. The state lives in a lightweight HTML comment:
# <!-- SUMMARY:STATE {...json...} -->
# ---------------------------------------------------------------------------
STATE_MARKER = "SUMMARY:STATE"
STATE_PATTERN = re.compile(rf"<!-- {STATE_MARKER} ({{.*?}}) -->", re.DOTALL)
_KNOWN_FILE_EXTENSIONS = {"sip", "py", "md", "json", "yml", "yaml", "txt", "rst", "ini"}
def _canonical_text(value: str | None) -> str:
return (value or "").strip().lower()
def _sanitize_name(value: str | None) -> str:
"""
Clean up participant/assignee names that may contain file paths or other noise.
"""
if not value:
return "unknown"
name = value.strip()
if not name:
return "unknown"
# Strip any directory components.
for sep in ("/", "\\"):
if sep in name:
name = name.split(sep)[-1]
name = name.strip()
if not name:
return "unknown"
# Drop common file extensions that sneak in from tool output.
if "." in name:
base, ext = name.rsplit(".", 1)
if ext.lower() in _KNOWN_FILE_EXTENSIONS:
name = base.strip()
# Collapse whitespace and remove most punctuation (keep _, -, @, ., and ').
name = re.sub(r"\s+", " ", name)
name = re.sub(r"[^A-Za-z0-9_@.\-'\s]", "", name).strip()
if not name:
return "unknown"
if len(name) > 60:
name = name[:60].rstrip()
return name
def load_summary_state(content: str) -> tuple[dict[str, Any], str]:
"""
Extract stored state from the summary content. Returns (state, content)
where `content` has the marker intact (we update it separately).
"""
match = STATE_PATTERN.search(content)
if not match:
return {
"questions": [],
"action_items": [],
"decisions": [],
"mentions": [],
}, content
json_blob = match.group(1)
try:
state = json.loads(json_blob)
except json.JSONDecodeError:
state = {}
state.setdefault("questions", [])
state.setdefault("action_items", [])
state.setdefault("decisions", [])
state.setdefault("mentions", [])
return state, content
def save_summary_state(content: str, state: Mapping[str, Any]) -> str:
"""
Persist the state marker inside the summary content.
"""
state_json = json.dumps(state, sort_keys=True)
marker = f"<!-- {STATE_MARKER} {state_json} -->"
if STATE_PATTERN.search(content):
return STATE_PATTERN.sub(lambda _: marker, content)
# Insert near the top, preferably after META block if present.
if content.startswith("<!--META"):
closing = content.find("-->")
if closing != -1:
insertion_point = closing + 3
return content[:insertion_point] + "\n" + marker + content[insertion_point:]
return marker + "\n" + content
def _merge_items_by_key(items: list[dict[str, Any]], new_items: list[dict[str, Any]], key_fn) -> list[dict[str, Any]]:
"""
Merge dictionaries in-place keyed by `key_fn`. Later entries override fields.
"""
index: dict[str, dict[str, Any]] = {}
merged: list[dict[str, Any]] = []
for item in items:
key = key_fn(item)
if not key:
continue
copy = dict(item)
index[key] = copy
merged.append(copy)
for item in new_items:
key = key_fn(item)
if not key:
continue
existing = index.get(key)
if existing:
for field, value in item.items():
if value not in (None, ""):
existing[field] = value
else:
copy = dict(item)
index[key] = copy
merged.append(copy)
return merged
def merge_questions(existing: list[dict[str, Any]], new_items: list[dict[str, Any]]) -> list[dict[str, Any]]:
merged = _merge_items_by_key(existing, new_items, lambda item: _canonical_text(item.get("question")))
# Drop fully answered questions to keep the summary lean.
filtered: list[dict[str, Any]] = []
for item in merged:
status = (item.get("status") or "OPEN").upper()
if status in {"OPEN", "PARTIAL"}:
filtered.append(item)
for item in filtered:
item["participant"] = _sanitize_name(item.get("participant"))
return filtered
def merge_action_items(existing: list[dict[str, Any]], new_items: list[dict[str, Any]]) -> list[dict[str, Any]]:
merged = _merge_items_by_key(existing, new_items, lambda item: _canonical_text(item.get("action")))
for item in merged:
item["participant"] = _sanitize_name(item.get("participant"))
if "assignee" in item:
item["assignee"] = _sanitize_name(item.get("assignee"))
if "completed_by" in item:
item["completed_by"] = _sanitize_name(item.get("completed_by"))
return merged
def merge_decisions(existing: list[dict[str, Any]], new_items: list[dict[str, Any]]) -> list[dict[str, Any]]:
merged = _merge_items_by_key(existing, new_items, lambda item: _canonical_text(item.get("decision")))
# Deduplicate supporter lists.
for decision in merged:
decision["participant"] = _sanitize_name(decision.get("participant"))
supporters = decision.get("supporters")
if isinstance(supporters, list):
seen = set()
deduped = []
for supporter in supporters:
sanitized = _sanitize_name(supporter)
if sanitized not in seen:
seen.add(sanitized)
deduped.append(sanitized)
decision["supporters"] = deduped
return merged
def merge_mentions(existing: list[dict[str, Any]], new_items: list[dict[str, Any]]) -> list[dict[str, Any]]:
merged = _merge_items_by_key(existing, new_items, lambda item: f"{_canonical_text(item.get('to'))}|{_canonical_text(item.get('context'))}")
for item in merged:
item["from"] = _sanitize_name(item.get("from"))
item["to"] = _sanitize_name(item.get("to"))
return merged
def format_votes_section(votes: Mapping[str, str]) -> str:
"""Format the VOTES section content."""
# Count latest vote values and render a compact tally + per-participant list.
from collections import Counter
counts = Counter(votes.values())
ready = counts.get("READY", 0)
changes = counts.get("CHANGES", 0)
reject = counts.get("REJECT", 0)
lines = [
"## Votes (latest per participant)",
f"READY: {ready} • CHANGES: {changes} • REJECT: {reject}"
]
if votes:
for participant, vote in sorted(votes.items()):
lines.append(f"- {_sanitize_name(participant)}: {vote}")
else:
lines.append("- (no votes yet)")
return "\n".join(lines)
def format_participants_section(participants: list[dict[str, Any]]) -> str:
"""Format the PARTICIPANTS section content."""
lines = ["## Participants"]
if not participants:
lines.append("- (none yet)")
return "\n".join(lines)
total = len(participants)
agents = sum(1 for p in participants if p.get("is_agent"))
humans = total - agents
lines.append(f"Total: {total} (Humans: {humans} • Agents: {agents})")
for participant in participants:
name = _sanitize_name(participant.get("name"))
vote = participant.get("vote") or ""
lines.append(f"- {name}{vote}")
last_comment = participant.get("last_comment", "").strip()
if last_comment:
preview = _preview_text(last_comment)
lines.append(f" Last: {preview}")
return "\n".join(lines)
def format_tasks_section(tasks: list[dict[str, Any]]) -> str:
"""Format the TASKS section content."""
lines = ["## Tasks"]
progress = None
filtered: list[dict[str, Any]] = []
for task in tasks or []:
if "progress_summary" in task:
progress = task["progress_summary"]
else:
filtered.append(task)
if progress:
total = max(int(progress.get("total", 0)), 0)
completed = max(int(progress.get("completed", 0)), 0)
remaining = max(total - completed, 0)
percent = 0 if total == 0 else int(round((completed / total) * 100))
lines.append(f"Progress: {completed}/{total} complete ({percent}%) • Remaining: {remaining}")
if not filtered:
lines.append("- (none yet)")
return "\n".join(lines)
if progress:
lines.append("")
for task in filtered:
text = task.get("text", "").strip()
done = bool(task.get("done"))
owner = task.get("owner")
refs = task.get("refs") or []
entry_text = text if text else "(unnamed task)"
entry = f"- [{'x' if done else ' '}] {entry_text}"
if owner:
entry += f" (@{_sanitize_name(owner)})"
if refs:
formatted_refs = []
for ref in sorted(refs, key=str.lower):
if ref.startswith("#"):
formatted_refs.append(f"PR {ref}")
else:
formatted_refs.append(f"commit {ref[:7]}")
entry += f" (refs: {', '.join(formatted_refs)})"
lines.append(entry)
return "\n".join(lines)
def format_questions_section(questions: list[dict[str, Any]]) -> str:
"""Format the OPEN_QUESTIONS section content."""
lines = ["## Open Questions"]
if not questions:
lines.append("- (none yet)")
return "\n".join(lines)
# Split questions by status so OPEN items stay at the top and partial answers
# can be rendered with their follow-up context.
# Default to "OPEN" if status field is missing (for AI-extracted questions)
open_questions = [q for q in questions if q.get("status", "OPEN") == "OPEN"]
partial_questions = [q for q in questions if q.get("status") == "PARTIAL"]
if open_questions:
for q in open_questions:
participant = _sanitize_name(q.get("participant"))
question = q.get("question", "")
lines.append(f"- @{participant}: {question}")
if partial_questions:
lines.append("\n### Partially Answered:")
for q in partial_questions:
participant = _sanitize_name(q.get("participant"))
question = q.get("question", "")
answer = q.get("answer", "")
lines.append(f"- @{participant}: {question}")
lines.append(f" - Partial answer: {answer}")
if not open_questions and not partial_questions:
lines.append("- (all questions answered)")
return "\n".join(lines)
def format_action_items_section(items: list[dict[str, Any]]) -> str:
"""Format the ACTION_ITEMS section content."""
lines = ["## Action Items"]
if not items:
lines.append("- (none yet)")
return "\n".join(lines)
# Normalize items by lifecycle bucket so the rendered Markdown feels like a
# kanban snapshot (TODO → In Progress → Completed).
todo_items = [i for i in items if i.get("status") == "TODO"]
assigned_items = [i for i in items if i.get("status") == "ASSIGNED"]
done_items = [i for i in items if i.get("status") == "DONE"]
if todo_items:
lines.append("\n### TODO (unassigned):")
for item in todo_items:
action = item.get("action", "")
participant = _sanitize_name(item.get("participant"))
lines.append(f"- [ ] {action} (suggested by @{participant})")
if assigned_items:
lines.append("\n### In Progress:")
for item in assigned_items:
action = item.get("action", "")
assignee = _sanitize_name(item.get("assignee"))
lines.append(f"- [ ] {action} (@{assignee})")
if done_items:
lines.append("\n### Completed:")
for item in done_items:
action = item.get("action", "")
completed_by = _sanitize_name(item.get("completed_by") or item.get("assignee"))
lines.append(f"- [x] {action} (@{completed_by})")
return "\n".join(lines)
def format_decisions_section(decisions: list[dict[str, Any]]) -> str:
"""Format the DECISIONS section content (ADR-style)."""
lines = ["## Decisions (ADR-style)"]
if not decisions:
lines.append("- (none yet)")
return "\n".join(lines)
active_decisions = [d for d in decisions if d.get("status", "ACTIVE") == "ACTIVE"]
if not active_decisions:
lines.append("- (none yet)")
return "\n".join(lines)
for idx, decision in enumerate(active_decisions, 1):
decision_text = decision.get("decision", "")
rationale = decision.get("rationale", "")
participant = _sanitize_name(decision.get("participant"))
supporters = decision.get("supporters", [])
lines.append(f"\n### Decision {idx}: {decision_text}")
lines.append(f"- **Proposed by:** @{participant}")
if supporters:
supporters_str = ", ".join(f"@{_sanitize_name(s)}" for s in supporters)
lines.append(f"- **Supported by:** {supporters_str}")
if rationale:
lines.append(f"- **Rationale:** {rationale}")
alternatives = decision.get("alternatives", [])
if alternatives:
lines.append("- **Alternatives considered:**")
for alt in alternatives:
lines.append(f" - {alt}")
return "\n".join(lines)
def format_awaiting_section(mentions: list[dict[str, str]]) -> str:
"""Format the AWAITING section content (unanswered @mentions)."""
lines = ["## Awaiting Replies"]
if not mentions:
lines.append("- (none yet)")
return "\n".join(lines)
# Group by target
by_target: dict[str, list[str]] = {}
for mention in mentions:
to = mention.get("to", "unknown")
from_participant = _sanitize_name(mention.get("from"))
context = mention.get("context", "")
if to not in by_target:
by_target[to] = []
by_target[to].append(f"@{from_participant}: {context}")
for target, contexts in sorted(by_target.items()):
lines.append(f"\n### @{_sanitize_name(target)}")
for ctx in contexts:
lines.append(f"- {ctx}")
return "\n".join(lines)
def format_timeline_entry(participant: str, summary: str) -> str:
"""Format a single timeline entry."""
now = datetime.now().strftime("%Y-%m-%d %H:%M")
return f"- {now} @{_sanitize_name(participant)}: {summary}"
def append_timeline_entry(content: str, entry: str) -> str:
"""Append a new entry to the timeline section (most recent first)."""
pattern = r"(<!-- SUMMARY:TIMELINE START -->\s*## Timeline \(most recent first\)\s*)(.*?)(<!-- SUMMARY:TIMELINE END -->)"
def replacer(match):
header = match.group(1)
existing = match.group(2).strip()
footer = match.group(3)
# Remove placeholder if present
if existing.startswith("- <YYYY-MM-DD"):
existing = ""
# Add new entry at the top
if existing:
new_timeline = f"{entry}\n{existing}"
else:
new_timeline = entry
return f"{header}\n{new_timeline}\n{footer}"
return re.sub(pattern, replacer, content, flags=re.DOTALL)
def update_summary_file(
summary_path: Path,
votes: Mapping[str, str] | None = None,
questions: list[dict[str, Any]] | None = None,
action_items: list[dict[str, Any]] | None = None,
decisions: list[dict[str, Any]] | None = None,
mentions: list[dict[str, str]] | None = None,
participants: list[dict[str, Any]] | None = None,
tasks: list[dict[str, Any]] | None = None,
timeline_entry: str | None = None
) -> bool:
"""
Update a summary file with extracted information.
Returns True if successful, False otherwise.
"""
if not summary_path.exists():
sys.stderr.write(f"[summary] warning: {summary_path} does not exist\n")
return False
try:
content = summary_path.read_text(encoding="utf-8")
except OSError as e:
sys.stderr.write(f"[summary] error reading {summary_path}: {e}\n")
return False
state, content = load_summary_state(content)
if questions is not None:
state["questions"] = merge_questions(state.get("questions", []), questions)
if action_items is not None:
state["action_items"] = merge_action_items(state.get("action_items", []), action_items)
if decisions is not None:
state["decisions"] = merge_decisions(state.get("decisions", []), decisions)
if mentions is not None:
state["mentions"] = merge_mentions(state.get("mentions", []), mentions)
content = save_summary_state(content, state)
# Update each section that has new data
if votes is not None:
new_votes = format_votes_section(votes)
content = update_marker_block(content, "VOTES", new_votes)
if participants is not None:
new_participants = format_participants_section(participants)
content = update_marker_block(content, "PARTICIPANTS", new_participants)
if tasks is not None:
new_tasks = format_tasks_section(tasks)
content = update_marker_block(content, "TASKS", new_tasks)
new_questions = format_questions_section(state.get("questions", []))
content = update_marker_block(content, "OPEN_QUESTIONS", new_questions)
new_items = format_action_items_section(state.get("action_items", []))
content = update_marker_block(content, "ACTION_ITEMS", new_items)
new_decisions = format_decisions_section(state.get("decisions", []))
content = update_marker_block(content, "DECISIONS", new_decisions)
new_awaiting = format_awaiting_section(state.get("mentions", []))
content = update_marker_block(content, "AWAITING", new_awaiting)
if timeline_entry is not None:
content = append_timeline_entry(content, timeline_entry)
# Write back
try:
summary_path.write_text(content, encoding="utf-8")
return True
except OSError as e:
sys.stderr.write(f"[summary] error writing {summary_path}: {e}\n")
return False