Refactor CLI and UI into modular packages, add registry scrutiny

Major refactoring:
- Split cli.py (1465 lines) into cli/ package with 6 modules
- Split ui_urwid.py (2313 lines) into ui_urwid/ package with 4 modules
- Maintain backwards compatibility via thin wrapper modules

New features:
- Add tool scrutiny system for registry publishing (honesty, transparency, scope, efficiency checks)
- Add optimization suggestions for AI calls that could be pure code

Bug fixes:
- Fix variable substitution escaping ({{literal}} now works)
- Fix provider command parsing with shlex for quoted paths
- Add error logging even without --verbose flag

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
This commit is contained in:
rob 2026-01-01 02:44:39 -04:00
parent 4fb229858f
commit 97f9de52e6
27 changed files with 5195 additions and 3781 deletions

View File

@ -554,7 +554,16 @@ smarttools providers test claude
## Links
### User Documentation
- [Installation Guide](docs/INSTALL.md)
- [Provider Setup](docs/PROVIDERS.md)
- [Example Tools](docs/EXAMPLES.md)
- [User Wiki](wiki/Home.md)
### Developer Documentation
- [Project Overview](docs/PROJECT.md) - Start here to understand the codebase
- [Design Document](docs/DESIGN.md)
- [Registry API Design](docs/REGISTRY.md)
- [Web UI Design](docs/WEB_UI.md)
- [Deployment Guide](docs/DEPLOYMENT.md)
- [Architecture Diagrams](docs/diagrams/)

264
docs/DEPLOYMENT.md Normal file
View File

@ -0,0 +1,264 @@
# SmartTools Deployment Guide
This guide covers deploying the SmartTools Registry web application.
## Quick Start (Development)
```bash
# Clone and install
git clone https://gitea.brrd.tech/rob/SmartTools.git
cd SmartTools
pip install -e ".[dev]"
# Initialize database
cd src/smarttools/web
python -c "from app import create_app; create_app()"
# Run development server
flask run --port 5050 --host 0.0.0.0
```
Access at `http://localhost:5050`
## Production Deployment
### Prerequisites
- Python 3.8+
- pip/pipx
- nginx (recommended for reverse proxy)
- systemd (for process management)
### 1. Install Application
```bash
# Create application directory
sudo mkdir -p /opt/smarttools
sudo chown $USER:$USER /opt/smarttools
cd /opt/smarttools
# Clone repository
git clone https://gitea.brrd.tech/rob/SmartTools.git .
# Create virtual environment
python3 -m venv venv
source venv/bin/activate
# Install dependencies
pip install -e ".[prod]"
pip install gunicorn
```
### 2. Configure Environment
Create `/opt/smarttools/.env`:
```bash
# Required
FLASK_ENV=production
SECRET_KEY=your-secret-key-here # Generate with: python -c "import secrets; print(secrets.token_hex(32))"
# Optional
SENTRY_DSN=https://xxx@sentry.io/xxx # Error monitoring
SITE_URL=https://registry.smarttools.dev # For sitemap/SEO
```
### 3. Initialize Database
```bash
cd /opt/smarttools/src/smarttools/web
python -c "from app import create_app; create_app()"
```
Database will be created at `data/smarttools.db`.
### 4. systemd Service
Create `/etc/systemd/system/smarttools-web.service`:
```ini
[Unit]
Description=SmartTools Registry Web Application
After=network.target
[Service]
Type=simple
User=www-data
Group=www-data
WorkingDirectory=/opt/smarttools
Environment="PATH=/opt/smarttools/venv/bin"
EnvironmentFile=/opt/smarttools/.env
ExecStart=/opt/smarttools/venv/bin/gunicorn \
--workers 4 \
--bind 127.0.0.1:5050 \
--access-logfile /var/log/smarttools/access.log \
--error-logfile /var/log/smarttools/error.log \
'smarttools.web.app:create_app()'
Restart=always
RestartSec=5
[Install]
WantedBy=multi-user.target
```
Enable and start:
```bash
# Create log directory
sudo mkdir -p /var/log/smarttools
sudo chown www-data:www-data /var/log/smarttools
# Enable service
sudo systemctl daemon-reload
sudo systemctl enable smarttools-web
sudo systemctl start smarttools-web
# Check status
sudo systemctl status smarttools-web
```
### 5. nginx Configuration
Create `/etc/nginx/sites-available/smarttools`:
```nginx
server {
listen 80;
server_name registry.smarttools.dev;
# Redirect HTTP to HTTPS
return 301 https://$server_name$request_uri;
}
server {
listen 443 ssl http2;
server_name registry.smarttools.dev;
# SSL certificates (use certbot for Let's Encrypt)
ssl_certificate /etc/letsencrypt/live/registry.smarttools.dev/fullchain.pem;
ssl_certificate_key /etc/letsencrypt/live/registry.smarttools.dev/privkey.pem;
# SSL settings
ssl_protocols TLSv1.2 TLSv1.3;
ssl_prefer_server_ciphers on;
ssl_ciphers ECDHE-ECDSA-AES128-GCM-SHA256:ECDHE-RSA-AES128-GCM-SHA256;
# Static files (optional - Flask can serve these)
location /static/ {
alias /opt/smarttools/src/smarttools/web/static/;
expires 1d;
add_header Cache-Control "public, immutable";
}
# Proxy to gunicorn
location / {
proxy_pass http://127.0.0.1:5050;
proxy_set_header Host $host;
proxy_set_header X-Real-IP $remote_addr;
proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for;
proxy_set_header X-Forwarded-Proto $scheme;
}
}
```
Enable site:
```bash
sudo ln -s /etc/nginx/sites-available/smarttools /etc/nginx/sites-enabled/
sudo nginx -t
sudo systemctl reload nginx
```
### 6. SSL with Let's Encrypt
```bash
sudo apt install certbot python3-certbot-nginx
sudo certbot --nginx -d registry.smarttools.dev
```
## Environment Variables
| Variable | Required | Description |
|----------|----------|-------------|
| `FLASK_ENV` | Yes | `production` or `development` |
| `SECRET_KEY` | Yes | Secret key for sessions (32+ chars) |
| `SENTRY_DSN` | No | Sentry error monitoring DSN |
| `SITE_URL` | No | Public URL for sitemap generation |
| `DATABASE_PATH` | No | Override database path (default: `data/smarttools.db`) |
## Database Backup
```bash
# Backup
cp /opt/smarttools/data/smarttools.db /backup/smarttools-$(date +%Y%m%d).db
# Or use SQLite backup command for consistency
sqlite3 /opt/smarttools/data/smarttools.db ".backup '/backup/smarttools.db'"
```
## Monitoring
### Logs
```bash
# Application logs
tail -f /var/log/smarttools/error.log
tail -f /var/log/smarttools/access.log
# systemd logs
journalctl -u smarttools-web -f
```
### Sentry Integration
The application includes Sentry integration for error monitoring. Set `SENTRY_DSN` environment variable to enable.
### Health Check
```bash
curl http://localhost:5050/api/v1/tools
```
## Troubleshooting
### Service won't start
```bash
# Check logs
journalctl -u smarttools-web -n 50
# Verify Python path
/opt/smarttools/venv/bin/python -c "import smarttools"
# Test gunicorn manually
cd /opt/smarttools
source venv/bin/activate
gunicorn --bind 127.0.0.1:5050 'smarttools.web.app:create_app()'
```
### Database errors
```bash
# Check database exists and is readable
ls -la /opt/smarttools/data/smarttools.db
# Verify permissions
chown www-data:www-data /opt/smarttools/data/
chown www-data:www-data /opt/smarttools/data/smarttools.db
```
### Static files not loading
```bash
# Rebuild Tailwind CSS
cd /opt/smarttools
npm install
npm run css:build
# Or use development CSS
# (static/css/output.css should exist)
```
## Architecture Diagram
See `docs/diagrams/deployment-architecture.svg` for visual representation.

180
docs/PROJECT.md Normal file
View File

@ -0,0 +1,180 @@
# SmartTools Project Overview
This document provides a comprehensive overview of the SmartTools project structure, components, and how everything fits together.
## What is SmartTools?
SmartTools is a personal tool builder for AI-powered CLI commands. It consists of three main components:
1. **CLI Tool** (`smarttools`) - Create, manage, and run AI-powered command-line tools
2. **Registry API** - REST API for publishing and discovering tools
3. **Web UI** - Browser interface for the registry with docs, search, and user dashboard
## Quick Reference
| Component | Location | Purpose |
|-----------|----------|---------|
| CLI | `src/smarttools/cli.py` | Main entry point for `smarttools` command |
| Registry API | `src/smarttools/registry/` | REST API for tool registry |
| Web UI | `src/smarttools/web/` | Flask web application |
| Tool Storage | `~/.smarttools/` | User's installed tools |
| Database | `data/smarttools.db` | SQLite with FTS5 search |
## Architecture Diagrams
See `docs/diagrams/` for visual representations:
- **system-architecture.svg** - How components interact
- **deployment-architecture.svg** - Production deployment setup
- **source-organization.svg** - Code structure
## Source Code Structure
```
src/smarttools/
├── cli.py # Entry point, subcommand routing
├── tool.py # Tool dataclasses, YAML loading
├── runner.py # Step execution engine
├── providers.py # AI provider abstraction
├── config.py # Global configuration
├── manifest.py # Tool manifest handling
├── registry/ # Registry API Server
│ ├── app.py # Flask API application
│ ├── db.py # Database operations, FTS5 search
│ ├── rate_limit.py # API rate limiting
│ ├── sync.py # Git sync for tool submissions
│ ├── categorize.py # Auto-categorization
│ └── similarity.py # Similar tool recommendations
├── web/ # Web UI Application
│ ├── app.py # Flask web app, Sentry integration
│ ├── routes.py # Page routes and handlers
│ ├── auth.py # Password hashing, login/register
│ ├── sessions.py # Session management
│ ├── docs_content.py # Documentation text content
│ ├── seo.py # Sitemap, robots.txt
│ ├── filters.py # Jinja template filters
│ ├── templates/ # Jinja HTML templates
│ └── static/ # CSS, JS, images
├── registry_client.py # CLI client for registry API
├── resolver.py # Dependency resolution
├── ui.py # TUI dispatcher
├── ui_urwid.py # urwid TUI implementation
└── ui_snack.py # newt/snack TUI fallback
```
## Database Schema
SQLite database at `data/smarttools.db`:
| Table | Purpose |
|-------|---------|
| `publishers` | User accounts (username, email, password_hash) |
| `tools` | Published tools (name, owner, description, config) |
| `api_tokens` | Authentication tokens for API access |
| `download_stats` | Tool download tracking |
| `pageviews` | Analytics for tool detail pages |
| `tools_fts` | Full-text search index (FTS5) |
## API Endpoints
Base URL: `/api/v1/`
| Endpoint | Method | Purpose |
|----------|--------|---------|
| `/tools` | GET | List/search tools |
| `/tools/<owner>/<name>` | GET | Get tool details |
| `/tools/<owner>/<name>` | POST | Publish tool (auth required) |
| `/tools/<owner>/<name>/download` | POST | Record download |
| `/me` | GET | Current user info |
| `/tokens` | GET/POST | Manage API tokens |
## Web Routes
| Route | Purpose |
|-------|---------|
| `/` | Homepage with featured tools |
| `/tools` | Tool browser with search |
| `/tools/<owner>/<name>` | Tool detail page |
| `/docs/*` | Documentation pages |
| `/login`, `/register` | Authentication |
| `/dashboard/*` | User dashboard, token management |
## Key Files to Know
### Configuration
- `~/.smarttools/providers.yaml` - AI provider configurations
- `~/.smarttools/config.yaml` - Global settings
- `~/.smarttools/<tool>/config.yaml` - Individual tool configs
### Development
- `pyproject.toml` - Package configuration
- `pytest.ini` - Test configuration
- `tailwind.config.js` - Tailwind CSS configuration
### Documentation
- `docs/REGISTRY.md` - Registry design document (comprehensive)
- `docs/WEB_UI.md` - Web UI design document (comprehensive)
- `wiki/Home.md` - User-facing wiki documentation
- `CLAUDE.md` - Development guidance for AI assistants
## Running the Project
### Development
```bash
# Install with dev dependencies
pip install -e ".[dev]"
# Run CLI
python -m smarttools.cli
# Run web server (development)
cd src/smarttools/web
flask run --port 5050
```
### Production
```bash
# Using gunicorn
gunicorn -w 4 -b 0.0.0.0:5050 'smarttools.web.app:create_app()'
# Environment variables needed:
# FLASK_ENV=production
# SECRET_KEY=<your-secret-key>
# SENTRY_DSN=<optional-sentry-dsn>
# SITE_URL=https://your-domain.com
```
### systemd Service
See `docs/DEPLOYMENT.md` for systemd service configuration.
## Testing
```bash
# Run all tests
pytest
# Run specific test
pytest tests/test_name.py::test_function
# Run with coverage
pytest --cov=smarttools
```
## Design Documents
For detailed design decisions and specifications:
- **docs/REGISTRY.md** - Complete registry API design (Phases 1-8)
- **docs/WEB_UI.md** - Complete web UI design (Phase 7)
- **wiki/Home.md** - User documentation for CLI usage
## Archive
Old development discussions and diagrams are preserved in `archive/discussions/` for historical reference.

File diff suppressed because it is too large Load Diff

View File

