Spaces:
Running
Running
| """Tests for agent discovery module.""" | |
| import asyncio | |
| import json | |
| import pytest | |
| from pathlib import Path | |
| import tempfile | |
| from unittest.mock import AsyncMock, MagicMock, patch | |
| from delegation_mcp.agent_discovery import AgentDiscovery, AgentMetadata | |
| def temp_cache_file(): | |
| """Create a temporary cache file with correct structure.""" | |
| with tempfile.TemporaryDirectory() as tmpdir: | |
| home = Path(tmpdir) | |
| cache_dir = home / ".cache" / "delegation-mcp" | |
| cache_dir.mkdir(parents=True, exist_ok=True) | |
| cache_file = cache_dir / "test_cache.json" | |
| yield cache_file | |
| def discovery(temp_cache_file): | |
| """Create an AgentDiscovery instance with temporary cache.""" | |
| # Patch Path.home() to return the temp directory so validation passes | |
| # temp_cache_file is inside temp_home/.cache/delegation-mcp | |
| temp_home = temp_cache_file.parent.parent.parent | |
| with patch("pathlib.Path.home", return_value=temp_home): | |
| # We need to mock the cache_dir construction in __init__ to match | |
| with patch("delegation_mcp.agent_discovery.Path.home", return_value=temp_home): | |
| return AgentDiscovery(cache_file=temp_cache_file) | |
| def test_agent_metadata_creation(): | |
| """Test creating agent metadata.""" | |
| metadata = AgentMetadata( | |
| name="claude", | |
| command="claude", | |
| version="1.0.0", | |
| available=True, | |
| path="/usr/local/bin/claude", | |
| capabilities=["reasoning", "code_generation"], | |
| ) | |
| assert metadata.name == "claude" | |
| assert metadata.command == "claude" | |
| assert metadata.version == "1.0.0" | |
| assert metadata.available is True | |
| assert metadata.path == "/usr/local/bin/claude" | |
| assert "reasoning" in metadata.capabilities | |
| def test_discovery_initialization(discovery, temp_cache_file): | |
| """Test AgentDiscovery initialization.""" | |
| assert discovery.cache_file == temp_cache_file | |
| assert isinstance(discovery._discovered_agents, dict) | |
| assert len(AgentDiscovery.KNOWN_AGENTS) > 0 | |
| def test_cache_save_and_load(discovery, temp_cache_file): | |
| """Test saving and loading discovery cache.""" | |
| # Add some test data | |
| discovery._discovered_agents["test_agent"] = AgentMetadata( | |
| name="test_agent", | |
| command="test", | |
| version="1.0.0", | |
| available=True, | |
| path="/usr/bin/test", | |
| ) | |
| # Save cache | |
| discovery._save_cache() | |
| # Verify file exists | |
| assert temp_cache_file.exists() | |
| # Load cache in new instance | |
| # We need to patch here too for the new instance | |
| temp_home = temp_cache_file.parent.parent.parent | |
| with patch("pathlib.Path.home", return_value=temp_home): | |
| with patch("delegation_mcp.agent_discovery.Path.home", return_value=temp_home): | |
| new_discovery = AgentDiscovery(cache_file=temp_cache_file) | |
| assert "test_agent" in new_discovery._discovered_agents | |
| assert new_discovery._discovered_agents["test_agent"].name == "test_agent" | |
| assert new_discovery._discovered_agents["test_agent"].version == "1.0.0" | |
| async def test_resolve_command_path(discovery): | |
| """Test resolving command paths.""" | |
| # Mock shutil.which to return a path | |
| with patch("shutil.which", return_value="/usr/bin/python3"): | |
| path = discovery._resolve_command_path("python3") | |
| assert path == "/usr/bin/python3" | |
| # Test with command not found | |
| with patch("shutil.which", return_value=None): | |
| path = discovery._resolve_command_path("nonexistent_command") | |
| assert path is None | |
| async def test_verify_agent_success(discovery): | |
| """Test successful agent verification.""" | |
| # Mock subprocess for successful verification | |
| mock_process = AsyncMock() | |
| mock_process.returncode = 0 | |
| mock_process.communicate = AsyncMock( | |
| return_value=(b"claude version 1.0.0\n", b"") | |
| ) | |
| mock_process.stdin = MagicMock() # Mock stdin as non-async | |
| with patch("asyncio.create_subprocess_exec", return_value=mock_process): | |
| available, version, error = await discovery._verify_agent( | |
| "claude", "claude", "--version" | |
| ) | |
| assert available is True | |
| assert "1.0.0" in version | |
| assert error is None | |
| async def test_verify_agent_failure(discovery): | |
| """Test failed agent verification.""" | |
| # Mock subprocess for failed verification | |
| mock_process = AsyncMock() | |
| mock_process.returncode = 1 | |
| mock_process.communicate = AsyncMock( | |
| return_value=(b"", b"command not found\n") | |
| ) | |
| mock_process.stdin = MagicMock() # Mock stdin as non-async | |
| with patch("asyncio.create_subprocess_exec", return_value=mock_process): | |
| # First call will fail, second with --help should also fail | |
| available, version, error = await discovery._verify_agent( | |
| "nonexistent", "nonexistent", "--version" | |
| ) | |
| # The function retries with --help, so we need to mock that too | |
| assert available is False or error is not None | |
| async def test_verify_agent_timeout(discovery): | |
| """Test agent verification timeout.""" | |
| # Mock subprocess that times out | |
| mock_process = AsyncMock() | |
| mock_process.communicate = AsyncMock(side_effect=asyncio.TimeoutError()) | |
| mock_process.stdin = MagicMock() # Mock stdin as non-async | |
| with patch("asyncio.create_subprocess_exec", return_value=mock_process): | |
| available, version, error = await discovery._verify_agent( | |
| "slow_agent", "slow_agent", "--version" | |
| ) | |
| assert available is False | |
| assert version is None | |
| assert "timed out" in error.lower() | |
| async def test_verify_agent_not_found(discovery): | |
| """Test agent verification when command not found.""" | |
| with patch( | |
| "asyncio.create_subprocess_exec", | |
| side_effect=FileNotFoundError("Command not found"), | |
| ): | |
| available, version, error = await discovery._verify_agent( | |
| "missing", "missing", "--version" | |
| ) | |
| assert available is False | |
| assert version is None | |
| assert "not found" in error.lower() | |
| async def test_discover_single_agent(discovery): | |
| """Test discovering a single agent.""" | |
| config = { | |
| "command": "python3", | |
| "version_flag": "--version", | |
| "capabilities": ["scripting", "general"], | |
| } | |
| # Mock shutil.which to return a path | |
| with patch("shutil.which", return_value="/usr/bin/python3"): | |
| # Mock verify_agent | |
| with patch.object( | |
| discovery, | |
| "_verify_agent", | |
| return_value=(True, "Python 3.9.0", None), | |
| ): | |
| metadata = await discovery._discover_single_agent("python", config) | |
| assert metadata.name == "python" | |
| assert metadata.available is True | |
| assert metadata.version == "Python 3.9.0" | |
| assert metadata.path == "/usr/bin/python3" | |
| assert "scripting" in metadata.capabilities | |
| async def test_discover_agents_with_cache(discovery, temp_cache_file): | |
| """Test agent discovery with caching.""" | |
| # Add cached data | |
| discovery._discovered_agents["cached_agent"] = AgentMetadata( | |
| name="cached_agent", | |
| command="cached", | |
| version="1.0.0", | |
| available=True, | |
| ) | |
| # First call should use cache | |
| result = await discovery.discover_agents(force_refresh=False) | |
| assert "cached_agent" in result | |
| # Force refresh should re-discover | |
| with patch.object(discovery, "_discover_single_agent") as mock_discover: | |
| mock_discover.return_value = AgentMetadata( | |
| name="test", | |
| command="test", | |
| version="2.0.0", | |
| available=True, | |
| ) | |
| result = await discovery.discover_agents(force_refresh=True) | |
| # Should have called discover for all known agents | |
| assert mock_discover.called | |
| async def test_discover_agents_parallel(discovery): | |
| """Test that agent discovery runs in parallel.""" | |
| # Mock _discover_single_agent to track calls | |
| call_count = 0 | |
| async def mock_discover(name, config): | |
| nonlocal call_count | |
| call_count += 1 | |
| await asyncio.sleep(0.01) # Simulate some work | |
| return AgentMetadata( | |
| name=name, | |
| command=config["command"], | |
| available=False, | |
| error_message="Not found", | |
| ) | |
| with patch.object(discovery, "_discover_single_agent", side_effect=mock_discover): | |
| await discovery.discover_agents(force_refresh=True) | |
| # Should have discovered multiple agents | |
| assert call_count == len(AgentDiscovery.KNOWN_AGENTS) | |
| def test_get_available_agents(discovery): | |
| """Test getting list of available agents.""" | |
| discovery._discovered_agents = { | |
| "agent1": AgentMetadata(name="agent1", command="a1", available=True), | |
| "agent2": AgentMetadata(name="agent2", command="a2", available=False), | |
| "agent3": AgentMetadata(name="agent3", command="a3", available=True), | |
| } | |
| available = discovery.get_available_agents() | |
| assert len(available) == 2 | |
| assert all(agent.available for agent in available) | |
| def test_get_unavailable_agents(discovery): | |
| """Test getting list of unavailable agents.""" | |
| discovery._discovered_agents = { | |
| "agent1": AgentMetadata(name="agent1", command="a1", available=True), | |
| "agent2": AgentMetadata(name="agent2", command="a2", available=False), | |
| "agent3": AgentMetadata(name="agent3", command="a3", available=False), | |
| } | |
| unavailable = discovery.get_unavailable_agents() | |
| assert len(unavailable) == 2 | |
| assert all(not agent.available for agent in unavailable) | |
| def test_is_agent_available(discovery): | |
| """Test checking if specific agent is available.""" | |
| discovery._discovered_agents = { | |
| "claude": AgentMetadata(name="claude", command="claude", available=True), | |
| "gemini": AgentMetadata(name="gemini", command="gemini", available=False), | |
| } | |
| assert discovery.is_agent_available("claude") is True | |
| assert discovery.is_agent_available("gemini") is False | |
| assert discovery.is_agent_available("nonexistent") is False | |
| def test_get_agent_metadata(discovery): | |
| """Test getting metadata for specific agent.""" | |
| metadata = AgentMetadata( | |
| name="claude", | |
| command="claude", | |
| version="1.0.0", | |
| available=True, | |
| ) | |
| discovery._discovered_agents["claude"] = metadata | |
| result = discovery.get_agent_metadata("claude") | |
| assert result == metadata | |
| result = discovery.get_agent_metadata("nonexistent") | |
| assert result is None | |
| def test_get_discovery_summary(discovery): | |
| """Test getting discovery summary.""" | |
| discovery._discovered_agents = { | |
| "agent1": AgentMetadata( | |
| name="agent1", | |
| command="a1", | |
| version="1.0.0", | |
| available=True, | |
| path="/usr/bin/a1", | |
| ), | |
| "agent2": AgentMetadata( | |
| name="agent2", | |
| command="a2", | |
| available=False, | |
| error_message="Not found", | |
| ), | |
| } | |
| summary = discovery.get_discovery_summary() | |
| assert summary["total_agents"] == 2 | |
| assert summary["available"] == 1 | |
| assert summary["unavailable"] == 1 | |
| assert len(summary["available_agents"]) == 1 | |
| assert len(summary["unavailable_agents"]) == 1 | |
| assert summary["available_agents"][0]["name"] == "agent1" | |
| assert summary["unavailable_agents"][0]["name"] == "agent2" | |
| assert "system_info" in summary | |
| def test_clear_cache(discovery, temp_cache_file): | |
| """Test clearing the discovery cache.""" | |
| # Add some data and save | |
| discovery._discovered_agents["test"] = AgentMetadata( | |
| name="test", | |
| command="test", | |
| available=True, | |
| ) | |
| discovery._save_cache() | |
| assert temp_cache_file.exists() | |
| assert len(discovery._discovered_agents) > 0 | |
| # Clear cache | |
| discovery.clear_cache() | |
| assert len(discovery._discovered_agents) == 0 | |
| assert not temp_cache_file.exists() | |
| def test_get_install_message(discovery): | |
| """Test getting installation instructions.""" | |
| message = discovery._get_install_message("claude") | |
| assert "Claude Code" in message | |
| assert "install" in message.lower() | |
| message = discovery._get_install_message("gemini") | |
| assert "Gemini" in message | |
| assert "npm install" in message.lower() | |
| message = discovery._get_install_message("unknown_agent") | |
| assert "unknown_agent" in message | |
| assert "documentation" in message.lower() | |
| async def test_discover_agents_convenience_function(): | |
| """Test the convenience function for discovering agents.""" | |
| from delegation_mcp.agent_discovery import discover_agents | |
| with patch("delegation_mcp.agent_discovery.AgentDiscovery") as mock_class: | |
| mock_instance = MagicMock() | |
| mock_instance.discover_agents = AsyncMock(return_value={}) | |
| mock_class.return_value = mock_instance | |
| result = await discover_agents(force_refresh=True) | |
| mock_instance.discover_agents.assert_called_once_with(force_refresh=True) | |
| async def test_discover_specific_agents(discovery): | |
| """Test discovering only specific agents.""" | |
| agents_to_check = ["claude", "gemini"] | |
| with patch.object(discovery, "_discover_single_agent") as mock_discover: | |
| mock_discover.return_value = AgentMetadata( | |
| name="test", | |
| command="test", | |
| available=False, | |
| ) | |
| await discovery.discover_agents( | |
| force_refresh=True, | |
| agents_to_check=agents_to_check, | |
| ) | |
| # Should only call discover for specified agents | |
| assert mock_discover.call_count == len(agents_to_check) | |
| def test_known_agents_structure(): | |
| """Test that KNOWN_AGENTS has expected structure.""" | |
| for name, config in AgentDiscovery.KNOWN_AGENTS.items(): | |
| assert "command" in config | |
| assert "version_flag" in config | |
| assert "capabilities" in config | |
| assert isinstance(config["capabilities"], list) | |
| assert len(config["capabilities"]) > 0 | |