"""Collections page - manage local collections and browse registry collections.""" from pathlib import Path from typing import Optional from PySide6.QtWidgets import ( QWidget, QVBoxLayout, QHBoxLayout, QSplitter, QListWidget, QListWidgetItem, QTextEdit, QLabel, QPushButton, QGroupBox, QMessageBox, QFrame, QLineEdit, QTabWidget, QDialog, QFormLayout, QDialogButtonBox, QComboBox, QCheckBox ) from PySide6.QtCore import Qt, QThread, Signal, QTimer from PySide6.QtGui import QFont from ...collection import ( Collection, list_collections, get_collection, COLLECTIONS_DIR, classify_tool_reference, resolve_tool_references, gather_local_unpublished_deps ) from ...tool import list_tools from ...config import load_config class CollectionStatusSyncWorker(QThread): """Background worker to sync collection statuses from registry.""" collection_updated = Signal(str) # collection name finished = Signal() def __init__(self, pending_collections: list): """ Args: pending_collections: List of (collection_name, pending_tools) tuples """ super().__init__() self.pending_collections = pending_collections def run(self): from ...registry_client import get_client, RegistryError try: client = get_client() # Validate token first is_valid, _ = client.validate_token() if not is_valid: return for coll_name, pending_tools in self.pending_collections: try: self._sync_collection(client, coll_name, pending_tools) except Exception: pass # Skip collections that fail to sync except Exception: pass # Silently fail - this is background sync finally: self.finished.emit() def _sync_collection(self, client, coll_name: str, pending_tools: list): """Sync a single collection's pending tool statuses.""" from ...registry_client import RegistryError coll = get_collection(coll_name) if not coll or not coll.pending_approval: return all_approved = True has_rejected = False still_pending = [] for tool_name in pending_tools: try: status_info = client.get_my_tool_status(tool_name) status = status_info.get("status", "pending") if status == "approved": pass # Good elif status == "rejected": has_rejected = True all_approved = False else: # Still pending or changes_requested still_pending.append(tool_name) all_approved = False except RegistryError: # Tool not found or other error - treat as still pending still_pending.append(tool_name) all_approved = False # Update collection state based on results changed = False if all_approved: # All tools approved - now actually publish the collection to registry! try: user_slug = coll.maintainer if not user_slug: me = client.get_me() user_slug = me.get("slug", "") # Build registry refs registry_refs = [] transformed_pinned = {} for tool_ref in coll.tools: if "/" in tool_ref: registry_refs.append(tool_ref) if tool_ref in coll.pinned: transformed_pinned[tool_ref] = coll.pinned[tool_ref] else: full_ref = f"{user_slug}/{tool_ref}" registry_refs.append(full_ref) if tool_ref in coll.pinned: transformed_pinned[full_ref] = coll.pinned[tool_ref] # Publish collection payload = { "name": coll.name, "display_name": coll.display_name, "description": coll.description, "maintainer": user_slug, "tools": registry_refs, "pinned": transformed_pinned, "tags": coll.tags, } client.publish_collection(payload) # Update local state coll.published = True coll.registry_name = coll.name coll.pending_approval = False coll.pending_tools = [] coll.maintainer = user_slug changed = True except Exception: # Failed to publish - just clear pending state coll.pending_approval = False coll.pending_tools = [] changed = True elif has_rejected: # Some tools rejected - clear pending state so user can retry coll.pending_approval = False coll.pending_tools = [] changed = True elif still_pending != pending_tools: # Some tools approved, update the list coll.pending_tools = still_pending changed = True if changed: coll.save() self.collection_updated.emit(coll_name) class CollectionCreateDialog(QDialog): """Dialog for creating a new collection.""" def __init__(self, parent=None): super().__init__(parent) self.setWindowTitle("Create Collection") self.setMinimumWidth(400) layout = QVBoxLayout(self) # Form form = QFormLayout() self.name_edit = QLineEdit() self.name_edit.setPlaceholderText("my-collection (lowercase, hyphens ok)") form.addRow("Name:", self.name_edit) self.display_name_edit = QLineEdit() self.display_name_edit.setPlaceholderText("My Collection") form.addRow("Display Name:", self.display_name_edit) self.description_edit = QTextEdit() self.description_edit.setMaximumHeight(80) self.description_edit.setPlaceholderText("Description of your collection...") form.addRow("Description:", self.description_edit) self.tags_edit = QLineEdit() self.tags_edit.setPlaceholderText("tag1, tag2, tag3") form.addRow("Tags:", self.tags_edit) layout.addLayout(form) # Buttons buttons = QDialogButtonBox( QDialogButtonBox.Ok | QDialogButtonBox.Cancel ) buttons.accepted.connect(self.accept) buttons.rejected.connect(self.reject) layout.addWidget(buttons) # Auto-fill display name from name self.name_edit.textChanged.connect(self._update_display_name) def _update_display_name(self, name: str): if not self.display_name_edit.text(): display = name.replace("-", " ").title() self.display_name_edit.setPlaceholderText(display or "My Collection") def get_data(self) -> dict: name = self.name_edit.text().strip().lower() display_name = self.display_name_edit.text().strip() if not display_name: display_name = name.replace("-", " ").title() tags_text = self.tags_edit.text().strip() tags = [t.strip() for t in tags_text.split(",") if t.strip()] return { "name": name, "display_name": display_name, "description": self.description_edit.toPlainText().strip(), "tags": tags, } class AddToolDialog(QDialog): """Dialog for adding a tool to a collection.""" def __init__(self, parent=None): super().__init__(parent) self.setWindowTitle("Add Tool") self.setMinimumWidth(350) layout = QVBoxLayout(self) # Tool selection form = QFormLayout() self.tool_combo = QComboBox() self.tool_combo.setEditable(True) self.tool_combo.setPlaceholderText("Select or enter tool name...") # Populate with local tools local_tools = list_tools() self.tool_combo.addItems(local_tools) form.addRow("Tool:", self.tool_combo) self.version_edit = QLineEdit() self.version_edit.setPlaceholderText("Optional: ^1.0.0 or specific version") form.addRow("Pin Version:", self.version_edit) layout.addLayout(form) # Hint hint = QLabel("For registry tools, use format: owner/name") hint.setStyleSheet("color: gray; font-size: 11px;") layout.addWidget(hint) # Buttons buttons = QDialogButtonBox( QDialogButtonBox.Ok | QDialogButtonBox.Cancel ) buttons.accepted.connect(self.accept) buttons.rejected.connect(self.reject) layout.addWidget(buttons) def get_data(self) -> dict: return { "tool": self.tool_combo.currentText().strip(), "version": self.version_edit.text().strip() or None, } class InstallWorker(QThread): """Background worker for installing registry collections.""" finished = Signal(bool, str, int, int) # success, message, installed, failed progress = Signal(str) # status message def __init__(self, collection_name: str): super().__init__() self.collection_name = collection_name def run(self): from ...registry_client import get_client, RegistryError from ...resolver import install_from_registry installed = 0 failed = 0 try: client = get_client() self.progress.emit(f"Fetching collection '{self.collection_name}'...") coll = client.get_collection(self.collection_name) tools = coll.get("tools", []) if not tools: self.finished.emit(True, "Collection has no tools", 0, 0) return total = len(tools) for i, tool in enumerate(tools, 1): if isinstance(tool, dict): tool_ref = f"{tool['owner']}/{tool['name']}" else: tool_ref = tool self.progress.emit(f"Installing {tool_ref} ({i}/{total})...") try: install_from_registry(tool_ref) installed += 1 except Exception as e: failed += 1 if failed: self.finished.emit(False, f"Installed {installed} tools, {failed} failed", installed, failed) else: self.finished.emit(True, f"Installed {installed} tools from '{self.collection_name}'", installed, failed) except RegistryError as e: self.finished.emit(False, f"Registry error: {e.message}", installed, failed) except Exception as e: self.finished.emit(False, f"Error: {str(e)}", installed, failed) class PublishAnalysisWorker(QThread): """Background worker to analyze collection before publishing.""" finished = Signal(dict) # result dict with analysis def __init__(self, collection: Collection): super().__init__() self.collection = collection def run(self): from ...registry_client import get_client, RegistryError result = { "success": False, "error": None, "user_slug": None, "resolution": None, "dep_result": None, # Transitive dependency check result } try: client = get_client() # Validate token is_valid, error = client.validate_token() if not is_valid: result["error"] = f"Not authenticated: {error}" self.finished.emit(result) return # Get user slug user_info = client.get_me() user_slug = user_info.get("slug", "") if not user_slug: result["error"] = "Could not determine registry username" self.finished.emit(result) return result["user_slug"] = user_slug # Resolve tool references resolution = resolve_tool_references( self.collection.tools, self.collection.pinned, user_slug, client ) result["resolution"] = resolution # Gather transitive dependencies from local tools in the collection local_tool_names = [ref for ref in self.collection.tools if '/' not in ref] if local_tool_names: dep_result = gather_local_unpublished_deps(local_tool_names, client, user_slug) result["dep_result"] = dep_result result["success"] = True self.finished.emit(result) except RegistryError as e: result["error"] = f"Registry error: {e.message}" self.finished.emit(result) except Exception as e: result["error"] = f"Error: {str(e)}" self.finished.emit(result) class PublishWorker(QThread): """Background worker for publishing collections.""" finished = Signal(bool, str) # success, message progress = Signal(str) # status message def __init__(self, collection: Collection, user_slug: str, registry_refs: list, transformed_pinned: dict, skip_unpublished: list = None, publish_unpublished: list = None): super().__init__() self.collection = collection self.user_slug = user_slug self.registry_refs = registry_refs self.transformed_pinned = transformed_pinned self.skip_unpublished = skip_unpublished or [] self.publish_unpublished = publish_unpublished or [] def run(self): from ...registry_client import get_client, RegistryError try: client = get_client() # Publish unpublished local tools first if requested if self.publish_unpublished: from ...cli.collections_commands import _publish_single_tool pending_tools = [] for tool_name in self.publish_unpublished: self.progress.emit(f"Publishing tool: {tool_name}...") result = _publish_single_tool(tool_name, client) if not result.get("success"): self.finished.emit(False, f"Failed to publish {tool_name}: {result.get('error')}") return if result.get("pending"): pending_tools.append(tool_name) if pending_tools: # Save pending state and exit without publishing collection yet self.collection.pending_approval = True self.collection.pending_tools = pending_tools self.collection.maintainer = self.user_slug self.collection.save() self.finished.emit(True, "Tools submitted for review. Collection will publish once approved.") return # Filter out skipped tools if self.skip_unpublished: unpublished_full_refs = {f"{self.user_slug}/{u}" for u in self.skip_unpublished} self.registry_refs = [ref for ref in self.registry_refs if ref not in unpublished_full_refs] for ref in unpublished_full_refs: self.transformed_pinned.pop(ref, None) if not self.registry_refs: self.finished.emit(False, "No tools remaining after filtering") return # Publish collection self.progress.emit("Publishing collection...") payload = { "name": self.collection.name, "display_name": self.collection.display_name, "description": self.collection.description, "maintainer": self.user_slug, "tools": self.registry_refs, "pinned": self.transformed_pinned, "tags": self.collection.tags, } client.publish_collection(payload) # Update local state self.collection.published = True self.collection.registry_name = self.collection.name self.collection.maintainer = self.user_slug self.collection.pending_approval = False self.collection.pending_tools = [] # If we skipped tools, update the local collection to match if self.skip_unpublished: self.collection.tools = [t for t in self.collection.tools if t not in self.skip_unpublished] for u in self.skip_unpublished: self.collection.pinned.pop(u, None) self.collection.save() self.finished.emit(True, f"Collection published: {self.collection.name}") except RegistryError as e: self.finished.emit(False, f"Registry error: {e.message}") except Exception as e: self.finished.emit(False, f"Error: {str(e)}") class CollectionsPage(QWidget): """Page for managing collections.""" def __init__(self, main_window): super().__init__() self.main_window = main_window self._setup_ui() self._publish_worker = None self._analysis_worker = None self._install_worker = None self._pending_analysis = None # Stores analysis result for publish flow # Status sync self._sync_worker = None self._poll_timer = None self._has_pending_collections = False self._syncing = False def _setup_ui(self): layout = QVBoxLayout(self) layout.setContentsMargins(20, 20, 20, 20) # Header header = QHBoxLayout() title = QLabel("Collections") title.setFont(QFont("", 18, QFont.Bold)) header.addWidget(title) header.addStretch() self.btn_create = QPushButton("New Collection") self.btn_create.clicked.connect(self._create_collection) header.addWidget(self.btn_create) self.btn_refresh = QPushButton("Refresh") self.btn_refresh.clicked.connect(self.refresh) header.addWidget(self.btn_refresh) layout.addLayout(header) # Tabs for local vs registry self.tabs = QTabWidget() # Local collections tab local_widget = QWidget() local_layout = QHBoxLayout(local_widget) # Splitter for list and details splitter = QSplitter(Qt.Horizontal) # Collection list list_frame = QFrame() list_layout = QVBoxLayout(list_frame) list_layout.setContentsMargins(0, 0, 0, 0) self.collection_list = QListWidget() self.collection_list.currentItemChanged.connect(self._on_selection_changed) list_layout.addWidget(self.collection_list) splitter.addWidget(list_frame) # Details panel details_frame = QFrame() details_layout = QVBoxLayout(details_frame) self.details_title = QLabel("Select a collection") self.details_title.setFont(QFont("", 14, QFont.Bold)) details_layout.addWidget(self.details_title) self.details_description = QLabel("") self.details_description.setWordWrap(True) details_layout.addWidget(self.details_description) self.details_status = QLabel("") details_layout.addWidget(self.details_status) # Tools list tools_group = QGroupBox("Tools") tools_layout = QVBoxLayout(tools_group) self.tools_list = QListWidget() tools_layout.addWidget(self.tools_list) # Tool action buttons tool_buttons = QHBoxLayout() self.btn_add_tool = QPushButton("Add Tool") self.btn_add_tool.clicked.connect(self._add_tool) self.btn_add_tool.setEnabled(False) tool_buttons.addWidget(self.btn_add_tool) self.btn_remove_tool = QPushButton("Remove Tool") self.btn_remove_tool.clicked.connect(self._remove_tool) self.btn_remove_tool.setEnabled(False) tool_buttons.addWidget(self.btn_remove_tool) tool_buttons.addStretch() tools_layout.addLayout(tool_buttons) details_layout.addWidget(tools_group) # Collection action buttons action_buttons = QHBoxLayout() self.btn_publish = QPushButton("Publish") self.btn_publish.clicked.connect(self._publish_collection) self.btn_publish.setEnabled(False) action_buttons.addWidget(self.btn_publish) self.btn_delete = QPushButton("Delete") self.btn_delete.clicked.connect(self._delete_collection) self.btn_delete.setEnabled(False) action_buttons.addWidget(self.btn_delete) action_buttons.addStretch() details_layout.addLayout(action_buttons) details_layout.addStretch() splitter.addWidget(details_frame) splitter.setSizes([250, 450]) local_layout.addWidget(splitter) self.tabs.addTab(local_widget, "Local Collections") # Registry collections tab registry_widget = QWidget() registry_layout = QVBoxLayout(registry_widget) self.registry_list = QListWidget() self.registry_list.itemDoubleClicked.connect(self._install_registry_collection) registry_layout.addWidget(self.registry_list) registry_hint = QLabel("Double-click to install a collection") registry_hint.setStyleSheet("color: gray;") registry_layout.addWidget(registry_hint) self.tabs.addTab(registry_widget, "Registry Collections") layout.addWidget(self.tabs) def refresh(self): """Refresh both local and registry collections.""" self._has_pending_collections = False self._load_local_collections() self._load_registry_collections() # Start background sync for pending collections self._start_background_sync() # Manage polling timer based on pending state self._manage_poll_timer() def _load_local_collections(self): """Load local collections into the list.""" self.collection_list.clear() for name in list_collections(): coll = get_collection(name) if coll: item = QListWidgetItem(coll.display_name) item.setData(Qt.UserRole, name) # Show status indicator if coll.pending_approval: item.setText(f"{coll.display_name} (pending)") self._has_pending_collections = True elif coll.published: item.setText(f"{coll.display_name} (published)") self.collection_list.addItem(item) if self.collection_list.count() == 0: item = QListWidgetItem("No local collections") item.setFlags(Qt.NoItemFlags) self.collection_list.addItem(item) def _load_registry_collections(self): """Load registry collections in background.""" self.registry_list.clear() self.registry_list.addItem("Loading...") # Load in background to avoid blocking UI QTimer.singleShot(100, self._fetch_registry_collections) def _fetch_registry_collections(self): """Actually fetch registry collections.""" from ...registry_client import get_client, RegistryError self.registry_list.clear() try: client = get_client() collections = client.get_collections() if not collections: self.registry_list.addItem("No registry collections available") return for coll in collections: name = coll.get("name", "") display_name = coll.get("display_name", name) tool_count = coll.get("tool_count", 0) maintainer = coll.get("maintainer", "") item = QListWidgetItem(f"{display_name} ({tool_count} tools) - {maintainer}") item.setData(Qt.UserRole, name) self.registry_list.addItem(item) except RegistryError as e: self.registry_list.addItem(f"Error: {e.message}") except Exception as e: self.registry_list.addItem(f"Error: {str(e)}") def _start_background_sync(self): """Start background sync for collections with pending tool approvals.""" if self._syncing: return config = load_config() if not config.registry.token: return # No auth, can't check status # Gather pending collections pending_collections = [] for name in list_collections(): coll = get_collection(name) if coll and coll.pending_approval and coll.pending_tools: pending_collections.append((name, list(coll.pending_tools))) if not pending_collections: return # Stop any existing sync if self._sync_worker and self._sync_worker.isRunning(): self._sync_worker.wait(1000) # Start new sync self._sync_worker = CollectionStatusSyncWorker(pending_collections) self._sync_worker.collection_updated.connect(self._on_collection_status_updated) self._sync_worker.finished.connect(self._on_sync_finished) self._syncing = True self._sync_worker.start() def _on_sync_finished(self): """Handle background sync completion.""" self._syncing = False def _manage_poll_timer(self): """Start or stop the polling timer based on pending collections.""" config = load_config() should_poll = self._has_pending_collections and config.registry.token if should_poll: if not self._poll_timer: # Create timer that polls every 30 seconds self._poll_timer = QTimer(self) self._poll_timer.timeout.connect(self._poll_status) if not self._poll_timer.isActive(): self._poll_timer.start(30000) # 30 seconds else: # No pending collections, stop polling if self._poll_timer and self._poll_timer.isActive(): self._poll_timer.stop() def _poll_status(self): """Timer callback to poll status for pending collections.""" if self._syncing: return # Already syncing # Check if we still have pending collections pending_collections = [] for name in list_collections(): coll = get_collection(name) if coll and coll.pending_approval and coll.pending_tools: pending_collections.append((name, list(coll.pending_tools))) if pending_collections: # Stop existing sync if any if self._sync_worker and self._sync_worker.isRunning(): self._sync_worker.wait(1000) # Start new sync self._sync_worker = CollectionStatusSyncWorker(pending_collections) self._sync_worker.collection_updated.connect(self._on_collection_status_updated) self._sync_worker.finished.connect(self._on_sync_finished) self._syncing = True self._sync_worker.start() else: # No more pending collections, stop timer if self._poll_timer and self._poll_timer.isActive(): self._poll_timer.stop() self._has_pending_collections = False def _on_collection_status_updated(self, coll_name: str): """Handle background sync updating a collection's status.""" # Refresh the display self._load_local_collections() # Re-select the current item if it was the updated one current = self.collection_list.currentItem() if current and current.data(Qt.UserRole) == coll_name: coll = get_collection(coll_name) if coll: self._show_collection_details(coll) # Show status message coll = get_collection(coll_name) if coll: if coll.pending_approval: self.main_window.show_status(f"Collection '{coll_name}' still has pending tools") elif coll.published: self.main_window.show_status(f"Collection '{coll_name}' published to registry!") self._load_registry_collections() # Refresh registry tab when auto-published else: self.main_window.show_status(f"Collection '{coll_name}' status updated (some tools may have been rejected)") def _on_selection_changed(self, current, previous): """Handle collection selection change.""" if not current: self._clear_details() return name = current.data(Qt.UserRole) if not name: self._clear_details() return coll = get_collection(name) if not coll: self._clear_details() return self._show_collection_details(coll) def _clear_details(self): """Clear the details panel.""" self.details_title.setText("Select a collection") self.details_description.setText("") self.details_status.setText("") self.tools_list.clear() self.btn_add_tool.setEnabled(False) self.btn_remove_tool.setEnabled(False) self.btn_publish.setEnabled(False) self.btn_delete.setEnabled(False) def _show_collection_details(self, coll: Collection): """Show collection details in the panel.""" self.details_title.setText(coll.display_name) self.details_description.setText(coll.description or "(No description)") # Status if coll.pending_approval: self.details_status.setText("Status: Pending tool approvals") self.details_status.setStyleSheet("color: orange;") elif coll.published: self.details_status.setText(f"Status: Published as '{coll.registry_name}'") self.details_status.setStyleSheet("color: green;") else: self.details_status.setText("Status: Local only") self.details_status.setStyleSheet("color: gray;") # Tools list self.tools_list.clear() local_tools = set(list_tools()) for tool_ref in coll.tools: owner, name, is_local = classify_tool_reference(tool_ref) pinned = coll.pinned.get(tool_ref, "") version_str = f" @ {pinned}" if pinned else "" if is_local: exists = name in local_tools status = "" if exists else " (not found)" item = QListWidgetItem(f"{name}{version_str} [local]{status}") else: item = QListWidgetItem(f"{tool_ref}{version_str} [registry]") item.setData(Qt.UserRole, tool_ref) self.tools_list.addItem(item) # Enable buttons self.btn_add_tool.setEnabled(True) self.btn_remove_tool.setEnabled(True) self.btn_publish.setEnabled(not coll.published) self.btn_delete.setEnabled(True) def _create_collection(self): """Create a new collection.""" dialog = CollectionCreateDialog(self) if dialog.exec() != QDialog.Accepted: return data = dialog.get_data() name = data["name"] if not name: QMessageBox.warning(self, "Error", "Collection name is required") return # Validate name format import re if not re.match(r'^[a-z0-9-]+$', name): QMessageBox.warning(self, "Error", "Collection name must be lowercase alphanumeric with hyphens only") return # Check if exists if name in list_collections(): QMessageBox.warning(self, "Error", f"Collection '{name}' already exists") return # Create collection coll = Collection( name=name, display_name=data["display_name"], description=data["description"], tags=data["tags"], ) coll.save() self._load_local_collections() self.main_window.show_status(f"Created collection: {name}") # Select the new collection for i in range(self.collection_list.count()): item = self.collection_list.item(i) if item.data(Qt.UserRole) == name: self.collection_list.setCurrentItem(item) break def _add_tool(self): """Add a tool to the selected collection.""" current = self.collection_list.currentItem() if not current: return name = current.data(Qt.UserRole) coll = get_collection(name) if not coll: return dialog = AddToolDialog(self) if dialog.exec() != QDialog.Accepted: return data = dialog.get_data() tool_ref = data["tool"] if not tool_ref: QMessageBox.warning(self, "Error", "Tool name is required") return if tool_ref in coll.tools: QMessageBox.information(self, "Info", f"Tool '{tool_ref}' already in collection") return coll.tools.append(tool_ref) if data["version"]: coll.pinned[tool_ref] = data["version"] coll.save() self._show_collection_details(coll) self.main_window.show_status(f"Added '{tool_ref}' to collection") def _remove_tool(self): """Remove selected tool from collection.""" current = self.collection_list.currentItem() if not current: return name = current.data(Qt.UserRole) coll = get_collection(name) if not coll: return tool_item = self.tools_list.currentItem() if not tool_item: QMessageBox.warning(self, "Error", "Select a tool to remove") return tool_ref = tool_item.data(Qt.UserRole) reply = QMessageBox.question( self, "Remove Tool", f"Remove '{tool_ref}' from collection?", QMessageBox.Yes | QMessageBox.No ) if reply != QMessageBox.Yes: return coll.tools.remove(tool_ref) coll.pinned.pop(tool_ref, None) coll.save() self._show_collection_details(coll) self.main_window.show_status(f"Removed '{tool_ref}' from collection") def _publish_collection(self): """Publish the selected collection.""" current = self.collection_list.currentItem() if not current: return name = current.data(Qt.UserRole) coll = get_collection(name) if not coll: return if not coll.tools: QMessageBox.warning(self, "Error", "Collection has no tools") return # Start analysis first self.btn_publish.setEnabled(False) self.btn_publish.setText("Analyzing...") self.main_window.show_status("Analyzing collection...") self._analysis_worker = PublishAnalysisWorker(coll) self._analysis_worker.finished.connect(self._on_analysis_finished) self._analysis_worker.start() def _on_analysis_finished(self, result: dict): """Handle publish analysis completion.""" self.btn_publish.setText("Publish") self.btn_publish.setEnabled(True) if not result["success"]: QMessageBox.warning(self, "Analysis Failed", result["error"]) return resolution = result["resolution"] user_slug = result["user_slug"] # Get current collection again current = self.collection_list.currentItem() if not current: return coll = get_collection(current.data(Qt.UserRole)) if not coll: return # Check for blocking visibility issues if resolution.visibility_issues: tools = ", ".join(t[0] for t in resolution.visibility_issues) QMessageBox.warning( self, "Cannot Publish", f"The following tools are not public:\n\n{tools}\n\n" "Collections can only include PUBLIC tools.\n" "Update these tools' visibility to 'public' and republish them first." ) return # Check for registry tool issues if resolution.registry_tool_issues: issues = "\n".join(f" - {t[0]}: {t[1]}" for t in resolution.registry_tool_issues) QMessageBox.warning( self, "Cannot Publish", f"The following registry tools have issues:\n\n{issues}\n\n" "Collections can only include approved public tools." ) return # Gather all unpublished tools (collection tools + transitive deps) dep_result = result.get("dep_result") # Start with tools not in registry at all collection_tools_unpub = list(resolution.local_unpublished) # Also include tools that exist but aren't approved (rejected, pending, changes_requested) for name, status, has_approved in resolution.local_published: if status != "approved" and name not in collection_tools_unpub: collection_tools_unpub.append(name) dependency_tools_unpub = [] if dep_result and dep_result.unpublished: for dep in dep_result.unpublished: if dep not in collection_tools_unpub: dependency_tools_unpub.append(dep) all_unpublished = collection_tools_unpub + dependency_tools_unpub # Handle unpublished local tools (both collection tools and their deps) publish_list = [] skip_list = [] if all_unpublished: # Build detailed message message = "Some tools are not published yet.\n\n" if collection_tools_unpub: message += "Collection tools:\n" message += "\n".join(f" - {t}" for t in collection_tools_unpub) message += "\n\n" if dependency_tools_unpub: message += "Dependencies:\n" message += "\n".join(f" - {t}" for t in dependency_tools_unpub) message += "\n\n" if dep_result and dep_result.cycles: message += "Warning: Circular dependencies detected:\n" for cycle in dep_result.cycles: message += f" {' -> '.join(cycle)}\n" message += "(Cyclic deps excluded from auto-publish)\n\n" if dep_result and dep_result.skipped: message += f"Note: Could not check {len(dep_result.skipped)} dep(s) due to errors.\n\n" message += "Choose how to proceed:" dialog = QMessageBox(self) dialog.setIcon(QMessageBox.Warning) dialog.setWindowTitle("Unpublished Tools") dialog.setText(message) btn_publish = dialog.addButton("Publish Tools First", QMessageBox.AcceptRole) btn_skip = dialog.addButton("Skip Unpublished", QMessageBox.DestructiveRole) btn_cancel = dialog.addButton("Cancel", QMessageBox.RejectRole) dialog.setDefaultButton(btn_publish) # Ensure buttons are wide enough for their text for btn in [btn_publish, btn_skip, btn_cancel]: btn.setMinimumWidth(btn.fontMetrics().horizontalAdvance(btn.text()) + 30) dialog.exec() clicked = dialog.clickedButton() if clicked == btn_publish: # Use topological order if available if dep_result and dep_result.publish_order: # Filter to only unpublished tools publish_list = [t for t in dep_result.publish_order if t in all_unpublished] else: publish_list = all_unpublished elif clicked == btn_skip: skip_list = all_unpublished else: return # Confirm publish remaining = len(resolution.registry_refs) - len(skip_list) if remaining == 0: QMessageBox.warning(self, "Error", "No tools remaining after filtering") return reply = QMessageBox.question( self, "Confirm Publish", f"Publish '{coll.display_name}' with {remaining} tools?", QMessageBox.Yes | QMessageBox.No ) if reply != QMessageBox.Yes: return # Start actual publish self.btn_publish.setEnabled(False) self.btn_publish.setText("Publishing...") self._publish_worker = PublishWorker( coll, user_slug, resolution.registry_refs, resolution.transformed_pinned, skip_unpublished=skip_list, publish_unpublished=publish_list ) self._publish_worker.progress.connect(self._on_publish_progress) self._publish_worker.finished.connect(self._on_publish_finished) self._publish_worker.start() def _on_publish_progress(self, message: str): """Handle publish progress updates.""" self.main_window.show_status(message) def _on_publish_finished(self, success: bool, message: str): """Handle publish completion.""" self.btn_publish.setText("Publish") if success: QMessageBox.information(self, "Published", message) self._load_local_collections() self._load_registry_collections() # Also refresh registry tab else: QMessageBox.warning(self, "Publish Failed", message) self.btn_publish.setEnabled(True) self.main_window.show_status(message) def _delete_collection(self): """Delete the selected collection.""" current = self.collection_list.currentItem() if not current: return name = current.data(Qt.UserRole) coll = get_collection(name) if not coll: return reply = QMessageBox.question( self, "Delete Collection", f"Delete collection '{coll.display_name}'?\n\n" "This only removes the local collection file.", QMessageBox.Yes | QMessageBox.No ) if reply != QMessageBox.Yes: return coll.delete() self._load_local_collections() self._clear_details() self.main_window.show_status(f"Deleted collection: {name}") def _install_registry_collection(self, item: QListWidgetItem): """Install a registry collection.""" name = item.data(Qt.UserRole) if not name: return reply = QMessageBox.question( self, "Install Collection", f"Install all tools from collection '{name}'?", QMessageBox.Yes | QMessageBox.No ) if reply != QMessageBox.Yes: return # Disable double-click during install self.registry_list.setEnabled(False) self.main_window.show_status(f"Installing collection '{name}'...") self._install_worker = InstallWorker(name) self._install_worker.progress.connect(self._on_install_progress) self._install_worker.finished.connect(self._on_install_finished) self._install_worker.start() def _on_install_progress(self, message: str): """Handle install progress updates.""" self.main_window.show_status(message) def _on_install_finished(self, success: bool, message: str, installed: int, failed: int): """Handle install completion.""" self.registry_list.setEnabled(True) if success: if installed == 0 and failed == 0: QMessageBox.information(self, "Info", message) else: QMessageBox.information(self, "Success", message) else: if installed > 0: QMessageBox.warning(self, "Partial Success", message) else: QMessageBox.warning(self, "Error", message) self.main_window.show_status(message)