@ -0,0 +1,233 @@
"""CLI entry point for SmartTools."""
import argparse
import sys
from .. import __version__
from .tool_commands import (
cmd_list, cmd_create, cmd_edit, cmd_delete, cmd_test, cmd_run,
cmd_ui, cmd_refresh, cmd_docs
)
from .provider_commands import cmd_providers
from .registry_commands import cmd_registry
from .project_commands import cmd_deps, cmd_install_deps, cmd_add, cmd_init
from .config_commands import cmd_config
def main():
"""Main CLI entry point."""
parser = argparse.ArgumentParser(
prog="smarttools",
description="A lightweight personal tool builder for AI-powered CLI commands"
)
parser.add_argument("--version", action="version", version=f"%(prog)s {__version__}")
subparsers = parser.add_subparsers(dest="command", help="Available commands")
# No command = launch UI
# 'list' command
p_list = subparsers.add_parser("list", help="List all tools")
p_list.set_defaults(func=cmd_list)
# 'create' command
p_create = subparsers.add_parser("create", help="Create a new tool")
p_create.add_argument("name", help="Tool name")
p_create.add_argument("-d", "--description", help="Tool description")
p_create.add_argument("-p", "--prompt", help="Prompt template")
p_create.add_argument("--provider", help="AI provider (default: mock)")
p_create.add_argument("-f", "--force", action="store_true", help="Overwrite existing")
p_create.set_defaults(func=cmd_create)
# 'edit' command
p_edit = subparsers.add_parser("edit", help="Edit a tool config")
p_edit.add_argument("name", help="Tool name")
p_edit.set_defaults(func=cmd_edit)
# 'delete' command
p_delete = subparsers.add_parser("delete", help="Delete a tool")
p_delete.add_argument("name", help="Tool name")
p_delete.add_argument("-f", "--force", action="store_true", help="Skip confirmation")
p_delete.set_defaults(func=cmd_delete)
# 'test' command
p_test = subparsers.add_parser("test", help="Test a tool with mock provider")
p_test.add_argument("name", help="Tool name")
p_test.add_argument("-i", "--input", help="Input file for testing")
p_test.add_argument("--dry-run", action="store_true", help="Show prompt only")
p_test.set_defaults(func=cmd_test)
# 'run' command
p_run = subparsers.add_parser("run", help="Run a tool")
p_run.add_argument("name", help="Tool name")
p_run.add_argument("-i", "--input", help="Input file (reads from stdin if piped)")
p_run.add_argument("-o", "--output", help="Output file (writes to stdout if omitted)")
p_run.add_argument("--stdin", action="store_true", help="Read input interactively (type then Ctrl+D)")
p_run.add_argument("-p", "--provider", help="Override provider")
p_run.add_argument("--dry-run", action="store_true", help="Show what would happen without executing")
p_run.add_argument("--show-prompt", action="store_true", help="Show prompts in addition to output")
p_run.add_argument("-v", "--verbose", action="store_true", help="Show debug information")
p_run.add_argument("tool_args", nargs="*", help="Additional tool-specific arguments")
p_run.set_defaults(func=cmd_run)
# 'ui' command (explicit)
p_ui = subparsers.add_parser("ui", help="Launch interactive UI")
p_ui.set_defaults(func=cmd_ui)
# 'refresh' command
p_refresh = subparsers.add_parser("refresh", help="Refresh all wrapper scripts")
p_refresh.set_defaults(func=cmd_refresh)
# 'docs' command
p_docs = subparsers.add_parser("docs", help="View or edit tool documentation")
p_docs.add_argument("name", help="Tool name")
p_docs.add_argument("-e", "--edit", action="store_true", help="Edit/create README in $EDITOR")
p_docs.set_defaults(func=cmd_docs)
# 'providers' command
p_providers = subparsers.add_parser("providers", help="Manage AI providers")
providers_sub = p_providers.add_subparsers(dest="providers_cmd", help="Provider commands")
# providers list
p_prov_list = providers_sub.add_parser("list", help="List all providers and their status")
p_prov_list.set_defaults(func=cmd_providers)
# providers check
p_prov_check = providers_sub.add_parser("check", help="Check which providers are available")
p_prov_check.set_defaults(func=cmd_providers)
# providers install
p_prov_install = providers_sub.add_parser("install", help="Interactive guide to install AI providers")
p_prov_install.set_defaults(func=cmd_providers)
# providers add
p_prov_add = providers_sub.add_parser("add", help="Add or update a provider")
p_prov_add.add_argument("name", help="Provider name")
p_prov_add.add_argument("command", help="Command to run (e.g., 'claude -p')")
p_prov_add.add_argument("-d", "--description", help="Provider description")
p_prov_add.set_defaults(func=cmd_providers)
# providers remove
p_prov_remove = providers_sub.add_parser("remove", help="Remove a provider")
p_prov_remove.add_argument("name", help="Provider name")
p_prov_remove.set_defaults(func=cmd_providers)
# providers test
p_prov_test = providers_sub.add_parser("test", help="Test a provider")
p_prov_test.add_argument("name", help="Provider name")
p_prov_test.set_defaults(func=cmd_providers)
# Default for providers with no subcommand
p_providers.set_defaults(func=lambda args: cmd_providers(args) if args.providers_cmd else (setattr(args, 'providers_cmd', 'list') or cmd_providers(args)))
# -------------------------------------------------------------------------
# Registry Commands
# -------------------------------------------------------------------------
p_registry = subparsers.add_parser("registry", help="Registry commands (search, install, publish)")
registry_sub = p_registry.add_subparsers(dest="registry_cmd", help="Registry commands")
# registry search
p_reg_search = registry_sub.add_parser("search", help="Search for tools")
p_reg_search.add_argument("query", help="Search query")
p_reg_search.add_argument("-c", "--category", help="Filter by category")
p_reg_search.add_argument("-l", "--limit", type=int, help="Max results (default: 20)")
p_reg_search.set_defaults(func=cmd_registry)
# registry install
p_reg_install = registry_sub.add_parser("install", help="Install a tool from registry")
p_reg_install.add_argument("tool", help="Tool to install (owner/name or name)")
p_reg_install.add_argument("-v", "--version", help="Version constraint")
p_reg_install.set_defaults(func=cmd_registry)
# registry uninstall
p_reg_uninstall = registry_sub.add_parser("uninstall", help="Uninstall a tool")
p_reg_uninstall.add_argument("tool", help="Tool to uninstall (owner/name)")
p_reg_uninstall.set_defaults(func=cmd_registry)
# registry info
p_reg_info = registry_sub.add_parser("info", help="Show tool information")
p_reg_info.add_argument("tool", help="Tool name (owner/name)")
p_reg_info.set_defaults(func=cmd_registry)
# registry update
p_reg_update = registry_sub.add_parser("update", help="Update local index cache")
p_reg_update.set_defaults(func=cmd_registry)
# registry publish
p_reg_publish = registry_sub.add_parser("publish", help="Publish a tool to registry")
p_reg_publish.add_argument("path", nargs="?", help="Path to tool directory (default: current dir)")
p_reg_publish.add_argument("--dry-run", action="store_true", help="Validate without publishing")
p_reg_publish.set_defaults(func=cmd_registry)
# registry my-tools
p_reg_mytools = registry_sub.add_parser("my-tools", help="List your published tools")
p_reg_mytools.set_defaults(func=cmd_registry)
# registry browse
p_reg_browse = registry_sub.add_parser("browse", help="Browse tools (TUI)")
p_reg_browse.set_defaults(func=cmd_registry)
# Default for registry with no subcommand
p_registry.set_defaults(func=lambda args: cmd_registry(args) if args.registry_cmd else (setattr(args, 'registry_cmd', None) or cmd_registry(args)))
# -------------------------------------------------------------------------
# Project Commands
# -------------------------------------------------------------------------
# 'deps' command
p_deps = subparsers.add_parser("deps", help="Show project dependencies")
p_deps.set_defaults(func=cmd_deps)
# 'install' command (for dependencies)
p_install = subparsers.add_parser("install", help="Install dependencies from smarttools.yaml")
p_install.set_defaults(func=cmd_install_deps)
# 'add' command
p_add = subparsers.add_parser("add", help="Add a tool to project dependencies")
p_add.add_argument("tool", help="Tool to add (owner/name)")
p_add.add_argument("-v", "--version", help="Version constraint (default: *)")
p_add.add_argument("--no-install", action="store_true", help="Don't install after adding")
p_add.set_defaults(func=cmd_add)
# 'init' command
p_init = subparsers.add_parser("init", help="Initialize smarttools.yaml")
p_init.add_argument("-n", "--name", help="Project name")
p_init.add_argument("-v", "--version", help="Project version")
p_init.add_argument("-f", "--force", action="store_true", help="Overwrite existing")
p_init.set_defaults(func=cmd_init)
# -------------------------------------------------------------------------
# Config Commands
# -------------------------------------------------------------------------
p_config = subparsers.add_parser("config", help="Manage configuration")
config_sub = p_config.add_subparsers(dest="config_cmd", help="Config commands")
# config show
p_cfg_show = config_sub.add_parser("show", help="Show current configuration")
p_cfg_show.set_defaults(func=cmd_config)
# config set-token
p_cfg_token = config_sub.add_parser("set-token", help="Set registry authentication token")
p_cfg_token.add_argument("token", help="Registry token")
p_cfg_token.set_defaults(func=cmd_config)
# config set
p_cfg_set = config_sub.add_parser("set", help="Set a configuration value")
p_cfg_set.add_argument("key", help="Config key")
p_cfg_set.add_argument("value", help="Config value")
p_cfg_set.set_defaults(func=cmd_config)
# Default for config with no subcommand
p_config.set_defaults(func=lambda args: cmd_config(args) if args.config_cmd else (setattr(args, 'config_cmd', 'show') or cmd_config(args)))
args = parser.parse_args()
# If no command, launch UI
if args.command is None:
return cmd_ui(args)
return args.func(args)
if __name__ == "__main__":
sys.exit(main())

View File

@ -0,0 +1,7 @@
"""Allow running the CLI as a module."""
import sys
from . import main
if __name__ == "__main__":
sys.exit(main())

View File

@ -0,0 +1,62 @@
"""Configuration commands."""
from ..config import load_config, save_config, set_registry_token
def cmd_config(args):
"""Manage SmartTools configuration."""
if args.config_cmd == "show":
return _cmd_config_show(args)
elif args.config_cmd == "set-token":
return _cmd_config_set_token(args)
elif args.config_cmd == "set":
return _cmd_config_set(args)
else:
print("Config commands:")
print(" show Show current configuration")
print(" set-token <token> Set registry authentication token")
print(" set <key> <value> Set a configuration value")
return 0
def _cmd_config_show(args):
"""Show current configuration."""
config = load_config()
print("SmartTools Configuration:")
print(f" Registry URL: {config.registry.url}")
print(f" Token: {'***' if config.registry.token else '(not set)'}")
print(f" Client ID: {config.client_id}")
print(f" Auto-fetch: {config.auto_fetch_from_registry}")
if config.default_provider:
print(f" Default provider: {config.default_provider}")
return 0
def _cmd_config_set_token(args):
"""Set registry authentication token."""
token = args.token
set_registry_token(token)
print("Registry token saved.")
return 0
def _cmd_config_set(args):
"""Set a configuration value."""
config = load_config()
key = args.key
value = args.value
if key == "auto_fetch":
config.auto_fetch_from_registry = value.lower() in ("true", "1", "yes")
elif key == "default_provider":
config.default_provider = value if value else None
elif key == "registry_url":
config.registry.url = value
else:
print(f"Unknown config key: {key}")
print("Available keys: auto_fetch, default_provider, registry_url")
return 1
save_config(config)
print(f"Set {key} = {value}")
return 0

View File

@ -0,0 +1,202 @@
"""Project management commands."""
import sys
from pathlib import Path
from ..manifest import (
load_manifest, save_manifest, create_manifest, find_manifest,
MANIFEST_FILENAME
)
from ..resolver import (
find_tool, install_from_registry, ToolSpec
)
def cmd_deps(args):
"""Show project dependencies from smarttools.yaml."""
manifest = load_manifest()
if manifest is None:
print("No smarttools.yaml found in current project.")
print("Create one with: smarttools init")
return 1
print(f"Project: {manifest.name} v{manifest.version}")
print()
if not manifest.dependencies:
print("No dependencies defined.")
print("Add one with: smarttools add <owner/name>")
return 0
print(f"Dependencies ({len(manifest.dependencies)}):")
print()
for dep in manifest.dependencies:
# Check if installed
installed = find_tool(dep.name)
status = "[installed]" if installed else "[not installed]"
print(f" {dep.name}")
print(f" Version: {dep.version}")
print(f" Status: {status}")
print()
if manifest.overrides:
print("Overrides:")
for name, override in manifest.overrides.items():
if override.provider:
print(f" {name}: provider={override.provider}")
return 0
def cmd_install_deps(args):
"""Install dependencies from smarttools.yaml."""
from ..registry_client import RegistryError
manifest = load_manifest()
if manifest is None:
print("No smarttools.yaml found in current project.")
print("Create one with: smarttools init")
return 1
if not manifest.dependencies:
print("No dependencies to install.")
return 0
print(f"Installing dependencies for {manifest.name}...")
print()
failed = []
installed = []
for i, dep in enumerate(manifest.dependencies, 1):
print(f"[{i}/{len(manifest.dependencies)}] {dep.name}@{dep.version}")
# Check if already installed
existing = find_tool(dep.name)
if existing:
print(f" Already installed: {existing.full_name}")
installed.append(dep.name)
continue
try:
print(f" Downloading...")
resolved = install_from_registry(dep.name, dep.version)
print(f" Installed: {resolved.full_name}@{resolved.version}")
installed.append(dep.name)
except RegistryError as e:
if e.code == "TOOL_NOT_FOUND":
print(f" Not found in registry")
elif e.code == "VERSION_NOT_FOUND" or e.code == "CONSTRAINT_UNSATISFIABLE":
print(f" Version {dep.version} not available")
elif e.code == "CONNECTION_ERROR":
print(f" Connection failed (check network)")
else:
print(f" Failed: {e.message}")
failed.append(dep.name)
except Exception as e:
print(f" Failed: {e}")
failed.append(dep.name)
print()
print(f"Installed {len(installed)} tools")
if failed:
print(f"Failed: {', '.join(failed)}")
return 1
return 0
def cmd_add(args):
"""Add a tool to project dependencies."""
from ..registry_client import RegistryError
tool_spec = args.tool
version = args.version or "*"
# Find or create manifest
manifest_path = find_manifest()
if manifest_path:
manifest = load_manifest(manifest_path)
else:
# Create in current directory
manifest = create_manifest(name=Path.cwd().name)
manifest_path = Path.cwd() / MANIFEST_FILENAME
# Parse tool spec
parsed = ToolSpec.parse(tool_spec)
full_name = parsed.full_name
# Add dependency
manifest.add_dependency(full_name, version)
# Save
save_manifest(manifest, manifest_path)
print(f"Added {full_name}@{version} to {manifest_path.name}")
# Install if requested
if not args.no_install:
print(f"Installing {full_name}...")
try:
resolved = install_from_registry(tool_spec, version if version != "*" else None)
print(f"Installed: {resolved.full_name}@{resolved.version}")
except RegistryError as e:
if e.code == "TOOL_NOT_FOUND":
print(f"Tool not found in registry.", file=sys.stderr)
print(f"It's been added to your dependencies - you can install it manually later.", file=sys.stderr)
elif e.code == "CONNECTION_ERROR":
print(f"Could not connect to registry.", file=sys.stderr)
print("Run 'smarttools install' to try again later.", file=sys.stderr)
else:
print(f"Install failed: {e.message}", file=sys.stderr)
print("Run 'smarttools install' to try again.", file=sys.stderr)
except Exception as e:
print(f"Install failed: {e}", file=sys.stderr)
print("Run 'smarttools install' to try again.", file=sys.stderr)
return 0
def cmd_init(args):
"""Initialize a new smarttools.yaml."""
manifest_path = Path.cwd() / MANIFEST_FILENAME
if manifest_path.exists() and not args.force:
print(f"{MANIFEST_FILENAME} already exists. Use --force to overwrite.")
return 1
# Get project name
default_name = Path.cwd().name
if args.name:
name = args.name
else:
try:
name = input(f"Project name [{default_name}]: ").strip() or default_name
except (EOFError, KeyboardInterrupt):
print()
name = default_name
# Get version
if args.version:
version = args.version
else:
try:
version = input("Version [1.0.0]: ").strip() or "1.0.0"
except (EOFError, KeyboardInterrupt):
print()
version = "1.0.0"
# Create manifest
manifest = create_manifest(name=name, version=version)
save_manifest(manifest, manifest_path)
print(f"Created {MANIFEST_FILENAME}")
print()
print("Add dependencies with: smarttools add <owner/name>")
print("Install them with: smarttools install")
return 0

