Spaces:
Sleeping
Sleeping
| """Agent auto-discovery system for detecting installed CLI agents. | |
| Automatically discovers which agents (Claude Code, Gemini CLI, Aider, Copilot, etc.) | |
| are installed and available on the user's system. | |
| """ | |
| import asyncio | |
| import json | |
| import logging | |
| import os | |
| import platform | |
| import re | |
| import shutil | |
| import subprocess | |
| from dataclasses import dataclass, asdict | |
| from pathlib import Path | |
| from typing import Any | |
| logger = logging.getLogger(__name__) | |
| class AgentMetadata: | |
| """Metadata for a discovered agent.""" | |
| name: str | |
| command: str | list[str] | |
| version: str | None = None | |
| available: bool = False | |
| path: str | None = None | |
| error_message: str | None = None | |
| capabilities: list[str] | None = None | |
| verified_at: str | None = None | |
| class AgentDiscovery: | |
| """Automatic discovery system for CLI agents.""" | |
| # Known agent patterns to scan for | |
| # Note: On Windows, gemini must be checked first due to subprocess interaction issues | |
| VERIFICATION_TIMEOUT = 30.0 | |
| KNOWN_AGENTS = { | |
| "gemini": { | |
| "command": "gemini", | |
| "version_flag": "--version", | |
| "capabilities": ["security", "vision", "fast_iteration"], | |
| }, | |
| "claude": { | |
| "command": "claude", | |
| "version_flag": "--version", | |
| "capabilities": ["reasoning", "architecture", "code_generation"], | |
| }, | |
| "aider": { | |
| "command": "aider", | |
| "version_flag": "--version", | |
| "capabilities": ["git_operations", "code_editing", "refactoring"], | |
| }, | |
| "copilot": { | |
| "command": "copilot", | |
| "version_flag": "--version", | |
| "capabilities": ["github_integration", "suggestions"], | |
| }, | |
| "qwen": { | |
| "command": "qwen-code", | |
| "version_flag": "--version", | |
| "capabilities": ["code_generation", "multilingual"], | |
| }, | |
| } | |
| def __init__(self, cache_file: Path | None = None): | |
| """Initialize agent discovery system. | |
| Args: | |
| cache_file: Path to cache file for discovered agents | |
| """ | |
| # Set up safe cache directory | |
| cache_dir = Path.home() / ".cache" / "delegation-mcp" | |
| if cache_file: | |
| # Validate user-provided cache file path to prevent traversal | |
| cache_file = Path(cache_file) | |
| # Ensure it's just a filename, not a path | |
| if len(cache_file.parts) > 1: | |
| logger.warning(f"Cache file path contains directories, using only filename: {cache_file.name}") | |
| cache_file = cache_dir / cache_file.name | |
| else: | |
| cache_file = cache_dir / cache_file | |
| # Resolve to absolute path and check it's within cache_dir | |
| try: | |
| cache_file = cache_file.resolve() | |
| if not str(cache_file).startswith(str(cache_dir.resolve())): | |
| raise ValueError(f"Cache file path outside allowed directory: {cache_file}") | |
| except (OSError, ValueError) as e: | |
| logger.error(f"Invalid cache file path: {e}") | |
| cache_file = cache_dir / "discovered_agents.json" | |
| else: | |
| cache_file = cache_dir / "discovered_agents.json" | |
| self.cache_file = cache_file | |
| self._discovered_agents: dict[str, AgentMetadata] = {} | |
| self._load_cache() | |
| def _load_cache(self) -> None: | |
| """Load cached discovery results with validation.""" | |
| if self.cache_file.exists(): | |
| try: | |
| with open(self.cache_file) as f: | |
| data = json.load(f) | |
| # Validate data structure | |
| if not isinstance(data, dict): | |
| logger.warning("Invalid cache format: expected dictionary") | |
| return | |
| # Validate each entry | |
| required_fields = {'name', 'command', 'available'} | |
| for name, metadata in data.items(): | |
| # Validate name is safe | |
| if not isinstance(name, str) or not re.match(r'^[a-zA-Z0-9_\-]+$', name): | |
| logger.warning(f"Skipping invalid agent name in cache: {name}") | |
| continue | |
| # Validate metadata structure | |
| if not isinstance(metadata, dict): | |
| logger.warning(f"Skipping invalid metadata for {name}") | |
| continue | |
| if not required_fields.issubset(metadata.keys()): | |
| logger.warning(f"Skipping incomplete cache entry: {name}") | |
| continue | |
| try: | |
| self._discovered_agents[name] = AgentMetadata(**metadata) | |
| except Exception as e: | |
| logger.warning(f"Failed to load agent {name} from cache: {e}") | |
| continue | |
| logger.info(f"Loaded {len(self._discovered_agents)} agents from cache") | |
| except Exception as e: | |
| logger.warning(f"Failed to load agent cache: {e}") | |
| def _save_cache(self) -> None: | |
| """Save discovery results to cache.""" | |
| try: | |
| self.cache_file.parent.mkdir(parents=True, exist_ok=True) | |
| with open(self.cache_file, "w") as f: | |
| data = {name: asdict(agent) for name, agent in self._discovered_agents.items()} | |
| json.dump(data, f, indent=2) | |
| logger.info(f"Saved {len(self._discovered_agents)} agents to cache") | |
| except Exception as e: | |
| logger.error(f"Failed to save agent cache: {e}") | |
| def _resolve_command_path(self, command: str | list[str]) -> str | None: | |
| """Resolve command to full path if available. | |
| Args: | |
| command: Command string or list | |
| Returns: | |
| Full path to executable or None if not found | |
| """ | |
| cmd = command[0] if isinstance(command, list) else command | |
| # Use shutil.which for cross-platform command resolution | |
| path = shutil.which(cmd) | |
| if path: | |
| logger.debug(f"Found {cmd} at {path}") | |
| return path | |
| logger.debug(f"Command {cmd} not found in PATH") | |
| return None | |
| def _get_node_script_path(self, cmd_path: str) -> tuple[str, list[str]] | None: | |
| """For npm .cmd files on Windows, extract the underlying Node.js script. | |
| Args: | |
| cmd_path: Path to .cmd file | |
| Returns: | |
| Tuple of (node_executable, [script_path]) or None | |
| """ | |
| if not cmd_path.lower().endswith('.cmd'): | |
| return None | |
| try: | |
| # Read the .cmd file to find the node script | |
| with open(cmd_path, 'r') as f: | |
| content = f.read() | |
| # Look for pattern like: "%_prog%" "%dp0%\node_modules\...\index.js" | |
| import re | |
| match = re.search(r'"%dp0%\\node_modules\\([^"]+)"', content) | |
| if match: | |
| script_rel_path = match.group(1) | |
| npm_dir = Path(cmd_path).parent | |
| script_path = npm_dir / "node_modules" / script_rel_path | |
| if script_path.exists(): | |
| node_exe = shutil.which("node") | |
| if node_exe: | |
| logger.debug(f"Found node script: {script_path}") | |
| return node_exe, [str(script_path)] | |
| except Exception as e: | |
| logger.debug(f"Could not extract node script from {cmd_path}: {e}") | |
| return None | |
| async def _verify_agent( | |
| self, | |
| name: str, | |
| command: str | list[str], | |
| version_flag: str = "--version", | |
| ) -> tuple[bool, str | None, str | None]: | |
| """Verify agent is working by running version command. | |
| Args: | |
| name: Agent name | |
| command: Command to execute | |
| version_flag: Flag to get version (--version or --help) | |
| Returns: | |
| tuple: (is_available, version_string, error_message) | |
| """ | |
| # Build command | |
| if isinstance(command, list): | |
| cmd = command + [version_flag] | |
| else: | |
| cmd = [command, version_flag] | |
| logger.debug(f"Verifying {name} with command: {cmd}, is_list: {isinstance(command, list)}") | |
| try: | |
| # Try --version first | |
| # On Windows with string commands (not node direct calls), use cmd /c | |
| if platform.system() == "Windows" and isinstance(command, str): | |
| # Use cmd /c with proper quoting for Windows CMD/BAT files | |
| cmd_str = " ".join(cmd) | |
| process = await asyncio.create_subprocess_exec( | |
| "cmd", | |
| "/c", | |
| cmd_str, | |
| stdout=asyncio.subprocess.PIPE, | |
| stderr=asyncio.subprocess.PIPE, | |
| stdin=asyncio.subprocess.PIPE, | |
| ) | |
| else: | |
| # Direct execution (Unix or Windows with node direct call) | |
| process = await asyncio.create_subprocess_exec( | |
| *cmd, | |
| stdout=asyncio.subprocess.PIPE, | |
| stderr=asyncio.subprocess.PIPE, | |
| stdin=asyncio.subprocess.PIPE, | |
| ) | |
| # Close stdin to prevent processes from hanging while waiting for input | |
| if process.stdin: | |
| process.stdin.close() | |
| stdout, stderr = await asyncio.wait_for(process.communicate(), timeout=self.VERIFICATION_TIMEOUT) | |
| if process.returncode == 0: | |
| version = stdout.decode("utf-8", errors="replace").strip() | |
| if not version: | |
| version = stderr.decode("utf-8", errors="replace").strip() | |
| # Take only the first line to avoid multi-line version outputs | |
| version = version.split('\n')[0].strip() if version else "" | |
| logger.info(f"Agent {name} verified: {version[:100]}") | |
| return True, version[:200], None | |
| else: | |
| # Try --help as fallback | |
| if version_flag == "--version": | |
| return await self._verify_agent(name, command, "--help") | |
| error = stderr.decode("utf-8", errors="replace").strip() | |
| logger.warning(f"Agent {name} verification failed: {error}") | |
| return False, None, error[:200] | |
| except asyncio.TimeoutError: | |
| error = f"Agent {name} timed out during verification" | |
| logger.warning(error) | |
| return False, None, error | |
| except FileNotFoundError: | |
| error = f"Command not found: {cmd[0]}" | |
| logger.debug(error) | |
| return False, None, error | |
| except Exception as e: | |
| error = f"Failed to verify {name}: {str(e)}" | |
| logger.warning(error) | |
| return False, None, error | |
| async def discover_agents( | |
| self, | |
| force_refresh: bool = False, | |
| agents_to_check: list[str] | None = None, | |
| ) -> dict[str, AgentMetadata]: | |
| """Discover available agents on the system. | |
| Args: | |
| force_refresh: Force re-discovery even if cache exists | |
| agents_to_check: Specific agents to check (default: all known agents) | |
| Returns: | |
| Dictionary of agent name to metadata | |
| """ | |
| if not force_refresh and self._discovered_agents: | |
| logger.info("Using cached agent discovery results") | |
| return self._discovered_agents | |
| logger.info("Starting agent discovery...") | |
| # Determine which agents to check | |
| agents = agents_to_check or list(self.KNOWN_AGENTS.keys()) | |
| # Use a semaphore to limit concurrency on all platforms to avoid resource spikes | |
| # On Windows this is critical, on others it's just good practice | |
| concurrency_limit = 5 if platform.system() == "Windows" else 10 | |
| semaphore = asyncio.Semaphore(concurrency_limit) | |
| async def _bounded_discover(name: str, config: dict[str, Any]) -> AgentMetadata | Exception: | |
| async with semaphore: | |
| try: | |
| return await self._discover_single_agent(name, config) | |
| except Exception as e: | |
| logger.error(f"Discovery task failed for {name}: {e}") | |
| return e | |
| logger.debug(f"Running agent discovery in parallel (limit={concurrency_limit})") | |
| tasks = [] | |
| for name in agents: | |
| if name not in self.KNOWN_AGENTS: | |
| logger.warning(f"Unknown agent: {name}") | |
| continue | |
| config = self.KNOWN_AGENTS[name] | |
| tasks.append(_bounded_discover(name, config)) | |
| results = await asyncio.gather(*tasks, return_exceptions=True) | |
| # Process results | |
| for result in results: | |
| if isinstance(result, AgentMetadata): | |
| self._discovered_agents[result.name] = result | |
| elif isinstance(result, Exception): | |
| # Already logged in _bounded_discover | |
| pass | |
| # Save to cache | |
| self._save_cache() | |
| logger.info( | |
| f"Discovery complete: {sum(1 for a in self._discovered_agents.values() if a.available)}/{len(self._discovered_agents)} agents available" | |
| ) | |
| return self._discovered_agents | |
| async def _discover_single_agent( | |
| self, | |
| name: str, | |
| config: dict[str, Any], | |
| ) -> AgentMetadata: | |
| """Discover a single agent. | |
| Args: | |
| name: Agent name | |
| config: Agent configuration | |
| Returns: | |
| AgentMetadata for the agent | |
| """ | |
| command = config["command"] | |
| version_flag = config.get("version_flag", "--version") | |
| capabilities = config.get("capabilities", []) | |
| # Resolve command path | |
| path = self._resolve_command_path(command) | |
| if not path: | |
| return AgentMetadata( | |
| name=name, | |
| command=command, | |
| available=False, | |
| error_message=self._get_install_message(name), | |
| ) | |
| # On Windows, try to extract node script from .cmd files | |
| verify_command = command | |
| if platform.system() == "Windows" and isinstance(command, str): | |
| node_script = self._get_node_script_path(path) | |
| if node_script: | |
| node_exe, script_args = node_script | |
| verify_command = [node_exe] + script_args | |
| # Verify agent works | |
| available, version, error = await self._verify_agent(name, verify_command, version_flag) | |
| from datetime import datetime | |
| return AgentMetadata( | |
| name=name, | |
| command=command, | |
| version=version, | |
| available=available, | |
| path=path, | |
| error_message=error if not available else None, | |
| capabilities=capabilities if available else None, | |
| verified_at=datetime.utcnow().isoformat() if available else None, | |
| ) | |
| def _get_install_message(self, agent_name: str) -> str: | |
| """Get installation instructions for an agent. | |
| Args: | |
| agent_name: Name of the agent | |
| Returns: | |
| Installation instructions | |
| """ | |
| install_messages = { | |
| "claude": "Claude Code not found. Install with: npm install -g @anthropic/claude-code", | |
| "gemini": "Gemini CLI not found. Install with: npm install -g @google/gemini-cli", | |
| "aider": "Aider not found. Install with: pip install aider-chat", | |
| "copilot": "GitHub Copilot CLI not found. Install with: npm install -g @github/copilot", | |
| "qwen": "Qwen Code not found. Install with: npm install -g @qwen-code/qwen-code", | |
| } | |
| return install_messages.get( | |
| agent_name, | |
| f"{agent_name} not found. Check agent documentation for installation instructions.", | |
| ) | |
| def get_available_agents(self) -> list[AgentMetadata]: | |
| """Get list of available agents. | |
| Returns: | |
| List of available agent metadata | |
| """ | |
| return [agent for agent in self._discovered_agents.values() if agent.available] | |
| def get_unavailable_agents(self) -> list[AgentMetadata]: | |
| """Get list of unavailable agents. | |
| Returns: | |
| List of unavailable agent metadata | |
| """ | |
| return [agent for agent in self._discovered_agents.values() if not agent.available] | |
| def is_agent_available(self, name: str) -> bool: | |
| """Check if a specific agent is available. | |
| Args: | |
| name: Agent name | |
| Returns: | |
| True if agent is available | |
| """ | |
| agent = self._discovered_agents.get(name) | |
| return agent.available if agent else False | |
| def get_agent_metadata(self, name: str) -> AgentMetadata | None: | |
| """Get metadata for a specific agent. | |
| Args: | |
| name: Agent name | |
| Returns: | |
| Agent metadata or None if not found | |
| """ | |
| return self._discovered_agents.get(name) | |
| def get_discovery_summary(self) -> dict[str, Any]: | |
| """Get summary of discovery results. | |
| Returns: | |
| Dictionary with discovery summary | |
| """ | |
| available = self.get_available_agents() | |
| unavailable = self.get_unavailable_agents() | |
| return { | |
| "total_agents": len(self._discovered_agents), | |
| "available": len(available), | |
| "unavailable": len(unavailable), | |
| "available_agents": [ | |
| {"name": a.name, "version": a.version, "path": a.path} for a in available | |
| ], | |
| "unavailable_agents": [ | |
| {"name": a.name, "error": a.error_message} for a in unavailable | |
| ], | |
| "system_info": { | |
| "platform": platform.system(), | |
| "python_version": platform.python_version(), | |
| }, | |
| } | |
| def clear_cache(self) -> None: | |
| """Clear the discovery cache.""" | |
| self._discovered_agents.clear() | |
| if self.cache_file.exists(): | |
| self.cache_file.unlink() | |
| logger.info("Agent discovery cache cleared") | |
| async def discover_agents(force_refresh: bool = False) -> dict[str, AgentMetadata]: | |
| """Convenience function to discover agents. | |
| Args: | |
| force_refresh: Force re-discovery even if cache exists | |
| Returns: | |
| Dictionary of agent name to metadata | |
| """ | |
| discovery = AgentDiscovery() | |
| return await discovery.discover_agents(force_refresh=force_refresh) | |