multi-agent-mcp / tests /test_performance.py
Cduplar's picture
Initial public release: Multi-Agent MCP Delegation Server
8b02e7c
Raw
History Blame Contribute Delete
2.93 kB
"""Tests for performance optimizations."""
import pytest
import asyncio
import sqlite3
from pathlib import Path
import tempfile
from unittest.mock import MagicMock, patch
from datetime import datetime
from delegation_mcp.agent_discovery import AgentDiscovery, AgentMetadata
from delegation_mcp.persistence import PersistenceManager, AuditLogEntry, WorkflowState
@pytest.mark.asyncio
async def test_parallel_agent_discovery():
"""Test that agent discovery runs in parallel and respects semaphore."""
with tempfile.TemporaryDirectory() as tmpdir:
# Mock Path.home to pass validation
with patch("delegation_mcp.agent_discovery.Path.home", return_value=Path(tmpdir)):
discovery = AgentDiscovery()
# Mock _discover_single_agent to be slow
async def slow_discover(name, config):
await asyncio.sleep(0.1)
return AgentMetadata(name=name, command="test", available=True)
with patch.object(discovery, "_discover_single_agent", side_effect=slow_discover):
start_time = datetime.now()
await discovery.discover_agents(force_refresh=True)
duration = (datetime.now() - start_time).total_seconds()
# If sequential (5 agents * 0.1s = 0.5s), if parallel should be ~0.1s + overhead
# We expect it to be faster than sequential
assert duration < 0.4, f"Discovery took too long: {duration}s"
@pytest.mark.asyncio
async def test_async_persistence():
"""Test that persistence methods are async and don't block."""
with tempfile.TemporaryDirectory() as tmpdir:
db_path = Path(tmpdir) / "test.db"
persistence = PersistenceManager(db_path=db_path)
# Test async log_delegation
entry = AuditLogEntry(
client_id="test",
query="test query",
orchestrator="claude",
success=True,
duration=1.0,
output_size=100
)
# This should be awaitable
entry_id = await persistence.log_delegation(entry)
assert entry_id > 0
# Test async get_audit_logs
logs = await persistence.get_audit_logs()
assert len(logs) == 1
assert logs[0].id == entry_id
# Test async workflow state
state = WorkflowState(
workflow_name="test_workflow",
status="running"
)
state_id = await persistence.save_workflow_state(state)
assert state_id > 0
loaded_state = await persistence.load_workflow_state(state_id)
assert loaded_state.workflow_name == "test_workflow"
# Test async stats
await persistence.record_delegation_history("claude", True, 1.0)
stats = await persistence.get_statistics()
assert stats["total"] == 1