View File

@ -0,0 +1,307 @@
"""Provider management commands."""
import shutil
import subprocess
from pathlib import Path
from ..providers import load_providers, add_provider, delete_provider, Provider, call_provider
PROVIDER_INSTALL_INFO = {
"claude": {
"group": "Anthropic Claude",
"install_cmd": "npm install -g @anthropic-ai/claude-code",
"requires": "Node.js 18+ and npm",
"setup": "Run 'claude' - opens browser for sign-in (auto-saves auth tokens)",
"cost": "Pay-per-use (billed to your Anthropic account)",
"variants": ["claude", "claude-haiku", "claude-opus", "claude-sonnet"],
},
"codex": {
"group": "OpenAI Codex",
"install_cmd": "npm install -g @openai/codex",
"requires": "Node.js 18+ and npm",
"setup": "Run 'codex' - opens browser for sign-in (auto-saves auth tokens)",
"cost": "Pay-per-use (billed to your OpenAI account)",
"variants": ["codex"],
},
"gemini": {
"group": "Google Gemini",
"install_cmd": "npm install -g @google/gemini-cli",
"requires": "Node.js 18+ and npm",
"setup": "Run 'gemini' - opens browser for Google sign-in",
"cost": "Free tier available, pay-per-use for more",
"variants": ["gemini", "gemini-flash"],
},
"opencode": {
"group": "OpenCode (75+ providers)",
"install_cmd": "curl -fsSL https://opencode.ai/install | bash",
"requires": "curl, bash",
"setup": "Run 'opencode' - opens browser to connect more providers",
"cost": "4 FREE models included (Big Pickle, GLM-4.7, Grok Code Fast 1, MiniMax M2.1), 75+ more available",
"variants": ["opencode-pickle", "opencode-deepseek", "opencode-nano", "opencode-reasoner", "opencode-grok"],
},
"ollama": {
"group": "Ollama (Local LLMs)",
"install_cmd": "curl -fsSL https://ollama.ai/install.sh | bash",
"requires": "curl, bash, 8GB+ RAM (GPU recommended)",
"setup": "Run 'ollama pull llama3' to download a model, then add provider",
"cost": "FREE (runs entirely on your machine)",
"variants": [],
"custom": True,
"post_install_note": "After installing, add the provider:\n smarttools providers add ollama 'ollama run llama3' -d 'Local Llama 3'",
},
}
def cmd_providers(args):
"""Manage AI providers."""
if args.providers_cmd == "install":
return _cmd_providers_install(args)
elif args.providers_cmd == "list":
return _cmd_providers_list(args)
elif args.providers_cmd == "add":
return _cmd_providers_add(args)
elif args.providers_cmd == "remove":
return _cmd_providers_remove(args)
elif args.providers_cmd == "test":
return _cmd_providers_test(args)
elif args.providers_cmd == "check":
return _cmd_providers_check(args)
return 0
def _cmd_providers_install(args):
"""Interactive guide to install AI providers."""
print("=" * 60)
print("SmartTools Provider Installation Guide")
print("=" * 60)
print()
# Check what's already installed
providers = load_providers()
installed_groups = set()
for p in providers:
if p.name.lower() == "mock":
continue
cmd_parts = p.command.split()[0]
cmd_expanded = cmd_parts.replace("$HOME", str(Path.home())).replace("~", str(Path.home()))
if shutil.which(cmd_expanded) or Path(cmd_expanded).exists():
# Find which group this belongs to
for group, info in PROVIDER_INSTALL_INFO.items():
if p.name in info.get("variants", []):
installed_groups.add(group)
# Show available provider groups
print("Available AI Provider Groups:\n")
options = []
for i, (key, info) in enumerate(PROVIDER_INSTALL_INFO.items(), 1):
status = "[INSTALLED]" if key in installed_groups else ""
print(f" {i}. {info['group']} {status}")
print(f" Cost: {info['cost']}")
print(f" Models: {', '.join(info['variants']) if info['variants'] else 'Custom'}")
print()
options.append(key)
print(" 0. Cancel")
print()
try:
choice = input("Select a provider to install (1-{}, or 0 to cancel): ".format(len(options)))
choice = int(choice)
except (ValueError, EOFError):
print("Cancelled.")
return 0
if choice == 0 or choice > len(options):
print("Cancelled.")
return 0
selected = options[choice - 1]
info = PROVIDER_INSTALL_INFO[selected]
print()
print("=" * 60)
print(f"Installing: {info['group']}")
print("=" * 60)
print()
print(f"Requirements: {info['requires']}")
print(f"Install command: {info['install_cmd']}")
print(f"Post-install: {info['setup']}")
print()
try:
confirm = input("Run installation command? (y/N): ").strip().lower()
except EOFError:
confirm = "n"
if confirm == "y":
print()
print(f"Running: {info['install_cmd']}")
print("-" * 40)
result = subprocess.run(info['install_cmd'], shell=True)
print("-" * 40)
if result.returncode == 0:
# Refresh PATH to pick up newly installed tools
import os
new_paths = []
# Common install locations that might have been added
potential_paths = [
Path.home() / ".opencode" / "bin", # OpenCode
Path.home() / ".local" / "bin", # pip/pipx installs
Path("/usr/local/bin"), # Ollama, system installs
]
# Also try to get npm global bin path
try:
npm_result = subprocess.run(
["npm", "bin", "-g"],
capture_output=True, text=True, timeout=5
)
if npm_result.returncode == 0:
npm_bin = npm_result.stdout.strip()
if npm_bin:
potential_paths.append(Path(npm_bin))
except (subprocess.TimeoutExpired, FileNotFoundError):
pass
current_path = os.environ.get("PATH", "")
for p in potential_paths:
if p.exists() and str(p) not in current_path:
new_paths.append(str(p))
if new_paths:
os.environ["PATH"] = ":".join(new_paths) + ":" + current_path
print()
print("Installation completed!")
print()
print("IMPORTANT: Refresh your shell PATH before continuing:")
print(" source ~/.bashrc")
print()
print(f"Next steps:")
print(f" 1. source ~/.bashrc (required!)")
print(f" 2. {info['setup']}")
if info.get('post_install_note'):
print(f" 3. {info['post_install_note']}")
print(f" 4. Test with: smarttools providers test {selected}")
else:
print(f" 3. Test with: smarttools providers test {info['variants'][0] if info['variants'] else selected}")
else:
print()
print(f"Installation failed (exit code {result.returncode})")
print("Try running the command manually:")
print(f" {info['install_cmd']}")
else:
print()
print("To install manually, run:")
print(f" {info['install_cmd']}")
print()
print(f"Then: {info['setup']}")
return 0
def _cmd_providers_list(args):
"""List all providers and their status."""
providers = load_providers()
print(f"Configured providers ({len(providers)}):\n")
for p in providers:
# Mock provider is always available
if p.name.lower() == "mock":
print(f" [+] {p.name}")
print(f" Command: (built-in)")
print(f" Status: OK (always available)")
if p.description:
print(f" Info: {p.description}")
print()
continue
# Check if command exists
cmd_parts = p.command.split()[0]
cmd_expanded = cmd_parts.replace("$HOME", str(Path.home())).replace("~", str(Path.home()))
exists = shutil.which(cmd_expanded) is not None or Path(cmd_expanded).exists()
status = "OK" if exists else "NOT FOUND"
status_icon = "+" if exists else "-"
print(f" [{status_icon}] {p.name}")
print(f" Command: {p.command}")
print(f" Status: {status}")
if p.description:
print(f" Info: {p.description}")
print()
return 0
def _cmd_providers_add(args):
"""Add or update a provider."""
name = args.name
command = args.command
description = args.description or ""
provider = Provider(name, command, description)
add_provider(provider)
print(f"Provider '{name}' added/updated.")
return 0
def _cmd_providers_remove(args):
"""Remove a provider."""
name = args.name
if delete_provider(name):
print(f"Provider '{name}' removed.")
return 0
else:
print(f"Provider '{name}' not found.")
return 1
def _cmd_providers_test(args):
"""Test a provider."""
name = args.name
print(f"Testing provider '{name}'...")
result = call_provider(name, "Say 'hello' and nothing else.", timeout=30)
if result.success:
print(f"SUCCESS: {result.text[:200]}...")
else:
print(f"FAILED: {result.error}")
return 0 if result.success else 1
def _cmd_providers_check(args):
"""Check which providers are available."""
providers = load_providers()
print("Checking all providers...\n")
available = []
missing = []
for p in providers:
# Mock provider is always available (handled specially)
if p.name.lower() == "mock":
available.append(p.name)
print(f" [+] {p.name}: OK (built-in)")
continue
cmd_parts = p.command.split()[0]
cmd_expanded = cmd_parts.replace("$HOME", str(Path.home())).replace("~", str(Path.home()))
exists = shutil.which(cmd_expanded) is not None or Path(cmd_expanded).exists()
if exists:
available.append(p.name)
print(f" [+] {p.name}: OK")
else:
missing.append(p.name)
print(f" [-] {p.name}: NOT FOUND ({cmd_parts})")
print(f"\nSummary: {len(available)} available, {len(missing)} missing")
if len(available) == 1 and available[0] == "mock":
print(f"\nNo real AI providers found. Install one of these:")
print(f" - claude: npm install -g @anthropic-ai/claude-cli")
print(f" - codex: pip install openai-codex")
print(f" - gemini: pip install google-generative-ai")
print(f"\nMeanwhile, use mock provider for testing:")
print(f" echo 'test' | summarize --provider mock")
elif missing:
print(f"\nAvailable providers: {', '.join(available)}")
return 0

View File

