Implement content hash system for integrity verification

- Add hash_utils.py module for SHA256 content hashing with normalized YAML
- Store config_hash in registry database on publish
- Include hash in download response for client verification
- Verify downloaded content matches registry hash on install
- Store registry_hash in local tool config for publish state tracking
- Show publish state indicators in Tools page UI:
  - Green checkmark: Published and up to date
  - Orange dot: Modified since last publish
  - No indicator: Local tool (never published)

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
This commit is contained in:
rob 2026-01-16 01:31:08 -04:00
parent 8e6b03cade
commit 78025aac8e
6 changed files with 240 additions and 10 deletions

View File

@ -1,19 +1,58 @@
"""Tools page - main view for managing tools."""
from collections import defaultdict
from pathlib import Path
from typing import Optional, Tuple
import yaml
from PySide6.QtWidgets import (
QWidget, QVBoxLayout, QHBoxLayout, QSplitter,
QTreeWidget, QTreeWidgetItem, QTextEdit, QLabel,
QPushButton, QGroupBox, QMessageBox, QFrame
)
from PySide6.QtCore import Qt
from PySide6.QtGui import QFont
from PySide6.QtGui import QFont, QColor, QBrush
from ...tool import (
Tool, ToolArgument, PromptStep, CodeStep, ToolStep,
list_tools, load_tool, delete_tool, DEFAULT_CATEGORIES
list_tools, load_tool, delete_tool, DEFAULT_CATEGORIES,
get_tools_dir
)
from ...config import load_config
from ...hash_utils import compute_config_hash
def get_tool_publish_state(tool_name: str) -> Tuple[str, Optional[str]]:
"""
Get the publish state of a tool.
Returns:
Tuple of (state, registry_hash) where state is:
- "published" - has registry_hash and current hash matches
- "modified" - has registry_hash but current hash differs
- "local" - no registry_hash (never published/downloaded)
"""
config_path = get_tools_dir() / tool_name / "config.yaml"
if not config_path.exists():
return ("local", None)
try:
config = yaml.safe_load(config_path.read_text())
registry_hash = config.get("registry_hash")
if not registry_hash:
return ("local", None)
# Compute current hash (excluding hash fields)
current_hash = compute_config_hash(config)
if current_hash == registry_hash:
return ("published", registry_hash)
else:
return ("modified", registry_hash)
except Exception:
return ("local", None)
class ToolsPage(QWidget):
@ -169,10 +208,34 @@ class ToolsPage(QWidget):
# Tools in category
for name, tool in sorted(tools_by_category[category], key=lambda x: x[0]):
tool_item = QTreeWidgetItem([name])
# Get publish state
state, registry_hash = get_tool_publish_state(name)
# Show state indicator in display name
if state == "published":
display_name = f"{name}"
tooltip = "Published to registry - up to date"
color = QColor(56, 161, 105) # Green
elif state == "modified":
display_name = f"{name}"
tooltip = "Published to registry - local modifications"
color = QColor(221, 107, 32) # Orange
else:
display_name = name
tooltip = "Local tool - not published"
color = None
tool_item = QTreeWidgetItem([display_name])
tool_item.setData(0, Qt.UserRole, name)
if color:
tool_item.setForeground(0, QBrush(color))
# Build tooltip
if tool.source and tool.source.type == "imported":
tool_item.setToolTip(0, f"Imported from {tool.source.url or 'registry'}")
tooltip = f"Imported from {tool.source.url or 'registry'}"
tool_item.setToolTip(0, tooltip)
cat_item.addChild(tool_item)
self.tool_tree.addTopLevelItem(cat_item)
@ -224,6 +287,21 @@ class ToolsPage(QWidget):
if tool.description:
lines.append(f"<p style='color: #4a5568; margin-bottom: 16px;'>{tool.description}</p>")
# Publish state
state, registry_hash = get_tool_publish_state(tool.name)
if state == "published":
lines.append(
"<p style='background: #c6f6d5; color: #276749; padding: 6px 10px; "
"border-radius: 4px; margin-bottom: 12px; font-size: 12px;'>"
"✓ Published to registry - up to date</p>"
)
elif state == "modified":
lines.append(
"<p style='background: #feebc8; color: #c05621; padding: 6px 10px; "
"border-radius: 4px; margin-bottom: 12px; font-size: 12px;'>"
"● Modified since last publish - republish to update registry</p>"
)
# Source info
if tool.source:
source_type = tool.source.type

