Add source field support and Fabric import script
- Add ToolSource dataclass for attribution metadata (type, license, url, author, original_tool) - Add source and version fields to Tool dataclass - Update Tool.from_dict() and to_dict() to handle source field - Display source attribution in TUI info panel - Show [imported]/[forked] markers in cmdforge list - Add import_fabric.py script to import Fabric patterns as CmdForge tools 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
This commit is contained in:
parent
2c139b3982
commit
1caa3454f0
|
|
@ -0,0 +1,388 @@
|
|||
#!/usr/bin/env python3
|
||||
"""
|
||||
Import Fabric patterns as CmdForge tools.
|
||||
|
||||
Usage:
|
||||
# Import all patterns
|
||||
python scripts/import_fabric.py --all
|
||||
|
||||
# Import specific pattern(s)
|
||||
python scripts/import_fabric.py summarize extract_wisdom
|
||||
|
||||
# List available patterns
|
||||
python scripts/import_fabric.py --list
|
||||
|
||||
# Dry run (show what would be created)
|
||||
python scripts/import_fabric.py --all --dry-run
|
||||
|
||||
# Specify output directory
|
||||
python scripts/import_fabric.py --all --output /tmp/fabric-tools
|
||||
|
||||
# Specify default provider
|
||||
python scripts/import_fabric.py --all --provider opencode-deepseek
|
||||
"""
|
||||
|
||||
import argparse
|
||||
import os
|
||||
import re
|
||||
import shutil
|
||||
import subprocess
|
||||
import sys
|
||||
import tempfile
|
||||
from pathlib import Path
|
||||
from typing import Optional
|
||||
|
||||
import yaml
|
||||
|
||||
|
||||
# Fabric repo URL
|
||||
FABRIC_REPO = "https://github.com/danielmiessler/fabric.git"
|
||||
|
||||
# Default provider for imported tools
|
||||
DEFAULT_PROVIDER = "opencode-pickle"
|
||||
|
||||
# Category mapping based on pattern name prefixes/keywords
|
||||
CATEGORY_RULES = [
|
||||
(r"^analyze_", "Data"),
|
||||
(r"^extract_", "Data"),
|
||||
(r"^create_", "Developer"),
|
||||
(r"^write_", "Text"),
|
||||
(r"^improve_", "Text"),
|
||||
(r"^summarize", "Text"),
|
||||
(r"^explain_", "Text"),
|
||||
(r"^review_", "Developer"),
|
||||
(r"^code", "Developer"),
|
||||
(r"^debug", "Developer"),
|
||||
(r"^test", "Developer"),
|
||||
(r"rate_", "Data"),
|
||||
(r"json", "Data"),
|
||||
(r"csv", "Data"),
|
||||
(r"sql", "Data"),
|
||||
(r"translate", "Text"),
|
||||
(r"transcri", "Text"),
|
||||
]
|
||||
|
||||
|
||||
def get_category(pattern_name: str) -> str:
|
||||
"""Determine category based on pattern name."""
|
||||
name_lower = pattern_name.lower()
|
||||
for regex, category in CATEGORY_RULES:
|
||||
if re.search(regex, name_lower):
|
||||
return category
|
||||
return "Other"
|
||||
|
||||
|
||||
def pattern_to_display_name(pattern_name: str) -> str:
|
||||
"""Convert pattern name to display name."""
|
||||
# Replace underscores with spaces and title case
|
||||
return pattern_name.replace("_", " ").title()
|
||||
|
||||
|
||||
def clean_prompt(prompt: str) -> str:
|
||||
"""Clean up the Fabric prompt for use in CmdForge."""
|
||||
# Remove trailing INPUT: placeholder (we'll add {input} ourselves)
|
||||
prompt = prompt.strip()
|
||||
|
||||
# Remove various forms of input placeholder at the end
|
||||
for suffix in ["INPUT:", "# INPUT", "# INPUT:", "INPUT"]:
|
||||
if prompt.endswith(suffix):
|
||||
prompt = prompt[:-len(suffix)].strip()
|
||||
|
||||
return prompt
|
||||
|
||||
|
||||
def create_tool_config(
|
||||
pattern_name: str,
|
||||
system_prompt: str,
|
||||
provider: str = DEFAULT_PROVIDER,
|
||||
) -> dict:
|
||||
"""Create a CmdForge tool config from a Fabric pattern."""
|
||||
|
||||
cleaned_prompt = clean_prompt(system_prompt)
|
||||
display_name = pattern_to_display_name(pattern_name)
|
||||
category = get_category(pattern_name)
|
||||
|
||||
# Build the full prompt with input placeholder
|
||||
full_prompt = f"{cleaned_prompt}\n\n{{input}}"
|
||||
|
||||
config = {
|
||||
"name": pattern_name,
|
||||
"description": f"{display_name} - imported from Fabric patterns",
|
||||
"version": "1.0.0",
|
||||
"category": category,
|
||||
|
||||
# Attribution - marks this as an imported tool
|
||||
"source": {
|
||||
"type": "imported",
|
||||
"license": "MIT",
|
||||
"url": "https://github.com/danielmiessler/fabric",
|
||||
"author": "Daniel Miessler",
|
||||
"original_tool": f"fabric/patterns/{pattern_name}",
|
||||
},
|
||||
|
||||
"arguments": [],
|
||||
|
||||
"steps": [
|
||||
{
|
||||
"type": "prompt",
|
||||
"prompt": full_prompt,
|
||||
"provider": provider,
|
||||
"output_var": "response",
|
||||
}
|
||||
],
|
||||
|
||||
"output": "{response}",
|
||||
}
|
||||
|
||||
return config
|
||||
|
||||
|
||||
def clone_fabric(target_dir: Path) -> Path:
|
||||
"""Clone or update Fabric repo, return patterns directory."""
|
||||
fabric_dir = target_dir / "fabric"
|
||||
patterns_dir = fabric_dir / "data" / "patterns"
|
||||
|
||||
if fabric_dir.exists():
|
||||
print(f"Using existing Fabric clone at {fabric_dir}")
|
||||
# Pull latest
|
||||
subprocess.run(
|
||||
["git", "-C", str(fabric_dir), "pull", "--quiet"],
|
||||
check=False
|
||||
)
|
||||
else:
|
||||
print(f"Cloning Fabric to {fabric_dir}...")
|
||||
subprocess.run(
|
||||
["git", "clone", "--depth", "1", FABRIC_REPO, str(fabric_dir)],
|
||||
check=True,
|
||||
capture_output=True
|
||||
)
|
||||
|
||||
if not patterns_dir.exists():
|
||||
raise RuntimeError(f"Patterns directory not found at {patterns_dir}")
|
||||
|
||||
return patterns_dir
|
||||
|
||||
|
||||
def list_patterns(patterns_dir: Path) -> list[str]:
|
||||
"""List all available pattern names."""
|
||||
patterns = []
|
||||
for entry in sorted(patterns_dir.iterdir()):
|
||||
if entry.is_dir() and (entry / "system.md").exists():
|
||||
patterns.append(entry.name)
|
||||
return patterns
|
||||
|
||||
|
||||
def import_pattern(
|
||||
pattern_name: str,
|
||||
patterns_dir: Path,
|
||||
output_dir: Path,
|
||||
provider: str,
|
||||
dry_run: bool = False,
|
||||
registry_format: bool = False,
|
||||
namespace: str = "official",
|
||||
) -> bool:
|
||||
"""Import a single pattern. Returns True on success."""
|
||||
pattern_path = patterns_dir / pattern_name
|
||||
system_md = pattern_path / "system.md"
|
||||
|
||||
if not system_md.exists():
|
||||
print(f" ✗ Pattern '{pattern_name}' not found", file=sys.stderr)
|
||||
return False
|
||||
|
||||
# Read the system prompt
|
||||
system_prompt = system_md.read_text()
|
||||
|
||||
# Create tool config
|
||||
config = create_tool_config(pattern_name, system_prompt, provider)
|
||||
|
||||
# Output directory for this tool
|
||||
if registry_format:
|
||||
# Registry format: tools/<namespace>/<name>/config.yaml
|
||||
tool_dir = output_dir / "tools" / namespace / pattern_name
|
||||
else:
|
||||
# Local format: <output>/<name>/config.yaml
|
||||
tool_dir = output_dir / pattern_name
|
||||
config_file = tool_dir / "config.yaml"
|
||||
|
||||
if dry_run:
|
||||
print(f" [DRY RUN] Would create: {config_file}")
|
||||
print(f" Category: {config['category']}")
|
||||
print(f" Provider: {provider}")
|
||||
return True
|
||||
|
||||
# Create tool directory
|
||||
tool_dir.mkdir(parents=True, exist_ok=True)
|
||||
|
||||
# Write config.yaml with proper multi-line string handling
|
||||
class LiteralStr(str):
|
||||
pass
|
||||
|
||||
def literal_representer(dumper, data):
|
||||
return dumper.represent_scalar('tag:yaml.org,2002:str', data, style='|')
|
||||
|
||||
yaml.add_representer(LiteralStr, literal_representer)
|
||||
|
||||
# Convert the prompt to literal style
|
||||
config["steps"][0]["prompt"] = LiteralStr(config["steps"][0]["prompt"])
|
||||
|
||||
with open(config_file, "w") as f:
|
||||
yaml.dump(config, f, default_flow_style=False, sort_keys=False, allow_unicode=True)
|
||||
|
||||
# Create a basic README.md
|
||||
readme_content = f"""# {pattern_to_display_name(pattern_name)}
|
||||
|
||||
{config['description']}
|
||||
|
||||
## Usage
|
||||
|
||||
```bash
|
||||
cat input.txt | {pattern_name}
|
||||
```
|
||||
|
||||
## Source
|
||||
|
||||
This tool was imported from [Fabric](https://github.com/danielmiessler/fabric) patterns.
|
||||
|
||||
- **Original pattern**: `{pattern_name}`
|
||||
- **Author**: Daniel Miessler
|
||||
- **License**: MIT
|
||||
"""
|
||||
(tool_dir / "README.md").write_text(readme_content)
|
||||
|
||||
print(f" ✓ {pattern_name} -> {tool_dir}")
|
||||
return True
|
||||
|
||||
|
||||
def main():
|
||||
parser = argparse.ArgumentParser(
|
||||
description="Import Fabric patterns as CmdForge tools",
|
||||
formatter_class=argparse.RawDescriptionHelpFormatter,
|
||||
epilog=__doc__
|
||||
)
|
||||
|
||||
parser.add_argument(
|
||||
"patterns",
|
||||
nargs="*",
|
||||
help="Pattern name(s) to import"
|
||||
)
|
||||
parser.add_argument(
|
||||
"--all",
|
||||
action="store_true",
|
||||
help="Import all patterns"
|
||||
)
|
||||
parser.add_argument(
|
||||
"--list",
|
||||
action="store_true",
|
||||
help="List available patterns"
|
||||
)
|
||||
parser.add_argument(
|
||||
"--output", "-o",
|
||||
type=Path,
|
||||
default=Path.home() / ".cmdforge",
|
||||
help="Output directory (default: ~/.cmdforge)"
|
||||
)
|
||||
parser.add_argument(
|
||||
"--provider", "-p",
|
||||
default=DEFAULT_PROVIDER,
|
||||
help=f"Default provider for tools (default: {DEFAULT_PROVIDER})"
|
||||
)
|
||||
parser.add_argument(
|
||||
"--dry-run",
|
||||
action="store_true",
|
||||
help="Show what would be created without writing files"
|
||||
)
|
||||
parser.add_argument(
|
||||
"--fabric-dir",
|
||||
type=Path,
|
||||
default=Path("/tmp/fabric-import"),
|
||||
help="Directory to clone Fabric repo (default: /tmp/fabric-import)"
|
||||
)
|
||||
parser.add_argument(
|
||||
"--registry",
|
||||
action="store_true",
|
||||
help="Output in registry format (tools/official/<name>/config.yaml)"
|
||||
)
|
||||
parser.add_argument(
|
||||
"--namespace",
|
||||
default="official",
|
||||
help="Namespace for registry format (default: official)"
|
||||
)
|
||||
|
||||
args = parser.parse_args()
|
||||
|
||||
# Validate arguments
|
||||
if not args.list and not args.all and not args.patterns:
|
||||
parser.error("Specify pattern names, --all, or --list")
|
||||
|
||||
# Clone/update Fabric
|
||||
args.fabric_dir.mkdir(parents=True, exist_ok=True)
|
||||
patterns_dir = clone_fabric(args.fabric_dir)
|
||||
|
||||
# List mode
|
||||
if args.list:
|
||||
patterns = list_patterns(patterns_dir)
|
||||
print(f"\nAvailable Fabric patterns ({len(patterns)}):\n")
|
||||
|
||||
# Group by category
|
||||
by_category = {}
|
||||
for p in patterns:
|
||||
cat = get_category(p)
|
||||
by_category.setdefault(cat, []).append(p)
|
||||
|
||||
for cat in sorted(by_category.keys()):
|
||||
print(f" {cat}:")
|
||||
for p in sorted(by_category[cat]):
|
||||
print(f" - {p}")
|
||||
print()
|
||||
|
||||
return 0
|
||||
|
||||
# Determine which patterns to import
|
||||
if args.all:
|
||||
to_import = list_patterns(patterns_dir)
|
||||
else:
|
||||
to_import = args.patterns
|
||||
|
||||
if not to_import:
|
||||
print("No patterns to import.", file=sys.stderr)
|
||||
return 1
|
||||
|
||||
print(f"\nImporting {len(to_import)} pattern(s) to {args.output}")
|
||||
print(f"Provider: {args.provider}")
|
||||
if args.dry_run:
|
||||
print("(DRY RUN - no files will be written)\n")
|
||||
else:
|
||||
print()
|
||||
|
||||
# Import each pattern
|
||||
success = 0
|
||||
failed = 0
|
||||
|
||||
for pattern in to_import:
|
||||
if import_pattern(
|
||||
pattern,
|
||||
patterns_dir,
|
||||
args.output,
|
||||
args.provider,
|
||||
args.dry_run,
|
||||
args.registry,
|
||||
args.namespace
|
||||
):
|
||||
success += 1
|
||||
else:
|
||||
failed += 1
|
||||
|
||||
print(f"\nDone: {success} imported, {failed} failed")
|
||||
|
||||
if not args.dry_run and success > 0:
|
||||
print(f"\nNext steps:")
|
||||
print(f" 1. Review generated tools in {args.output}")
|
||||
print(f" 2. Run 'cmdforge refresh' to create wrapper scripts")
|
||||
print(f" 3. Test with: cmdforge test <pattern_name>")
|
||||
print(f" 4. Publish with: cmdforge registry publish {args.output}/<pattern_name>")
|
||||
|
||||
return 0 if failed == 0 else 1
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
sys.exit(main())
|
||||
|
|
@ -23,7 +23,14 @@ def cmd_list(args):
|
|||
for name in tools:
|
||||
tool = load_tool(name)
|
||||
if tool:
|
||||
print(f" {name}")
|
||||
# Show source indicator for imported tools
|
||||
source_marker = ""
|
||||
if tool.source and tool.source.type == "imported":
|
||||
source_marker = " [imported]"
|
||||
elif tool.source and tool.source.type == "forked":
|
||||
source_marker = " [forked]"
|
||||
|
||||
print(f" {name}{source_marker}")
|
||||
print(f" {tool.description or 'No description'}")
|
||||
|
||||
# Show arguments
|
||||
|
|
|
|||
|
|
@ -134,6 +134,38 @@ class ToolStep:
|
|||
Step = PromptStep | CodeStep | ToolStep
|
||||
|
||||
|
||||
@dataclass
|
||||
class ToolSource:
|
||||
"""Attribution and source information for imported/external tools."""
|
||||
type: str = "original" # "original", "imported", "forked"
|
||||
license: Optional[str] = None
|
||||
url: Optional[str] = None
|
||||
author: Optional[str] = None
|
||||
original_tool: Optional[str] = None # e.g., "fabric/patterns/extract_wisdom"
|
||||
|
||||
def to_dict(self) -> dict:
|
||||
d = {"type": self.type}
|
||||
if self.license:
|
||||
d["license"] = self.license
|
||||
if self.url:
|
||||
d["url"] = self.url
|
||||
if self.author:
|
||||
d["author"] = self.author
|
||||
if self.original_tool:
|
||||
d["original_tool"] = self.original_tool
|
||||
return d
|
||||
|
||||
@classmethod
|
||||
def from_dict(cls, data: dict) -> "ToolSource":
|
||||
return cls(
|
||||
type=data.get("type", "original"),
|
||||
license=data.get("license"),
|
||||
url=data.get("url"),
|
||||
author=data.get("author"),
|
||||
original_tool=data.get("original_tool"),
|
||||
)
|
||||
|
||||
|
||||
# Default categories for organizing tools
|
||||
DEFAULT_CATEGORIES = ["Text", "Developer", "Data", "Other"]
|
||||
|
||||
|
|
@ -148,6 +180,8 @@ class Tool:
|
|||
steps: List[Step] = field(default_factory=list)
|
||||
output: str = "{input}" # Output template
|
||||
dependencies: List[str] = field(default_factory=list) # Required tools for meta-tools
|
||||
source: Optional[ToolSource] = None # Attribution for imported/external tools
|
||||
version: str = "" # Tool version
|
||||
|
||||
@classmethod
|
||||
def from_dict(cls, data: dict) -> "Tool":
|
||||
|
|
@ -164,6 +198,11 @@ class Tool:
|
|||
elif step.get("type") == "tool":
|
||||
steps.append(ToolStep.from_dict(step))
|
||||
|
||||
# Parse source attribution if present
|
||||
source = None
|
||||
if "source" in data:
|
||||
source = ToolSource.from_dict(data["source"])
|
||||
|
||||
return cls(
|
||||
name=data["name"],
|
||||
description=data.get("description", ""),
|
||||
|
|
@ -171,7 +210,9 @@ class Tool:
|
|||
arguments=arguments,
|
||||
steps=steps,
|
||||
output=data.get("output", "{input}"),
|
||||
dependencies=data.get("dependencies", [])
|
||||
dependencies=data.get("dependencies", []),
|
||||
source=source,
|
||||
version=data.get("version", ""),
|
||||
)
|
||||
|
||||
def to_dict(self) -> dict:
|
||||
|
|
@ -179,9 +220,14 @@ class Tool:
|
|||
"name": self.name,
|
||||
"description": self.description,
|
||||
}
|
||||
if self.version:
|
||||
d["version"] = self.version
|
||||
# Only include category if it's not the default
|
||||
if self.category and self.category != "Other":
|
||||
d["category"] = self.category
|
||||
# Include source attribution if present
|
||||
if self.source:
|
||||
d["source"] = self.source.to_dict()
|
||||
if self.dependencies:
|
||||
d["dependencies"] = self.dependencies
|
||||
d["arguments"] = [arg.to_dict() for arg in self.arguments]
|
||||
|
|
|
|||
|
|
@ -197,6 +197,7 @@ class CmdForgeUI:
|
|||
self._info_args = urwid.Text("")
|
||||
self._info_steps = urwid.Text("")
|
||||
self._info_output = urwid.Text("")
|
||||
self._info_source = urwid.Text("")
|
||||
|
||||
info_content = urwid.Pile([
|
||||
self._info_name,
|
||||
|
|
@ -207,6 +208,8 @@ class CmdForgeUI:
|
|||
self._info_steps,
|
||||
urwid.Divider(),
|
||||
self._info_output,
|
||||
urwid.Divider(),
|
||||
self._info_source,
|
||||
])
|
||||
info_filler = urwid.Filler(info_content, valign='top')
|
||||
info_box = urwid.LineBox(info_filler, title='Tool Info')
|
||||
|
|
@ -279,12 +282,29 @@ class CmdForgeUI:
|
|||
self._info_steps.set_text(steps_text.rstrip())
|
||||
|
||||
self._info_output.set_text(f"Output: {tool.output}")
|
||||
|
||||
# Display source attribution if present
|
||||
if tool.source:
|
||||
source_text = "Source:\n"
|
||||
source_text += f" Type: {tool.source.type}\n"
|
||||
if tool.source.author:
|
||||
source_text += f" Author: {tool.source.author}\n"
|
||||
if tool.source.license:
|
||||
source_text += f" License: {tool.source.license}\n"
|
||||
if tool.source.url:
|
||||
source_text += f" URL: {tool.source.url}\n"
|
||||
if tool.source.original_tool:
|
||||
source_text += f" Original: {tool.source.original_tool}\n"
|
||||
self._info_source.set_text(source_text.rstrip())
|
||||
else:
|
||||
self._info_source.set_text("")
|
||||
else:
|
||||
self._info_name.set_text("")
|
||||
self._info_desc.set_text("")
|
||||
self._info_args.set_text("")
|
||||
self._info_steps.set_text("")
|
||||
self._info_output.set_text("")
|
||||
self._info_source.set_text("")
|
||||
|
||||
def _on_tool_select(self, name):
|
||||
"""Called when a tool is selected (Enter/double-click)."""
|
||||
|
|
|
|||
Loading…
Reference in New Issue