@ -0,0 +1,425 @@
"""Registry commands."""
import sys
from pathlib import Path
import yaml
from ..config import load_config, set_registry_token
from ..resolver import install_from_registry, uninstall_tool, ToolSpec
def cmd_registry(args):
"""Handle registry subcommands."""
from ..registry_client import RegistryClient, RegistryError, RateLimitError, get_client
if args.registry_cmd == "search":
return _cmd_registry_search(args)
elif args.registry_cmd == "install":
return _cmd_registry_install(args)
elif args.registry_cmd == "uninstall":
return _cmd_registry_uninstall(args)
elif args.registry_cmd == "info":
return _cmd_registry_info(args)
elif args.registry_cmd == "update":
return _cmd_registry_update(args)
elif args.registry_cmd == "publish":
return _cmd_registry_publish(args)
elif args.registry_cmd == "my-tools":
return _cmd_registry_my_tools(args)
elif args.registry_cmd == "browse":
return _cmd_registry_browse(args)
else:
# Default: show registry help
print("Registry commands:")
print(" search <query> Search for tools")
print(" install <tool> Install a tool")
print(" uninstall <tool> Uninstall a tool")
print(" info <tool> Show tool information")
print(" update Update local index cache")
print(" publish [path] Publish a tool")
print(" my-tools List your published tools")
print(" browse Browse tools (TUI)")
return 0
def _cmd_registry_search(args):
"""Search for tools in the registry."""
from ..registry_client import RegistryError, get_client
try:
client = get_client()
results = client.search_tools(
query=args.query,
category=args.category,
per_page=args.limit or 20
)
if not results.data:
print(f"No tools found matching '{args.query}'")
return 0
print(f"Found {results.total} tools:\n")
for tool in results.data:
owner = tool.get("owner", "")
name = tool.get("name", "")
version = tool.get("version", "")
desc = tool.get("description", "")
downloads = tool.get("downloads", 0)
print(f" {owner}/{name} v{version}")
print(f" {desc[:60]}{'...' if len(desc) > 60 else ''}")
print(f" Downloads: {downloads}")
print()
if results.total_pages > 1:
print(f"Showing page {results.page}/{results.total_pages}")
except RegistryError as e:
if e.code == "CONNECTION_ERROR":
print("Could not connect to the registry.", file=sys.stderr)
print("Check your internet connection or try again later.", file=sys.stderr)
elif e.code == "RATE_LIMITED":
print(f"Rate limited. Please wait and try again.", file=sys.stderr)
else:
print(f"Error: {e.message}", file=sys.stderr)
return 1
except Exception as e:
print(f"Error searching registry: {e}", file=sys.stderr)
print("If the problem persists, check: smarttools config show", file=sys.stderr)
return 1
return 0
def _cmd_registry_install(args):
"""Install a tool from the registry."""
from ..registry_client import RegistryError
from ..tool import BIN_DIR
tool_spec = args.tool
version = args.version
print(f"Installing {tool_spec}...")
try:
resolved = install_from_registry(tool_spec, version)
print(f"Installed: {resolved.full_name}@{resolved.version}")
print(f"Location: {resolved.path}")
# Show wrapper info
wrapper_name = resolved.tool.name
if resolved.owner:
# Check for collision
short_wrapper = BIN_DIR / resolved.tool.name
if short_wrapper.exists():
wrapper_name = f"{resolved.owner}-{resolved.tool.name}"
print(f"\nRun with: {wrapper_name}")
except RegistryError as e:
if e.code == "TOOL_NOT_FOUND":
print(f"Tool '{tool_spec}' not found in the registry.", file=sys.stderr)
print(f"Try: smarttools registry search {tool_spec.split('/')[-1]}", file=sys.stderr)
elif e.code == "VERSION_NOT_FOUND" or e.code == "CONSTRAINT_UNSATISFIABLE":
print(f"Error: {e.message}", file=sys.stderr)
if e.details and "available_versions" in e.details:
versions = e.details["available_versions"]
print(f"Available versions: {', '.join(versions[:5])}", file=sys.stderr)
if e.details.get("latest_stable"):
print(f"Latest stable: {e.details['latest_stable']}", file=sys.stderr)
elif e.code == "CONNECTION_ERROR":
print("Could not connect to the registry.", file=sys.stderr)
print("Check your internet connection or try again later.", file=sys.stderr)
else:
print(f"Error: {e.message}", file=sys.stderr)
return 1
except Exception as e:
print(f"Error installing tool: {e}", file=sys.stderr)
return 1
return 0
def _cmd_registry_uninstall(args):
"""Uninstall a tool."""
tool_spec = args.tool
print(f"Uninstalling {tool_spec}...")
if uninstall_tool(tool_spec):
print(f"Uninstalled: {tool_spec}")
else:
print(f"Tool '{tool_spec}' not found", file=sys.stderr)
return 1
return 0
def _cmd_registry_info(args):
"""Show tool information."""
from ..registry_client import RegistryError, get_client
tool_spec = args.tool
try:
# Parse the tool spec
parsed = ToolSpec.parse(tool_spec)
owner = parsed.owner or "official"
client = get_client()
tool_info = client.get_tool(owner, parsed.name)
print(f"{tool_info.owner}/{tool_info.name} v{tool_info.version}")
print("=" * 50)
print(f"Description: {tool_info.description}")
print(f"Category: {tool_info.category}")
print(f"Tags: {', '.join(tool_info.tags)}")
print(f"Downloads: {tool_info.downloads}")
print(f"Published: {tool_info.published_at}")
if tool_info.deprecated:
print()
print(f"DEPRECATED: {tool_info.deprecated_message}")
if tool_info.replacement:
print(f"Use instead: {tool_info.replacement}")
# Show versions
versions = client.get_tool_versions(owner, parsed.name)
if versions:
print(f"\nVersions: {', '.join(versions[:5])}")
if len(versions) > 5:
print(f" ...and {len(versions) - 5} more")
print(f"\nInstall: smarttools registry install {tool_info.owner}/{tool_info.name}")
except RegistryError as e:
if e.code == "TOOL_NOT_FOUND":
print(f"Tool '{tool_spec}' not found in the registry.", file=sys.stderr)
print(f"Try: smarttools registry search {parsed.name}", file=sys.stderr)
elif e.code == "CONNECTION_ERROR":
print("Could not connect to the registry.", file=sys.stderr)
print("Check your internet connection or try again later.", file=sys.stderr)
else:
print(f"Error: {e.message}", file=sys.stderr)
return 1
except Exception as e:
print(f"Error fetching tool info: {e}", file=sys.stderr)
return 1
return 0
def _cmd_registry_update(args):
"""Update local index cache."""
from ..registry_client import RegistryError, get_client
print("Updating registry index...")
try:
client = get_client()
index = client.get_index(force_refresh=True)
tool_count = index.get("tool_count", len(index.get("tools", [])))
generated = index.get("generated_at", "unknown")
print(f"Index updated: {tool_count} tools")
print(f"Generated: {generated}")
except RegistryError as e:
if e.code == "CONNECTION_ERROR":
print("Could not connect to the registry.", file=sys.stderr)
print("Check your internet connection or try again later.", file=sys.stderr)
elif e.code == "RATE_LIMITED":
print("Rate limited. Please wait a moment and try again.", file=sys.stderr)
else:
print(f"Error: {e.message}", file=sys.stderr)
return 1
except Exception as e:
print(f"Error updating index: {e}", file=sys.stderr)
return 1
return 0
def _cmd_registry_publish(args):
"""Publish a tool to the registry."""
from ..registry_client import RegistryError, get_client
# Read tool from current directory or specified path
tool_path = Path(args.path) if args.path else Path.cwd()
if tool_path.is_dir():
config_path = tool_path / "config.yaml"
else:
config_path = tool_path
tool_path = config_path.parent
if not config_path.exists():
print(f"Error: config.yaml not found in {tool_path}", file=sys.stderr)
return 1
# Read config
config_yaml = config_path.read_text()
# Read README if exists
readme_path = tool_path / "README.md"
readme = readme_path.read_text() if readme_path.exists() else ""
# Validate
try:
data = yaml.safe_load(config_yaml)
name = data.get("name", "")
version = data.get("version", "")
if not name or not version:
print("Error: config.yaml must have 'name' and 'version' fields", file=sys.stderr)
return 1
except yaml.YAMLError as e:
print(f"Error: Invalid YAML in config.yaml: {e}", file=sys.stderr)
return 1
if args.dry_run:
print("Dry run - validating only")
print()
print(f"Would publish:")
print(f" Name: {name}")
print(f" Version: {version}")
print(f" Config: {len(config_yaml)} bytes")
print(f" README: {len(readme)} bytes")
return 0
# Check for token
config = load_config()
if not config.registry.token:
print("No registry token configured.")
print()
print("1. Register at: https://gitea.brrd.tech/registry/register")
print("2. Generate a token from your dashboard")
print("3. Enter your token below")
print()
try:
token = input("Registry token: ").strip()
if not token:
print("Cancelled.")
return 1
set_registry_token(token)
print("Token saved.")
except (EOFError, KeyboardInterrupt):
print("\nCancelled.")
return 1
print(f"Publishing {name}@{version}...")
try:
client = get_client()
result = client.publish_tool(config_yaml, readme)
pr_url = result.get("pr_url", "")
status = result.get("status", "")
if status == "published" or result.get("version"):
print(f"Published: {result.get('owner', '')}/{result.get('name', '')}@{result.get('version', version)}")
elif pr_url:
print(f"PR created: {pr_url}")
print("Your tool is pending review.")
else:
print("Published successfully!")
# Show suggestions if provided (from Phase 6 smart features)
suggestions = result.get("suggestions", {})
if suggestions:
print()
# Category suggestion
cat_suggestion = suggestions.get("category")
if cat_suggestion and cat_suggestion.get("suggested"):
confidence = cat_suggestion.get("confidence", 0)
print(f"Suggested category: {cat_suggestion['suggested']} ({confidence:.0%} confidence)")
# Similar tools warning
similar = suggestions.get("similar_tools", [])
if similar:
print("Similar existing tools:")
for tool in similar[:3]:
similarity = tool.get("similarity", 0)
print(f" - {tool.get('name', 'unknown')} ({similarity:.0%} similar)")
except RegistryError as e:
if e.code == "UNAUTHORIZED":
print("Authentication failed.", file=sys.stderr)
print("Your token may have expired. Generate a new one from the registry.", file=sys.stderr)
elif e.code == "INVALID_CONFIG":
print(f"Invalid tool config: {e.message}", file=sys.stderr)
print("Check your config.yaml for errors.", file=sys.stderr)
elif e.code == "VERSION_EXISTS":
print(f"Version already exists: {e.message}", file=sys.stderr)
print("Bump the version in config.yaml and try again.", file=sys.stderr)
elif e.code == "CONNECTION_ERROR":
print("Could not connect to the registry.", file=sys.stderr)
print("Check your internet connection or try again later.", file=sys.stderr)
elif e.code == "RATE_LIMITED":
print("Rate limited. Please wait a moment and try again.", file=sys.stderr)
else:
print(f"Error: {e.message}", file=sys.stderr)
return 1
except Exception as e:
print(f"Error publishing: {e}", file=sys.stderr)
return 1
return 0
def _cmd_registry_my_tools(args):
"""List your published tools."""
from ..registry_client import RegistryError, get_client
try:
client = get_client()
tools = client.get_my_tools()
if not tools:
print("You haven't published any tools yet.")
print("Publish your first tool with: smarttools registry publish")
return 0
print(f"Your published tools ({len(tools)}):\n")
for tool in tools:
status = "[DEPRECATED]" if tool.deprecated else ""
print(f" {tool.owner}/{tool.name} v{tool.version} {status}")
print(f" Downloads: {tool.downloads}")
print()
except RegistryError as e:
if e.code == "UNAUTHORIZED":
print("Not logged in. Set your registry token first:", file=sys.stderr)
print(" smarttools config set-token <token>", file=sys.stderr)
print()
print("Don't have a token? Register at the registry website.", file=sys.stderr)
elif e.code == "CONNECTION_ERROR":
print("Could not connect to the registry.", file=sys.stderr)
print("Check your internet connection or try again later.", file=sys.stderr)
elif e.code == "RATE_LIMITED":
print("Rate limited. Please wait a moment and try again.", file=sys.stderr)
else:
print(f"Error: {e.message}", file=sys.stderr)
return 1
except Exception as e:
print(f"Error: {e}", file=sys.stderr)
return 1
return 0
def _cmd_registry_browse(args):
"""Browse tools (TUI)."""
try:
from ..ui_registry import run_registry_browser
return run_registry_browser()
except ImportError:
print("TUI browser requires urwid. Install with:", file=sys.stderr)
print(" pip install 'smarttools[tui]'", file=sys.stderr)
print()
print("Or search from command line:", file=sys.stderr)
print(" smarttools registry search <query>", file=sys.stderr)
return 1

View File

@ -0,0 +1,333 @@
"""Tool management commands."""
import sys
from pathlib import Path
from ..tool import (
list_tools, load_tool, save_tool, delete_tool, get_tools_dir,
Tool, ToolArgument, PromptStep, CodeStep
)
from ..ui import run_ui
def cmd_list(args):
"""List all tools."""
tools = list_tools()
if not tools:
print("No tools found.")
print("Create your first tool with: smarttools ui")
return 0
print(f"Available tools ({len(tools)}):\n")
for name in tools:
tool = load_tool(name)
if tool:
print(f" {name}")
print(f" {tool.description or 'No description'}")
# Show arguments
if tool.arguments:
args_str = ", ".join(arg.flag for arg in tool.arguments)
print(f" Arguments: {args_str}")
# Show steps
if tool.steps:
step_info = []
for step in tool.steps:
if isinstance(step, PromptStep):
step_info.append(f"PROMPT[{step.provider}]")
elif isinstance(step, CodeStep):
step_info.append("CODE")
print(f" Steps: {' -> '.join(step_info)}")
print()
return 0
def cmd_create(args):
"""Create a new tool (basic CLI creation - use 'ui' for full builder)."""
name = args.name
# Check if already exists
existing = load_tool(name)
if existing and not args.force:
print(f"Error: Tool '{name}' already exists. Use --force to overwrite.")
return 1
# Create a tool with a single prompt step
steps = []
if args.prompt:
steps.append(PromptStep(
prompt=args.prompt,
provider=args.provider or "mock",
output_var="response"
))
tool = Tool(
name=name,
description=args.description or "",
arguments=[],
steps=steps,
output="{response}" if steps else "{input}"
)
path = save_tool(tool)
print(f"Created tool '{name}'")
print(f"Config: {path}")
print(f"\nUse 'smarttools ui' to add arguments, steps, and customize.")
print(f"Or run: {name} < input.txt")
return 0
def cmd_edit(args):
"""Edit a tool (opens in $EDITOR or nano)."""
import os
import subprocess
tool = load_tool(args.name)
if not tool:
print(f"Error: Tool '{args.name}' not found.")
return 1
config_path = get_tools_dir() / args.name / "config.yaml"
editor = os.environ.get("EDITOR", "nano")
try:
subprocess.run([editor, str(config_path)], check=True)
print(f"Tool '{args.name}' updated.")
return 0
except subprocess.CalledProcessError:
print(f"Error: Editor failed.")
return 1
except FileNotFoundError:
print(f"Error: Editor '{editor}' not found. Set $EDITOR environment variable.")
return 1
def cmd_delete(args):
"""Delete a tool."""
if not load_tool(args.name):
print(f"Error: Tool '{args.name}' not found.")
return 1
if not args.force:
confirm = input(f"Delete tool '{args.name}'? [y/N] ")
if confirm.lower() != 'y':
print("Cancelled.")
return 0
if delete_tool(args.name):
print(f"Deleted tool '{args.name}'.")
return 0
else:
print(f"Error: Failed to delete '{args.name}'.")
return 1
def cmd_test(args):
"""Test a tool with mock provider."""
tool = load_tool(args.name)
if not tool:
print(f"Error: Tool '{args.name}' not found.")
return 1
from ..runner import run_tool
# Read test input
if args.input:
input_text = Path(args.input).read_text()
else:
print("Enter test input (Ctrl+D to end):")
input_text = sys.stdin.read()
print("\n--- Running with mock provider ---\n")
output, code = run_tool(
tool=tool,
input_text=input_text,
custom_args={},
provider_override="mock",
dry_run=args.dry_run,
show_prompt=True,
verbose=True
)
if output:
print("\n--- Output ---\n")
print(output)
return code
def cmd_run(args):
"""Run a tool."""
from ..runner import run_tool
tool = load_tool(args.name)
if not tool:
print(f"Error: Tool '{args.name}' not found.", file=sys.stderr)
return 1
# Read input
if args.input:
# Read from file
input_path = Path(args.input)
if not input_path.exists():
print(f"Error: Input file not found: {args.input}", file=sys.stderr)
return 1
input_text = input_path.read_text()
elif args.stdin:
# Explicit interactive input requested
print("Reading from stdin (Ctrl+D to end):", file=sys.stderr)
input_text = sys.stdin.read()
elif not sys.stdin.isatty():
# Stdin is piped - read it
input_text = sys.stdin.read()
else:
# No input provided - use empty string
input_text = ""
# Collect custom args from remaining arguments
custom_args = {}
if args.tool_args:
# Parse tool-specific arguments
i = 0
while i < len(args.tool_args):
arg = args.tool_args[i]
if arg.startswith('--'):
key = arg[2:].replace('-', '_')
if i + 1 < len(args.tool_args) and not args.tool_args[i + 1].startswith('--'):
custom_args[key] = args.tool_args[i + 1]
i += 2
else:
custom_args[key] = True
i += 1
elif arg.startswith('-'):
key = arg[1:].replace('-', '_')
if i + 1 < len(args.tool_args) and not args.tool_args[i + 1].startswith('-'):
custom_args[key] = args.tool_args[i + 1]
i += 2
else:
custom_args[key] = True
i += 1
else:
i += 1
# Run tool
output, code = run_tool(
tool=tool,
input_text=input_text,
custom_args=custom_args,
provider_override=args.provider,
dry_run=args.dry_run,
show_prompt=args.show_prompt,
verbose=args.verbose
)
# Write output
if code == 0 and output:
if args.output:
Path(args.output).write_text(output)
else:
print(output)
return code
def cmd_ui(args):
"""Launch the interactive UI."""
run_ui()
return 0
def cmd_refresh(args):
"""Refresh all wrapper scripts with the current Python path."""
from ..tool import create_wrapper_script
tools = list_tools()
if not tools:
print("No tools found.")
return 0
print(f"Refreshing wrapper scripts for {len(tools)} tools...")
for name in tools:
path = create_wrapper_script(name)
print(f" {name} -> {path}")
print("\nDone. Wrapper scripts updated to use current Python interpreter.")
return 0
def cmd_docs(args):
"""View or edit tool documentation."""
import os
import subprocess
tool = load_tool(args.name)
if not tool:
print(f"Error: Tool '{args.name}' not found.")
return 1
readme_path = get_tools_dir() / args.name / "README.md"
if args.edit:
# Edit/create README
editor = os.environ.get("EDITOR", "nano")
# Create a template if README doesn't exist
if not readme_path.exists():
template = f"""# {args.name}
{tool.description or 'No description provided.'}
## Usage
```bash
echo "input" | {args.name}
```
## Arguments
| Flag | Default | Description |
|------|---------|-------------|
"""
for arg in tool.arguments:
template += f"| `{arg.flag}` | {arg.default or ''} | {arg.description or ''} |\n"
template += """
## Examples
```bash
# Example 1
```
## Requirements
- List any dependencies here
"""
readme_path.write_text(template)
print(f"Created template: {readme_path}")
try:
subprocess.run([editor, str(readme_path)], check=True)
print(f"Documentation updated: {readme_path}")
return 0
except subprocess.CalledProcessError:
print("Error: Editor failed.")
return 1
except FileNotFoundError:
print(f"Error: Editor '{editor}' not found. Set $EDITOR environment variable.")
return 1
else:
# View README
if not readme_path.exists():
print(f"No documentation found for '{args.name}'.")
print(f"Create it with: smarttools docs {args.name} --edit")
return 1
print(readme_path.read_text())
return 0