122
src/cmdforge/hash_utils.py Normal file
View File

@ -0,0 +1,122 @@
"""Content hash utilities for tool integrity verification.
This module provides consistent SHA256 hashing for tool content,
used for:
- Publish state tracking (detect local modifications)
- Download integrity verification
- Registry content verification
"""
import hashlib
from typing import Optional, Dict, Any
import yaml
def compute_config_hash(config: Dict[str, Any], exclude_fields: Optional[list] = None) -> str:
"""Compute SHA256 hash of tool configuration.
The hash is computed from a normalized YAML representation to ensure
consistent hashing regardless of field order or formatting.
Args:
config: Tool configuration dictionary
exclude_fields: Fields to exclude from hashing (e.g., 'published_hash', 'registry_hash')
Returns:
Hash string in format "sha256:<64-char-hex>"
"""
if exclude_fields is None:
exclude_fields = ['published_hash', 'registry_hash']
# Create a copy without excluded fields
config_copy = {k: v for k, v in config.items() if k not in exclude_fields}
# Normalize to YAML with sorted keys for consistent ordering
normalized = yaml.dump(config_copy, sort_keys=True, default_flow_style=False)
# Compute SHA256
hash_bytes = hashlib.sha256(normalized.encode('utf-8')).hexdigest()
return f"sha256:{hash_bytes}"
def compute_yaml_hash(yaml_content: str, exclude_fields: Optional[list] = None) -> str:
"""Compute SHA256 hash of YAML content string.
Parses the YAML, normalizes it, and computes hash.
Useful for hashing raw config.yaml content.
Args:
yaml_content: Raw YAML string
exclude_fields: Fields to exclude from hashing
Returns:
Hash string in format "sha256:<64-char-hex>"
"""
config = yaml.safe_load(yaml_content)
if config is None:
config = {}
return compute_config_hash(config, exclude_fields)
def compute_file_hash(file_path: str) -> str:
"""Compute SHA256 hash of a file's contents.
Used for hashing pattern files (e.g., Fabric's system.md).
Args:
file_path: Path to file
Returns:
Hash string in format "sha256:<64-char-hex>"
"""
with open(file_path, 'rb') as f:
hash_bytes = hashlib.sha256(f.read()).hexdigest()
return f"sha256:{hash_bytes}"
def verify_hash(content: str, expected_hash: str) -> bool:
"""Verify content matches expected hash.
Args:
content: Content to verify (YAML string)
expected_hash: Expected hash in format "sha256:<hex>"
Returns:
True if hash matches, False otherwise
"""
if not expected_hash or not expected_hash.startswith("sha256:"):
return False
computed = compute_yaml_hash(content)
return computed == expected_hash
def extract_hash_value(hash_string: str) -> Optional[str]:
"""Extract the hex value from a hash string.
Args:
hash_string: Hash in format "sha256:<hex>"
Returns:
The hex value without prefix, or None if invalid
"""
if not hash_string or not hash_string.startswith("sha256:"):
return None
return hash_string[7:] # Remove "sha256:" prefix
def short_hash(hash_string: str, length: int = 8) -> str:
"""Get a shortened version of a hash for display.
Args:
hash_string: Full hash string
length: Number of hex chars to include
Returns:
Shortened hash (e.g., "sha256:abc123...")
"""
hex_value = extract_hash_value(hash_string)
if hex_value:
return f"sha256:{hex_value[:length]}..."
return hash_string[:length + 10] + "..."

View File

