387 lines
11 KiB
Python
387 lines
11 KiB
Python
"""Interactive fuzzy tool picker for CmdForge - inline dropdown style."""
|
|
|
|
import sys
|
|
import os
|
|
import tty
|
|
import termios
|
|
import select
|
|
from typing import List, Tuple, Optional
|
|
from dataclasses import dataclass
|
|
|
|
from ..tool import list_tools, load_tool
|
|
|
|
|
|
@dataclass
|
|
class PickerResult:
|
|
"""Result from the picker."""
|
|
tool_name: str
|
|
arguments: dict
|
|
|
|
|
|
# ANSI escape codes
|
|
CLEAR_LINE = "\033[2K"
|
|
MOVE_UP = "\033[A"
|
|
HIDE_CURSOR = "\033[?25l"
|
|
SHOW_CURSOR = "\033[?25h"
|
|
BOLD = "\033[1m"
|
|
DIM = "\033[2m"
|
|
CYAN = "\033[36m"
|
|
YELLOW = "\033[33m"
|
|
GREEN = "\033[32m"
|
|
RESET = "\033[0m"
|
|
|
|
MAX_VISIBLE = 8 # Show at most 8 items
|
|
|
|
# Output stream for UI (stderr when stdout is piped, stdout otherwise)
|
|
_ui_out = None
|
|
|
|
|
|
def _write(text: str):
|
|
"""Write to UI output stream."""
|
|
_ui_out.write(text)
|
|
_ui_out.flush()
|
|
|
|
|
|
def fuzzy_match(query: str, text: str) -> Tuple[bool, int]:
|
|
"""Fuzzy match with scoring."""
|
|
if not query:
|
|
return True, 0
|
|
|
|
query = query.lower()
|
|
text = text.lower()
|
|
|
|
if query in text:
|
|
if text.startswith(query):
|
|
return True, 1000 + len(query)
|
|
return True, 500 + len(query)
|
|
|
|
# Character-by-character fuzzy match
|
|
qi = 0
|
|
score = 0
|
|
for i, c in enumerate(text):
|
|
if qi < len(query) and c == query[qi]:
|
|
score += 10 if i == 0 or text[i-1] in ' -_' else 1
|
|
qi += 1
|
|
|
|
return (qi == len(query), score) if qi == len(query) else (False, 0)
|
|
|
|
|
|
def get_tools() -> List[dict]:
|
|
"""Get all tools with info."""
|
|
tools = []
|
|
for name in list_tools():
|
|
tool = load_tool(name)
|
|
if tool:
|
|
tools.append({
|
|
"name": name,
|
|
"desc": (tool.description or "")[:50],
|
|
"args": [{
|
|
"flag": a.flag,
|
|
"default": a.default or "",
|
|
"desc": a.description or ""
|
|
} for a in tool.arguments]
|
|
})
|
|
return sorted(tools, key=lambda t: t["name"])
|
|
|
|
|
|
class TTYInput:
|
|
"""Read from /dev/tty for keyboard input, even when stdin is piped."""
|
|
|
|
def __init__(self):
|
|
self.tty = None
|
|
self.fd = None
|
|
self.old_settings = None
|
|
|
|
def __enter__(self):
|
|
self.tty = open('/dev/tty', 'r')
|
|
self.fd = self.tty.fileno()
|
|
self.old_settings = termios.tcgetattr(self.fd)
|
|
return self
|
|
|
|
def __exit__(self, *args):
|
|
if self.old_settings:
|
|
termios.tcsetattr(self.fd, termios.TCSANOW, self.old_settings)
|
|
if self.tty:
|
|
self.tty.close()
|
|
|
|
def getch(self):
|
|
"""Read a single character."""
|
|
tty.setraw(self.fd)
|
|
try:
|
|
ch = self.tty.read(1)
|
|
if ch == '\x1b':
|
|
# Use select to check if more chars available (escape sequence)
|
|
# Timeout of 0.05s - if nothing comes, it was just Escape
|
|
if select.select([self.fd], [], [], 0.05)[0]:
|
|
ch2 = self.tty.read(1)
|
|
if ch2 == '[':
|
|
if select.select([self.fd], [], [], 0.05)[0]:
|
|
ch3 = self.tty.read(1)
|
|
if ch3 == 'A': return 'UP'
|
|
if ch3 == 'B': return 'DOWN'
|
|
if ch3 == 'C': return 'RIGHT'
|
|
if ch3 == 'D': return 'LEFT'
|
|
return ch
|
|
finally:
|
|
termios.tcsetattr(self.fd, termios.TCSANOW, self.old_settings)
|
|
|
|
|
|
def clear_dropdown(n_lines: int):
|
|
"""Clear the dropdown lines we drew."""
|
|
for _ in range(n_lines):
|
|
_write(MOVE_UP + CLEAR_LINE)
|
|
_write('\r')
|
|
|
|
|
|
def run_picker(tty_input: TTYInput) -> Optional[PickerResult]:
|
|
"""Run inline picker."""
|
|
tools = get_tools()
|
|
if not tools:
|
|
_write("No tools. Create one: cmdforge\n")
|
|
return None
|
|
|
|
query = ""
|
|
selected = 0
|
|
scroll = 0
|
|
last_drawn = 0
|
|
|
|
_write(HIDE_CURSOR)
|
|
|
|
try:
|
|
while True:
|
|
# Filter
|
|
matches = []
|
|
for t in tools:
|
|
ok, score = fuzzy_match(query, t["name"])
|
|
if not ok:
|
|
ok, score = fuzzy_match(query, t["desc"])
|
|
score = score // 2
|
|
if ok:
|
|
matches.append((t, score))
|
|
matches.sort(key=lambda x: -x[1])
|
|
filtered = [m[0] for m in matches]
|
|
|
|
if selected >= len(filtered):
|
|
selected = max(0, len(filtered) - 1)
|
|
|
|
# Adjust scroll to keep selection visible
|
|
if selected < scroll:
|
|
scroll = selected
|
|
elif selected >= scroll + MAX_VISIBLE:
|
|
scroll = selected - MAX_VISIBLE + 1
|
|
|
|
# Clear previous
|
|
if last_drawn:
|
|
clear_dropdown(last_drawn)
|
|
|
|
# Draw
|
|
visible = filtered[scroll:scroll + MAX_VISIBLE]
|
|
lines = []
|
|
|
|
# Query line
|
|
prompt = f"{DIM}>{RESET} {YELLOW}{query}{RESET}▌"
|
|
lines.append(prompt)
|
|
|
|
# Items
|
|
for i, t in enumerate(visible):
|
|
actual_idx = scroll + i
|
|
if actual_idx == selected:
|
|
prefix = f"{CYAN}{BOLD}▸ {t['name']}{RESET}"
|
|
else:
|
|
prefix = f" {t['name']}"
|
|
|
|
# Add arg indicator
|
|
if t['args']:
|
|
prefix += f" {GREEN}⚙{RESET}"
|
|
|
|
# Add description
|
|
if t['desc']:
|
|
prefix += f" {DIM}- {t['desc']}{RESET}"
|
|
|
|
lines.append(prefix)
|
|
|
|
# Print
|
|
_write('\n'.join(lines) + '\n')
|
|
last_drawn = len(lines)
|
|
|
|
# Input
|
|
ch = tty_input.getch()
|
|
|
|
if ch in ('\r', '\n'): # Enter - run
|
|
if filtered:
|
|
clear_dropdown(last_drawn)
|
|
_write(SHOW_CURSOR)
|
|
return PickerResult(filtered[selected]["name"], {})
|
|
|
|
elif ch == '\t': # Tab - configure args or run
|
|
if filtered:
|
|
tool = filtered[selected]
|
|
clear_dropdown(last_drawn)
|
|
if tool['args']:
|
|
args = pick_args(tty_input, tool)
|
|
_write(SHOW_CURSOR)
|
|
return PickerResult(tool["name"], args) if args is not None else None
|
|
_write(SHOW_CURSOR)
|
|
return PickerResult(tool["name"], {})
|
|
|
|
elif ch == '\x1b' or ch == '\x03': # Esc or Ctrl+C
|
|
clear_dropdown(last_drawn)
|
|
_write(SHOW_CURSOR)
|
|
return None
|
|
|
|
elif ch == 'UP':
|
|
selected = max(0, selected - 1)
|
|
elif ch == 'DOWN':
|
|
selected = min(len(filtered) - 1, selected + 1)
|
|
|
|
elif ch == '\x7f' or ch == '\b': # Backspace
|
|
query = query[:-1]
|
|
selected = 0
|
|
scroll = 0
|
|
|
|
elif ch.isprintable():
|
|
query += ch
|
|
selected = 0
|
|
scroll = 0
|
|
|
|
except Exception:
|
|
_write(SHOW_CURSOR)
|
|
raise
|
|
|
|
|
|
def pick_args(tty_input: TTYInput, tool: dict) -> Optional[dict]:
|
|
"""Inline argument picker."""
|
|
args = tool['args']
|
|
values = {a['flag']: a['default'] for a in args}
|
|
selected = 0
|
|
editing = None
|
|
edit_buf = ""
|
|
last_drawn = 0
|
|
|
|
_write(HIDE_CURSOR)
|
|
|
|
try:
|
|
while True:
|
|
if last_drawn:
|
|
clear_dropdown(last_drawn)
|
|
|
|
lines = []
|
|
lines.append(f"{BOLD}{tool['name']}{RESET} arguments:")
|
|
|
|
for i, arg in enumerate(args):
|
|
flag = arg['flag']
|
|
val = edit_buf if editing == i else values[flag]
|
|
|
|
if i == selected:
|
|
if editing == i:
|
|
line = f"{CYAN}▸ {flag}: {YELLOW}{val}▌{RESET}"
|
|
else:
|
|
line = f"{CYAN}{BOLD}▸ {flag}:{RESET} {val or f'{DIM}(empty){RESET}'}"
|
|
else:
|
|
line = f" {flag}: {val or f'{DIM}(empty){RESET}'}"
|
|
|
|
if arg['desc'] and editing != i:
|
|
line += f" {DIM}# {arg['desc'][:30]}{RESET}"
|
|
|
|
lines.append(line)
|
|
|
|
if editing is None:
|
|
lines.append(f"{DIM}Tab:edit Enter:run Esc:back{RESET}")
|
|
|
|
_write('\n'.join(lines) + '\n')
|
|
last_drawn = len(lines)
|
|
|
|
ch = tty_input.getch()
|
|
|
|
if editing is not None:
|
|
if ch in ('\r', '\n'): # Save
|
|
values[args[editing]['flag']] = edit_buf
|
|
editing = None
|
|
edit_buf = ""
|
|
elif ch == '\x1b': # Cancel edit
|
|
editing = None
|
|
edit_buf = ""
|
|
elif ch == '\x7f' or ch == '\b':
|
|
edit_buf = edit_buf[:-1]
|
|
elif ch.isprintable():
|
|
edit_buf += ch
|
|
else:
|
|
if ch in ('\r', '\n'): # Enter - run
|
|
clear_dropdown(last_drawn)
|
|
return values
|
|
elif ch == '\t': # Tab - edit
|
|
editing = selected
|
|
edit_buf = values[args[selected]['flag']]
|
|
elif ch == '\x1b' or ch == '\x03': # Back
|
|
clear_dropdown(last_drawn)
|
|
return None
|
|
elif ch == 'UP':
|
|
selected = max(0, selected - 1)
|
|
elif ch == 'DOWN':
|
|
selected = min(len(args) - 1, selected + 1)
|
|
|
|
except Exception:
|
|
_write(SHOW_CURSOR)
|
|
raise
|
|
|
|
|
|
def main():
|
|
"""Entry point for cf command."""
|
|
global _ui_out
|
|
import subprocess
|
|
|
|
# Read piped input if stdin is not a tty
|
|
piped_input = None
|
|
if not sys.stdin.isatty():
|
|
piped_input = sys.stdin.read()
|
|
|
|
# Check if we have a terminal available
|
|
if not os.path.exists('/dev/tty'):
|
|
print("cf requires a terminal", file=sys.stderr)
|
|
sys.exit(1)
|
|
|
|
# Use stderr for UI if stdout is piped, so tool output stays clean
|
|
_ui_out = sys.stderr if not sys.stdout.isatty() else sys.stdout
|
|
|
|
try:
|
|
with TTYInput() as tty_input:
|
|
result = run_picker(tty_input)
|
|
except OSError as e:
|
|
print(f"cf requires a terminal: {e}", file=sys.stderr)
|
|
sys.exit(1)
|
|
|
|
if result:
|
|
# Build command
|
|
cmd = [result.tool_name]
|
|
for flag, val in result.arguments.items():
|
|
if val:
|
|
cmd.extend([flag, val])
|
|
|
|
# Show what we're running and flush immediately
|
|
sys.stderr.write(f"{BOLD}{' '.join(cmd)}{RESET}\n")
|
|
sys.stderr.flush()
|
|
|
|
# Run it - use subprocess if we have piped input, exec otherwise
|
|
tool_path = os.path.expanduser(f"~/.local/bin/{result.tool_name}")
|
|
|
|
if piped_input is not None:
|
|
# Use subprocess so we can pass stdin
|
|
if os.path.exists(tool_path):
|
|
proc = subprocess.run([tool_path] + cmd[1:], input=piped_input, text=True)
|
|
else:
|
|
proc = subprocess.run(
|
|
["python", "-m", "cmdforge.runner"] + cmd,
|
|
input=piped_input, text=True
|
|
)
|
|
sys.exit(proc.returncode)
|
|
else:
|
|
# No piped input - use exec for efficiency
|
|
if os.path.exists(tool_path):
|
|
os.execv(tool_path, cmd)
|
|
else:
|
|
os.execlp("python", "python", "-m", "cmdforge.runner", *cmd)
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main()
|