View File

@ -1,6 +1,7 @@
"""Provider abstraction for AI CLI tools."""
import os
import shlex
import subprocess
import shutil
from dataclasses import dataclass, field
@ -166,8 +167,14 @@ def call_provider(provider_name: str, prompt: str, timeout: int = 300) -> Provid
# Parse command (expand environment variables)
cmd = os.path.expandvars(provider.command)
# Check if base command exists
base_cmd = cmd.split()[0]
# Check if base command exists (use shlex for proper quote handling)
try:
cmd_parts = shlex.split(cmd)
base_cmd = cmd_parts[0] if cmd_parts else cmd.split()[0]
except ValueError:
# shlex failed (unbalanced quotes, etc.) - fall back to simple split
base_cmd = cmd.split()[0]
# Expand ~ for the which check
base_cmd_expanded = os.path.expanduser(base_cmd)
if not shutil.which(base_cmd_expanded) and not os.path.isfile(base_cmd_expanded):

View File

@ -1076,6 +1076,28 @@ def create_app() -> Flask:
except Exception:
pass
# Run automated scrutiny
scrutiny_report = None
try:
from .scrutiny import scrutinize_tool
scrutiny_report = scrutinize_tool(config_text, description or "", readme)
except Exception:
pass
# Check scrutiny decision
if scrutiny_report:
suggestions["scrutiny"] = scrutiny_report
if scrutiny_report.get("decision") == "reject":
# Find the failing check for error message
fail_findings = [f for f in scrutiny_report.get("findings", []) if f.get("result") == "fail"]
fail_msg = fail_findings[0]["message"] if fail_findings else "Tool failed automated review"
return error_response(
"SCRUTINY_FAILED",
f"Tool rejected: {fail_msg}",
400,
details={"scrutiny": scrutiny_report},
)
if dry_run:
return jsonify({
"data": {
@ -1088,12 +1110,24 @@ def create_app() -> Flask:
})
tags_json = json.dumps(tags)
# Determine status based on scrutiny
if scrutiny_report and scrutiny_report.get("decision") == "approve":
scrutiny_status = "approved"
elif scrutiny_report and scrutiny_report.get("decision") == "review":
scrutiny_status = "pending_review"
else:
scrutiny_status = "pending"
scrutiny_json = json.dumps(scrutiny_report) if scrutiny_report else None
g.db.execute(
"""
INSERT INTO tools (
owner, name, version, description, category, tags, config_yaml, readme,
publisher_id, deprecated, deprecated_message, replacement, downloads, published_at
) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
publisher_id, deprecated, deprecated_message, replacement, downloads,
scrutiny_status, scrutiny_report, published_at
) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
""",
[
owner,
@ -1109,6 +1143,8 @@ def create_app() -> Flask:
data.get("deprecated_message"),
data.get("replacement"),
0,
scrutiny_status,
scrutiny_json,
datetime.utcnow().isoformat(),
],
)
@ -1120,7 +1156,7 @@ def create_app() -> Flask:
"name": name,
"version": version,
"pr_url": "",
"status": "pending_review",
"status": scrutiny_status,
"suggestions": suggestions,
}
})
@ -1154,6 +1190,54 @@ def create_app() -> Flask:
})
return jsonify({"data": data})
@app.route("/api/v1/tools/<owner>/<name>/deprecate", methods=["POST"])
@require_token
def deprecate_tool(owner: str, name: str) -> Response:
"""Mark a tool as deprecated."""
if g.current_publisher["slug"] != owner:
return error_response("FORBIDDEN", "You can only deprecate your own tools", 403)
data = request.get_json() or {}
message = (data.get("deprecated_message") or data.get("message") or "").strip()
replacement = (data.get("replacement") or "").strip() or None
if message and len(message) > 500:
return error_response("VALIDATION_ERROR", "Message too long (max 500)", 400)
# Update all versions of the tool
result = g.db.execute(
"""
UPDATE tools SET deprecated = 1, deprecated_message = ?, replacement = ?
WHERE owner = ? AND name = ?
""",
[message or None, replacement, owner, name],
)
if result.rowcount == 0:
return error_response("TOOL_NOT_FOUND", f"Tool {owner}/{name} not found", 404)
g.db.commit()
return jsonify({"data": {"status": "deprecated", "owner": owner, "name": name}})
@app.route("/api/v1/tools/<owner>/<name>/undeprecate", methods=["POST"])
@require_token
def undeprecate_tool(owner: str, name: str) -> Response:
"""Remove deprecation status from a tool."""
if g.current_publisher["slug"] != owner:
return error_response("FORBIDDEN", "You can only undeprecate your own tools", 403)
result = g.db.execute(
"""
UPDATE tools SET deprecated = 0, deprecated_message = NULL, replacement = NULL
WHERE owner = ? AND name = ?
""",
[owner, name],
)
if result.rowcount == 0:
return error_response("TOOL_NOT_FOUND", f"Tool {owner}/{name} not found", 404)
g.db.commit()
return jsonify({"data": {"status": "active", "owner": owner, "name": name}})
@app.route("/api/v1/me/settings", methods=["PUT"])
@require_token
def update_settings() -> Response:

View File

@ -49,6 +49,8 @@ CREATE TABLE IF NOT EXISTS tools (
deprecated_message TEXT,
replacement TEXT,
downloads INTEGER DEFAULT 0,
scrutiny_status TEXT DEFAULT 'pending',
scrutiny_report TEXT,
published_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
UNIQUE(owner, name, version)
);

View File

