CmdForge/src/cmdforge/ui_urwid/__init__.py

1903 lines
73 KiB
Python

"""BIOS-style TUI for CmdForge using urwid (with mouse support)."""
import urwid
from typing import Optional, Callable
from ..tool import (
Tool, ToolArgument, PromptStep, CodeStep,
list_tools, load_tool, save_tool, delete_tool, tool_exists, validate_tool_name,
get_tools_dir, DEFAULT_CATEGORIES
)
from ..providers import Provider, load_providers, add_provider, delete_provider, get_provider
from ..registry_client import RegistryClient, RegistryError
from .palette import PALETTE
from .widgets import (
SelectableText, Button3D, Button3DCompact, ClickableButton,
SelectableToolItem, ToolListBox, TabCyclePile, TabPassEdit,
UndoableEdit, DOSScrollBar, ToolBuilderLayout, Dialog
)
class CmdForgeUI:
"""Urwid-based UI for CmdForge with mouse support."""
def __init__(self):
self.loop = None
self.main_widget = None
self.overlay_stack = []
def run(self):
"""Run the UI."""
self.show_main_menu()
self.loop = urwid.MainLoop(
self.main_widget,
palette=PALETTE,
unhandled_input=self.handle_input,
handle_mouse=True # Enable mouse support!
)
self.loop.run()
def handle_input(self, key):
"""Handle global key input."""
if key in ('q', 'Q', 'esc'):
if self.overlay_stack:
self.close_overlay()
else:
raise urwid.ExitMainLoop()
def refresh(self):
"""Refresh the display."""
if self.loop:
self.loop.draw_screen()
def set_main(self, widget):
"""Set the main widget."""
self.main_widget = urwid.AttrMap(widget, 'body')
if self.loop:
self.loop.widget = self.main_widget
def show_overlay(self, dialog, width=60, height=20):
"""Show a dialog overlay."""
overlay = urwid.Overlay(
dialog,
self.main_widget,
align='center', width=width,
valign='middle', height=height,
)
self.overlay_stack.append(self.main_widget)
self.main_widget = overlay
if self.loop:
self.loop.widget = self.main_widget
def close_overlay(self):
"""Close the current overlay."""
if self.overlay_stack:
self.main_widget = self.overlay_stack.pop()
if self.loop:
self.loop.widget = self.main_widget
def message_box(self, title: str, message: str, callback=None):
"""Show a message box."""
def on_ok(_):
self.close_overlay()
if callback:
callback()
body = urwid.Text(message)
dialog = Dialog(title, body, [("OK", on_ok)], width=50)
self.show_overlay(dialog, width=52, height=min(10 + message.count('\n'), 20))
def yes_no(self, title: str, message: str, on_yes=None, on_no=None):
"""Show a yes/no dialog."""
def handle_yes(_):
self.close_overlay()
if on_yes:
on_yes()
def handle_no(_):
self.close_overlay()
if on_no:
on_no()
body = urwid.Text(message)
dialog = Dialog(title, body, [("Yes", handle_yes), ("No", handle_no)], width=50)
self.show_overlay(dialog, width=52, height=10)
def input_dialog(self, title: str, prompt: str, initial: str, callback: Callable[[str], None]):
"""Show an input dialog."""
edit = urwid.Edit(('label', f"{prompt}: "), initial)
edit = urwid.AttrMap(edit, 'edit', 'edit_focus')
def on_ok(_):
value = edit.base_widget.edit_text
self.close_overlay()
callback(value)
def on_cancel(_):
self.close_overlay()
body = urwid.Filler(edit, valign='top')
dialog = Dialog(title, body, [("OK", on_ok), ("Cancel", on_cancel)], width=50)
self.show_overlay(dialog, width=52, height=8)
# ==================== Main Menu ====================
def show_main_menu(self):
"""Show the main menu with tool list and info panel."""
self._selected_tool_name = None
self._refresh_main_menu()
def _refresh_main_menu(self):
"""Refresh the main menu display."""
from collections import defaultdict
tools = list_tools()
self._tools_list = tools
# Group tools by category
tools_by_category = defaultdict(list)
for name in tools:
tool = load_tool(name)
category = tool.category if tool else "Other"
tools_by_category[category].append(name)
# Build tool list with category headers
tool_items = []
# Show categories in defined order, then any custom ones
all_categories = list(DEFAULT_CATEGORIES)
for cat in tools_by_category:
if cat not in all_categories:
all_categories.append(cat)
for category in all_categories:
if category in tools_by_category and tools_by_category[category]:
# Category header (non-selectable)
header = urwid.AttrMap(
urwid.Text(f"─── {category} ───"),
'label'
)
tool_items.append(header)
# Tools in this category
for name in sorted(tools_by_category[category]):
item = SelectableToolItem(name, on_select=self._on_tool_select)
tool_items.append(item)
if not tools:
tool_items.append(urwid.Text(('label', " (no tools - click Create to add one) ")))
self._tool_walker = urwid.SimpleFocusListWalker(tool_items)
tool_listbox = ToolListBox(self._tool_walker, on_focus_change=self._on_tool_focus)
tool_box = urwid.LineBox(tool_listbox, title='Tools')
# Action buttons - Tab navigates here from tool list (3D style)
create_btn = Button3DCompact("Create", lambda _: self._create_tool_before_selected())
edit_btn = Button3DCompact("Edit", lambda _: self._edit_selected_tool())
delete_btn = Button3DCompact("Delete", lambda _: self._delete_selected_tool())
test_btn = Button3DCompact("Test", lambda _: self._test_selected_tool())
publish_btn = Button3DCompact("Publish", lambda _: self._publish_selected_tool())
registry_btn = Button3DCompact("Registry", lambda _: self.browse_registry())
providers_btn = Button3DCompact("Providers", lambda _: self.manage_providers())
buttons_row = urwid.Columns([
('pack', create_btn),
('pack', urwid.Text(" ")),
('pack', edit_btn),
('pack', urwid.Text(" ")),
('pack', delete_btn),
('pack', urwid.Text(" ")),
('pack', test_btn),
('pack', urwid.Text(" ")),
('pack', publish_btn),
('pack', urwid.Text(" ")),
('pack', registry_btn),
('pack', urwid.Text(" ")),
('pack', providers_btn),
])
buttons_padded = urwid.Padding(buttons_row, align='left', left=1)
# Info panel - shows details of selected tool (not focusable)
self._info_name = urwid.Text("")
self._info_desc = urwid.Text("")
self._info_args = urwid.Text("")
self._info_steps = urwid.Text("")
self._info_output = urwid.Text("")
self._info_source = urwid.Text("")
info_content = urwid.Pile([
self._info_name,
self._info_desc,
urwid.Divider(),
self._info_args,
urwid.Divider(),
self._info_steps,
urwid.Divider(),
self._info_output,
urwid.Divider(),
self._info_source,
])
info_filler = urwid.Filler(info_content, valign='top')
info_box = urwid.LineBox(info_filler, title='Tool Info')
# Exit button at bottom (3D style)
exit_btn = Button3DCompact("EXIT", lambda _: self.exit_app())
exit_centered = urwid.Padding(exit_btn, align='center', width=12)
# Use a custom Pile that handles Tab to cycle between tool list and buttons
self._main_pile = TabCyclePile([
('weight', 1, tool_box),
('pack', buttons_padded),
('pack', urwid.Divider('')),
('weight', 2, info_box),
('pack', urwid.Divider()),
('pack', exit_centered),
], tab_positions=[0, 1, 5]) # Tool list, buttons row, exit button
# Header
header = urwid.Text(('header', ' CmdForge Manager '), align='center')
# Footer
footer = urwid.Text(('footer', ' Arrow:Navigate list | Tab:Jump to buttons | Enter/Click:Select | Q:Quit '), align='center')
frame = urwid.Frame(self._main_pile, header=header, footer=footer)
self.set_main(frame)
# Update info for first tool if any
if tools:
self._on_tool_focus(tools[0])
def _create_tool_before_selected(self):
"""Create a new tool (will appear in list based on name sorting)."""
self.create_tool()
def _on_tool_focus(self, name):
"""Called when a tool is focused/highlighted."""
self._selected_tool_name = name
# Update selection state on all tool items
if hasattr(self, '_tool_walker'):
for item in self._tool_walker:
if isinstance(item, SelectableToolItem):
item.set_selected(item.name == name)
tool = load_tool(name)
if tool:
self._info_name.set_text(('label', f"Name: {tool.name}"))
self._info_desc.set_text(f"Description: {tool.description or '(none)'}")
if tool.arguments:
args_text = "Arguments:\n"
for arg in tool.arguments:
default = f" = {arg.default}" if arg.default else ""
args_text += f" {arg.flag} -> {{{arg.variable}}}{default}\n"
else:
args_text = "Arguments: (none)"
self._info_args.set_text(args_text.rstrip())
if tool.steps:
steps_text = "Execution Steps:\n"
for i, step in enumerate(tool.steps):
if isinstance(step, PromptStep):
steps_text += f" {i+1}. PROMPT [{step.provider}] -> {{{step.output_var}}}\n"
else:
steps_text += f" {i+1}. CODE -> {{{step.output_var}}}\n"
else:
steps_text = "Execution Steps: (none)"
self._info_steps.set_text(steps_text.rstrip())
self._info_output.set_text(f"Output: {tool.output}")
# Display source attribution if present
if tool.source:
source_text = "Source:\n"
source_text += f" Type: {tool.source.type}\n"
if tool.source.author:
source_text += f" Author: {tool.source.author}\n"
if tool.source.license:
source_text += f" License: {tool.source.license}\n"
if tool.source.url:
source_text += f" URL: {tool.source.url}\n"
if tool.source.original_tool:
source_text += f" Original: {tool.source.original_tool}\n"
self._info_source.set_text(source_text.rstrip())
else:
self._info_source.set_text("")
else:
self._info_name.set_text("")
self._info_desc.set_text("")
self._info_args.set_text("")
self._info_steps.set_text("")
self._info_output.set_text("")
self._info_source.set_text("")
def _on_tool_select(self, name):
"""Called when a tool is selected (Enter/double-click)."""
# Edit the tool on select
tool = load_tool(name)
if tool:
self.tool_builder(tool)
def _edit_selected_tool(self):
"""Edit the currently selected tool."""
if self._selected_tool_name:
tool = load_tool(self._selected_tool_name)
if tool:
self.tool_builder(tool)
else:
self.message_box("Edit", "No tool selected.")
def _delete_selected_tool(self):
"""Delete the currently selected tool."""
if self._selected_tool_name:
name = self._selected_tool_name
def do_delete():
delete_tool(name)
self._selected_tool_name = None
self.message_box("Deleted", f"Tool '{name}' deleted.", self._refresh_main_menu)
self.yes_no("Confirm", f"Delete tool '{name}'?", on_yes=do_delete)
else:
self.message_box("Delete", "No tool selected.")
def _test_selected_tool(self):
"""Test the currently selected tool."""
if self._selected_tool_name:
tool = load_tool(self._selected_tool_name)
if tool:
self._test_tool(tool)
else:
self.message_box("Test", "No tool selected.")
def _publish_selected_tool(self):
"""Publish the currently selected tool to the registry."""
if not self._selected_tool_name:
self.message_box("Publish", "No tool selected.")
return
tool = load_tool(self._selected_tool_name)
if not tool:
self.message_box("Publish", "Could not load tool.")
return
# Check for version
tool_dir = get_tools_dir() / self._selected_tool_name
config_path = tool_dir / "config.yaml"
if not config_path.exists():
self.message_box("Publish", "Tool config not found.")
return
import yaml
config_text = config_path.read_text()
config_data = yaml.safe_load(config_text)
version = config_data.get("version", "")
if not version:
# Prompt for version
self._prompt_for_version_and_publish(tool, config_path, config_data, config_text)
else:
self._do_publish(tool, version)
def _prompt_for_version_and_publish(self, tool, config_path, config_data, config_text):
"""Prompt user for version and then publish."""
def on_version(version):
version = version.strip()
if not version:
self.message_box("Publish", "Version is required for publishing.")
return
# Add version to config
import yaml
config_data["version"] = version
config_path.write_text(yaml.dump(config_data, default_flow_style=False, sort_keys=False))
self._do_publish(tool, version)
self.input_dialog(
"Version Required",
"Enter version (e.g., 1.0.0)",
"1.0.0",
on_version
)
def _do_publish(self, tool, version):
"""Perform the actual publish."""
from ..config import load_config, set_registry_token
config = load_config()
if not config.registry.token:
self.message_box(
"Authentication Required",
"No registry token configured.\n\n"
"To publish tools:\n"
"1. Register at cmdforge.brrd.tech/register\n"
"2. Log in and go to Dashboard > Tokens\n"
"3. Generate a token\n"
"4. Run: cmdforge config set-token <token>"
)
return
def do_publish():
try:
client = RegistryClient()
tool_dir = get_tools_dir() / tool.name
config_yaml = (tool_dir / "config.yaml").read_text()
readme_path = tool_dir / "README.md"
readme = readme_path.read_text() if readme_path.exists() else ""
result = client.publish_tool(config_yaml, readme)
status = result.get("status", "")
pr_url = result.get("pr_url", "")
if status == "published" or result.get("version"):
owner = result.get("owner", "unknown")
name = result.get("name", tool.name)
self.message_box("Success", f"Published {owner}/{name}@{version}")
elif pr_url:
self.message_box("Pending Review", f"PR created: {pr_url}\n\nYour tool is pending review.")
else:
self.message_box("Success", "Tool published successfully!")
except RegistryError as e:
if e.code == "UNAUTHORIZED":
self.message_box("Error", "Authentication failed.\nYour token may have expired.")
elif e.code == "VERSION_EXISTS":
self.message_box("Error", f"Version {version} already exists.\nBump the version and try again.")
else:
self.message_box("Error", f"Publish failed: {e.message}")
except Exception as e:
self.message_box("Error", f"Publish failed: {e}")
self.yes_no(
"Publish Tool",
f"Publish {tool.name}@{version} to registry?",
on_yes=do_publish
)
def exit_app(self):
"""Exit the application."""
raise urwid.ExitMainLoop()
# ==================== Tool Builder ====================
def create_tool(self):
"""Create a new tool."""
self.tool_builder(None)
def tool_builder(self, existing: Optional[Tool]):
"""Main tool builder interface."""
is_edit = existing is not None
# Initialize tool
if existing:
tool = Tool(
name=existing.name,
description=existing.description,
arguments=list(existing.arguments),
steps=list(existing.steps),
output=existing.output
)
else:
tool = Tool(name="", description="", arguments=[], steps=[], output="{input}")
# Store references for callbacks
self._current_tool = tool
self._is_edit = is_edit
self._selected_arg_idx = None
self._selected_step_idx = None
self._show_tool_builder()
def _save_tool_fields(self):
"""Save current edit field values to the tool object."""
if not hasattr(self, '_name_edit') or not hasattr(self, '_current_tool'):
return
tool = self._current_tool
# Save name (only if it's an edit widget, not a text label)
if not self._is_edit and hasattr(self._name_edit, 'base_widget'):
name_edit = self._name_edit.base_widget if hasattr(self._name_edit, 'base_widget') else self._name_edit
if hasattr(name_edit, 'edit_text'):
tool.name = name_edit.edit_text.strip()
# Save description
if hasattr(self, '_desc_edit') and hasattr(self._desc_edit, 'base_widget'):
tool.description = self._desc_edit.base_widget.edit_text.strip()
# Save output
if hasattr(self, '_output_edit') and hasattr(self._output_edit, 'base_widget'):
tool.output = self._output_edit.base_widget.edit_text.strip()
def _show_tool_builder(self):
"""Render the tool builder screen."""
tool = self._current_tool
# Create edit widgets
if self._is_edit:
name_widget = urwid.Text(('label', f"Name: {tool.name}"))
else:
name_widget = urwid.AttrMap(urwid.Edit(('label', "Name: "), tool.name), 'edit', 'edit_focus')
self._name_edit = name_widget
self._desc_edit = urwid.AttrMap(urwid.Edit(('label', "Desc: "), tool.description), 'edit', 'edit_focus')
self._output_edit = urwid.AttrMap(urwid.Edit(('label', "Output: "), tool.output), 'edit', 'edit_focus')
# Category selector
self._selected_category = [tool.category or "Other"]
category_btn_text = urwid.Text(self._selected_category[0])
category_btn = urwid.AttrMap(
urwid.Padding(category_btn_text, left=1, right=1),
'edit', 'edit_focus'
)
def show_category_dropdown(_):
"""Show category selection popup."""
def select_category(cat):
def callback(_):
self._selected_category[0] = cat
category_btn_text.set_text(cat)
tool.category = cat
self.close_overlay()
return callback
items = []
for cat in DEFAULT_CATEGORIES:
btn = urwid.Button(cat, on_press=select_category(cat))
items.append(urwid.AttrMap(btn, 'button', 'button_focus'))
listbox = urwid.ListBox(urwid.SimpleFocusListWalker(items))
popup = Dialog("Select Category", listbox, [])
self.show_overlay(popup, width=30, height=len(DEFAULT_CATEGORIES) + 4)
category_select_btn = Button3DCompact("", on_press=show_category_dropdown)
category_row = urwid.Columns([
('pack', urwid.Text(('label', "Category: "))),
('weight', 1, category_btn),
('pack', urwid.Text(" ")),
('pack', category_select_btn),
])
# Left column - fields
left_pile = urwid.Pile([
('pack', name_widget),
('pack', urwid.Divider()),
('pack', self._desc_edit),
('pack', urwid.Divider()),
('pack', category_row),
('pack', urwid.Divider()),
('pack', self._output_edit),
])
left_box = urwid.LineBox(urwid.Filler(left_pile, valign='top'), title='Tool Info')
# Arguments list
arg_items = []
for i, arg in enumerate(tool.arguments):
text = f"{arg.flag} -> {{{arg.variable}}}"
item = SelectableToolItem(text, on_select=lambda n, idx=i: self._on_arg_activate(idx))
item.name = i # Store index
arg_items.append(item)
if not arg_items:
arg_items.append(urwid.Text(('label', " (none) ")))
self._arg_walker = urwid.SimpleFocusListWalker(arg_items)
args_listbox = ToolListBox(self._arg_walker, on_focus_change=self._on_arg_focus)
args_box = urwid.LineBox(args_listbox, title='Arguments')
# Argument buttons
arg_add_btn = ClickableButton("Add", lambda _: self._add_argument_dialog())
arg_edit_btn = ClickableButton("Edit", lambda _: self._edit_selected_arg())
arg_del_btn = ClickableButton("Delete", lambda _: self._delete_selected_arg())
arg_buttons = urwid.Columns([
('pack', arg_add_btn),
('pack', urwid.Text(" ")),
('pack', arg_edit_btn),
('pack', urwid.Text(" ")),
('pack', arg_del_btn),
])
arg_buttons_padded = urwid.Padding(arg_buttons, align='left', left=1)
# Args section (list + buttons)
args_section = urwid.Pile([
('weight', 1, args_box),
('pack', arg_buttons_padded),
])
# Steps list
step_items = []
for i, step in enumerate(tool.steps):
if isinstance(step, PromptStep):
text = f"P:{step.provider} -> {{{step.output_var}}}"
else:
text = f"C: -> {{{step.output_var}}}"
item = SelectableToolItem(text, on_select=lambda n, idx=i: self._on_step_activate(idx))
item.name = i # Store index
step_items.append(item)
if not step_items:
step_items.append(urwid.Text(('label', " (none) ")))
self._step_walker = urwid.SimpleFocusListWalker(step_items)
steps_listbox = ToolListBox(self._step_walker, on_focus_change=self._on_step_focus)
steps_box = urwid.LineBox(steps_listbox, title='Execution Steps')
# Step buttons
step_add_btn = ClickableButton("Add", lambda _: self._add_step_choice())
step_edit_btn = ClickableButton("Edit", lambda _: self._edit_selected_step())
step_del_btn = ClickableButton("Delete", lambda _: self._delete_selected_step())
step_buttons = urwid.Columns([
('pack', step_add_btn),
('pack', urwid.Text(" ")),
('pack', step_edit_btn),
('pack', urwid.Text(" ")),
('pack', step_del_btn),
])
step_buttons_padded = urwid.Padding(step_buttons, align='left', left=1)
# Steps section (list + buttons)
steps_section = urwid.Pile([
('weight', 1, steps_box),
('pack', step_buttons_padded),
])
# Save/Cancel buttons
save_btn = ClickableButton("Save", self._on_save_tool)
cancel_btn = ClickableButton("Cancel", self._on_cancel_tool)
bottom_buttons = urwid.Columns([
('pack', save_btn),
('pack', urwid.Text(" ")),
('pack', cancel_btn),
], dividechars=1)
bottom_buttons_centered = urwid.Padding(bottom_buttons, align='center', width='pack')
# Use ToolBuilderLayout for proper Tab cycling
# Pass LineBoxes for title highlighting and on_cancel for Escape key
body = ToolBuilderLayout(
left_box, args_box, steps_box,
args_section, steps_section, bottom_buttons_centered,
on_cancel=self._on_cancel_tool
)
# Frame
title = f"Edit Tool: {tool.name}" if self._is_edit and tool.name else "New Tool"
header = urwid.Text(('header', f' {title} '), align='center')
footer = urwid.Text(('footer', ' Arrow:Navigate | Tab:Next section | Enter/Click:Select | Esc:Cancel '), align='center')
frame = urwid.Frame(body, header=header, footer=footer)
self.set_main(frame)
# Set initial selection
if tool.arguments:
self._selected_arg_idx = 0
self._on_arg_focus(0)
if tool.steps:
self._selected_step_idx = 0
self._on_step_focus(0)
def _on_arg_focus(self, idx):
"""Called when an argument is focused."""
if isinstance(idx, int):
self._selected_arg_idx = idx
# Update selection display
if hasattr(self, '_arg_walker'):
for i, item in enumerate(self._arg_walker):
if isinstance(item, SelectableToolItem):
item.set_selected(i == idx)
def _on_arg_activate(self, idx):
"""Called when an argument is activated (Enter/click)."""
self._selected_arg_idx = idx
self._edit_argument_at(idx)
def _on_step_focus(self, idx):
"""Called when a step is focused."""
if isinstance(idx, int):
self._selected_step_idx = idx
# Update selection display
if hasattr(self, '_step_walker'):
for i, item in enumerate(self._step_walker):
if isinstance(item, SelectableToolItem):
item.set_selected(i == idx)
def _on_step_activate(self, idx):
"""Called when a step is activated (Enter/click)."""
self._selected_step_idx = idx
self._edit_step_at(idx)
def _edit_selected_arg(self):
"""Edit the currently selected argument."""
if self._selected_arg_idx is not None and self._selected_arg_idx < len(self._current_tool.arguments):
self._edit_argument_at(self._selected_arg_idx)
else:
self.message_box("Edit", "No argument selected.")
def _delete_selected_arg(self):
"""Delete the currently selected argument."""
if self._selected_arg_idx is not None and self._selected_arg_idx < len(self._current_tool.arguments):
idx = self._selected_arg_idx
arg = self._current_tool.arguments[idx]
def do_delete():
self._save_tool_fields()
self._current_tool.arguments.pop(idx)
self._selected_arg_idx = None
self._show_tool_builder()
self.yes_no("Delete", f"Delete argument {arg.flag}?", on_yes=do_delete)
else:
self.message_box("Delete", "No argument selected.")
def _edit_selected_step(self):
"""Edit the currently selected step."""
if self._selected_step_idx is not None and self._selected_step_idx < len(self._current_tool.steps):
self._edit_step_at(self._selected_step_idx)
else:
self.message_box("Edit", "No step selected.")
def _delete_selected_step(self):
"""Delete the currently selected step."""
if self._selected_step_idx is not None and self._selected_step_idx < len(self._current_tool.steps):
idx = self._selected_step_idx
def do_delete():
self._save_tool_fields()
self._current_tool.steps.pop(idx)
self._selected_step_idx = None
self._show_tool_builder()
self.yes_no("Delete", f"Delete step {idx + 1}?", on_yes=do_delete)
else:
self.message_box("Delete", "No step selected.")
def _edit_argument_at(self, idx):
"""Edit argument at index."""
self._do_edit_argument(idx)
def _edit_step_at(self, idx):
"""Edit step at index - opens the appropriate dialog based on step type."""
# Save current field values before showing dialog
self._save_tool_fields()
step = self._current_tool.steps[idx]
if isinstance(step, PromptStep):
self._add_prompt_dialog(step, idx)
else:
self._add_code_dialog(step, idx)
def _add_argument_dialog(self):
"""Show add argument dialog."""
# Save current field values before showing dialog
self._save_tool_fields()
flag_edit = urwid.Edit(('label', "Flag: "), "--")
var_edit = urwid.Edit(('label', "Variable: "), "")
default_edit = urwid.Edit(('label', "Default: "), "")
def on_ok(_):
flag = flag_edit.edit_text.strip()
var = var_edit.edit_text.strip()
default = default_edit.edit_text.strip() or None
if not flag:
return
if not var:
var = flag.lstrip("-").replace("-", "_")
self._current_tool.arguments.append(ToolArgument(
flag=flag, variable=var, default=default, description=""
))
self.close_overlay()
self._show_tool_builder()
def on_cancel(_):
self.close_overlay()
body = urwid.Pile([
urwid.AttrMap(flag_edit, 'edit', 'edit_focus'),
urwid.Divider(),
urwid.AttrMap(var_edit, 'edit', 'edit_focus'),
urwid.Divider(),
urwid.AttrMap(default_edit, 'edit', 'edit_focus'),
])
dialog = Dialog("Add Argument", body, [("OK", on_ok), ("Cancel", on_cancel)])
self.show_overlay(dialog, width=50, height=14)
def _do_edit_argument(self, idx):
"""Edit an existing argument."""
# Save current field values before showing dialog
self._save_tool_fields()
arg = self._current_tool.arguments[idx]
flag_edit = urwid.Edit(('label', "Flag: "), arg.flag)
var_edit = urwid.Edit(('label', "Variable: "), arg.variable)
default_edit = urwid.Edit(('label', "Default: "), arg.default or "")
def on_ok(_):
arg.flag = flag_edit.edit_text.strip()
arg.variable = var_edit.edit_text.strip()
arg.default = default_edit.edit_text.strip() or None
self.close_overlay()
self._show_tool_builder()
def on_cancel(_):
self.close_overlay()
body = urwid.Pile([
urwid.AttrMap(flag_edit, 'edit', 'edit_focus'),
urwid.Divider(),
urwid.AttrMap(var_edit, 'edit', 'edit_focus'),
urwid.Divider(),
urwid.AttrMap(default_edit, 'edit', 'edit_focus'),
])
dialog = Dialog("Edit Argument", body, [("OK", on_ok), ("Cancel", on_cancel)])
self.show_overlay(dialog, width=50, height=14)
def _add_step_choice(self):
"""Choose step type to add."""
# Save current field values before showing dialog
self._save_tool_fields()
def on_prompt(_):
self.close_overlay()
# Defer dialog opening to avoid overlay rendering issues
if self.loop:
self.loop.set_alarm_in(0, lambda loop, data: self._add_prompt_dialog())
else:
self._add_prompt_dialog()
def on_code(_):
self.close_overlay()
# Defer dialog opening to avoid overlay rendering issues
if self.loop:
self.loop.set_alarm_in(0, lambda loop, data: self._add_code_dialog())
else:
self._add_code_dialog()
def on_cancel(_):
self.close_overlay()
body = urwid.Text("Choose step type:")
dialog = Dialog("Add Step", body, [("Prompt", on_prompt), ("Code", on_code), ("Cancel", on_cancel)])
self.show_overlay(dialog, width=45, height=9)
def _get_available_vars(self, up_to=-1):
"""Get available variables."""
tool = self._current_tool
variables = ["input"]
for arg in tool.arguments:
variables.append(arg.variable)
if up_to == -1:
up_to = len(tool.steps)
for i, step in enumerate(tool.steps):
if i >= up_to:
break
variables.append(step.output_var)
return variables
def _add_prompt_dialog(self, existing=None, idx=-1):
"""Add/edit prompt step with provider dropdown and multiline prompt."""
providers = load_providers()
provider_names = [p.name for p in providers]
if not provider_names:
provider_names = ["mock"]
current_provider = existing.provider if existing else provider_names[0]
# Provider selector state
selected_provider = [current_provider] # Use list to allow mutation in closures
# Provider dropdown button
provider_btn_text = urwid.Text(current_provider)
provider_btn = urwid.AttrMap(
urwid.Padding(provider_btn_text, left=1, right=1),
'edit', 'edit_focus'
)
def show_provider_dropdown(_):
"""Show provider selection popup with descriptions."""
# Build provider lookup for descriptions
provider_lookup = {p.name: p.description for p in providers}
# Description display (updates on focus change)
desc_text = urwid.Text("")
desc_box = urwid.AttrMap(
urwid.Padding(desc_text, left=1, right=1),
'label'
)
def update_description(name):
"""Update the description text for the focused provider."""
desc = provider_lookup.get(name, "")
desc_text.set_text(('label', desc if desc else "No description"))
def select_provider(name):
def callback(_):
selected_provider[0] = name
provider_btn_text.set_text(name)
self.close_overlay()
return callback
# Create focusable buttons that update description on focus
class DescriptiveButton(urwid.Button):
def __init__(self, name, desc_callback):
super().__init__(name, on_press=select_provider(name))
self._name = name
self._desc_callback = desc_callback
def render(self, size, focus=False):
if focus:
self._desc_callback(self._name)
return super().render(size, focus)
items = []
for name in provider_names:
# Show short hint inline: "name | short_desc"
short_desc = provider_lookup.get(name, "")
# Extract just the key info (after the timing)
if "|" in short_desc:
short_desc = short_desc.split("|", 1)[1].strip()[:20]
else:
short_desc = short_desc[:20]
label = f"{name:<18} {short_desc}"
btn = DescriptiveButton(name, update_description)
btn.set_label(label)
items.append(urwid.AttrMap(btn, 'button', 'button_focus'))
# Set initial description
update_description(provider_names[0])
listbox = urwid.ListBox(urwid.SimpleFocusListWalker(items))
# Combine listbox with description footer
body = urwid.Pile([
('weight', 1, listbox),
('pack', urwid.Divider('')),
('pack', desc_box),
])
popup = Dialog("Select Provider", body, [])
self.show_overlay(popup, width=50, height=min(len(provider_names) + 6, 16))
provider_select_btn = Button3DCompact("", on_press=show_provider_dropdown)
# File input for external prompt
default_file = existing.prompt_file if existing and existing.prompt_file else "prompt.txt"
file_edit = urwid.Edit(('label', "File: "), default_file)
# Multiline prompt editor - use TabPassEdit so Tab passes through for navigation
prompt_edit = TabPassEdit(
edit_text=existing.prompt if existing else "{input}",
multiline=True
)
output_edit = urwid.Edit(('label', "Output var: "), existing.output_var if existing else "response")
vars_available = self._get_available_vars(idx)
vars_text = urwid.Text(('label', f"Variables: {', '.join('{'+v+'}' for v in vars_available)}"))
status_text = urwid.Text("")
def do_load():
"""Actually load prompt from file."""
filename = file_edit.edit_text.strip()
tool_dir = get_tools_dir() / self._current_tool.name
prompt_path = tool_dir / filename
try:
prompt_edit.set_edit_text(prompt_path.read_text())
status_text.set_text(('success', f"Loaded from {filename}"))
except Exception as e:
status_text.set_text(('error', f"Load error: {e}"))
def on_load(_):
"""Load prompt from file with confirmation."""
filename = file_edit.edit_text.strip()
if not filename:
status_text.set_text(('error', "Enter a filename first"))
return
tool_dir = get_tools_dir() / self._current_tool.name
prompt_path = tool_dir / filename
if not prompt_path.exists():
status_text.set_text(('error', f"File not found: {filename}"))
return
# Show confirmation dialog
def on_yes(_):
self.close_overlay()
do_load()
def on_no(_):
self.close_overlay()
confirm_body = urwid.Text(f"Load from '{filename}'?\nThis will replace the current prompt.")
confirm_dialog = Dialog("Confirm Load", confirm_body, [("Yes", on_yes), ("No", on_no)])
self.show_overlay(confirm_dialog, width=50, height=8)
def on_ok(_):
provider = selected_provider[0]
prompt = prompt_edit.edit_text.strip()
output_var = output_edit.edit_text.strip() or "response"
prompt_file = file_edit.edit_text.strip() or None
# Auto-save to file if filename is set
if prompt_file:
tool_dir = get_tools_dir() / self._current_tool.name
tool_dir.mkdir(parents=True, exist_ok=True)
prompt_path = tool_dir / prompt_file
try:
prompt_path.write_text(prompt)
except Exception as e:
status_text.set_text(('error', f"Save error: {e}"))
return
step = PromptStep(prompt=prompt, provider=provider, output_var=output_var, prompt_file=prompt_file)
if existing and idx >= 0:
self._current_tool.steps[idx] = step
else:
self._current_tool.steps.append(step)
self.close_overlay()
self._show_tool_builder()
def on_cancel(_):
self.close_overlay()
def on_external_edit(_):
"""Open prompt in external editor ($EDITOR)."""
import os
import subprocess
import tempfile
current_prompt = prompt_edit.edit_text
# Stop the urwid loop temporarily
if self.loop:
self.loop.stop()
try:
# Create temp file with current prompt
with tempfile.NamedTemporaryFile(mode='w', suffix='.txt', delete=False) as f:
f.write(current_prompt)
temp_path = f.name
# Get editor from environment
editor = os.environ.get('EDITOR', os.environ.get('VISUAL', 'nano'))
# Run editor
subprocess.run([editor, temp_path], check=True)
# Read back the edited prompt
with open(temp_path, 'r') as f:
new_prompt = f.read()
# Update the prompt editor
prompt_edit.set_edit_text(new_prompt)
status_text.set_text(('success', f"Prompt updated from {editor}"))
# Clean up temp file
os.unlink(temp_path)
except subprocess.CalledProcessError:
status_text.set_text(('error', "Editor exited with error"))
except FileNotFoundError:
status_text.set_text(('error', f"Editor '{editor}' not found"))
except Exception as e:
status_text.set_text(('error', f"Edit error: {e}"))
finally:
# Restart the urwid loop
if self.loop:
self.loop.start()
load_btn = Button3DCompact("Load", on_load)
edit_btn = Button3DCompact("$EDITOR", on_external_edit)
# Prompt editor in a box - use ListBox for proper focus handling and scrolling
# Wrap in DOSScrollBar for DOS-style scrollbar with arrow buttons
prompt_edit_styled = urwid.AttrMap(prompt_edit, 'edit', 'edit_focus')
prompt_walker = urwid.SimpleFocusListWalker([prompt_edit_styled])
prompt_listbox = urwid.ListBox(prompt_walker)
prompt_scrollbar = DOSScrollBar(prompt_listbox)
prompt_box = urwid.LineBox(prompt_scrollbar, title="Prompt")
# Use TabCyclePile so Tab cycles between sections
# Note: All flow widgets must be explicitly wrapped in ('pack', ...) when
# the Pile contains weighted items (urwid 3.x requirement)
body = TabCyclePile([
('pack', vars_text),
('pack', urwid.Divider()),
('pack', urwid.Columns([
('pack', urwid.Text(('label', "Provider: "))),
('weight', 1, provider_btn),
('pack', urwid.Text(" ")),
('pack', provider_select_btn),
])),
('pack', urwid.Divider()),
('pack', urwid.Columns([
('weight', 1, urwid.AttrMap(file_edit, 'edit', 'edit_focus')),
('pack', urwid.Text(" ")),
('pack', load_btn),
('pack', urwid.Text(" ")),
('pack', edit_btn),
])),
('pack', status_text),
('weight', 1, prompt_box),
('pack', urwid.Divider()),
('pack', urwid.AttrMap(output_edit, 'edit', 'edit_focus')),
], tab_positions=[2, 4, 6, 8])
title = "Edit Prompt Step" if existing else "Add Prompt Step"
dialog = Dialog(title, body, [("OK", on_ok), ("Cancel", on_cancel)])
self.show_overlay(dialog, width=75, height=22)
def _add_code_dialog(self, existing=None, idx=-1):
"""Add/edit code step with multiline editor, file support, and AI auto-adjust."""
from ..providers import call_provider
# File name input (default based on output_var)
default_output_var = existing.output_var if existing else "processed"
default_file = existing.code_file if existing and existing.code_file else f"{default_output_var}.py"
file_edit = urwid.Edit(('label', "File: "), default_file)
# Multiline code editor with undo/redo (Alt+U / Alt+R)
default_code = existing.code if existing else f"{default_output_var} = input.upper()"
code_edit = UndoableEdit(
edit_text=default_code,
multiline=True
)
output_edit = urwid.Edit(('label', "Output var: "), existing.output_var if existing else "processed")
vars_available = self._get_available_vars(idx)
vars_text = urwid.Text(('label', f"Variables: {', '.join(vars_available)}"))
status_text = urwid.Text("")
# --- Auto-adjust AI feature ---
providers = load_providers()
provider_names = [p.name for p in providers]
if not provider_names:
provider_names = ["mock"]
selected_ai_provider = [provider_names[0]]
ai_provider_btn_text = urwid.Text(provider_names[0])
ai_provider_btn = urwid.AttrMap(
urwid.Padding(ai_provider_btn_text, left=1, right=1),
'edit', 'edit_focus'
)
def show_ai_provider_dropdown(_):
provider_lookup = {p.name: p.description for p in providers}
desc_text = urwid.Text("")
desc_box = urwid.AttrMap(urwid.Padding(desc_text, left=1, right=1), 'label')
def update_description(name):
desc = provider_lookup.get(name, "")
desc_text.set_text(('label', desc if desc else "No description"))
def select_provider(name):
def callback(_):
selected_ai_provider[0] = name
ai_provider_btn_text.set_text(name)
self.close_overlay()
return callback
class DescriptiveButton(urwid.Button):
def __init__(btn_self, name, desc_callback):
super().__init__(name, on_press=select_provider(name))
btn_self._name = name
btn_self._desc_callback = desc_callback
def render(btn_self, size, focus=False):
if focus:
btn_self._desc_callback(btn_self._name)
return super().render(size, focus)
items = []
for name in provider_names:
short_desc = provider_lookup.get(name, "")
if "|" in short_desc:
short_desc = short_desc.split("|", 1)[1].strip()[:20]
else:
short_desc = short_desc[:20]
label = f"{name:<18} {short_desc}"
btn = DescriptiveButton(name, update_description)
btn.set_label(label)
items.append(urwid.AttrMap(btn, 'button', 'button_focus'))
update_description(provider_names[0])
listbox = urwid.ListBox(urwid.SimpleFocusListWalker(items))
popup_body = urwid.Pile([
('weight', 1, listbox),
('pack', urwid.Divider('')),
('pack', desc_box),
])
popup = Dialog("Select Provider", popup_body, [])
self.show_overlay(popup, width=50, height=min(len(provider_names) + 6, 16))
ai_provider_select_btn = Button3DCompact("", on_press=show_ai_provider_dropdown)
# Default prompt template for AI code generation/adjustment
# Show variables in triple-quote format so the AI follows the pattern
vars_formatted = ', '.join(f'\"\"\"{{{v}}}\"\"\"' for v in vars_available)
default_ai_prompt = f"""Write inline Python code (NOT a function definition) according to my instruction.
The code runs directly with variable substitution. Assign any "Available Variables" used to a new standard variable first, then use that variable in the code. Use triple quotes and curly braces since the substituted content may contain quotes/newlines.
Example:
my_var = \"\"\"{{variable}}\"\"\"
INSTRUCTION: [Describe what you want]
CURRENT CODE:
```python
{{code}}
```
AVAILABLE VARIABLES: {vars_formatted}
IMPORTANT: Return ONLY executable inline code. Do NOT wrap in a function.
No explanations, no markdown fencing, just the code."""
# Multiline editable prompt for AI with DOS-style scrollbar
ai_prompt_edit = TabPassEdit(edit_text=default_ai_prompt, multiline=True)
ai_prompt_styled = urwid.AttrMap(ai_prompt_edit, 'edit', 'edit_focus')
ai_prompt_walker = urwid.SimpleFocusListWalker([ai_prompt_styled])
ai_prompt_listbox = urwid.ListBox(ai_prompt_walker)
ai_prompt_scrollbar = DOSScrollBar(ai_prompt_listbox)
ai_prompt_box = urwid.LineBox(ai_prompt_scrollbar, title="Prompt")
# Output/feedback area for AI responses
ai_output_text = urwid.Text("")
ai_output_walker = urwid.SimpleFocusListWalker([ai_output_text])
ai_output_listbox = urwid.ListBox(ai_output_walker)
ai_output_box = urwid.LineBox(ai_output_listbox, title="Output & Feedback")
def on_auto_adjust(_):
prompt_template = ai_prompt_edit.edit_text.strip()
if not prompt_template:
ai_output_text.set_text(('error', "Enter a prompt for the AI"))
return
current_code = code_edit.edit_text.strip()
# Replace {code} placeholder with actual code
prompt = prompt_template.replace("{code}", current_code)
provider_name = selected_ai_provider[0]
ai_output_text.set_text(('label', f"Calling {provider_name}...\nPlease wait..."))
self.refresh()
result = call_provider(provider_name, prompt)
if result.success:
new_code = result.text.strip()
# Strip markdown code fences if present
if new_code.startswith("```python"):
new_code = new_code[9:]
if new_code.startswith("```"):
new_code = new_code[3:]
if new_code.endswith("```"):
new_code = new_code[:-3]
new_code = new_code.strip()
code_edit.set_edit_text(new_code)
ai_output_text.set_text(('success', f"✓ Code updated successfully!\n\nProvider: {provider_name}\nResponse length: {len(result.text)} chars"))
else:
error_msg = result.error or "Unknown error"
ai_output_text.set_text(('error', f"✗ Error from {provider_name}:\n\n{error_msg}"))
auto_adjust_btn = Button3DCompact("Auto-adjust", on_auto_adjust)
# Build the AI assist box with provider selector, prompt editor, output area, and button
ai_provider_row = urwid.Columns([
('pack', urwid.Text(('label', "Provider: "))),
('pack', ai_provider_btn),
('pack', ai_provider_select_btn),
])
ai_assist_content = urwid.Pile([
('pack', ai_provider_row),
('pack', urwid.Divider()),
('weight', 2, ai_prompt_box),
('weight', 1, ai_output_box),
('pack', urwid.Padding(auto_adjust_btn, align='center', width=16)),
])
ai_assist_box = urwid.LineBox(ai_assist_content, title="AI Assisted Auto-adjust")
# --- End Auto-adjust feature ---
def do_load():
"""Actually load code from file."""
filename = file_edit.edit_text.strip()
tool_dir = get_tools_dir() / self._current_tool.name
code_path = tool_dir / filename
try:
code_edit.set_edit_text(code_path.read_text())
status_text.set_text(('success', f"Loaded from {filename}"))
except Exception as e:
status_text.set_text(('error', f"Load error: {e}"))
def on_load(_):
"""Load code from file with confirmation."""
filename = file_edit.edit_text.strip()
if not filename:
status_text.set_text(('error', "Enter a filename first"))
return
tool_dir = get_tools_dir() / self._current_tool.name
code_path = tool_dir / filename
if not code_path.exists():
status_text.set_text(('error', f"File not found: {filename}"))
return
def on_yes(_):
self.close_overlay()
do_load()
def on_no(_):
self.close_overlay()
confirm_body = urwid.Text(f"Load from '{filename}'?\nThis will replace the current code.")
confirm_dialog = Dialog("Confirm Load", confirm_body, [("Yes", on_yes), ("No", on_no)])
self.show_overlay(confirm_dialog, width=50, height=8)
def on_ok(_):
import ast
code = code_edit.edit_text.strip()
output_var = output_edit.edit_text.strip() or "processed"
code_file = file_edit.edit_text.strip() or None
if code:
try:
ast.parse(code)
except SyntaxError as e:
line_info = f" (line {e.lineno})" if e.lineno else ""
status_text.set_text(('error', f"Syntax error{line_info}: {e.msg}"))
return
if code_file:
tool_dir = get_tools_dir() / self._current_tool.name
tool_dir.mkdir(parents=True, exist_ok=True)
code_path = tool_dir / code_file
try:
code_path.write_text(code)
except Exception as e:
status_text.set_text(('error', f"Save error: {e}"))
return
from ..tool import CodeStep
step = CodeStep(code=code, output_var=output_var, code_file=code_file)
if existing and idx >= 0:
self._current_tool.steps[idx] = step
else:
self._current_tool.steps.append(step)
self.close_overlay()
self._show_tool_builder()
def on_cancel(_):
self.close_overlay()
def on_external_edit(_):
"""Open code in external editor ($EDITOR)."""
import os
import subprocess
import tempfile
current_code = code_edit.edit_text
# Stop the urwid loop temporarily
if self.loop:
self.loop.stop()
try:
# Create temp file with current code
with tempfile.NamedTemporaryFile(mode='w', suffix='.py', delete=False) as f:
f.write(current_code)
temp_path = f.name
# Get editor from environment
editor = os.environ.get('EDITOR', os.environ.get('VISUAL', 'nano'))
# Run editor
subprocess.run([editor, temp_path], check=True)
# Read back the edited code
with open(temp_path, 'r') as f:
new_code = f.read()
# Update the code editor
code_edit.set_edit_text(new_code)
status_text.set_text(('success', f"Code updated from {editor}"))
# Clean up temp file
os.unlink(temp_path)
except subprocess.CalledProcessError:
status_text.set_text(('error', "Editor exited with error"))
except FileNotFoundError:
status_text.set_text(('error', f"Editor '{editor}' not found"))
except Exception as e:
status_text.set_text(('error', f"Edit error: {e}"))
finally:
# Restart the urwid loop
if self.loop:
self.loop.start()
load_btn = Button3DCompact("Load", on_load)
edit_btn = Button3DCompact("$EDITOR", on_external_edit)
# Code editor in a box - use ListBox for proper focus handling and scrolling
# Wrap in DOSScrollBar for DOS-style scrollbar with arrow buttons
code_edit_styled = urwid.AttrMap(code_edit, 'edit', 'edit_focus')
code_walker = urwid.SimpleFocusListWalker([code_edit_styled])
code_listbox = urwid.ListBox(code_walker)
code_scrollbar = DOSScrollBar(code_listbox)
code_box = urwid.LineBox(code_scrollbar, title="Code")
# Layout: Code editor on left, AI assist box on right
main_columns = urwid.Columns([
('weight', 1, code_box),
('weight', 1, ai_assist_box),
], dividechars=1)
# Use TabCyclePile so Tab cycles between sections
# Note: All flow widgets must be explicitly wrapped in ('pack', ...) when
# the Pile contains weighted items (urwid 3.x requirement)
body = TabCyclePile([
('pack', vars_text),
('pack', urwid.Divider()),
('pack', urwid.Columns([
('weight', 1, urwid.AttrMap(file_edit, 'edit', 'edit_focus')),
('pack', urwid.Text(" ")),
('pack', load_btn),
('pack', urwid.Text(" ")),
('pack', edit_btn),
])),
('pack', status_text),
('weight', 1, main_columns),
('pack', urwid.Divider()),
('pack', urwid.AttrMap(output_edit, 'edit', 'edit_focus')),
], tab_positions=[2, 4, 6])
title = "Edit Code Step" if existing else "Add Code Step"
dialog = Dialog(title, body, [("OK", on_ok), ("Cancel", on_cancel)])
self.show_overlay(dialog, width=90, height=30)
def _on_save_tool(self, _):
"""Save the tool."""
tool = self._current_tool
# Update from edits - widgets are wrapped in AttrMap, access base_widget
if not self._is_edit:
# Name edit is an AttrMap wrapping an Edit
name_edit = self._name_edit.base_widget if hasattr(self._name_edit, 'base_widget') else self._name_edit
if hasattr(name_edit, 'edit_text'):
tool.name = name_edit.edit_text.strip()
tool.description = self._desc_edit.base_widget.edit_text.strip()
tool.output = self._output_edit.base_widget.edit_text.strip()
if not tool.name:
self.message_box("Error", "Tool name is required.")
return
# Validate tool name
is_valid, error_msg = validate_tool_name(tool.name)
if not is_valid:
self.message_box("Error", error_msg)
return
if not self._is_edit and tool_exists(tool.name):
def on_yes():
save_tool(tool)
self.message_box("Success", f"Tool '{tool.name}' saved!", self.show_main_menu)
self.yes_no("Overwrite?", f"Tool '{tool.name}' exists. Overwrite?", on_yes=on_yes)
else:
save_tool(tool)
self.message_box("Success", f"Tool '{tool.name}' saved!", self.show_main_menu)
def _on_cancel_tool(self, _):
"""Cancel tool editing."""
self.show_main_menu()
def _test_tool(self, tool):
"""Test a tool with mock input."""
def on_input(text):
from ..runner import run_tool
output, code = run_tool(
tool=tool,
input_text=text,
custom_args={},
provider_override="mock",
dry_run=False,
show_prompt=False,
verbose=False
)
result = f"Exit code: {code}\n\nOutput:\n{output[:300]}"
self.message_box("Test Result", result)
self.input_dialog("Test Input", "Enter test input", "Hello world", on_input)
# ==================== Registry Browser ====================
def browse_registry(self):
"""Browse and install tools from the registry."""
self._registry_search_query = ""
self._registry_tools = []
self._selected_registry_tool = None
self._show_registry_browser()
def _show_registry_browser(self, search_query: str = ""):
"""Show the registry browser screen."""
# Search input
search_edit = urwid.Edit(('label', "Search: "), search_query)
search_edit = urwid.AttrMap(search_edit, 'edit', 'edit_focus')
# Status/loading text
status_text = urwid.Text(('label', "Loading..."))
# Tool list (will be populated)
self._registry_walker = urwid.SimpleFocusListWalker([])
tool_listbox = ToolListBox(self._registry_walker, on_focus_change=self._on_registry_tool_focus)
tool_box = urwid.LineBox(tool_listbox, title='Registry Tools')
# Info panel
self._reg_info_name = urwid.Text("")
self._reg_info_desc = urwid.Text("")
self._reg_info_owner = urwid.Text("")
self._reg_info_version = urwid.Text("")
self._reg_info_downloads = urwid.Text("")
self._reg_info_tags = urwid.Text("")
info_content = urwid.Pile([
self._reg_info_name,
self._reg_info_owner,
self._reg_info_version,
urwid.Divider(),
self._reg_info_desc,
urwid.Divider(),
self._reg_info_downloads,
self._reg_info_tags,
])
info_filler = urwid.Filler(info_content, valign='top')
info_box = urwid.LineBox(info_filler, title='Tool Info')
def do_search(_=None):
query = search_edit.base_widget.edit_text.strip()
self._registry_search_query = query
if query:
status_text.set_text(('label', f"Searching for '{query}'..."))
else:
status_text.set_text(('label', "Loading all tools..."))
self.refresh()
try:
client = RegistryClient()
# Use list_tools for browsing, search_tools only when there's a query
if query:
result = client.search_tools(query=query, per_page=50)
else:
result = client.list_tools(per_page=50)
self._registry_tools = result.data
# Update the list
self._registry_walker.clear()
if self._registry_tools:
for tool in self._registry_tools:
display = f"{tool['owner']}/{tool['name']}"
item = SelectableToolItem(display, on_select=lambda n: self._install_registry_tool())
item.tool_data = tool
self._registry_walker.append(item)
# Select first item
self._selected_registry_tool = self._registry_tools[0]
self._on_registry_tool_focus(f"{self._registry_tools[0]['owner']}/{self._registry_tools[0]['name']}")
status_text.set_text(('label', f"Found {len(self._registry_tools)} tools"))
else:
self._registry_walker.append(urwid.Text(('label', " (no results) ")))
status_text.set_text(('label', "No tools found"))
except RegistryError as e:
status_text.set_text(('error', f"Error: {e}"))
self._registry_walker.clear()
self._registry_walker.append(urwid.Text(('error', f" Error: {e} ")))
except Exception as e:
status_text.set_text(('error', f"Error: {e}"))
self._registry_walker.clear()
self._registry_walker.append(urwid.Text(('error', f" Error: {e} ")))
self.refresh()
def on_install(_):
self._install_registry_tool()
def on_close(_):
self.close_overlay()
self._refresh_main_menu()
# Buttons
search_btn = ClickableButton("Search", do_search)
install_btn = ClickableButton("Install", on_install)
close_btn = ClickableButton("Close", on_close)
buttons = urwid.Columns([
('pack', search_btn),
('pack', urwid.Text(" ")),
('pack', install_btn),
('pack', urwid.Text(" ")),
('pack', close_btn),
])
buttons_centered = urwid.Padding(buttons, align='center', width='pack')
# Search row
search_row = urwid.Columns([
('weight', 1, search_edit),
('pack', urwid.Text(" ")),
('pack', search_btn),
])
# Main layout with columns for list and info
list_and_info = urwid.Columns([
('weight', 1, tool_box),
('weight', 1, info_box),
])
body = urwid.Pile([
('pack', search_row),
('pack', status_text),
('pack', urwid.Divider()),
('weight', 1, list_and_info),
('pack', urwid.Divider()),
('pack', buttons_centered),
])
# Wrap in frame
header = urwid.Text(('header', ' Browse Registry '), align='center')
footer = urwid.Text(('footer', ' Enter: Install | Tab: Navigate | Esc: Close '), align='center')
frame = urwid.Frame(body, header=header, footer=footer)
frame = urwid.LineBox(frame)
frame = urwid.AttrMap(frame, 'dialog')
self.show_overlay(frame, width=80, height=24)
# Do initial search
do_search()
def _on_registry_tool_focus(self, name):
"""Called when a registry tool is focused."""
# Find the tool data
for tool in self._registry_tools:
if f"{tool['owner']}/{tool['name']}" == name:
self._selected_registry_tool = tool
break
if self._selected_registry_tool:
tool = self._selected_registry_tool
self._reg_info_name.set_text(('label', f"Name: {tool['name']}"))
self._reg_info_owner.set_text(f"Publisher: {tool['owner']}")
self._reg_info_version.set_text(f"Version: {tool.get('version', 'unknown')}")
self._reg_info_desc.set_text(f"Description: {tool.get('description', '(none)')}")
self._reg_info_downloads.set_text(f"Downloads: {tool.get('downloads', 0)}")
tags = tool.get('tags', [])
if tags:
self._reg_info_tags.set_text(f"Tags: {', '.join(tags)}")
else:
self._reg_info_tags.set_text("Tags: (none)")
# Update selection state
if hasattr(self, '_registry_walker'):
for item in self._registry_walker:
if isinstance(item, SelectableToolItem):
item.set_selected(item.name == name)
def _install_registry_tool(self):
"""Install the selected registry tool."""
if not self._selected_registry_tool:
self.message_box("Install", "No tool selected.")
return
tool = self._selected_registry_tool
tool_name = f"{tool['owner']}/{tool['name']}"
def do_install():
try:
client = RegistryClient()
client.install_tool(tool_name)
self.message_box("Success", f"Installed {tool_name}\n\nRun 'cmdforge refresh' to create wrapper script.")
except RegistryError as e:
self.message_box("Error", f"Failed to install: {e}")
except Exception as e:
self.message_box("Error", f"Failed to install: {e}")
self.yes_no(
"Install Tool",
f"Install {tool_name}?",
on_yes=do_install
)
# ==================== Provider Management ====================
def manage_providers(self):
"""Manage providers."""
self._show_providers_menu()
def _show_providers_menu(self):
"""Show providers management menu."""
providers = load_providers()
self._selected_provider_name = None
self._provider_walker = None
def on_provider_focus(name):
"""Called when a provider is focused."""
self._selected_provider_name = name
# Update selection state on all items
if self._provider_walker:
for item in self._provider_walker:
if isinstance(item, SelectableToolItem):
item.set_selected(item.name == name)
def on_provider_activate(name):
"""Called when Enter is pressed on a provider."""
self.close_overlay()
self._edit_provider_menu(name)
def on_add(_):
self.close_overlay()
self._add_provider_dialog()
def on_edit(_):
if self._selected_provider_name:
self.close_overlay()
self._edit_provider_menu(self._selected_provider_name)
else:
self.message_box("Edit", "No provider selected.")
def on_cancel(_):
self.close_overlay()
# Build provider list
items = []
for p in providers:
item = SelectableToolItem(f"{p.name}: {p.command}", on_select=on_provider_activate)
item.name = p.name # Store the actual provider name
items.append(item)
if not items:
items.append(urwid.Text(('label', " (no providers) ")))
self._provider_walker = urwid.SimpleFocusListWalker(items)
listbox = ToolListBox(self._provider_walker, on_focus_change=on_provider_focus)
listbox_box = urwid.LineBox(listbox, title='Providers')
# Buttons row
add_btn = ClickableButton("Add", on_add)
edit_btn = ClickableButton("Edit", on_edit)
cancel_btn = ClickableButton("Cancel", on_cancel)
buttons = urwid.Columns([
('pack', add_btn),
('pack', urwid.Text(" ")),
('pack', edit_btn),
('pack', urwid.Text(" ")),
('pack', cancel_btn),
])
buttons_centered = urwid.Padding(buttons, align='center', width='pack')
# Layout
body = urwid.Pile([
('weight', 1, listbox_box),
('pack', urwid.Divider()),
('pack', buttons_centered),
])
# Wrap in a frame with title
header = urwid.Text(('header', ' Manage Providers '), align='center')
frame = urwid.Frame(body, header=header)
frame = urwid.LineBox(frame)
frame = urwid.AttrMap(frame, 'dialog')
height = min(len(providers) + 10, 18)
self.show_overlay(frame, width=55, height=height)
# Set initial selection
if providers:
self._selected_provider_name = providers[0].name
on_provider_focus(providers[0].name)
def _add_provider_dialog(self):
"""Add a new provider."""
name_edit = urwid.Edit(('label', "Name: "), "")
cmd_edit = urwid.Edit(('label', "Command: "), "")
desc_edit = urwid.Edit(('label', "Description: "), "")
def on_ok(_):
name = name_edit.edit_text.strip()
cmd = cmd_edit.edit_text.strip()
desc = desc_edit.edit_text.strip()
if name and cmd:
add_provider(Provider(name=name, command=cmd, description=desc))
self.close_overlay()
self.message_box("Success", f"Provider '{name}' added.", self._show_providers_menu)
else:
self.message_box("Error", "Name and command are required.")
def on_cancel(_):
self.close_overlay()
self._show_providers_menu()
body = urwid.Pile([
urwid.AttrMap(name_edit, 'edit', 'edit_focus'),
urwid.Divider(),
urwid.AttrMap(cmd_edit, 'edit', 'edit_focus'),
urwid.Divider(),
urwid.AttrMap(desc_edit, 'edit', 'edit_focus'),
])
dialog = Dialog("Add Provider", body, [("OK", on_ok), ("Cancel", on_cancel)])
self.show_overlay(dialog, width=55, height=14)
def _edit_provider_menu(self, name):
"""Edit a provider."""
provider = get_provider(name)
if not provider:
return
name_edit = urwid.Edit(('label', "Name: "), provider.name)
cmd_edit = urwid.Edit(('label', "Command: "), provider.command)
desc_edit = urwid.Edit(('label', "Description: "), provider.description or "")
def on_save(_):
new_name = name_edit.edit_text.strip()
cmd = cmd_edit.edit_text.strip()
desc = desc_edit.edit_text.strip()
if new_name and cmd:
# Delete old provider if name changed
if new_name != name:
delete_provider(name)
# Save with new/same name
add_provider(Provider(name=new_name, command=cmd, description=desc))
self.close_overlay()
self.message_box("Success", f"Provider '{new_name}' saved.", self._show_providers_menu)
else:
self.message_box("Error", "Name and command are required.")
def on_delete(_):
self.close_overlay()
def do_delete():
delete_provider(name)
self.message_box("Deleted", f"Provider '{name}' deleted.", self._show_providers_menu)
self.yes_no("Confirm", f"Delete provider '{name}'?", on_yes=do_delete, on_no=self._show_providers_menu)
def on_cancel(_):
self.close_overlay()
self._show_providers_menu()
body = urwid.Pile([
urwid.AttrMap(name_edit, 'edit', 'edit_focus'),
urwid.Divider(),
urwid.AttrMap(cmd_edit, 'edit', 'edit_focus'),
urwid.Divider(),
urwid.AttrMap(desc_edit, 'edit', 'edit_focus'),
])
dialog = Dialog("Edit Provider", body, [("Save", on_save), ("Delete", on_delete), ("Cancel", on_cancel)])
self.show_overlay(dialog, width=55, height=16)
def run_ui():
"""Entry point for the urwid UI."""
ui = CmdForgeUI()
ui.run()