@ -21,6 +21,7 @@ from argon2.exceptions import VerifyMismatchError
from .db import connect_db, init_db, query_all, query_one
from .rate_limit import RateLimiter
from .sync import process_webhook, get_categories_cache_path, get_repo_dir
from ..hash_utils import compute_yaml_hash
from .stats import (
refresh_tool_stats, get_tool_stats, refresh_publisher_stats,
get_publisher_stats, track_tool_usage, calculate_badges, BADGES,
@ -996,6 +997,7 @@ def create_app() -> Flask:
"resolved_version": row["version"],
"config": row["config_yaml"],
"readme": row["readme"] or "",
"config_hash": row.get("config_hash") or "",
}
})
response.headers["Cache-Control"] = "max-age=3600, immutable"
@ -1792,6 +1794,9 @@ def create_app() -> Flask:
tags_json = json.dumps(tags)
# Compute content hash for integrity verification
config_hash = compute_yaml_hash(config_text)
# Determine status based on scrutiny
if scrutiny_report and scrutiny_report.get("decision") == "approve":
scrutiny_status = "approved"
@ -1816,8 +1821,8 @@ def create_app() -> Flask:
owner, name, version, description, category, tags, config_yaml, readme,
publisher_id, deprecated, deprecated_message, replacement, downloads,
scrutiny_status, scrutiny_report, source, source_url, source_json,
visibility, moderation_status, published_at
) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
config_hash, visibility, moderation_status, published_at
) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
""",
[
owner,
@ -1838,6 +1843,7 @@ def create_app() -> Flask:
source,
source_url,
source_json,
config_hash,
visibility,
moderation_status,
datetime.utcnow().isoformat(),

View File

@ -59,6 +59,7 @@ CREATE TABLE IF NOT EXISTS tools (
source TEXT,
source_url TEXT,
source_json TEXT,
config_hash TEXT,
visibility TEXT DEFAULT 'public',
moderation_status TEXT DEFAULT 'pending',
moderation_note TEXT,
@ -448,6 +449,7 @@ def migrate_db(conn: sqlite3.Connection) -> None:
("source", "TEXT", "NULL"),
("source_url", "TEXT", "NULL"),
("source_json", "TEXT", "NULL"),
("config_hash", "TEXT", "NULL"),
("visibility", "TEXT", "'public'"),
("moderation_status", "TEXT", "'pending'"),
("moderation_note", "TEXT", "NULL"),
@ -512,6 +514,7 @@ def migrate_db(conn: sqlite3.Connection) -> None:
try:
conn.execute("CREATE INDEX IF NOT EXISTS idx_tools_owner ON tools(owner)")
conn.execute("CREATE INDEX IF NOT EXISTS idx_tools_moderation ON tools(moderation_status, visibility)")
conn.execute("CREATE INDEX IF NOT EXISTS idx_tools_hash ON tools(config_hash)")
conn.execute("CREATE INDEX IF NOT EXISTS idx_publishers_role ON publishers(role)")
conn.execute("CREATE INDEX IF NOT EXISTS idx_publishers_banned ON publishers(banned)")
conn.execute("CREATE INDEX IF NOT EXISTS idx_audit_log_target ON audit_log(target_type, target_id)")

View File

@ -112,6 +112,7 @@ class DownloadResult:
resolved_version: str
config_yaml: str
readme: str = ""
config_hash: str = "" # Registry hash for integrity verification
class RegistryClient:
@ -544,7 +545,8 @@ class RegistryClient:
name=data.get("name", name),
resolved_version=data.get("resolved_version", ""),
config_yaml=data.get("config", ""),
readme=data.get("readme", "")
readme=data.get("readme", ""),
config_hash=data.get("config_hash", "")
)
def get_categories(self) -> List[Dict[str, Any]]:

View File

@ -352,7 +352,8 @@ class ToolResolver:
name=result.name,
version=result.resolved_version,
config_yaml=result.config_yaml,
readme=result.readme
readme=result.readme,
config_hash=result.config_hash
)
if self.verbose:
@ -379,13 +380,30 @@ class ToolResolver:
name: str,
version: str,
config_yaml: str,
readme: str = ""
readme: str = "",
config_hash: str = ""
) -> ResolvedTool:
"""Install a tool fetched from registry to global directory."""
# Verify hash if provided
if config_hash:
from .hash_utils import compute_yaml_hash
computed_hash = compute_yaml_hash(config_yaml)
if computed_hash != config_hash:
raise RuntimeError(
f"Hash mismatch for {owner}/{name}: expected {config_hash[:20]}..., "
f"got {computed_hash[:20]}... - content may have been tampered with"
)
# Create directory structure
tool_dir = TOOLS_DIR / owner / name
tool_dir.mkdir(parents=True, exist_ok=True)
# Add registry_hash to config so we can track publish state
if config_hash:
parsed_config = yaml.safe_load(config_yaml)
parsed_config["registry_hash"] = config_hash
config_yaml = yaml.dump(parsed_config, default_flow_style=False, sort_keys=False)
# Write config
config_path = tool_dir / "config.yaml"
config_path.write_text(config_yaml)
@ -593,7 +611,8 @@ def install_from_registry(spec: str, version: Optional[str] = None) -> ResolvedT
name=result.name,
version=result.resolved_version,
config_yaml=result.config_yaml,
readme=result.readme
readme=result.readme,
config_hash=result.config_hash
)