@ -0,0 +1,588 @@
"""Tool scrutiny system for automated review of published tools.
This module analyzes tools on publish to verify:
1. Honesty - Does the tool do what its description claims?
2. Transparency - Is behavior visible or hidden/obfuscated?
3. Scope - Does the code stay within expected bounds?
4. Efficiency - Are AI calls necessary or wasteful?
"""
from __future__ import annotations
import base64
import re
from dataclasses import dataclass, field
from enum import Enum
from typing import Any, Dict, List, Optional, Tuple
import yaml
class CheckResult(Enum):
PASS = "pass"
WARNING = "warning"
FAIL = "fail"
@dataclass
class Finding:
"""A single finding from scrutiny analysis."""
check: str
result: CheckResult
message: str
suggestion: Optional[str] = None
location: Optional[str] = None # e.g., "step 2", "code block"
@dataclass
class ScrutinyReport:
"""Complete scrutiny report for a tool."""
tool_name: str
findings: List[Finding] = field(default_factory=list)
optimizations: List[Any] = field(default_factory=list) # OptimizationSuggestion list
@property
def passed(self) -> bool:
"""Tool passes if no FAIL results."""
return not any(f.result == CheckResult.FAIL for f in self.findings)
@property
def has_warnings(self) -> bool:
return any(f.result == CheckResult.WARNING for f in self.findings)
@property
def decision(self) -> str:
"""Auto-approve, flag for review, or reject."""
if any(f.result == CheckResult.FAIL for f in self.findings):
return "reject"
if any(f.result == CheckResult.WARNING for f in self.findings):
return "review"
return "approve"
def to_dict(self) -> Dict[str, Any]:
result = {
"tool_name": self.tool_name,
"decision": self.decision,
"passed": self.passed,
"has_warnings": self.has_warnings,
"findings": [
{
"check": f.check,
"result": f.result.value,
"message": f.message,
"suggestion": f.suggestion,
"location": f.location,
}
for f in self.findings
],
}
# Add optimization suggestions if any
if self.optimizations:
result["optimizations"] = [
{
"operation": opt.operation,
"current_prompt": opt.current_prompt,
"optimized_code": opt.optimized_code,
"tradeoffs": opt.tradeoffs,
"location": opt.location,
"action": "optional",
}
for opt in self.optimizations
]
return result
# Keywords that indicate specific behaviors
BEHAVIOR_KEYWORDS = {
"summarize": ["summary", "summarize", "condense", "brief", "shorten", "tldr"],
"translate": ["translate", "translation", "language", "convert to"],
"explain": ["explain", "clarify", "describe", "what is", "how does"],
"fix": ["fix", "correct", "repair", "grammar", "spelling"],
"generate": ["generate", "create", "write", "produce", "make"],
"extract": ["extract", "pull", "find", "identify", "parse"],
"analyze": ["analyze", "review", "examine", "check", "audit"],
"convert": ["convert", "transform", "format", "change to"],
}
# Patterns that suggest obfuscation or suspicious behavior
SUSPICIOUS_PATTERNS = [
(r'base64\.(b64decode|decodebytes)', "Base64 decoding detected"),
(r'exec\s*\(', "Dynamic code execution with exec()"),
(r'eval\s*\(', "Dynamic code execution with eval()"),
(r'__import__\s*\(', "Dynamic import detected"),
(r'subprocess\.(run|call|Popen)', "Subprocess execution detected"),
(r'os\.(system|popen|exec)', "OS command execution detected"),
(r'requests\.(get|post|put|delete)', "Network request detected"),
(r'urllib', "URL library usage detected"),
(r'socket\.', "Raw socket usage detected"),
(r'\\x[0-9a-fA-F]{2}', "Hex-encoded strings detected"),
]
# File/system access patterns
SCOPE_PATTERNS = [
(r'open\s*\([^)]*["\']/', "Absolute path file access"),
(r'open\s*\([^)]*~', "Home directory file access"),
(r'\.ssh', "SSH directory access"),
(r'\.aws', "AWS credentials access"),
(r'\.env', "Environment file access"),
(r'/etc/', "System config access"),
(r'os\.environ', "Environment variable access"),
(r'keyring|password|credential', "Credential-related access"),
]
# Operations that could be done with code (with tradeoffs)
# Format: (prompt_pattern, operation_name, code_template, code_benefit, ai_benefit)
CODE_OPTIMIZATIONS = [
(
r'count (the |how many )?words',
"Word counting",
'word_count = len(input.split())',
"Faster, no API cost, deterministic",
"Handles edge cases (hyphenated words, contractions, numbers as words)",
),
(
r'count (the |how many )?(lines|line)',
"Line counting",
'line_count = len(input.strip().split("\\n"))',
"Instant, no API cost",
"Could interpret 'meaningful lines' vs blank lines",
),
(
r'count (the |how many )?(characters?|chars?)',
"Character counting",
'char_count = len(input)',
"Instant, no API cost",
"Could exclude whitespace or count 'visible' characters",
),
(
r'convert.*(to|into) json',
"JSON conversion",
'import json\nresult = json.dumps(data, indent=2)',
"Reliable, no API cost, handles valid input perfectly",
"Can interpret messy/partial data and fix formatting issues",
),
(
r'parse.*(json|the json)',
"JSON parsing",
'import json\nresult = json.loads(input)',
"Fast, no API cost, strict validation",
"Can fix malformed JSON, handle comments, trailing commas",
),
(
r'convert.*(to|into) csv',
"CSV conversion",
'import csv\nimport io\n# ... csv.writer conversion',
"Reliable for structured data",
"Can infer structure from unstructured text",
),
(
r'(extract|find|get) (all )?(emails?|email addresses)',
"Email extraction",
'import re\nemails = re.findall(r"[\\w.+-]+@[\\w.-]+\\.[a-zA-Z]{2,}", input)',
"Fast, consistent pattern matching",
"Handles obfuscated emails (user [at] domain), context-aware",
),
(
r'(extract|find|get) (all )?(urls?|links?)',
"URL extraction",
'import re\nurls = re.findall(r"https?://[^\\s<>\"]+", input)',
"Fast, consistent pattern matching",
"Handles partial URLs, context-aware link detection",
),
(
r'(uppercase|to upper|make upper)',
"Uppercase conversion",
'result = input.upper()',
"Instant, no API cost",
"Language-aware (Turkish İ/i, Greek σ/ς edge cases)",
),
(
r'(lowercase|to lower|make lower)',
"Lowercase conversion",
'result = input.lower()',
"Instant, no API cost",
"Language-aware case handling",
),
(
r'(capitalize|title case)',
"Title case conversion",
'result = input.title()',
"Instant, no API cost",
"Knows style rules (articles, prepositions)",
),
(
r'(reverse|reversed?) (the |this )?(text|string|order)',
"Text reversal",
'result = input[::-1]',
"Instant, no API cost",
"Could reverse by word, sentence, or paragraph intelligently",
),
(
r'(sort|sorted?) (the |these |this )?(lines?|items?|list)',
"Sorting",
'result = "\\n".join(sorted(input.strip().split("\\n")))',
"Fast, deterministic alphabetical sort",
"Can sort by meaning ('sort by importance', 'by date mentioned')",
),
(
r'remove (duplicate|duplicated?) (lines?|items?)',
"Deduplication",
'seen = set()\nresult = "\\n".join(x for x in input.split("\\n") if not (x in seen or seen.add(x)))',
"Fast, exact matching",
"Can detect semantic duplicates ('USA' vs 'United States')",
),
(
r'(remove|strip|trim) (whitespace|spaces|blank)',
"Whitespace removal",
'result = " ".join(input.split())',
"Instant, predictable",
"Could preserve meaningful spacing, format-aware",
),
(
r'split (by|on|into)',
"Text splitting",
'# result = input.split(delimiter)',
"Fast, exact delimiter matching",
"Can split by meaning ('split into paragraphs about each topic')",
),
]
def analyze_tool(config_yaml: str, description: str) -> ScrutinyReport:
"""Run full scrutiny analysis on a tool.
Args:
config_yaml: The tool's YAML configuration
description: The tool's description
Returns:
ScrutinyReport with all findings
"""
try:
config = yaml.safe_load(config_yaml) or {}
except yaml.YAMLError:
return ScrutinyReport(
tool_name="unknown",
findings=[Finding(
check="parse",
result=CheckResult.FAIL,
message="Invalid YAML configuration",
)]
)
tool_name = config.get("name", "unknown")
report = ScrutinyReport(tool_name=tool_name)
# Run all checks
report.findings.extend(check_honesty(config, description))
report.findings.extend(check_transparency(config))
report.findings.extend(check_scope(config))
# Efficiency check returns findings and optimizations
efficiency_findings, optimizations = check_efficiency(config)
report.findings.extend(efficiency_findings)
report.optimizations = optimizations
# If no findings, add a pass
if not report.findings:
report.findings.append(Finding(
check="overall",
result=CheckResult.PASS,
message="All checks passed",
))
return report
def check_honesty(config: Dict[str, Any], description: str) -> List[Finding]:
"""Check if tool behavior matches its description.
Analyzes prompts and compares against description claims.
"""
findings = []
steps = config.get("steps", [])
description_lower = description.lower()
# Extract all prompt text
all_prompts = ""
for step in steps:
if step.get("type") == "prompt":
all_prompts += " " + (step.get("prompt") or "").lower()
# Detect what the description claims
claimed_behaviors = set()
for behavior, keywords in BEHAVIOR_KEYWORDS.items():
if any(kw in description_lower for kw in keywords):
claimed_behaviors.add(behavior)
# Detect what prompts actually do
actual_behaviors = set()
for behavior, keywords in BEHAVIOR_KEYWORDS.items():
if any(kw in all_prompts for kw in keywords):
actual_behaviors.add(behavior)
# Check for mismatches
claimed_not_done = claimed_behaviors - actual_behaviors
done_not_claimed = actual_behaviors - claimed_behaviors
if claimed_not_done:
findings.append(Finding(
check="honesty",
result=CheckResult.WARNING,
message=f"Description claims '{', '.join(claimed_not_done)}' but prompts don't reflect this",
suggestion="Update description to accurately reflect tool behavior",
))
if done_not_claimed and claimed_behaviors:
# Only warn if tool claims to do something specific but also does other things
findings.append(Finding(
check="honesty",
result=CheckResult.WARNING,
message=f"Prompts include '{', '.join(done_not_claimed)}' not mentioned in description",
suggestion="Update description to include all tool capabilities",
))
# Check for empty or minimal prompts
for i, step in enumerate(steps):
if step.get("type") == "prompt":
prompt = step.get("prompt", "").strip()
if len(prompt) < 10:
findings.append(Finding(
check="honesty",
result=CheckResult.WARNING,
message="Prompt is unusually short",
location=f"step {i + 1}",
))
if not findings:
findings.append(Finding(
check="honesty",
result=CheckResult.PASS,
message="Description matches observed behavior",
))
return findings
def check_transparency(config: Dict[str, Any]) -> List[Finding]:
"""Check for obfuscated or hidden behavior.
Looks for encoded strings, dynamic execution, etc.
"""
findings = []
steps = config.get("steps", [])
for i, step in enumerate(steps):
if step.get("type") == "code":
code = step.get("code", "")
location = f"step {i + 1}"
# Check for suspicious patterns
for pattern, description in SUSPICIOUS_PATTERNS:
if re.search(pattern, code, re.IGNORECASE):
# Determine severity
if "exec" in pattern or "eval" in pattern:
result = CheckResult.FAIL
msg = f"{description} - potential code injection risk"
elif "socket" in pattern or "subprocess" in pattern:
result = CheckResult.WARNING
msg = f"{description} - flagged for manual review"
else:
result = CheckResult.WARNING
msg = description
findings.append(Finding(
check="transparency",
result=result,
message=msg,
location=location,
))
# Check for obfuscated variable names
obfuscated = re.findall(r'\b([a-z])\1{2,}\b|\b[a-z]{1,2}\d+\b', code)
if len(obfuscated) > 3:
findings.append(Finding(
check="transparency",
result=CheckResult.WARNING,
message="Multiple obfuscated variable names detected",
location=location,
suggestion="Use descriptive variable names",
))
# Check for very long single lines (possible obfuscation)
for line_num, line in enumerate(code.split('\n'), 1):
if len(line) > 500:
findings.append(Finding(
check="transparency",
result=CheckResult.WARNING,
message=f"Unusually long code line ({len(line)} chars)",
location=f"{location}, line {line_num}",
))
if not findings:
findings.append(Finding(
check="transparency",
result=CheckResult.PASS,
message="No obfuscation detected",
))
return findings
def check_scope(config: Dict[str, Any]) -> List[Finding]:
"""Check if code accesses unexpected resources.
Flags file system, network, or credential access.
"""
findings = []
steps = config.get("steps", [])
tool_name = config.get("name", "").lower()
description = config.get("description", "").lower()
for i, step in enumerate(steps):
if step.get("type") == "code":
code = step.get("code", "")
location = f"step {i + 1}"
for pattern, description_text in SCOPE_PATTERNS:
if re.search(pattern, code, re.IGNORECASE):
# Check if this access seems expected based on tool name/description
is_expected = False
if "ssh" in pattern and ("ssh" in tool_name or "ssh" in description):
is_expected = True
if "env" in pattern and ("env" in tool_name or "environment" in description):
is_expected = True
if "file" in description_text.lower() and ("file" in tool_name or "file" in description):
is_expected = True
if not is_expected:
# Credential access is always flagged
if "credential" in pattern or ".ssh" in pattern or ".aws" in pattern:
result = CheckResult.FAIL
suggestion = "Remove credential access or justify in description"
else:
result = CheckResult.WARNING
suggestion = "Document this access in tool description"
findings.append(Finding(
check="scope",
result=result,
message=description_text,
location=location,
suggestion=suggestion,
))
if not findings:
findings.append(Finding(
check="scope",
result=CheckResult.PASS,
message="No unexpected resource access",
))
return findings
@dataclass
class OptimizationSuggestion:
"""A code optimization suggestion for an AI prompt."""
operation: str
current_prompt: str
optimized_code: str
tradeoffs: Dict[str, str]
location: str
def check_efficiency(config: Dict[str, Any]) -> List[Finding]:
"""Check for AI calls that could optionally be pure code.
Generates optimization suggestions with tradeoffs - these are
informational, not penalties. Authors can choose to keep AI
for valid reasons.
"""
findings = []
steps = config.get("steps", [])
optimizations = []
for i, step in enumerate(steps):
if step.get("type") == "prompt":
prompt = step.get("prompt", "")
prompt_lower = prompt.lower()
location = f"step {i + 1}"
# Check against known optimizable patterns
for pattern, operation, code, code_benefit, ai_benefit in CODE_OPTIMIZATIONS:
if re.search(pattern, prompt_lower):
optimizations.append(OptimizationSuggestion(
operation=operation,
current_prompt=prompt.strip()[:200], # Truncate for display
optimized_code=code,
tradeoffs={
"code": code_benefit,
"ai": ai_benefit,
},
location=location,
))
break # One suggestion per step
# Add optimization suggestions as findings (not warnings - optional)
for opt in optimizations:
findings.append(Finding(
check="efficiency",
result=CheckResult.PASS, # Not a warning - it's a suggestion
message=f"Optional optimization: {opt.operation} could use code",
location=opt.location,
suggestion=f"Code: {opt.optimized_code}",
))
# Check for all-AI tools (informational, not a warning)
code_steps = [s for s in steps if s.get("type") == "code"]
prompt_steps = [s for s in steps if s.get("type") == "prompt"]
if prompt_steps and not code_steps and len(prompt_steps) > 3:
findings.append(Finding(
check="efficiency",
result=CheckResult.PASS, # Informational
message=f"Tool has {len(prompt_steps)} AI steps - consider if any could be code",
suggestion="Mixing AI and code steps can reduce API costs",
))
if not findings:
findings.append(Finding(
check="efficiency",
result=CheckResult.PASS,
message="AI usage appears appropriate",
))
return findings, optimizations
def get_optimization_details(optimizations: List[OptimizationSuggestion]) -> List[Dict[str, Any]]:
"""Convert optimization suggestions to serializable format."""
return [
{
"operation": opt.operation,
"current_prompt": opt.current_prompt,
"optimized_code": opt.optimized_code,
"tradeoffs": opt.tradeoffs,
"location": opt.location,
"action": "optional", # Author chooses
}
for opt in optimizations
]
def scrutinize_tool(config_yaml: str, description: str, readme: Optional[str] = None) -> Dict[str, Any]:
"""Main entry point for tool scrutiny.
Args:
config_yaml: The tool's YAML configuration
description: The tool's description
readme: Optional README content (for additional context)
Returns:
Dictionary with scrutiny results
"""
report = analyze_tool(config_yaml, description)
return report.to_dict()

View File

