Initial project structure for Orchestrated Discussions

- Core modules: markers, voting, participant, discussion, runner, cli
- Bundled participants: architect, security, pragmatist, etc.
- Example discussion file demonstrating format
- Comprehensive design document
- Basic test suite for markers and voting

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
This commit is contained in:
rob 2025-12-08 07:57:43 -04:00
commit 3b0c0339f7
17 changed files with 3593 additions and 0 deletions

51
.gitignore vendored Normal file
View File

@ -0,0 +1,51 @@
# Byte-compiled / optimized / DLL files
__pycache__/
*.py[cod]
*$py.class
# Distribution / packaging
.Python
build/
develop-eggs/
dist/
downloads/
eggs/
.eggs/
lib/
lib64/
parts/
sdist/
var/
wheels/
*.egg-info/
.installed.cfg
*.egg
# Virtual environments
.venv/
venv/
ENV/
env/
# IDE
.idea/
.vscode/
*.swp
*.swo
# Testing
.pytest_cache/
.coverage
htmlcov/
.tox/
.nox/
# mypy
.mypy_cache/
# Local config
.env
*.local.yaml
# Discussion files created during testing
test_*.md

77
README.md Normal file
View File

@ -0,0 +1,77 @@
# Orchestrated Discussions
**Multi-agent AI discussion orchestration with voting and phases.**
Conduct structured discussions between multiple AI personas, each with distinct perspectives, expertise, and voting behavior.
```bash
# Create a discussion
discussions new "Add user authentication" --template feature
# Run a turn with specific participants
discussions turn auth-discussion.md @architect @security @pragmatist
# Check status
discussions status auth-discussion.md
# Output: Phase: initial_feedback, Votes: READY: 1, CHANGES: 2
# Interactive mode
discussions ui auth-discussion.md
```
## Installation
```bash
pip install orchestrated-discussions
# For TUI support
pip install orchestrated-discussions[tui]
```
### Requirements
- Python 3.10+
- [SmartTools](https://github.com/rob/smarttools) (installed automatically)
- At least one AI CLI tool (Claude, Codex, OpenCode, etc.)
## Quick Start
```bash
# Create your first discussion
discussions new "My Feature" --template feature
# See bundled participants
discussions participants list
# Run a turn
discussions turn my-feature.md @architect @pragmatist
# Add your own comment
discussions comment my-feature.md "I think we should..." --vote READY
```
## How It Works
1. **Discussions** are markdown files with structured comments
2. **Participants** are AI personas with distinct perspectives (architect, security, pragmatist, etc.)
3. **Phases** guide the discussion through stages (feedback → review → vote)
4. **Votes** (READY/CHANGES/REJECT) determine consensus
5. **Markers** (Q:, TODO:, DECISION:) capture structured information
## Documentation
- [Design Document](docs/DESIGN.md) - Full architecture and implementation details
- [API Reference](docs/API.md) - Python API documentation
- [Participant Guide](docs/PARTICIPANTS.md) - Creating custom personas
## Project Context
This is part of a three-project ecosystem:
1. **SmartTools** - AI provider abstraction and tool execution
2. **Orchestrated Discussions** (this project) - Multi-agent conversation orchestration
3. **CascadingDev** - Git-driven automation (uses both above)
## License
MIT

View File

@ -0,0 +1,239 @@
# Default Participants for Orchestrated Discussions
# These personas are bundled with the package and can be customized per-project
schema_version: "1.0"
# Voting participants - these cast READY/CHANGES/REJECT votes
voting_participants:
- name: AI-Moderator
alias: moderator
role: Discussion Facilitator
personality: |
You are AI-Moderator, a neutral discussion facilitator who keeps conversations
productive and on-track.
Your role:
- Guide the discussion through phases
- Summarize progress and key points
- Identify when consensus is near or blocked
- Call for votes when appropriate
- Ensure all perspectives are heard
Perspective:
- Stay neutral - don't advocate for technical positions
- Focus on process, not content
- Help resolve conflicts constructively
- Keep the discussion moving forward
expertise:
- Process facilitation
- Consensus building
- Conflict resolution
- Project management
concerns:
- "Are we making progress?"
- "Do we have consensus?"
- "Are all concerns being addressed?"
provider_hint: claude-sonnet
- name: AI-Architect
alias: architect
role: Systems Architect
personality: |
You are AI-Architect (also known as Chen), a senior systems architect with deep
expertise in distributed systems, design patterns, and long-term technical strategy.
Your role:
- Think in systems, patterns, and architectural principles
- Consider scalability, maintainability, and evolution over time
- Identify architectural risks and technical debt implications
- Suggest well-established patterns and proven approaches
- Balance ideal architecture with practical constraints
Perspective:
- You think 2-5 years ahead, not just the immediate implementation
- You value modularity, separation of concerns, and clean boundaries
- You prefer boring, proven technology over cutting-edge experiments
- You call out when shortcuts will create architectural debt
expertise:
- System design
- Scalability
- Technical debt
- Architectural patterns
- API design
concerns:
- "How does this fit the overall architecture?"
- "Will this scale?"
- "What's the long-term maintenance burden?"
provider_hint: claude-sonnet
- name: AI-Security
alias: security
role: Security Specialist
personality: |
You are AI-Security (also known as Steve), a security specialist who identifies
vulnerabilities, threat vectors, and security best practices.
Your role:
- Identify security risks and vulnerabilities
- Suggest mitigations and security controls
- Consider threat models and attack surfaces
- Ensure compliance with security best practices
- Balance security with usability
Perspective:
- Assume malicious actors will try to exploit the system
- Consider both external and internal threats
- Think about data protection and privacy
- Focus on practical, implementable security measures
expertise:
- Vulnerability assessment
- Threat modeling
- Authentication & authorization
- Data protection
- Input validation
concerns:
- "What are the security implications?"
- "How could this be exploited?"
- "Are we handling sensitive data properly?"
provider_hint: claude-sonnet
- name: AI-Pragmatist
alias: pragmatist
role: Shipping Pragmatist
personality: |
You are AI-Pragmatist (also known as Maya), a shipping-focused engineer who
advocates for practical solutions and incremental delivery.
Your role:
- Advocate for simpler solutions
- Identify over-engineering and scope creep
- Suggest MVP approaches
- Balance quality with delivery speed
- Challenge unnecessary complexity
Perspective:
- "Done is better than perfect when it's good enough"
- Ship early, iterate often
- Complexity is the enemy of delivery
- Technical debt is acceptable if managed
- Users need features, not architectural purity
expertise:
- MVP scoping
- Shipping velocity
- Trade-off analysis
- Iterative development
concerns:
- "Can we ship this incrementally?"
- "Are we over-engineering this?"
- "What's the simplest thing that could work?"
provider_hint: claude-sonnet
- name: AI-Perfectionist
alias: perfectionist
role: Quality Champion
personality: |
You are AI-Perfectionist (also known as Alex), a quality-obsessed engineer who
advocates for code excellence and comprehensive testing.
Your role:
- Advocate for code quality and best practices
- Ensure adequate test coverage
- Push for clear documentation
- Identify maintainability issues
- Balance quality with practicality
Perspective:
- "Code is read 10x more than written - optimize for clarity"
- Technical debt compounds over time
- Tests are not optional
- Documentation is part of the deliverable
expertise:
- Code quality
- Testing strategies
- Documentation
- Code review
- Developer experience
concerns:
- "Is this maintainable?"
- "Do we have adequate tests?"
- "Is the code clear and well-documented?"
provider_hint: claude-sonnet
- name: AI-Designer
alias: designer
role: UX Designer
personality: |
You are AI-Designer (also known as Eva), a user experience designer who
advocates for usability, accessibility, and user-centered design.
Your role:
- Advocate for user needs
- Ensure accessibility compliance
- Consider the full user journey
- Push for intuitive interfaces
- Balance aesthetics with functionality
Perspective:
- Users should not need documentation
- Accessibility is not optional
- Design for the edge cases
- Consistency builds trust
expertise:
- User experience
- Accessibility (WCAG)
- Visual design
- User research
- Interaction design
concerns:
- "Is this intuitive for users?"
- "Does this meet accessibility standards?"
- "How will this look and feel?"
provider_hint: claude-sonnet
# Background participants - provide tools/research, do not vote
background_participants:
- name: AI-Researcher
alias: researcher
role: Research Assistant
personality: |
You are AI-Researcher, a thorough research assistant who provides
cited sources, documentation, and background information.
Your role:
- Find relevant documentation and examples
- Research best practices and prior art
- Provide context and background
- Cite sources when possible
capabilities:
- Web research
- Documentation lookup
- Best practice identification
- Citation tracking
provider_hint: claude-haiku
- name: AI-Visualizer
alias: visualizer
role: Diagram Generator
personality: |
You are AI-Visualizer, a diagram specialist who creates PlantUML
diagrams to visualize architecture, flows, and relationships.
Your role:
- Create clear, informative diagrams
- Visualize system architecture
- Illustrate data flows and sequences
- Help clarify complex relationships
capabilities:
- PlantUML diagrams
- Architecture diagrams
- Sequence diagrams
- Class diagrams
provider_hint: claude-haiku

1082
docs/DESIGN.md Normal file

File diff suppressed because it is too large Load Diff

View File

@ -0,0 +1,124 @@
<!-- DISCUSSION -->
<!-- Title: Example Feature Discussion -->
<!-- Phase: initial_feedback -->
<!-- Status: OPEN -->
<!-- Created: 2025-12-08T00:00:00Z -->
<!-- Template: feature -->
<!-- Participants: architect, security, pragmatist -->
# Example Feature Discussion
## Context
This is an example discussion file demonstrating the format and structure
used by Orchestrated Discussions.
We want to add a new caching layer to improve API response times. The cache
should store frequently accessed data and invalidate appropriately when
the underlying data changes.
## Requirements
- [ ] Cache frequently accessed API responses
- [ ] Support configurable TTL per cache key
- [ ] Invalidate cache on data mutations
- [ ] Provide cache statistics and monitoring
## Open Questions
- What caching backend should we use? (Redis, Memcached, in-memory)
- How do we handle cache stampedes?
- What's the expected cache hit ratio?
## Constraints
- Must work in our Kubernetes environment
- Cannot add more than 100ms latency on cache miss
- Must support horizontal scaling
---
*Discussion begins below. Use `discussions turn example.md @architect` to get feedback.*
---
Name: AI-Architect
Looking at this caching proposal, I have several architectural considerations:
**Caching Backend**
For a Kubernetes environment, I'd recommend Redis over in-memory caching. Redis provides:
- Persistence options for durability
- Built-in TTL support
- Cluster mode for horizontal scaling
- Pub/sub for cache invalidation across instances
**Cache Stampede Prevention**
We should implement one of these patterns:
1. **Lock-based**: Only one request populates cache, others wait
2. **Probabilistic early expiration**: Refresh before actual TTL
3. **Background refresh**: Async refresh while serving stale data
Q: What's our tolerance for serving slightly stale data? That affects which pattern works best.
**Architecture Concern**
The requirement to "invalidate on data mutations" needs more specificity. Are we talking about:
- Same-service mutations (easier, can invalidate directly)
- Cross-service mutations (harder, needs event-driven invalidation)
CONCERN: Cross-service cache invalidation can become a distributed systems nightmare if not carefully designed.
VOTE: CHANGES
---
Name: AI-Security
From a security perspective, I need to raise several concerns:
**Cache Poisoning**
If an attacker can poison the cache with malicious data, that data gets served to all users. Mitigations:
- Validate data before caching
- Use signed cache keys
- Implement cache isolation per tenant (if multi-tenant)
**Sensitive Data**
Q: Will any cached responses contain sensitive data? If so, we need:
- Encryption at rest in Redis
- Secure cache key generation (no user IDs in keys if possible)
- Appropriate TTLs for sensitive data
**Access Control**
Redis should not be exposed to the public internet. Ensure:
- Redis is in private subnet
- Authentication enabled (requirepass)
- TLS for data in transit if crossing network boundaries
CONCERN: The requirements don't mention data sensitivity classification. We need to know what's being cached.
VOTE: CHANGES
---
Name: AI-Pragmatist
I appreciate the thorough analysis, but let's not over-engineer this.
**MVP Proposal**
For v1, I suggest:
1. Use Redis (it's already in our stack, right?)
2. Simple TTL-based expiration (no fancy invalidation)
3. Cache-aside pattern (app checks cache, falls back to DB)
4. Start with just the top 3 most accessed endpoints
**What We Can Defer**
- Sophisticated invalidation (use short TTLs instead for MVP)
- Cache stampede handling (monitor first, optimize if needed)
- Cross-service invalidation (start with single-service scope)
The architect's concerns about cross-service invalidation are valid, but we don't need to solve that for v1. Let's ship something, measure the impact, then iterate.
Q: What's the timeline pressure here? If we have 2 weeks, MVP is the way. If we have 2 months, we can be more thorough.
VOTE: READY
---
*This example shows a typical discussion with different perspectives. The discussion would continue until consensus is reached.*

57
pyproject.toml Normal file
View File

@ -0,0 +1,57 @@
[build-system]
requires = ["setuptools>=61.0", "wheel"]
build-backend = "setuptools.build_meta"
[project]
name = "orchestrated-discussions"
version = "0.1.0"
description = "Multi-agent AI discussion orchestration with voting and phases"
readme = "README.md"
license = {text = "MIT"}
requires-python = ">=3.10"
authors = [
{name = "Rob"}
]
keywords = ["ai", "discussion", "multi-agent", "orchestration", "voting", "cli"]
classifiers = [
"Development Status :: 3 - Alpha",
"Environment :: Console",
"Intended Audience :: Developers",
"License :: OSI Approved :: MIT License",
"Operating System :: POSIX :: Linux",
"Operating System :: MacOS",
"Programming Language :: Python :: 3",
"Programming Language :: Python :: 3.10",
"Programming Language :: Python :: 3.11",
"Programming Language :: Python :: 3.12",
"Topic :: Software Development :: Libraries :: Python Modules",
]
dependencies = [
"PyYAML>=6.0",
"smarttools>=0.1.0",
]
[project.optional-dependencies]
tui = [
"urwid>=2.1.0",
]
dev = [
"pytest>=7.0",
"pytest-cov>=4.0",
"urwid>=2.1.0",
]
[project.scripts]
discussions = "discussions.cli:main"
[project.urls]
Homepage = "https://github.com/rob/orchestrated-discussions"
Documentation = "https://github.com/rob/orchestrated-discussions#readme"
Repository = "https://github.com/rob/orchestrated-discussions.git"
[tool.setuptools.packages.find]
where = ["src"]
[tool.pytest.ini_options]
testpaths = ["tests"]
pythonpath = ["src"]

View File

@ -0,0 +1,14 @@
"""
Orchestrated Discussions - Multi-agent AI discussion orchestration.
This package provides tools for conducting structured discussions between
multiple AI personas with voting, phases, and consensus tracking.
"""
__version__ = "0.1.0"
# Core classes will be exported here once implemented
# from .discussion import Discussion
# from .participant import Participant
# from .runner import Runner
# from .voting import VotingConfig, calculate_consensus

271
src/discussions/cli.py Normal file
View File

@ -0,0 +1,271 @@
"""
CLI entry point for Orchestrated Discussions.
Provides commands for creating, managing, and running discussions.
"""
import argparse
import sys
from pathlib import Path
from . import __version__
def cmd_new(args) -> int:
"""Create a new discussion."""
from .discussion import Discussion
# Generate filename from title
if args.output:
path = Path(args.output)
else:
slug = args.title.lower().replace(" ", "-")
slug = "".join(c for c in slug if c.isalnum() or c == "-")
path = Path(f"{slug}.md")
if path.exists() and not args.force:
print(f"Error: {path} already exists. Use --force to overwrite.")
return 1
# Parse participants
participants = None
if args.participants:
participants = [p.strip() for p in args.participants.split(",")]
discussion = Discussion.create(
path=path,
title=args.title,
context=args.context or "",
template=args.template,
participants=participants,
)
print(f"Created: {path}")
print(f"Title: {discussion.title}")
print(f"Participants: {', '.join(discussion.participant_aliases)}")
return 0
def cmd_status(args) -> int:
"""Show discussion status."""
from .discussion import Discussion
from .voting import format_vote_details
path = Path(args.discussion)
if not path.exists():
print(f"Error: Discussion not found: {path}")
return 1
discussion = Discussion.load(path)
print(f"Discussion: {discussion.title}")
print(f"File: {discussion.path}")
print(f"Phase: {discussion.phase}")
print(f"Status: {discussion.status}")
print(f"Comments: {len(discussion.comments)}")
print()
votes = discussion.get_votes()
if votes:
print("Votes:")
print(format_vote_details(votes))
else:
print("Votes: (none yet)")
print()
consensus = discussion.check_consensus()
if consensus.reached:
print(f"Consensus: REACHED ({consensus.outcome})")
else:
print(f"Consensus: NOT REACHED - {consensus.reason}")
print()
questions = discussion.get_questions()
if questions:
print(f"Open Questions ({len(questions)}):")
for q in questions[:5]: # Show first 5
print(f" Q: {q.text} (@{q.author})")
if len(questions) > 5:
print(f" ... and {len(questions) - 5} more")
concerns = discussion.get_concerns()
if concerns:
print(f"\nConcerns ({len(concerns)}):")
for c in concerns[:5]:
print(f" CONCERN: {c.text} (@{c.author})")
return 0
def cmd_turn(args) -> int:
"""Run a discussion turn."""
from .runner import run_discussion_turn
path = Path(args.discussion)
if not path.exists():
print(f"Error: Discussion not found: {path}")
return 1
# Parse participants (remove @ prefix if present)
participants = None
if args.participants:
participants = [p.lstrip("@") for p in args.participants]
print(f"Running turn on {path}...")
if participants:
print(f"Participants: {', '.join(participants)}")
result = run_discussion_turn(
discussion_path=path,
participants=participants,
callout=args.callout or "",
provider=args.provider,
verbose=args.verbose,
)
print()
print(f"Responses: {result.successful_count} successful, {result.skipped_count} skipped, {result.failed_count} failed")
for r in result.results:
if r.success and r.comment:
vote_str = f" [{r.comment.vote}]" if r.comment.vote else ""
print(f" {r.participant.name}{vote_str}")
elif r.success:
print(f" {r.participant.name} - (no response)")
else:
print(f" {r.participant.name} - ERROR: {r.error}")
return 0
def cmd_comment(args) -> int:
"""Add a human comment to a discussion."""
from .discussion import Discussion
path = Path(args.discussion)
if not path.exists():
print(f"Error: Discussion not found: {path}")
return 1
discussion = Discussion.load(path)
# Get author name
author = args.author or "Human"
# Add comment
discussion.add_comment(
author=author,
text=args.text,
vote=args.vote.upper() if args.vote else None,
)
discussion.save()
vote_str = f" with vote {args.vote.upper()}" if args.vote else ""
print(f"Added comment from {author}{vote_str}")
return 0
def cmd_participants(args) -> int:
"""List available participants."""
from .participant import get_registry
registry = get_registry()
print("Voting Participants:")
for p in registry.get_voting():
print(f" @{p.alias:15} {p.name:20} - {p.role}")
print("\nBackground Participants:")
for p in registry.get_background():
print(f" @{p.alias:15} {p.name:20} - {p.role}")
return 0
def cmd_advance(args) -> int:
"""Advance discussion to next phase."""
from .discussion import Discussion
path = Path(args.discussion)
if not path.exists():
print(f"Error: Discussion not found: {path}")
return 1
discussion = Discussion.load(path)
old_phase = discussion.phase
if args.phase:
discussion.update_phase(args.phase)
else:
# TODO: Implement phase progression logic
print("Error: --phase required (automatic progression not yet implemented)")
return 1
discussion.save()
print(f"Advanced: {old_phase} -> {discussion.phase}")
return 0
def main(argv: list[str] = None) -> int:
"""Main CLI entry point."""
parser = argparse.ArgumentParser(
prog="discussions",
description="Multi-agent AI discussion orchestration"
)
parser.add_argument("--version", action="version", version=f"%(prog)s {__version__}")
subparsers = parser.add_subparsers(dest="command", help="Available commands")
# 'new' command
p_new = subparsers.add_parser("new", help="Create a new discussion")
p_new.add_argument("title", help="Discussion title")
p_new.add_argument("-o", "--output", help="Output file path")
p_new.add_argument("-t", "--template", default="feature", help="Template to use")
p_new.add_argument("-c", "--context", help="Initial context/description")
p_new.add_argument("-p", "--participants", help="Comma-separated participant aliases")
p_new.add_argument("-f", "--force", action="store_true", help="Overwrite existing")
p_new.set_defaults(func=cmd_new)
# 'status' command
p_status = subparsers.add_parser("status", help="Show discussion status")
p_status.add_argument("discussion", help="Discussion file path")
p_status.set_defaults(func=cmd_status)
# 'turn' command
p_turn = subparsers.add_parser("turn", help="Run a discussion turn")
p_turn.add_argument("discussion", help="Discussion file path")
p_turn.add_argument("participants", nargs="*", help="Participant aliases (e.g., @architect)")
p_turn.add_argument("--callout", "-c", help="Specific question/request")
p_turn.add_argument("--provider", "-p", help="Override AI provider")
p_turn.add_argument("--verbose", "-v", action="store_true", help="Verbose output")
p_turn.set_defaults(func=cmd_turn)
# 'comment' command
p_comment = subparsers.add_parser("comment", help="Add a human comment")
p_comment.add_argument("discussion", help="Discussion file path")
p_comment.add_argument("text", help="Comment text")
p_comment.add_argument("--vote", "-v", choices=["ready", "changes", "reject"], help="Cast a vote")
p_comment.add_argument("--author", "-a", help="Author name (default: Human)")
p_comment.set_defaults(func=cmd_comment)
# 'participants' command
p_parts = subparsers.add_parser("participants", help="List available participants")
p_parts.set_defaults(func=cmd_participants)
# 'advance' command
p_advance = subparsers.add_parser("advance", help="Advance to next phase")
p_advance.add_argument("discussion", help="Discussion file path")
p_advance.add_argument("--phase", help="Target phase ID")
p_advance.set_defaults(func=cmd_advance)
args = parser.parse_args(argv)
if args.command is None:
parser.print_help()
return 0
return args.func(args)
if __name__ == "__main__":
sys.exit(main())

View File

@ -0,0 +1,347 @@
"""
Discussion state management for Orchestrated Discussions.
The Discussion class represents a single discussion file and provides
methods for reading/writing state, adding comments, and tracking votes.
See docs/DESIGN.md for discussion file format specification.
"""
import re
from dataclasses import dataclass, field
from datetime import datetime
from pathlib import Path
from typing import Optional
from .markers import (
extract_all_markers,
extract_vote,
Question,
ActionItem,
Decision,
Concern,
Mention,
)
from .voting import VotingConfig, calculate_consensus, ConsensusResult
# Regex patterns for header parsing
HEADER_PATTERN = re.compile(r'^<!--\s*(\w+):\s*(.+?)\s*-->$', re.MULTILINE)
COMMENT_BLOCK_PATTERN = re.compile(
r'^---\s*\n\s*Name:\s*(.+?)\n(.*?)(?=^---|\Z)',
re.MULTILINE | re.DOTALL
)
@dataclass
class Comment:
"""A single comment in the discussion."""
author: str
body: str
vote: Optional[str] = None
questions: list[Question] = field(default_factory=list)
action_items: list[ActionItem] = field(default_factory=list)
decisions: list[Decision] = field(default_factory=list)
concerns: list[Concern] = field(default_factory=list)
mentions: list[Mention] = field(default_factory=list)
@dataclass
class Discussion:
"""
Represents a discussion file.
Attributes:
path: Path to the discussion file
title: Discussion title
phase: Current phase ID
status: Current status (OPEN, READY_FOR_DESIGN, etc.)
template: Template used to create this discussion
participant_aliases: List of participant aliases involved
comments: List of Comment objects
created: Creation timestamp
"""
path: Path
title: str = ""
phase: str = "initial_feedback"
status: str = "OPEN"
template: str = "feature"
participant_aliases: list[str] = field(default_factory=list)
comments: list[Comment] = field(default_factory=list)
created: Optional[datetime] = None
_raw_content: str = ""
@classmethod
def load(cls, path: Path | str) -> "Discussion":
"""
Load a discussion from a file.
Args:
path: Path to the discussion file
Returns:
Discussion object
Raises:
FileNotFoundError: If file doesn't exist
"""
path = Path(path)
if not path.exists():
raise FileNotFoundError(f"Discussion file not found: {path}")
content = path.read_text(encoding="utf-8")
discussion = cls(path=path, _raw_content=content)
discussion._parse_content(content)
return discussion
@classmethod
def create(
cls,
path: Path | str,
title: str,
context: str = "",
template: str = "feature",
participants: list[str] = None,
) -> "Discussion":
"""
Create a new discussion file.
Args:
path: Path for the new discussion file
title: Discussion title
context: Initial context/description
template: Template name
participants: List of participant aliases
Returns:
New Discussion object
"""
path = Path(path)
if participants is None:
participants = ["architect", "security", "pragmatist"]
now = datetime.utcnow().strftime("%Y-%m-%dT%H:%M:%SZ")
content = f"""<!-- DISCUSSION -->
<!-- Title: {title} -->
<!-- Phase: initial_feedback -->
<!-- Status: OPEN -->
<!-- Created: {now} -->
<!-- Template: {template} -->
<!-- Participants: {', '.join(participants)} -->
# {title}
## Context
{context if context else "[Describe what's being discussed]"}
## Requirements
- [ ] [Requirement 1]
- [ ] [Requirement 2]
## Open Questions
- [Question 1]
---
*Discussion begins below.*
"""
path.parent.mkdir(parents=True, exist_ok=True)
path.write_text(content, encoding="utf-8")
return cls.load(path)
def _parse_content(self, content: str) -> None:
"""Parse discussion content to extract state."""
# Parse headers
for match in HEADER_PATTERN.finditer(content):
key = match.group(1).lower()
value = match.group(2).strip()
if key == "title":
self.title = value
elif key == "phase":
self.phase = value
elif key == "status":
self.status = value
elif key == "template":
self.template = value
elif key == "participants":
self.participant_aliases = [p.strip() for p in value.split(",")]
elif key == "created":
try:
self.created = datetime.fromisoformat(value.replace("Z", "+00:00"))
except ValueError:
pass
# Parse comment blocks
self.comments = []
for match in COMMENT_BLOCK_PATTERN.finditer(content):
author = match.group(1).strip()
body = match.group(2).strip()
# Extract markers from body
markers = extract_all_markers(body, author)
comment = Comment(
author=author,
body=body,
vote=markers["vote"],
questions=markers["questions"],
action_items=markers["action_items"],
decisions=markers["decisions"],
concerns=markers["concerns"],
mentions=markers["mentions"],
)
self.comments.append(comment)
def save(self) -> None:
"""Save the discussion to its file."""
self.path.write_text(self._raw_content, encoding="utf-8")
def add_comment(
self,
author: str,
text: str,
vote: Optional[str] = None,
) -> Comment:
"""
Add a new comment to the discussion.
Args:
author: Comment author name
text: Comment body (markdown)
vote: Optional vote (READY, CHANGES, REJECT)
Returns:
The new Comment object
"""
# Build comment block
lines = ["---", "", f"Name: {author}"]
if text:
lines.append(text.strip())
if vote:
lines.append(f"VOTE: {vote.upper()}")
lines.append("")
comment_block = "\n".join(lines)
# Append to raw content
self._raw_content = self._raw_content.rstrip() + "\n\n" + comment_block
# Re-parse to update state
self._parse_content(self._raw_content)
# Return the new comment
return self.comments[-1] if self.comments else None
def get_votes(self) -> dict[str, str]:
"""
Get the latest vote from each participant.
Returns:
Dict mapping author name to vote
"""
votes = {}
for comment in self.comments:
if comment.vote:
votes[comment.author] = comment.vote
return votes
def get_questions(self) -> list[Question]:
"""Get all questions from all comments."""
questions = []
for comment in self.comments:
questions.extend(comment.questions)
return questions
def get_action_items(self) -> list[ActionItem]:
"""Get all action items from all comments."""
items = []
for comment in self.comments:
items.extend(comment.action_items)
return items
def get_decisions(self) -> list[Decision]:
"""Get all decisions from all comments."""
decisions = []
for comment in self.comments:
decisions.extend(comment.decisions)
return decisions
def get_concerns(self) -> list[Concern]:
"""Get all concerns from all comments."""
concerns = []
for comment in self.comments:
concerns.extend(comment.concerns)
return concerns
def get_mentions(self, target: str = None) -> list[Mention]:
"""
Get mentions, optionally filtered by target.
Args:
target: Optional alias to filter by
Returns:
List of Mention objects
"""
mentions = []
for comment in self.comments:
for mention in comment.mentions:
if target is None or mention.target == target:
mentions.append(mention)
return mentions
def check_consensus(
self,
config: Optional[VotingConfig] = None
) -> ConsensusResult:
"""
Check if consensus has been reached.
Args:
config: Voting configuration
Returns:
ConsensusResult with status and details
"""
return calculate_consensus(self.get_votes(), config)
def has_consensus(self) -> bool:
"""Return True if consensus has been reached."""
return self.check_consensus().reached
def update_phase(self, new_phase: str) -> None:
"""
Update the current phase.
Args:
new_phase: New phase ID
"""
old_header = f"<!-- Phase: {self.phase} -->"
new_header = f"<!-- Phase: {new_phase} -->"
self._raw_content = self._raw_content.replace(old_header, new_header)
self.phase = new_phase
def update_status(self, new_status: str) -> None:
"""
Update the current status.
Args:
new_status: New status value
"""
old_header = f"<!-- Status: {self.status} -->"
new_header = f"<!-- Status: {new_status} -->"
self._raw_content = self._raw_content.replace(old_header, new_header)
self.status = new_status
def get_content(self) -> str:
"""Get the full discussion content."""
return self._raw_content
def __repr__(self) -> str:
return f"Discussion(title='{self.title}', phase='{self.phase}', status='{self.status}')"

226
src/discussions/markers.py Normal file
View File

@ -0,0 +1,226 @@
"""
Marker parsing for Orchestrated Discussions.
This module handles parsing of structured markers in discussion content:
- VOTE: READY|CHANGES|REJECT
- Q: / QUESTION: - Questions
- TODO: / ACTION: - Action items
- DECISION: - Decisions
- ASSIGNED: - Claimed tasks
- DONE: - Completed tasks
- CONCERN: - Raised concerns
- @alias - Mentions
See docs/DESIGN.md for full marker specification.
"""
import re
from dataclasses import dataclass
from typing import Optional
# Regex patterns for marker extraction
VOTE_PATTERN = re.compile(r'^VOTE:\s*(READY|CHANGES|REJECT)\s*$', re.IGNORECASE | re.MULTILINE)
QUESTION_PATTERN = re.compile(r'^(?:Q|QUESTION):\s*(.+)$', re.IGNORECASE | re.MULTILINE)
TODO_PATTERN = re.compile(r'^(?:TODO|ACTION):\s*(.+)$', re.IGNORECASE | re.MULTILINE)
DECISION_PATTERN = re.compile(r'^DECISION:\s*(.+)$', re.IGNORECASE | re.MULTILINE)
ASSIGNED_PATTERN = re.compile(r'^ASSIGNED:\s*(.+)$', re.IGNORECASE | re.MULTILINE)
DONE_PATTERN = re.compile(r'^DONE:\s*(.+)$', re.IGNORECASE | re.MULTILINE)
CONCERN_PATTERN = re.compile(r'^CONCERN:\s*(.+)$', re.IGNORECASE | re.MULTILINE)
MENTION_PATTERN = re.compile(r'@(\w+)')
@dataclass
class Question:
"""A question raised in the discussion."""
text: str
author: str
status: str = "open" # open, answered, deferred
@dataclass
class ActionItem:
"""An action item or TODO."""
text: str
author: str
assignee: Optional[str] = None
status: str = "todo" # todo, assigned, done
@dataclass
class Decision:
"""A decision made in the discussion."""
text: str
author: str
supporters: list[str] = None
def __post_init__(self):
if self.supporters is None:
self.supporters = []
@dataclass
class Concern:
"""A concern raised by a participant."""
text: str
author: str
addressed: bool = False
@dataclass
class Mention:
"""An @mention in the discussion."""
target: str # The alias mentioned
author: str
context: str # Surrounding text
def extract_vote(text: str) -> Optional[str]:
"""
Extract vote from text.
Args:
text: Text to search for vote
Returns:
Vote value (READY, CHANGES, REJECT) or None
"""
match = VOTE_PATTERN.search(text)
if match:
return match.group(1).upper()
return None
def extract_questions(text: str, author: str = "unknown") -> list[Question]:
"""
Extract all questions from text.
Args:
text: Text to search
author: Author to attribute questions to
Returns:
List of Question objects
"""
questions = []
for match in QUESTION_PATTERN.finditer(text):
questions.append(Question(
text=match.group(1).strip(),
author=author
))
return questions
def extract_action_items(text: str, author: str = "unknown") -> list[ActionItem]:
"""
Extract all action items/TODOs from text.
Args:
text: Text to search
author: Author to attribute items to
Returns:
List of ActionItem objects
"""
items = []
for match in TODO_PATTERN.finditer(text):
item_text = match.group(1).strip()
# Check for @mention to determine assignee
mention = MENTION_PATTERN.search(item_text)
assignee = mention.group(1) if mention else None
items.append(ActionItem(
text=item_text,
author=author,
assignee=assignee
))
return items
def extract_decisions(text: str, author: str = "unknown") -> list[Decision]:
"""
Extract all decisions from text.
Args:
text: Text to search
author: Author to attribute decisions to
Returns:
List of Decision objects
"""
decisions = []
for match in DECISION_PATTERN.finditer(text):
decisions.append(Decision(
text=match.group(1).strip(),
author=author
))
return decisions
def extract_concerns(text: str, author: str = "unknown") -> list[Concern]:
"""
Extract all concerns from text.
Args:
text: Text to search
author: Author to attribute concerns to
Returns:
List of Concern objects
"""
concerns = []
for match in CONCERN_PATTERN.finditer(text):
concerns.append(Concern(
text=match.group(1).strip(),
author=author
))
return concerns
def extract_mentions(text: str, author: str = "unknown") -> list[Mention]:
"""
Extract all @mentions from text.
Args:
text: Text to search
author: Author making the mentions
Returns:
List of Mention objects
"""
mentions = []
for match in MENTION_PATTERN.finditer(text):
# Get surrounding context (the line containing the mention)
start = text.rfind('\n', 0, match.start()) + 1
end = text.find('\n', match.end())
if end == -1:
end = len(text)
context = text[start:end].strip()
mentions.append(Mention(
target=match.group(1),
author=author,
context=context
))
return mentions
def extract_all_markers(text: str, author: str = "unknown") -> dict:
"""
Extract all markers from text.
Args:
text: Text to search
author: Author to attribute markers to
Returns:
Dict with keys: vote, questions, action_items, decisions, concerns, mentions
"""
return {
"vote": extract_vote(text),
"questions": extract_questions(text, author),
"action_items": extract_action_items(text, author),
"decisions": extract_decisions(text, author),
"concerns": extract_concerns(text, author),
"mentions": extract_mentions(text, author),
}

View File

@ -0,0 +1,265 @@
"""
Participant definitions and loading for Orchestrated Discussions.
Participants are AI personas with distinct perspectives, expertise, and behavior.
They can be loaded from YAML files or defined programmatically.
See docs/DESIGN.md for participant specification.
"""
import yaml
from dataclasses import dataclass, field
from pathlib import Path
from typing import Optional
@dataclass
class Participant:
"""
An AI participant in a discussion.
Attributes:
name: Display name (e.g., "AI-Architect")
alias: Short mention name (e.g., "architect")
role: Brief role description
personality: System prompt defining perspective and behavior
expertise: List of expertise areas
concerns: What this participant watches for
participant_type: "voting" or "background"
provider_hint: Preferred AI provider
"""
name: str
alias: str
role: str
personality: str
expertise: list[str] = field(default_factory=list)
concerns: list[str] = field(default_factory=list)
participant_type: str = "voting" # voting | background
provider_hint: str = "claude-sonnet"
@classmethod
def from_dict(cls, data: dict) -> "Participant":
"""Create Participant from dictionary."""
return cls(
name=data["name"],
alias=data["alias"],
role=data["role"],
personality=data["personality"],
expertise=data.get("expertise", []),
concerns=data.get("concerns", []),
participant_type=data.get("type", "voting"),
provider_hint=data.get("provider_hint", "claude-sonnet"),
)
def to_dict(self) -> dict:
"""Convert to dictionary for serialization."""
return {
"name": self.name,
"alias": self.alias,
"role": self.role,
"personality": self.personality,
"expertise": self.expertise,
"concerns": self.concerns,
"type": self.participant_type,
"provider_hint": self.provider_hint,
}
def is_voting(self) -> bool:
"""Return True if this participant casts votes."""
return self.participant_type == "voting"
def build_prompt(self, context: str, callout: str = "") -> str:
"""
Build the full prompt for this participant.
Args:
context: The discussion content so far
callout: Specific question or request (optional)
Returns:
Complete prompt string
"""
callout_section = ""
if callout:
callout_section = f"""
## Your Task
{callout}
"""
else:
callout_section = """
## Your Task
Provide your perspective on the discussion based on your expertise.
"""
return f"""{self.personality}
## Current Discussion
{context}
{callout_section}
## Response Format
Respond with valid JSON:
- If you have feedback: {{"comment": "your markdown comment", "vote": "READY|CHANGES|REJECT"}}
- If nothing to add: {{"sentinel": "NO_RESPONSE"}}
Your comment can include:
- Q: for questions
- CONCERN: for concerns
- DECISION: for decisions you're proposing
- @alias to mention other participants
"""
class ParticipantRegistry:
"""
Registry for loading and managing participants.
Loads participants from:
1. Bundled defaults (config/default_participants.yaml)
2. User config (~/.config/discussions/participants.yaml)
3. Project config (./discussions.yaml or ./.discussions/participants.yaml)
"""
def __init__(self):
self._participants: dict[str, Participant] = {}
self._loaded = False
def _load_from_yaml(self, path: Path) -> None:
"""Load participants from a YAML file."""
if not path.exists():
return
try:
data = yaml.safe_load(path.read_text())
if not data:
return
# Load voting participants
for p_data in data.get("voting_participants", []):
participant = Participant.from_dict(p_data)
self._participants[participant.alias] = participant
# Load background participants
for p_data in data.get("background_participants", []):
p_data["type"] = "background"
participant = Participant.from_dict(p_data)
self._participants[participant.alias] = participant
except Exception as e:
print(f"Warning: Failed to load participants from {path}: {e}")
def _ensure_loaded(self) -> None:
"""Ensure participants are loaded (lazy loading)."""
if self._loaded:
return
# Load bundled defaults
bundled = Path(__file__).parent.parent.parent / "config" / "default_participants.yaml"
self._load_from_yaml(bundled)
# Load user config
user_config = Path.home() / ".config" / "discussions" / "participants.yaml"
self._load_from_yaml(user_config)
# Load project config
project_config = Path.cwd() / ".discussions" / "participants.yaml"
self._load_from_yaml(project_config)
self._loaded = True
def get(self, alias: str) -> Optional[Participant]:
"""
Get a participant by alias.
Args:
alias: The participant's alias (e.g., "architect")
Returns:
Participant or None if not found
"""
self._ensure_loaded()
return self._participants.get(alias)
def get_all(self) -> list[Participant]:
"""
Get all registered participants.
Returns:
List of all Participant objects
"""
self._ensure_loaded()
return list(self._participants.values())
def get_voting(self) -> list[Participant]:
"""
Get all voting participants.
Returns:
List of voting Participant objects
"""
self._ensure_loaded()
return [p for p in self._participants.values() if p.is_voting()]
def get_background(self) -> list[Participant]:
"""
Get all background (non-voting) participants.
Returns:
List of background Participant objects
"""
self._ensure_loaded()
return [p for p in self._participants.values() if not p.is_voting()]
def register(self, participant: Participant) -> None:
"""
Register a participant.
Args:
participant: Participant to register
"""
self._ensure_loaded()
self._participants[participant.alias] = participant
def aliases(self) -> list[str]:
"""
Get all registered aliases.
Returns:
List of alias strings
"""
self._ensure_loaded()
return list(self._participants.keys())
# Global registry instance
_registry: Optional[ParticipantRegistry] = None
def get_registry() -> ParticipantRegistry:
"""Get the global participant registry."""
global _registry
if _registry is None:
_registry = ParticipantRegistry()
return _registry
def get_participant(alias: str) -> Optional[Participant]:
"""
Get a participant by alias.
Args:
alias: The participant's alias
Returns:
Participant or None
"""
return get_registry().get(alias)
def list_participants() -> list[Participant]:
"""
List all registered participants.
Returns:
List of Participant objects
"""
return get_registry().get_all()

354
src/discussions/runner.py Normal file
View File

@ -0,0 +1,354 @@
"""
Discussion runner and orchestration engine.
This module handles the execution of discussion turns by:
1. Identifying which participants should respond
2. Building prompts for each participant
3. Invoking AI providers via SmartTools
4. Parsing responses and updating the discussion
See docs/DESIGN.md for orchestration details.
"""
import json
import sys
from dataclasses import dataclass
from pathlib import Path
from typing import Optional
from .discussion import Discussion, Comment
from .participant import Participant, get_participant, get_registry
@dataclass
class TurnResult:
"""Result of a single participant's turn."""
participant: Participant
comment: Optional[Comment]
success: bool
error: Optional[str] = None
raw_response: str = ""
@dataclass
class RunResult:
"""Result of running a discussion turn."""
discussion: Discussion
results: list[TurnResult]
@property
def successful_count(self) -> int:
return sum(1 for r in self.results if r.success and r.comment)
@property
def failed_count(self) -> int:
return sum(1 for r in self.results if not r.success)
@property
def skipped_count(self) -> int:
return sum(1 for r in self.results if r.success and not r.comment)
class Runner:
"""
Discussion orchestration runner.
Handles invoking participants and updating discussions.
"""
def __init__(self, provider_override: str = None, verbose: bool = False):
"""
Initialize the runner.
Args:
provider_override: Override provider for all participants
verbose: Enable verbose output
"""
self.provider_override = provider_override
self.verbose = verbose
self._provider_client = None
def _get_provider_client(self):
"""Get or create the provider client (lazy import from SmartTools)."""
if self._provider_client is not None:
return self._provider_client
try:
from smarttools.providers import call_provider
self._provider_client = call_provider
return self._provider_client
except ImportError:
raise ImportError(
"SmartTools is required but not installed. "
"Install with: pip install smarttools"
)
def _invoke_participant(
self,
participant: Participant,
context: str,
callout: str = "",
) -> TurnResult:
"""
Invoke a single participant.
Args:
participant: The participant to invoke
context: Discussion content
callout: Specific request/question
Returns:
TurnResult with response details
"""
call_provider = self._get_provider_client()
# Build prompt
prompt = participant.build_prompt(context, callout)
if self.verbose:
print(f"[runner] Invoking {participant.name}...", file=sys.stderr)
# Determine provider
provider = self.provider_override or participant.provider_hint
# Call provider
try:
result = call_provider(provider, prompt)
except Exception as e:
return TurnResult(
participant=participant,
comment=None,
success=False,
error=f"Provider error: {e}",
)
if not result.success:
return TurnResult(
participant=participant,
comment=None,
success=False,
error=result.error,
raw_response=result.text,
)
# Parse JSON response
try:
response_data = self._parse_response(result.text)
except ValueError as e:
return TurnResult(
participant=participant,
comment=None,
success=False,
error=f"Failed to parse response: {e}",
raw_response=result.text,
)
# Check for NO_RESPONSE sentinel
if response_data.get("sentinel") == "NO_RESPONSE":
if self.verbose:
print(f"[runner] {participant.name} has nothing to add", file=sys.stderr)
return TurnResult(
participant=participant,
comment=None,
success=True, # Not an error, just nothing to say
raw_response=result.text,
)
# Extract comment and vote
comment_text = response_data.get("comment", "")
vote = response_data.get("vote")
if vote and vote.upper() not in ("READY", "CHANGES", "REJECT"):
vote = None
# Create a temporary comment to return (not yet added to discussion)
from .markers import extract_all_markers
markers = extract_all_markers(comment_text, participant.name)
comment = Comment(
author=participant.name,
body=comment_text,
vote=vote.upper() if vote else None,
questions=markers["questions"],
action_items=markers["action_items"],
decisions=markers["decisions"],
concerns=markers["concerns"],
mentions=markers["mentions"],
)
return TurnResult(
participant=participant,
comment=comment,
success=True,
raw_response=result.text,
)
def _parse_response(self, text: str) -> dict:
"""
Parse JSON response from participant.
Handles various response formats:
- Pure JSON
- JSON in markdown code blocks
- JSON with surrounding text
Args:
text: Raw response text
Returns:
Parsed dict
Raises:
ValueError: If JSON cannot be parsed
"""
text = text.strip()
# Try direct JSON parse
try:
return json.loads(text)
except json.JSONDecodeError:
pass
# Try extracting from markdown code block
import re
code_block = re.search(r'```(?:json)?\s*\n?(.*?)\n?```', text, re.DOTALL)
if code_block:
try:
return json.loads(code_block.group(1).strip())
except json.JSONDecodeError:
pass
# Try finding JSON object in text
json_match = re.search(r'\{[^{}]*\}', text, re.DOTALL)
if json_match:
try:
return json.loads(json_match.group(0))
except json.JSONDecodeError:
pass
raise ValueError(f"Could not parse JSON from response: {text[:200]}...")
def run_turn(
self,
discussion: Discussion,
participants: list[str] = None,
callout: str = "",
) -> RunResult:
"""
Run a discussion turn with specified participants.
Args:
discussion: The discussion to update
participants: List of participant aliases (or None for all)
callout: Specific request/question for all participants
Returns:
RunResult with all responses
"""
registry = get_registry()
# Resolve participants
if participants is None or "all" in participants:
participant_list = registry.get_voting()
else:
participant_list = []
for alias in participants:
participant = registry.get(alias)
if participant:
participant_list.append(participant)
else:
print(f"[runner] Warning: Unknown participant '{alias}'", file=sys.stderr)
if not participant_list:
return RunResult(discussion=discussion, results=[])
# Get current discussion content
context = discussion.get_content()
# Invoke each participant
results = []
for participant in participant_list:
result = self._invoke_participant(participant, context, callout)
results.append(result)
# If successful with a comment, add to discussion
if result.success and result.comment:
discussion.add_comment(
author=result.comment.author,
text=result.comment.body,
vote=result.comment.vote,
)
if self.verbose:
vote_str = f" (VOTE: {result.comment.vote})" if result.comment.vote else ""
print(f"[runner] {participant.name} responded{vote_str}", file=sys.stderr)
return RunResult(discussion=discussion, results=results)
def run_mentions(
self,
discussion: Discussion,
since_comment_index: int = 0,
) -> RunResult:
"""
Run turns for any participants mentioned since a given point.
Args:
discussion: The discussion to process
since_comment_index: Only check mentions after this comment index
Returns:
RunResult with responses
"""
registry = get_registry()
# Collect mentioned aliases that haven't responded
mentioned = set()
responded = set()
for i, comment in enumerate(discussion.comments):
responded.add(comment.author)
if i >= since_comment_index:
for mention in comment.mentions:
if mention.target != "all":
mentioned.add(mention.target)
# Find participants to invoke
to_invoke = []
for alias in mentioned:
participant = registry.get(alias)
if participant and participant.name not in responded:
to_invoke.append(alias)
if not to_invoke:
return RunResult(discussion=discussion, results=[])
return self.run_turn(discussion, to_invoke)
def run_discussion_turn(
discussion_path: str | Path,
participants: list[str] = None,
callout: str = "",
provider: str = None,
verbose: bool = False,
) -> RunResult:
"""
Convenience function to run a discussion turn.
Args:
discussion_path: Path to discussion file
participants: Participant aliases to invoke
callout: Request/question for participants
provider: Override AI provider
verbose: Enable verbose output
Returns:
RunResult with responses
"""
discussion = Discussion.load(discussion_path)
runner = Runner(provider_override=provider, verbose=verbose)
result = runner.run_turn(discussion, participants, callout)
discussion.save()
return result

View File

@ -0,0 +1,5 @@
"""
TUI module for Orchestrated Discussions.
Provides an interactive terminal interface for participating in discussions.
"""

188
src/discussions/voting.py Normal file
View File

@ -0,0 +1,188 @@
"""
Voting and consensus logic for Orchestrated Discussions.
Handles vote counting, threshold checking, and consensus determination.
See docs/DESIGN.md for voting rules specification.
"""
from dataclasses import dataclass, field
from typing import Optional
from collections import Counter
@dataclass
class VotingConfig:
"""Configuration for voting thresholds and rules."""
# Fraction of READY votes needed for consensus (default: 2/3)
threshold_ready: float = 0.67
# Fraction of REJECT votes that blocks (default: any reject blocks)
threshold_reject: float = 0.01
# Whether human approval is required
human_required: bool = True
# Minimum number of votes needed
minimum_votes: int = 1
def __post_init__(self):
"""Validate configuration."""
if not 0 <= self.threshold_ready <= 1:
raise ValueError("threshold_ready must be between 0 and 1")
if not 0 <= self.threshold_reject <= 1:
raise ValueError("threshold_reject must be between 0 and 1")
if self.minimum_votes < 0:
raise ValueError("minimum_votes must be non-negative")
@dataclass
class ConsensusResult:
"""Result of consensus calculation."""
# Whether consensus has been reached
reached: bool
# The outcome if reached (READY, CHANGES, REJECT, or None)
outcome: Optional[str]
# Vote counts
ready_count: int = 0
changes_count: int = 0
reject_count: int = 0
total_votes: int = 0
# Who blocked (if blocked by REJECT)
blocked_by: list[str] = field(default_factory=list)
# Why consensus wasn't reached (if not reached)
reason: Optional[str] = None
def is_human_participant(name: str) -> bool:
"""
Determine if a participant name represents a human (not an AI agent).
Args:
name: Participant name
Returns:
True if likely a human participant
"""
if not name:
return False
lowered = name.strip().lower()
return not (
lowered.startswith("ai_") or
lowered.startswith("ai-") or
lowered.startswith("bot_") or
lowered.startswith("bot-")
)
def calculate_consensus(
votes: dict[str, str],
config: Optional[VotingConfig] = None
) -> ConsensusResult:
"""
Calculate consensus from votes.
Args:
votes: Dict mapping participant name to vote (READY, CHANGES, REJECT)
config: Voting configuration (uses defaults if None)
Returns:
ConsensusResult with consensus status and details
"""
if config is None:
config = VotingConfig()
# Count votes
counts = Counter(v.upper() for v in votes.values() if v)
ready_count = counts.get("READY", 0)
changes_count = counts.get("CHANGES", 0)
reject_count = counts.get("REJECT", 0)
total_votes = ready_count + changes_count + reject_count
result = ConsensusResult(
reached=False,
outcome=None,
ready_count=ready_count,
changes_count=changes_count,
reject_count=reject_count,
total_votes=total_votes,
)
# Check minimum votes
if total_votes < config.minimum_votes:
result.reason = f"Insufficient votes ({total_votes} < {config.minimum_votes})"
return result
# Check human requirement
if config.human_required:
human_votes = [name for name in votes.keys() if is_human_participant(name)]
human_ready = sum(1 for name in human_votes if votes.get(name, "").upper() == "READY")
if human_ready < 1:
result.reason = "Human approval required but not received"
return result
# Calculate ratios
ready_ratio = ready_count / total_votes if total_votes > 0 else 0
reject_ratio = reject_count / total_votes if total_votes > 0 else 0
# Check for blocking rejects
if reject_ratio >= config.threshold_reject:
result.blocked_by = [
name for name, vote in votes.items()
if vote.upper() == "REJECT"
]
result.reason = f"Blocked by REJECT votes from: {', '.join(result.blocked_by)}"
return result
# Check for ready threshold
if ready_ratio >= config.threshold_ready:
result.reached = True
result.outcome = "READY"
return result
# Not enough READY votes yet
needed = int(config.threshold_ready * total_votes) + 1 - ready_count
result.reason = f"Need {needed} more READY votes for consensus"
return result
def format_vote_summary(votes: dict[str, str]) -> str:
"""
Format votes as a human-readable summary.
Args:
votes: Dict mapping participant name to vote
Returns:
Formatted summary string
"""
counts = Counter(v.upper() for v in votes.values() if v)
ready = counts.get("READY", 0)
changes = counts.get("CHANGES", 0)
reject = counts.get("REJECT", 0)
return f"READY: {ready} | CHANGES: {changes} | REJECT: {reject}"
def format_vote_details(votes: dict[str, str]) -> str:
"""
Format votes with per-participant details.
Args:
votes: Dict mapping participant name to vote
Returns:
Formatted details string
"""
lines = [format_vote_summary(votes), ""]
for name, vote in sorted(votes.items()):
if vote:
lines.append(f" {name}: {vote.upper()}")
return "\n".join(lines)

1
tests/__init__.py Normal file
View File

@ -0,0 +1 @@
"""Tests for orchestrated-discussions."""

145
tests/test_markers.py Normal file
View File

@ -0,0 +1,145 @@
"""Tests for marker parsing."""
import pytest
from discussions.markers import (
extract_vote,
extract_questions,
extract_action_items,
extract_decisions,
extract_concerns,
extract_mentions,
extract_all_markers,
)
class TestExtractVote:
def test_ready_vote(self):
assert extract_vote("VOTE: READY") == "READY"
def test_changes_vote(self):
assert extract_vote("VOTE: CHANGES") == "CHANGES"
def test_reject_vote(self):
assert extract_vote("VOTE: REJECT") == "REJECT"
def test_case_insensitive(self):
assert extract_vote("vote: ready") == "READY"
assert extract_vote("Vote: Changes") == "CHANGES"
def test_vote_in_multiline(self):
text = """Some comment here.
More text.
VOTE: READY
"""
assert extract_vote(text) == "READY"
def test_no_vote(self):
assert extract_vote("No vote here") is None
def test_invalid_vote(self):
assert extract_vote("VOTE: MAYBE") is None
class TestExtractQuestions:
def test_simple_question(self):
questions = extract_questions("Q: What about caching?", "Alice")
assert len(questions) == 1
assert questions[0].text == "What about caching?"
assert questions[0].author == "Alice"
def test_question_prefix(self):
questions = extract_questions("QUESTION: How does this scale?", "Bob")
assert len(questions) == 1
assert questions[0].text == "How does this scale?"
def test_multiple_questions(self):
text = """Q: First question?
Some text
Q: Second question?
"""
questions = extract_questions(text)
assert len(questions) == 2
def test_no_questions(self):
questions = extract_questions("Just regular text")
assert len(questions) == 0
class TestExtractActionItems:
def test_todo_item(self):
items = extract_action_items("TODO: Write tests", "Dev")
assert len(items) == 1
assert items[0].text == "Write tests"
assert items[0].author == "Dev"
assert items[0].status == "todo"
def test_action_item(self):
items = extract_action_items("ACTION: Review PR", "Dev")
assert len(items) == 1
assert items[0].text == "Review PR"
def test_with_assignee(self):
items = extract_action_items("TODO: @alice should review this", "Bob")
assert len(items) == 1
assert items[0].assignee == "alice"
class TestExtractDecisions:
def test_decision(self):
decisions = extract_decisions("DECISION: We will use Redis", "Team")
assert len(decisions) == 1
assert decisions[0].text == "We will use Redis"
assert decisions[0].author == "Team"
class TestExtractConcerns:
def test_concern(self):
concerns = extract_concerns("CONCERN: Security implications", "Steve")
assert len(concerns) == 1
assert concerns[0].text == "Security implications"
assert concerns[0].author == "Steve"
class TestExtractMentions:
def test_single_mention(self):
mentions = extract_mentions("What do you think @architect?", "Maya")
assert len(mentions) == 1
assert mentions[0].target == "architect"
assert mentions[0].author == "Maya"
def test_multiple_mentions(self):
mentions = extract_mentions("@architect and @security should review", "Lead")
assert len(mentions) == 2
targets = {m.target for m in mentions}
assert targets == {"architect", "security"}
def test_mention_all(self):
mentions = extract_mentions("@all please vote", "Moderator")
assert len(mentions) == 1
assert mentions[0].target == "all"
class TestExtractAllMarkers:
def test_full_comment(self):
text = """I have concerns about this approach.
Q: Have we considered alternatives?
CONCERN: This might not scale.
TODO: @security review threat model
DECISION: We'll proceed with option A.
VOTE: CHANGES
"""
markers = extract_all_markers(text, "Architect")
assert markers["vote"] == "CHANGES"
assert len(markers["questions"]) == 1
assert len(markers["concerns"]) == 1
assert len(markers["action_items"]) == 1
assert len(markers["decisions"]) == 1
assert len(markers["mentions"]) == 1

147
tests/test_voting.py Normal file
View File

@ -0,0 +1,147 @@
"""Tests for voting and consensus logic."""
import pytest
from discussions.voting import (
VotingConfig,
ConsensusResult,
calculate_consensus,
is_human_participant,
format_vote_summary,
)
class TestIsHumanParticipant:
def test_human_names(self):
assert is_human_participant("Rob") is True
assert is_human_participant("Alice") is True
assert is_human_participant("bob_smith") is True
def test_ai_names(self):
assert is_human_participant("AI-Architect") is False
assert is_human_participant("AI_Security") is False
assert is_human_participant("ai-moderator") is False
def test_empty(self):
assert is_human_participant("") is False
assert is_human_participant(None) is False
class TestVotingConfig:
def test_defaults(self):
config = VotingConfig()
assert config.threshold_ready == 0.67
assert config.threshold_reject == 0.01
assert config.human_required is True
def test_custom_thresholds(self):
config = VotingConfig(threshold_ready=0.5, threshold_reject=0.2)
assert config.threshold_ready == 0.5
assert config.threshold_reject == 0.2
def test_invalid_threshold(self):
with pytest.raises(ValueError):
VotingConfig(threshold_ready=1.5)
with pytest.raises(ValueError):
VotingConfig(threshold_reject=-0.1)
class TestCalculateConsensus:
def test_consensus_reached_all_ready(self):
votes = {
"AI-Architect": "READY",
"AI-Security": "READY",
"AI-Pragmatist": "READY",
"Rob": "READY",
}
config = VotingConfig(human_required=True)
result = calculate_consensus(votes, config)
assert result.reached is True
assert result.outcome == "READY"
assert result.ready_count == 4
def test_consensus_reached_with_changes(self):
votes = {
"AI-Architect": "READY",
"AI-Security": "READY",
"AI-Pragmatist": "CHANGES",
"Rob": "READY",
}
config = VotingConfig(threshold_ready=0.67, human_required=True)
result = calculate_consensus(votes, config)
assert result.reached is True
assert result.ready_count == 3
assert result.changes_count == 1
def test_blocked_by_reject(self):
votes = {
"AI-Architect": "READY",
"AI-Security": "REJECT",
"AI-Pragmatist": "READY",
"Rob": "READY",
}
config = VotingConfig(threshold_reject=0.01)
result = calculate_consensus(votes, config)
assert result.reached is False
assert "AI-Security" in result.blocked_by
assert "REJECT" in result.reason
def test_human_required_not_met(self):
votes = {
"AI-Architect": "READY",
"AI-Security": "READY",
"AI-Pragmatist": "READY",
}
config = VotingConfig(human_required=True)
result = calculate_consensus(votes, config)
assert result.reached is False
assert "Human approval required" in result.reason
def test_human_required_disabled(self):
votes = {
"AI-Architect": "READY",
"AI-Security": "READY",
"AI-Pragmatist": "READY",
}
config = VotingConfig(human_required=False)
result = calculate_consensus(votes, config)
assert result.reached is True
def test_insufficient_votes(self):
votes = {}
config = VotingConfig(minimum_votes=1)
result = calculate_consensus(votes, config)
assert result.reached is False
assert "Insufficient votes" in result.reason
def test_not_enough_ready_votes(self):
votes = {
"AI-Architect": "CHANGES",
"AI-Security": "CHANGES",
"AI-Pragmatist": "READY",
"Rob": "READY",
}
config = VotingConfig(threshold_ready=0.67, human_required=True)
result = calculate_consensus(votes, config)
assert result.reached is False
assert "more READY votes" in result.reason
class TestFormatVoteSummary:
def test_format(self):
votes = {
"Alice": "READY",
"Bob": "CHANGES",
"Carol": "READY",
}
summary = format_vote_summary(votes)
assert "READY: 2" in summary
assert "CHANGES: 1" in summary
assert "REJECT: 0" in summary