orchestrated-discussions/src/discussions/cli.py

474 lines
15 KiB
Python

"""
CLI entry point for Orchestrated Discussions.
Provides commands for creating, managing, and running discussions.
"""
import argparse
import sys
from pathlib import Path
from . import __version__
def cmd_new(args) -> int:
"""Create a new discussion."""
from .discussion import Discussion
# Generate filename from title with .discussion.md extension
if args.output:
path = Path(args.output)
# Ensure .discussion.md extension
if not path.name.endswith('.discussion.md'):
if path.suffix == '.md':
path = path.with_suffix('.discussion.md')
else:
path = Path(str(path) + '.discussion.md')
else:
slug = args.title.lower().replace(" ", "-")
slug = "".join(c for c in slug if c.isalnum() or c == "-")
path = Path(f"{slug}.discussion.md")
if path.exists() and not args.force:
print(f"Error: {path} already exists. Use --force to overwrite.")
return 1
# Parse participants
participants = None
if args.participants:
participants = [p.strip() for p in args.participants.split(",")]
discussion = Discussion.create(
path=path,
title=args.title,
context=args.context or "",
template=args.template,
participants=participants,
)
print(f"Created: {path}")
print(f"Title: {discussion.title}")
print(f"Participants: {', '.join(discussion.participant_aliases)}")
return 0
def cmd_status(args) -> int:
"""Show discussion status."""
from .discussion import Discussion
from .voting import format_vote_details
path = Path(args.discussion)
if not path.exists():
print(f"Error: Discussion not found: {path}")
return 1
discussion = Discussion.load(path)
print(f"Discussion: {discussion.title}")
print(f"File: {discussion.path}")
print(f"Phase: {discussion.phase}")
print(f"Status: {discussion.status}")
print(f"Comments: {len(discussion.comments)}")
print()
votes = discussion.get_votes()
if votes:
print("Votes:")
print(format_vote_details(votes))
else:
print("Votes: (none yet)")
print()
consensus = discussion.check_consensus()
if consensus.reached:
print(f"Consensus: REACHED ({consensus.outcome})")
else:
print(f"Consensus: NOT REACHED - {consensus.reason}")
print()
questions = discussion.get_questions()
if questions:
print(f"Open Questions ({len(questions)}):")
for q in questions[:5]: # Show first 5
print(f" Q: {q.text} (@{q.author})")
if len(questions) > 5:
print(f" ... and {len(questions) - 5} more")
concerns = discussion.get_concerns()
if concerns:
print(f"\nConcerns ({len(concerns)}):")
for c in concerns[:5]:
print(f" CONCERN: {c.text} (@{c.author})")
return 0
def cmd_turn(args) -> int:
"""Run a discussion turn."""
from .runner import run_pipeline_turn
path = Path(args.discussion)
if not path.exists():
print(f"Error: Discussion not found: {path}")
return 1
# Parse participants (remove @ prefix if present)
participants = None
if args.participants:
participants = [p.lstrip("@") for p in args.participants]
print(f"Running turn on {path}...")
if participants:
print(f"Participants: {', '.join(participants)}")
result = run_pipeline_turn(
discussion_path=path,
participants=participants,
callout=args.callout or "",
provider=args.provider,
verbose=args.verbose,
templates_dir=Path(args.templates_dir) if hasattr(args, 'templates_dir') and args.templates_dir else None,
)
print()
print(f"Responses: {result.successful_count} successful, {result.skipped_count} skipped, {result.failed_count} failed")
for r in result.responses:
if r.success and r.comment:
vote_str = f" [{r.vote}]" if r.vote else ""
print(f" {r.name}{vote_str}")
elif r.success:
print(f" {r.name} - (no response)")
else:
print(f" {r.name} - ERROR: {r.error}")
# Show vote summary if available
if result.vote_summary:
print()
print(f"Votes: READY={result.vote_summary.get('READY', 0)} "
f"CHANGES={result.vote_summary.get('CHANGES', 0)} "
f"REJECT={result.vote_summary.get('REJECT', 0)}")
if result.consensus_reached:
print(f"Consensus: REACHED")
elif result.consensus_reason:
print(f"Consensus: {result.consensus_reason}")
if result.status_promoted:
print(f"Status promoted to: {result.new_status}")
if result.phase_advanced:
print(f"Phase advanced to: {result.new_phase}")
return 0
def cmd_comment(args) -> int:
"""Add a human comment to a discussion."""
from .discussion import Discussion
path = Path(args.discussion)
if not path.exists():
print(f"Error: Discussion not found: {path}")
return 1
discussion = Discussion.load(path)
# Get author name
author = args.author or "Human"
# Add comment
discussion.add_comment(
author=author,
text=args.text,
vote=args.vote.upper() if args.vote else None,
)
discussion.save()
vote_str = f" with vote {args.vote.upper()}" if args.vote else ""
print(f"Added comment from {author}{vote_str}")
return 0
def cmd_participants(args) -> int:
"""List available participants (discovered from ~/.smarttools/)."""
from .participant import get_registry
registry = get_registry()
voting = registry.get_voting()
background = registry.get_background()
if not voting and not background:
print("No participants found in ~/.smarttools/")
print("Install discussion SmartTools or create participants with:")
print(" discussions participants add <alias>")
return 0
if voting:
print("Voting Participants:")
for p in voting:
desc = f" - {p.description}" if p.description else ""
print(f" @{p.alias:15} {p.name:20}{desc}")
if background:
print("\nBackground Participants:")
for p in background:
desc = f" - {p.description}" if p.description else ""
print(f" @{p.alias:15} {p.name:20}{desc}")
return 0
def cmd_advance(args) -> int:
"""Advance discussion to next phase."""
from .discussion import Discussion
path = Path(args.discussion)
if not path.exists():
print(f"Error: Discussion not found: {path}")
return 1
discussion = Discussion.load(path)
old_phase = discussion.phase
if args.phase:
discussion.update_phase(args.phase)
else:
# TODO: Implement phase progression logic
print("Error: --phase required (automatic progression not yet implemented)")
return 1
discussion.save()
print(f"Advanced: {old_phase} -> {discussion.phase}")
return 0
def cmd_cleanup(args) -> int:
"""Find and optionally delete orphaned diagrams."""
from .discussion import Discussion
from .markers import extract_diagrams
import re
directory = Path(args.directory) if args.directory else Path.cwd()
if not directory.exists():
print(f"Error: Directory not found: {directory}")
return 1
# Find all discussion files
discussions = list(directory.glob("**/*.discussion.md"))
# Collect all referenced diagrams from all discussions
referenced = set()
for disc_path in discussions:
try:
d = Discussion.load(disc_path)
for comment in d.comments:
diagrams = extract_diagrams(comment.body)
for diagram in diagrams:
# Resolve relative to discussion file
abs_path = (disc_path.parent / diagram.path).resolve()
referenced.add(abs_path)
except Exception as e:
print(f"Warning: Could not parse {disc_path}: {e}")
# Find all diagram directories
diagram_dirs = set()
# Check for diagrams folders alongside discussions
for disc_path in discussions:
diagrams_dir = disc_path.parent / "diagrams"
if diagrams_dir.exists():
diagram_dirs.add(diagrams_dir)
# Also check top-level diagrams folder
top_diagrams = directory / "diagrams"
if top_diagrams.exists():
diagram_dirs.add(top_diagrams)
# Scan for any diagrams/ folders recursively (catches orphaned folders with no discussions)
for diagrams_dir in directory.glob("**/diagrams"):
if diagrams_dir.is_dir():
diagram_dirs.add(diagrams_dir)
if not diagram_dirs:
print("No diagram directories found")
return 0
# Find all diagram files
diagram_extensions = {'.puml', '.plantuml', '.mmd', '.mermaid', '.svg', '.png', '.scad'}
all_diagrams = set()
for diagrams_dir in diagram_dirs:
for ext in diagram_extensions:
for f in diagrams_dir.glob(f"*{ext}"):
all_diagrams.add(f.resolve())
# Find orphans
orphans = all_diagrams - referenced
if not orphans:
print(f"No orphaned diagrams found ({len(all_diagrams)} diagrams, all referenced)")
return 0
print(f"Found {len(orphans)} orphaned diagram(s) out of {len(all_diagrams)} total:\n")
# Group by directory for display
by_dir = {}
for orphan in sorted(orphans):
parent = orphan.parent
if parent not in by_dir:
by_dir[parent] = []
by_dir[parent].append(orphan)
for parent, files in sorted(by_dir.items()):
try:
rel_parent = parent.relative_to(directory)
except ValueError:
rel_parent = parent
print(f" {rel_parent}/")
for f in files:
size = f.stat().st_size
size_str = f"{size:,} bytes" if size < 1024 else f"{size/1024:.1f} KB"
print(f" {f.name} ({size_str})")
if args.delete:
print()
confirm = input(f"Delete {len(orphans)} orphaned file(s)? [y/N] ")
if confirm.lower() == 'y':
deleted = 0
for orphan in orphans:
try:
orphan.unlink()
deleted += 1
except Exception as e:
print(f" Error deleting {orphan.name}: {e}")
print(f"Deleted {deleted} file(s)")
else:
print("Cancelled")
else:
print(f"\nRun with --delete to remove these files")
return 0
def cmd_ui(args) -> int:
"""Launch the interactive UI (GUI by default, TUI with --tui)."""
# Determine if path is a file or directory
path = args.path
open_file = None
directory = None
if path:
from pathlib import Path
p = Path(path)
if p.is_file() and (p.name.endswith('.discussion.md') or p.suffix == '.md'):
# Open specific discussion file
open_file = str(p.resolve())
directory = str(p.parent)
else:
directory = path
if args.tui:
# Launch TUI (urwid-based)
try:
from .ui.tui import main as tui_main
except ImportError as e:
print("Error: TUI requires urwid. Install with: pip install urwid")
print(f"Details: {e}")
return 1
tui_main(directory, open_file=open_file)
else:
# Launch GUI (Dear PyGui-based)
try:
from .ui.gui import main as gui_main
except ImportError as e:
print(f"GUI unavailable ({e}), falling back to TUI")
print("Install Dear PyGui with: pip install dearpygui")
try:
from .ui.tui import main as tui_main
tui_main(directory, open_file=open_file)
except ImportError as e2:
print(f"TUI also unavailable: {e2}")
return 1
return 0
gui_main(directory, open_file=open_file)
return 0
def main(argv: list[str] = None) -> int:
"""Main CLI entry point."""
parser = argparse.ArgumentParser(
prog="discussions",
description="Multi-agent AI discussion orchestration"
)
parser.add_argument("--version", action="version", version=f"%(prog)s {__version__}")
subparsers = parser.add_subparsers(dest="command", help="Available commands")
# 'new' command
p_new = subparsers.add_parser("new", help="Create a new discussion")
p_new.add_argument("title", help="Discussion title")
p_new.add_argument("-o", "--output", help="Output file path")
p_new.add_argument("-t", "--template", default="feature", help="Template to use")
p_new.add_argument("-c", "--context", help="Initial context/description")
p_new.add_argument("-p", "--participants", help="Comma-separated participant aliases")
p_new.add_argument("-f", "--force", action="store_true", help="Overwrite existing")
p_new.set_defaults(func=cmd_new)
# 'status' command
p_status = subparsers.add_parser("status", help="Show discussion status")
p_status.add_argument("discussion", help="Discussion file path")
p_status.set_defaults(func=cmd_status)
# 'turn' command
p_turn = subparsers.add_parser("turn", help="Run a discussion turn")
p_turn.add_argument("discussion", help="Discussion file path")
p_turn.add_argument("participants", nargs="*", help="Participant aliases (e.g., @architect)")
p_turn.add_argument("--callout", "-c", help="Specific question/request")
p_turn.add_argument("--provider", "-p", help="Override AI provider")
p_turn.add_argument("--verbose", "-v", action="store_true", help="Verbose output")
p_turn.set_defaults(func=cmd_turn)
# 'comment' command
p_comment = subparsers.add_parser("comment", help="Add a human comment")
p_comment.add_argument("discussion", help="Discussion file path")
p_comment.add_argument("text", help="Comment text")
p_comment.add_argument("--vote", "-v", choices=["ready", "changes", "reject"], help="Cast a vote")
p_comment.add_argument("--author", "-a", help="Author name (default: Human)")
p_comment.set_defaults(func=cmd_comment)
# 'participants' command
p_parts = subparsers.add_parser("participants", help="List available participants")
p_parts.set_defaults(func=cmd_participants)
# 'advance' command
p_advance = subparsers.add_parser("advance", help="Advance to next phase")
p_advance.add_argument("discussion", help="Discussion file path")
p_advance.add_argument("--phase", help="Target phase ID")
p_advance.set_defaults(func=cmd_advance)
# 'cleanup' command
p_cleanup = subparsers.add_parser("cleanup", help="Find orphaned diagrams")
p_cleanup.add_argument("directory", nargs="?", help="Directory to scan (default: current)")
p_cleanup.add_argument("--delete", "-d", action="store_true", help="Delete orphaned files (with confirmation)")
p_cleanup.set_defaults(func=cmd_cleanup)
# 'ui' command
p_ui = subparsers.add_parser("ui", help="Launch interactive UI")
p_ui.add_argument("path", nargs="?", help="Discussion file (.md) or directory to browse")
p_ui.add_argument("--tui", action="store_true", help="Use terminal UI instead of graphical UI")
p_ui.set_defaults(func=cmd_ui)
args = parser.parse_args(argv)
if args.command is None:
parser.print_help()
return 0
return args.func(args)
if __name__ == "__main__":
sys.exit(main())