multi-agent-mcp / tests /test_agent_discovery.py
Cduplar's picture
Initial public release: Multi-Agent MCP Delegation Server
8b02e7c
Raw
History Blame Contribute Delete
14.3 kB
"""Tests for agent discovery module."""
import asyncio
import json
import pytest
from pathlib import Path
import tempfile
from unittest.mock import AsyncMock, MagicMock, patch
from delegation_mcp.agent_discovery import AgentDiscovery, AgentMetadata
@pytest.fixture
def temp_cache_file():
"""Create a temporary cache file with correct structure."""
with tempfile.TemporaryDirectory() as tmpdir:
home = Path(tmpdir)
cache_dir = home / ".cache" / "delegation-mcp"
cache_dir.mkdir(parents=True, exist_ok=True)
cache_file = cache_dir / "test_cache.json"
yield cache_file
@pytest.fixture
def discovery(temp_cache_file):
"""Create an AgentDiscovery instance with temporary cache."""
# Patch Path.home() to return the temp directory so validation passes
# temp_cache_file is inside temp_home/.cache/delegation-mcp
temp_home = temp_cache_file.parent.parent.parent
with patch("pathlib.Path.home", return_value=temp_home):
# We need to mock the cache_dir construction in __init__ to match
with patch("delegation_mcp.agent_discovery.Path.home", return_value=temp_home):
return AgentDiscovery(cache_file=temp_cache_file)
def test_agent_metadata_creation():
"""Test creating agent metadata."""
metadata = AgentMetadata(
name="claude",
command="claude",
version="1.0.0",
available=True,
path="/usr/local/bin/claude",
capabilities=["reasoning", "code_generation"],
)
assert metadata.name == "claude"
assert metadata.command == "claude"
assert metadata.version == "1.0.0"
assert metadata.available is True
assert metadata.path == "/usr/local/bin/claude"
assert "reasoning" in metadata.capabilities
def test_discovery_initialization(discovery, temp_cache_file):
"""Test AgentDiscovery initialization."""
assert discovery.cache_file == temp_cache_file
assert isinstance(discovery._discovered_agents, dict)
assert len(AgentDiscovery.KNOWN_AGENTS) > 0
def test_cache_save_and_load(discovery, temp_cache_file):
"""Test saving and loading discovery cache."""
# Add some test data
discovery._discovered_agents["test_agent"] = AgentMetadata(
name="test_agent",
command="test",
version="1.0.0",
available=True,
path="/usr/bin/test",
)
# Save cache
discovery._save_cache()
# Verify file exists
assert temp_cache_file.exists()
# Load cache in new instance
# We need to patch here too for the new instance
temp_home = temp_cache_file.parent.parent.parent
with patch("pathlib.Path.home", return_value=temp_home):
with patch("delegation_mcp.agent_discovery.Path.home", return_value=temp_home):
new_discovery = AgentDiscovery(cache_file=temp_cache_file)
assert "test_agent" in new_discovery._discovered_agents
assert new_discovery._discovered_agents["test_agent"].name == "test_agent"
assert new_discovery._discovered_agents["test_agent"].version == "1.0.0"
@pytest.mark.asyncio
async def test_resolve_command_path(discovery):
"""Test resolving command paths."""
# Mock shutil.which to return a path
with patch("shutil.which", return_value="/usr/bin/python3"):
path = discovery._resolve_command_path("python3")
assert path == "/usr/bin/python3"
# Test with command not found
with patch("shutil.which", return_value=None):
path = discovery._resolve_command_path("nonexistent_command")
assert path is None
@pytest.mark.asyncio
async def test_verify_agent_success(discovery):
"""Test successful agent verification."""
# Mock subprocess for successful verification
mock_process = AsyncMock()
mock_process.returncode = 0
mock_process.communicate = AsyncMock(
return_value=(b"claude version 1.0.0\n", b"")
)
mock_process.stdin = MagicMock() # Mock stdin as non-async
with patch("asyncio.create_subprocess_exec", return_value=mock_process):
available, version, error = await discovery._verify_agent(
"claude", "claude", "--version"
)
assert available is True
assert "1.0.0" in version
assert error is None
@pytest.mark.asyncio
async def test_verify_agent_failure(discovery):
"""Test failed agent verification."""
# Mock subprocess for failed verification
mock_process = AsyncMock()
mock_process.returncode = 1
mock_process.communicate = AsyncMock(
return_value=(b"", b"command not found\n")
)
mock_process.stdin = MagicMock() # Mock stdin as non-async
with patch("asyncio.create_subprocess_exec", return_value=mock_process):
# First call will fail, second with --help should also fail
available, version, error = await discovery._verify_agent(
"nonexistent", "nonexistent", "--version"
)
# The function retries with --help, so we need to mock that too
assert available is False or error is not None
@pytest.mark.asyncio
async def test_verify_agent_timeout(discovery):
"""Test agent verification timeout."""
# Mock subprocess that times out
mock_process = AsyncMock()
mock_process.communicate = AsyncMock(side_effect=asyncio.TimeoutError())
mock_process.stdin = MagicMock() # Mock stdin as non-async
with patch("asyncio.create_subprocess_exec", return_value=mock_process):
available, version, error = await discovery._verify_agent(
"slow_agent", "slow_agent", "--version"
)
assert available is False
assert version is None
assert "timed out" in error.lower()
@pytest.mark.asyncio
async def test_verify_agent_not_found(discovery):
"""Test agent verification when command not found."""
with patch(
"asyncio.create_subprocess_exec",
side_effect=FileNotFoundError("Command not found"),
):
available, version, error = await discovery._verify_agent(
"missing", "missing", "--version"
)
assert available is False
assert version is None
assert "not found" in error.lower()
@pytest.mark.asyncio
async def test_discover_single_agent(discovery):
"""Test discovering a single agent."""
config = {
"command": "python3",
"version_flag": "--version",
"capabilities": ["scripting", "general"],
}
# Mock shutil.which to return a path
with patch("shutil.which", return_value="/usr/bin/python3"):
# Mock verify_agent
with patch.object(
discovery,
"_verify_agent",
return_value=(True, "Python 3.9.0", None),
):
metadata = await discovery._discover_single_agent("python", config)
assert metadata.name == "python"
assert metadata.available is True
assert metadata.version == "Python 3.9.0"
assert metadata.path == "/usr/bin/python3"
assert "scripting" in metadata.capabilities
@pytest.mark.asyncio
async def test_discover_agents_with_cache(discovery, temp_cache_file):
"""Test agent discovery with caching."""
# Add cached data
discovery._discovered_agents["cached_agent"] = AgentMetadata(
name="cached_agent",
command="cached",
version="1.0.0",
available=True,
)
# First call should use cache
result = await discovery.discover_agents(force_refresh=False)
assert "cached_agent" in result
# Force refresh should re-discover
with patch.object(discovery, "_discover_single_agent") as mock_discover:
mock_discover.return_value = AgentMetadata(
name="test",
command="test",
version="2.0.0",
available=True,
)
result = await discovery.discover_agents(force_refresh=True)
# Should have called discover for all known agents
assert mock_discover.called
@pytest.mark.asyncio
async def test_discover_agents_parallel(discovery):
"""Test that agent discovery runs in parallel."""
# Mock _discover_single_agent to track calls
call_count = 0
async def mock_discover(name, config):
nonlocal call_count
call_count += 1
await asyncio.sleep(0.01) # Simulate some work
return AgentMetadata(
name=name,
command=config["command"],
available=False,
error_message="Not found",
)
with patch.object(discovery, "_discover_single_agent", side_effect=mock_discover):
await discovery.discover_agents(force_refresh=True)
# Should have discovered multiple agents
assert call_count == len(AgentDiscovery.KNOWN_AGENTS)
def test_get_available_agents(discovery):
"""Test getting list of available agents."""
discovery._discovered_agents = {
"agent1": AgentMetadata(name="agent1", command="a1", available=True),
"agent2": AgentMetadata(name="agent2", command="a2", available=False),
"agent3": AgentMetadata(name="agent3", command="a3", available=True),
}
available = discovery.get_available_agents()
assert len(available) == 2
assert all(agent.available for agent in available)
def test_get_unavailable_agents(discovery):
"""Test getting list of unavailable agents."""
discovery._discovered_agents = {
"agent1": AgentMetadata(name="agent1", command="a1", available=True),
"agent2": AgentMetadata(name="agent2", command="a2", available=False),
"agent3": AgentMetadata(name="agent3", command="a3", available=False),
}
unavailable = discovery.get_unavailable_agents()
assert len(unavailable) == 2
assert all(not agent.available for agent in unavailable)
def test_is_agent_available(discovery):
"""Test checking if specific agent is available."""
discovery._discovered_agents = {
"claude": AgentMetadata(name="claude", command="claude", available=True),
"gemini": AgentMetadata(name="gemini", command="gemini", available=False),
}
assert discovery.is_agent_available("claude") is True
assert discovery.is_agent_available("gemini") is False
assert discovery.is_agent_available("nonexistent") is False
def test_get_agent_metadata(discovery):
"""Test getting metadata for specific agent."""
metadata = AgentMetadata(
name="claude",
command="claude",
version="1.0.0",
available=True,
)
discovery._discovered_agents["claude"] = metadata
result = discovery.get_agent_metadata("claude")
assert result == metadata
result = discovery.get_agent_metadata("nonexistent")
assert result is None
def test_get_discovery_summary(discovery):
"""Test getting discovery summary."""
discovery._discovered_agents = {
"agent1": AgentMetadata(
name="agent1",
command="a1",
version="1.0.0",
available=True,
path="/usr/bin/a1",
),
"agent2": AgentMetadata(
name="agent2",
command="a2",
available=False,
error_message="Not found",
),
}
summary = discovery.get_discovery_summary()
assert summary["total_agents"] == 2
assert summary["available"] == 1
assert summary["unavailable"] == 1
assert len(summary["available_agents"]) == 1
assert len(summary["unavailable_agents"]) == 1
assert summary["available_agents"][0]["name"] == "agent1"
assert summary["unavailable_agents"][0]["name"] == "agent2"
assert "system_info" in summary
def test_clear_cache(discovery, temp_cache_file):
"""Test clearing the discovery cache."""
# Add some data and save
discovery._discovered_agents["test"] = AgentMetadata(
name="test",
command="test",
available=True,
)
discovery._save_cache()
assert temp_cache_file.exists()
assert len(discovery._discovered_agents) > 0
# Clear cache
discovery.clear_cache()
assert len(discovery._discovered_agents) == 0
assert not temp_cache_file.exists()
def test_get_install_message(discovery):
"""Test getting installation instructions."""
message = discovery._get_install_message("claude")
assert "Claude Code" in message
assert "install" in message.lower()
message = discovery._get_install_message("gemini")
assert "Gemini" in message
assert "npm install" in message.lower()
message = discovery._get_install_message("unknown_agent")
assert "unknown_agent" in message
assert "documentation" in message.lower()
@pytest.mark.asyncio
async def test_discover_agents_convenience_function():
"""Test the convenience function for discovering agents."""
from delegation_mcp.agent_discovery import discover_agents
with patch("delegation_mcp.agent_discovery.AgentDiscovery") as mock_class:
mock_instance = MagicMock()
mock_instance.discover_agents = AsyncMock(return_value={})
mock_class.return_value = mock_instance
result = await discover_agents(force_refresh=True)
mock_instance.discover_agents.assert_called_once_with(force_refresh=True)
@pytest.mark.asyncio
async def test_discover_specific_agents(discovery):
"""Test discovering only specific agents."""
agents_to_check = ["claude", "gemini"]
with patch.object(discovery, "_discover_single_agent") as mock_discover:
mock_discover.return_value = AgentMetadata(
name="test",
command="test",
available=False,
)
await discovery.discover_agents(
force_refresh=True,
agents_to_check=agents_to_check,
)
# Should only call discover for specified agents
assert mock_discover.call_count == len(agents_to_check)
def test_known_agents_structure():
"""Test that KNOWN_AGENTS has expected structure."""
for name, config in AgentDiscovery.KNOWN_AGENTS.items():
assert "command" in config
assert "version_flag" in config
assert "capabilities" in config
assert isinstance(config["capabilities"], list)
assert len(config["capabilities"]) > 0