@ -7,6 +7,7 @@ Implements the tool resolution order:
4. Error if not found
"""
import logging
import re
import sys
from dataclasses import dataclass
@ -19,6 +20,8 @@ from .tool import Tool, TOOLS_DIR, get_bin_dir, BIN_DIR
from .config import is_auto_fetch_enabled, load_config
from .manifest import load_manifest
logger = logging.getLogger(__name__)
# Local project tools directories (support both legacy and documented paths)
LOCAL_TOOLS_DIRS = [Path(".smarttools"), Path("smarttools")]
@ -362,8 +365,10 @@ class ToolResolver:
except ImportError:
# Registry client not available
logger.debug("Registry client not available")
return None
except Exception as e:
logger.warning(f"Registry fetch failed: {e}")
if self.verbose:
print(f"Registry fetch failed: {e}", file=sys.stderr)
return None
@ -465,6 +470,7 @@ exec {python_path} -m smarttools.runner {owner}/{name} "$@"
return Tool.from_dict(data)
except Exception as e:
logger.warning(f"Error loading tool from {config_path}: {e}")
if self.verbose:
print(f"Error loading tool from {config_path}: {e}", file=sys.stderr)
return None

View File

@ -15,16 +15,35 @@ def substitute_variables(template: str, variables: dict) -> str:
"""
Substitute {variable} placeholders in a template.
Supports escaping: use {{ for literal { and }} for literal }
Args:
template: String with {var} placeholders
variables: Dict of variable name -> value
Returns:
String with placeholders replaced
Examples:
>>> substitute_variables("Hello {name}", {"name": "World"})
'Hello World'
>>> substitute_variables("Use {{braces}}", {"braces": "nope"})
'Use {braces}'
"""
result = template
# Use unique placeholders for escaped braces
ESCAPE_OPEN = "\x00\x01OPEN\x01\x00"
ESCAPE_CLOSE = "\x00\x01CLOSE\x01\x00"
# First, replace escaped braces with placeholders
result = template.replace("{{", ESCAPE_OPEN).replace("}}", ESCAPE_CLOSE)
# Now do variable substitution
for name, value in variables.items():
result = result.replace(f"{{{name}}}", str(value) if value else "")
# Finally, restore escaped braces as single braces
result = result.replace(ESCAPE_OPEN, "{").replace(ESCAPE_CLOSE, "}")
return result

File diff suppressed because it is too large Load Diff

File diff suppressed because it is too large Load Diff

View File

@ -0,0 +1,6 @@
"""Allow running the UI as a module."""
from . import run_ui
if __name__ == "__main__":
run_ui()

View File

@ -0,0 +1,30 @@
"""Color palette for the BIOS-style TUI."""
# Color palette - BIOS style with 3D button effects
PALETTE = [
('body', 'white', 'dark blue'),
('header', 'white', 'dark red', 'bold'),
('footer', 'black', 'light gray'),
# Button colors - raised 3D effect
('button', 'black', 'light gray'),
('button_focus', 'white', 'dark red', 'bold'),
('button_highlight', 'white', 'light gray'), # Top/left edge (light)
('button_shadow', 'dark gray', 'light gray'), # Bottom/right edge (dark)
('button_pressed', 'black', 'dark gray'), # Pressed state
# Edit fields
('edit', 'black', 'light gray'),
('edit_focus', 'black', 'yellow'),
# List items
('listbox', 'black', 'light gray'),
('listbox_focus', 'white', 'dark red'),
# Dialog
('dialog', 'black', 'light gray'),
('dialog_border', 'white', 'dark blue'),
# Text styles
('label', 'yellow', 'dark blue', 'bold'),
('error', 'white', 'dark red', 'bold'),
('success', 'light green', 'dark blue', 'bold'),
# 3D shadow elements
('shadow', 'black', 'black'),
('shadow_edge', 'dark gray', 'dark blue'),
]

View File

@ -0,0 +1,715 @@
"""Custom widgets for the BIOS-style TUI."""
import urwid
class SelectableText(urwid.WidgetWrap):
"""A selectable text widget for list items."""
def __init__(self, text, value=None, on_select=None):
self.value = value
self.on_select = on_select
self.text_widget = urwid.Text(text)
display = urwid.AttrMap(self.text_widget, 'listbox', 'listbox_focus')
super().__init__(display)
def selectable(self):
return True
def keypress(self, size, key):
if key == 'enter' and self.on_select:
self.on_select(self.value)
return None
return key
def mouse_event(self, size, event, button, col, row, focus):
if event == 'mouse press' and button == 1 and self.on_select:
self.on_select(self.value)
return True
return False
class Button3D(urwid.WidgetWrap):
"""A 3D-style button using box-drawing characters for depth.
Creates a raised button effect like DOS/BIOS interfaces:
Label
When focused, colors change to show selection.
"""
signals = ['click']
def __init__(self, label, on_press=None, user_data=None):
self.label = label
self.on_press = on_press
self.user_data = user_data
self._pressed = False
# Build the 3D button structure
self._build_widget()
super().__init__(self._widget)
def _build_widget(self):
"""Build the 3D button widget structure."""
label = self.label
width = len(label) + 4 # Padding inside button
# Button face with border
# Top edge: ┌────┐
top = '' + '' * (width - 2) + ''
# Middle: │ Label │ with shadow
middle_text = '' + label + ''
# Bottom edge: └────┘ with shadow
bottom = '' + '' * (width - 2) + ''
# Shadow characters (right and bottom)
shadow_right = ''
shadow_bottom = ''
# Create the rows
top_row = urwid.Text(top + ' ') # Space for shadow alignment
middle_row = urwid.Columns([
('pack', urwid.Text(middle_text)),
('pack', urwid.Text(('shadow_edge', shadow_right))),
])
bottom_row = urwid.Columns([
('pack', urwid.Text(bottom)),
('pack', urwid.Text(('shadow_edge', shadow_right))),
])
shadow_row = urwid.Text(('shadow_edge', ' ' + shadow_bottom * (width - 1)))
# Stack them
pile = urwid.Pile([
top_row,
middle_row,
bottom_row,
shadow_row,
])
self._widget = urwid.AttrMap(pile, 'button', 'button_focus')
def selectable(self):
return True
def keypress(self, size, key):
if key == 'enter':
self._activate()
return None
return key
def mouse_event(self, size, event, button, col, row, focus):
if button == 1:
if event == 'mouse press':
self._pressed = True
return True
elif event == 'mouse release' and self._pressed:
self._pressed = False
self._activate()
return True
return False
def _activate(self):
"""Trigger the button callback."""
if self.on_press:
self.on_press(self.user_data)
self._emit('click')
class Button3DCompact(urwid.WidgetWrap):
"""A compact 3D button that fits on a single line with shadow effect.
Creates a subtle 3D effect: [ Label ]
Better for inline use where vertical space is limited.
"""
signals = ['click']
def __init__(self, label, on_press=None, user_data=None):
self.label = label
self.on_press = on_press
self.user_data = user_data
# Build compact button: ▐ Label ▌ with shadow
# Using block characters for edges
button_text = urwid.Text([
('button_highlight', ''),
('button', f' {label} '),
('button_shadow', ''),
('shadow_edge', ''),
])
self._widget = urwid.AttrMap(button_text, None, {
'button': 'button_focus',
'button_highlight': 'button_focus',
'button_shadow': 'button_focus',
})
super().__init__(self._widget)
def selectable(self):
return True
def keypress(self, size, key):
if key == 'enter':
self._activate()
return None
return key
def mouse_event(self, size, event, button, col, row, focus):
if button == 1 and event == 'mouse release':
self._activate()
return True
return False
def _activate(self):
if self.on_press:
self.on_press(self.user_data)
self._emit('click')
class ClickableButton(urwid.WidgetWrap):
"""A button that responds to mouse clicks (legacy wrapper)."""
def __init__(self, label, on_press=None, user_data=None):
self.on_press = on_press
self.user_data = user_data
button = urwid.Button(label)
if on_press:
urwid.connect_signal(button, 'click', self._handle_click)
display = urwid.AttrMap(button, 'button', 'button_focus')
super().__init__(display)
def _handle_click(self, button):
if self.on_press:
self.on_press(self.user_data)
class SelectableToolItem(urwid.WidgetWrap):
"""A selectable tool item that maintains selection state."""
def __init__(self, name, on_select=None):
self.name = name
self.on_select = on_select
self._selected = False
self.text_widget = urwid.Text(f" {name} ")
self.attr_map = urwid.AttrMap(self.text_widget, 'listbox', 'listbox_focus')
super().__init__(self.attr_map)
def selectable(self):
return True
def set_selected(self, selected):
"""Set whether this item is the selected tool."""
self._selected = selected
if self._selected:
self.attr_map.set_attr_map({None: 'listbox_focus'})
else:
self.attr_map.set_attr_map({None: 'listbox'})
def keypress(self, size, key):
if key == 'enter' and self.on_select:
self.on_select(self.name)
return None
return key
def mouse_event(self, size, event, button, col, row, focus):
if event == 'mouse press' and button == 1:
# Single click just selects/focuses - don't call on_select
# on_select is only called on Enter key (to edit)
return True
return False
class ToolListBox(urwid.ListBox):
"""A ListBox that keeps arrow keys internal and passes Tab out."""
def __init__(self, body, on_focus_change=None):
super().__init__(body)
self.on_focus_change = on_focus_change
self._last_focus = None
def keypress(self, size, key):
if key in ('up', 'down'):
# Handle arrow keys internally - navigate within list
result = super().keypress(size, key)
# Check if focus changed
self._check_focus_change()
return result
elif key == 'tab':
# Pass tab out to parent for focus cycling
return key
elif key == 'shift tab':
return key
else:
return super().keypress(size, key)
def _check_focus_change(self):
"""Check if focus changed and notify callback."""
try:
current = self.focus
if current is not self._last_focus:
self._last_focus = current
if self.on_focus_change and isinstance(current, SelectableToolItem):
self.on_focus_change(current.name)
except (IndexError, TypeError):
pass
def render(self, size, focus=False):
# Check focus on render too (for initial display)
if focus:
self._check_focus_change()
return super().render(size, focus)
class TabCyclePile(urwid.Pile):
"""A Pile that uses Tab/Shift-Tab to cycle between specific positions.
Args:
widget_list: List of widgets (same as urwid.Pile)
tab_positions: List of indices in the pile that Tab should cycle between.
Default is [0] (only first position).
"""
def __init__(self, widget_list, tab_positions=None):
super().__init__(widget_list)
# Positions in the pile that Tab should cycle between
self.tab_positions = tab_positions or [0]
self._current_tab_idx = 0
def keypress(self, size, key):
if key == 'tab':
# Move to next tab position
self._current_tab_idx = (self._current_tab_idx + 1) % len(self.tab_positions)
self.focus_position = self.tab_positions[self._current_tab_idx]
return None
elif key == 'shift tab':
# Move to previous tab position
self._current_tab_idx = (self._current_tab_idx - 1) % len(self.tab_positions)
self.focus_position = self.tab_positions[self._current_tab_idx]
return None
else:
return super().keypress(size, key)
class TabPassEdit(urwid.Edit):
"""A multiline Edit that passes Tab through for focus cycling instead of inserting tabs."""
def keypress(self, size, key):
if key in ('tab', 'shift tab'):
# Pass Tab through to parent for focus cycling
return key
return super().keypress(size, key)
class UndoableEdit(urwid.Edit):
"""A multiline Edit with undo/redo support.
Features:
- Undo with Alt+U (up to 50 states)
- Redo with Alt+R
- Tab passes through for focus cycling
"""
MAX_UNDO = 50 # Maximum undo history size
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self._undo_stack = [] # List of (text, cursor_pos) tuples
self._redo_stack = []
def keypress(self, size, key):
if key in ('tab', 'shift tab'):
return key
# Handle undo (Alt+U or meta u)
if key in ('meta u', 'alt u'):
self._undo()
return None
# Handle redo (Alt+R or meta r)
if key in ('meta r', 'alt r'):
self._redo()
return None
# Save current state BEFORE the edit for undo
old_text = self.edit_text
old_pos = self.edit_pos
# Let the parent handle the keypress
result = super().keypress(size, key)
# If text changed, save the old state to undo stack
if self.edit_text != old_text:
self._save_undo_state(old_text, old_pos)
self._redo_stack.clear() # Clear redo on new edit
return result
def _save_undo_state(self, text, pos):
"""Save state to undo stack."""
# Don't save duplicate states
if self._undo_stack and self._undo_stack[-1][0] == text:
return
if len(self._undo_stack) >= self.MAX_UNDO:
self._undo_stack.pop(0)
self._undo_stack.append((text, pos))
def _undo(self):
"""Restore previous state from undo stack."""
if not self._undo_stack:
return
# Save current state to redo stack
self._redo_stack.append((self.edit_text, self.edit_pos))
# Restore previous state
text, pos = self._undo_stack.pop()
self.set_edit_text(text)
self.set_edit_pos(min(pos, len(text)))
def _redo(self):
"""Restore state from redo stack."""
if not self._redo_stack:
return
# Save current state to undo stack
self._undo_stack.append((self.edit_text, self.edit_pos))
# Restore redo state
text, pos = self._redo_stack.pop()
self.set_edit_text(text)
self.set_edit_pos(min(pos, len(text)))
class DOSScrollBar(urwid.WidgetWrap):
"""A DOS-style scrollbar with arrow buttons at top and bottom.
Renders a scrollbar on the right side of the wrapped widget with:
- arrow at top (click to scroll up)
- track with thumb showing scroll position
- arrow at bottom (click to scroll down)
Click zones (expanded to last 2 columns for easier clicking):
- Top 25%: scroll up 3 lines
- Bottom 25%: scroll down 3 lines
- Middle: page up/down based on which half clicked
"""
def __init__(self, widget):
self._wrapped = widget
# Create a columns layout: content on left, scrollbar on right
super().__init__(widget)
def render(self, size, focus=False):
maxcol, maxrow = size
# Render the wrapped widget with one less column for scrollbar
content_size = (maxcol - 1, maxrow)
content_canvas = self._wrapped.render(content_size, focus)
# Build the scrollbar column
scrollbar_chars = []
# Up arrow at top
scrollbar_chars.append('')
# Calculate thumb position
if maxrow > 2:
track_height = maxrow - 2 # Minus the two arrow buttons
# Get scroll position info from wrapped widget
try:
if hasattr(self._wrapped, 'rows_max'):
rows_max = self._wrapped.rows_max(content_size)
scroll_pos = self._wrapped.get_scrollpos(content_size)
else:
rows_max = maxrow
scroll_pos = 0
if rows_max > maxrow:
# Calculate thumb position within track
thumb_pos = int((scroll_pos / (rows_max - maxrow)) * (track_height - 1))
thumb_pos = max(0, min(thumb_pos, track_height - 1))
else:
thumb_pos = 0
except (AttributeError, TypeError, ZeroDivisionError):
thumb_pos = 0
# Build track with thumb
for i in range(track_height):
if i == thumb_pos:
scrollbar_chars.append('') # Thumb
else:
scrollbar_chars.append('') # Track
# Down arrow at bottom
scrollbar_chars.append('')
# Create scrollbar canvas
scrollbar_text = '\n'.join(scrollbar_chars[:maxrow])
scrollbar_canvas = urwid.Text(scrollbar_text).render((1,))
# Combine canvases
combined = urwid.CanvasJoin([
(content_canvas, None, focus, content_size[0]),
(scrollbar_canvas, None, False, 1),
])
return combined
def keypress(self, size, key):
maxcol, maxrow = size
content_size = (maxcol - 1, maxrow)
return self._wrapped.keypress(content_size, key)
def mouse_event(self, size, event, button, col, row, focus):
maxcol, maxrow = size
content_size = (maxcol - 1, maxrow)
# Expand clickable area - last 2 columns count as scrollbar
if col >= maxcol - 2:
if button == 1 and event == 'mouse press':
# Top 25% of scrollbar = scroll up
if row < maxrow // 4:
for _ in range(3):
self._wrapped.keypress(content_size, 'up')
self._invalidate()
return True
# Bottom 25% of scrollbar = scroll down
elif row >= maxrow - (maxrow // 4):
for _ in range(3):
self._wrapped.keypress(content_size, 'down')
self._invalidate()
return True
# Middle = page up/down based on which half
elif row < maxrow // 2:
for _ in range(maxrow // 2):
self._wrapped.keypress(content_size, 'up')
self._invalidate()
return True
else:
for _ in range(maxrow // 2):
self._wrapped.keypress(content_size, 'down')
self._invalidate()
return True
# Handle mouse wheel on scrollbar
if button == 4: # Scroll up
for _ in range(3):
self._wrapped.keypress(content_size, 'up')
self._invalidate()
return True
elif button == 5: # Scroll down
for _ in range(3):
self._wrapped.keypress(content_size, 'down')
self._invalidate()
return True
return True # Consume other scrollbar clicks
# Pass to wrapped widget
return self._wrapped.mouse_event(content_size, event, button, col, row, focus)
def selectable(self):
return self._wrapped.selectable()
def sizing(self):
return frozenset([urwid.Sizing.BOX])
class ToolBuilderLayout(urwid.WidgetWrap):
"""Custom layout for tool builder that handles Tab cycling across all sections."""
def __init__(self, left_box, args_box, steps_box, args_section, steps_section, bottom_buttons, on_cancel=None):
self._current_section = 0
self.on_cancel = on_cancel
# Store references to LineBoxes for title highlighting
self.left_box = left_box
self.args_box = args_box
self.steps_box = steps_box
# Build visual layout: left column and right column side by side
right_pile = urwid.Pile([
('weight', 1, args_section),
('pack', urwid.Divider()),
('weight', 1, steps_section),
])
columns = urwid.Columns([
('weight', 1, left_box),
('weight', 1, right_pile),
], dividechars=1)
main_pile = urwid.Pile([
('weight', 1, columns),
('pack', urwid.Divider()),
('pack', bottom_buttons),
])
super().__init__(main_pile)
# Set initial highlight
self._update_section_titles()
def keypress(self, size, key):
if key == 'tab':
self._current_section = (self._current_section + 1) % 4
self._focus_section(self._current_section)
self._update_section_titles()
return None
elif key == 'shift tab':
self._current_section = (self._current_section - 1) % 4
self._focus_section(self._current_section)
self._update_section_titles()
return None
elif key == 'esc':
# Go back to main menu instead of exiting
if self.on_cancel:
self.on_cancel(None)
return None
else:
return super().keypress(size, key)
def mouse_event(self, size, event, button, col, row, focus):
# Let the parent handle the mouse event first
result = super().mouse_event(size, event, button, col, row, focus)
# After mouse click, detect which section has focus and update titles
if event == 'mouse press':
self._detect_current_section()
self._update_section_titles()
return result
def _detect_current_section(self):
"""Detect which section currently has focus based on widget hierarchy."""
main_pile = self._w
# Check if bottom buttons have focus (position 2)
if main_pile.focus_position == 2:
self._current_section = 3
return
# Focus is on columns (position 0)
columns = main_pile.contents[0][0]
if columns.focus_position == 0:
# Left box (Tool Info)
self._current_section = 0
else:
# Right pile
right_pile = columns.contents[1][0]
if right_pile.focus_position == 0:
# Args section
self._current_section = 1
else:
# Steps section
self._current_section = 2
def _update_section_titles(self):
"""Update section titles to highlight the current one with markers."""
# Section 0 = Tool Info, Section 1 = Arguments, Section 2 = Steps, Section 3 = buttons
if self._current_section == 0:
self.left_box.set_title('[ Tool Info ]')
self.args_box.set_title('Arguments')
self.steps_box.set_title('Execution Steps')
elif self._current_section == 1:
self.left_box.set_title('Tool Info')
self.args_box.set_title('[ Arguments ]')
self.steps_box.set_title('Execution Steps')
elif self._current_section == 2:
self.left_box.set_title('Tool Info')
self.args_box.set_title('Arguments')
self.steps_box.set_title('[ Execution Steps ]')
else:
# Buttons focused - no section highlighted
self.left_box.set_title('Tool Info')
self.args_box.set_title('Arguments')
self.steps_box.set_title('Execution Steps')
def _focus_section(self, section_idx):
"""Set focus to the specified section."""
# Get the main pile
main_pile = self._w
if section_idx == 0:
# Tool Info (left box) - focus columns, then left
main_pile.focus_position = 0 # columns
columns = main_pile.contents[0][0]
columns.focus_position = 0 # left box
elif section_idx == 1:
# Arguments section - focus columns, then right, then args
main_pile.focus_position = 0 # columns
columns = main_pile.contents[0][0]
columns.focus_position = 1 # right pile
right_pile = columns.contents[1][0]
right_pile.focus_position = 0 # args section
elif section_idx == 2:
# Steps section - focus columns, then right, then steps
main_pile.focus_position = 0 # columns
columns = main_pile.contents[0][0]
columns.focus_position = 1 # right pile
right_pile = columns.contents[1][0]
right_pile.focus_position = 2 # steps section (after divider)
elif section_idx == 3:
# Save/Cancel buttons
main_pile.focus_position = 2 # bottom buttons (after divider)
class Dialog(urwid.WidgetWrap):
"""A dialog box overlay with 3D-style buttons."""
def __init__(self, title, body, buttons, width=60, height=None):
# Title
title_widget = urwid.Text(('header', f' {title} '), align='center')
# Buttons row - use 3D compact buttons for dialog actions
button_widgets = []
for label, callback in buttons:
btn = Button3DCompact(label, callback)
button_widgets.append(btn)
buttons_row = urwid.Columns([('pack', b) for b in button_widgets], dividechars=2)
buttons_centered = urwid.Padding(buttons_row, align='center', width='pack')
# Check if body is a box widget
# ListBox is always a box widget. For Piles with weighted items,
# check if it ONLY supports BOX sizing (not FLOW).
is_box_widget = isinstance(body, (urwid.ListBox, urwid.Scrollable, urwid.ScrollBar))
if not is_box_widget:
try:
sizing = body.sizing()
# Box widget if it ONLY supports BOX sizing
is_box_widget = sizing == frozenset({urwid.Sizing.BOX})
except (AttributeError, TypeError):
pass
if is_box_widget:
# Box widget - use directly with weight
pile = urwid.Pile([
('pack', title_widget),
('pack', urwid.Divider('')),
('weight', 1, body),
('pack', urwid.Divider('')),
('pack', buttons_centered),
])
else:
# Flow widget - wrap in Filler
body_padded = urwid.Padding(body, left=1, right=1)
body_filled = urwid.Filler(body_padded, valign='top')
pile = urwid.Pile([
('pack', title_widget),
('pack', urwid.Divider('')),
body_filled,
('pack', urwid.Divider('')),
('pack', buttons_centered),
])
# Box it
box = urwid.LineBox(pile, title='', title_align='center')
box = urwid.AttrMap(box, 'dialog')
super().__init__(box)

View File

@ -22,6 +22,25 @@ def _api_get(path: str, params: Optional[Dict[str, Any]] = None, token: Optional
return response.status_code, response.get_json(silent=True) or {}
def _api_post(path: str, data: Optional[Dict[str, Any]] = None, token: Optional[str] = None) -> Tuple[int, Dict[str, Any]]:
client = current_app.test_client()
headers = {"Content-Type": "application/json"}
if token:
headers["Authorization"] = f"Bearer {token}"
import json
response = client.post(path, data=json.dumps(data or {}), headers=headers)
return response.status_code, response.get_json(silent=True) or {}
def _api_delete(path: str, token: Optional[str] = None) -> Tuple[int, Dict[str, Any]]:
client = current_app.test_client()
headers = {}
if token:
headers["Authorization"] = f"Bearer {token}"
response = client.delete(path, headers=headers)
return response.status_code, response.get_json(silent=True) or {}
def _title_case(value: str) -> str:
return value.replace("-", " ").title()
@ -640,3 +659,62 @@ def sitemap():
xml_parts.append("</urlset>")
return Response("\n".join(xml_parts), mimetype="application/xml")
# ============================================
# Dashboard API Proxy Routes
# These proxy browser requests to the API using session auth
# ============================================
from flask import jsonify
@web_bp.route("/dashboard/api/tokens", methods=["POST"])
def dashboard_create_token():
"""Proxy token creation from dashboard to API."""
redirect_response = _require_login()
if redirect_response:
return jsonify({"error": {"code": "UNAUTHORIZED", "message": "Not logged in"}}), 401
token = session.get("auth_token")
data = request.get_json() or {}
status, payload = _api_post("/api/v1/tokens", data=data, token=token)
return jsonify(payload), status
@web_bp.route("/dashboard/api/tokens/<int:token_id>", methods=["DELETE"])
def dashboard_revoke_token(token_id: int):
"""Proxy token revocation from dashboard to API."""
redirect_response = _require_login()
if redirect_response:
return jsonify({"error": {"code": "UNAUTHORIZED", "message": "Not logged in"}}), 401
token = session.get("auth_token")
status, payload = _api_delete(f"/api/v1/tokens/{token_id}", token=token)
return jsonify(payload), status
@web_bp.route("/dashboard/api/tools/<owner>/<name>/deprecate", methods=["POST"])
def dashboard_deprecate_tool(owner: str, name: str):
"""Proxy tool deprecation from dashboard to API."""
redirect_response = _require_login()
if redirect_response:
return jsonify({"error": {"code": "UNAUTHORIZED", "message": "Not logged in"}}), 401
token = session.get("auth_token")
data = request.get_json() or {}
status, payload = _api_post(f"/api/v1/tools/{owner}/{name}/deprecate", data=data, token=token)
return jsonify(payload), status
@web_bp.route("/dashboard/api/tools/<owner>/<name>/undeprecate", methods=["POST"])
def dashboard_undeprecate_tool(owner: str, name: str):
"""Proxy tool undeprecation from dashboard to API."""
redirect_response = _require_login()
if redirect_response:
return jsonify({"error": {"code": "UNAUTHORIZED", "message": "Not logged in"}}), 401
token = session.get("auth_token")
data = request.get_json() or {}
status, payload = _api_post(f"/api/v1/tools/{owner}/{name}/undeprecate", data=data, token=token)
return jsonify(payload), status

View File

@ -163,10 +163,10 @@ function submitReport(owner, name) {
form.addEventListener('submit', async function (e) {
e.preventDefault();
const payload = {
tool_owner: ownerField ? ownerField.value : owner,
tool_name: nameField ? nameField.value : name,
owner: ownerField ? ownerField.value : owner,
name: nameField ? nameField.value : name,
reason: form.querySelector('[name="reason"]')?.value || '',
description: form.querySelector('[name="description"]')?.value || ''
details: form.querySelector('[name="description"]')?.value || ''
};
try {
@ -193,8 +193,14 @@ function submitReport(owner, name) {
openReportModal();
}
// Analytics
// Analytics (respects consent)
function trackPageView() {
// Only track if user has given analytics consent
const consent = document.body.dataset.analyticsConsent;
if (consent !== 'true') {
return;
}
fetch('/api/v1/analytics/pageview', {
method: 'POST',
headers: {'Content-Type': 'application/json'},

View File

@ -47,7 +47,7 @@
</script>
{% endblock %}
</head>
<body class="min-h-screen bg-gray-50 text-gray-900 font-sans antialiased">
<body class="min-h-screen bg-gray-50 text-gray-900 font-sans antialiased" data-analytics-consent="{{ 'true' if session.get('consent_analytics') else 'false' }}">
<!-- Skip to content link for accessibility -->
<a href="#main-content" class="sr-only focus:not-sr-only focus:absolute focus:top-4 focus:left-4 bg-indigo-600 text-white px-4 py-2 rounded-md z-50">
Skip to content

View File

@ -246,16 +246,16 @@ async function createToken(event) {
const name = form.querySelector('[name="name"]').value;
try {
const response = await fetch('/api/v1/tokens', {
const response = await fetch('/dashboard/api/tokens', {
method: 'POST',
headers: {'Content-Type': 'application/json'},
body: JSON.stringify({name: name})
});
if (response.ok) {
const data = await response.json();
const result = await response.json();
closeCreateTokenModal();
showNewToken(data.token);
showNewToken(result.data.token);
} else {
const error = await response.json();
alert(error.error || 'Failed to create token');
@ -294,7 +294,7 @@ async function revokeToken(tokenId, tokenName) {
}
try {
const response = await fetch(`/api/v1/tokens/${tokenId}`, {
const response = await fetch(`/dashboard/api/tokens/${tokenId}`, {
method: 'DELETE'
});

View File

@ -202,6 +202,8 @@
</div>
<script>
let deprecateAction = '';
function openDeprecateModal(owner, name, isDeprecated) {
document.getElementById('deprecate-owner').value = owner;
document.getElementById('deprecate-name').value = name;
@ -213,7 +215,7 @@ function openDeprecateModal(owner, name, isDeprecated) {
document.getElementById('deprecate-submit').textContent = 'Restore';
document.getElementById('deprecate-submit').classList.remove('bg-amber-600', 'hover:bg-amber-700');
document.getElementById('deprecate-submit').classList.add('bg-green-600', 'hover:bg-green-700');
document.getElementById('deprecate-form').action = `/api/v1/tools/${owner}/${name}/undeprecate`;
deprecateAction = `/dashboard/api/tools/${owner}/${name}/undeprecate`;
} else {
document.getElementById('deprecate-title').textContent = 'Deprecate Tool';
document.getElementById('deprecate-desc').textContent = 'Mark this tool as deprecated. Users will see a warning.';
@ -221,7 +223,7 @@ function openDeprecateModal(owner, name, isDeprecated) {
document.getElementById('deprecate-submit').textContent = 'Deprecate';
document.getElementById('deprecate-submit').classList.remove('bg-green-600', 'hover:bg-green-700');
document.getElementById('deprecate-submit').classList.add('bg-amber-600', 'hover:bg-amber-700');
document.getElementById('deprecate-form').action = `/api/v1/tools/${owner}/${name}/deprecate`;
deprecateAction = `/dashboard/api/tools/${owner}/${name}/deprecate`;
}
document.getElementById('deprecate-modal').classList.remove('hidden');
@ -230,5 +232,28 @@ function openDeprecateModal(owner, name, isDeprecated) {
function closeDeprecateModal() {
document.getElementById('deprecate-modal').classList.add('hidden');
}
document.getElementById('deprecate-form').addEventListener('submit', async function(e) {
e.preventDefault();
const message = document.getElementById('deprecated_message')?.value || '';
const replacement = document.getElementById('replacement')?.value || '';
try {
const response = await fetch(deprecateAction, {
method: 'POST',
headers: {'Content-Type': 'application/json'},
body: JSON.stringify({deprecated_message: message, replacement: replacement})
});
if (response.ok) {
window.location.reload();
} else {
const error = await response.json();
alert((error.error && error.error.message) || 'Failed to update tool status');
}
} catch (err) {
alert('Failed to update tool status. Please try again.');
}
});
</script>
{% endblock %}