Spaces:
Sleeping
Sleeping
| """Integration tests for delegation MCP server.""" | |
| import pytest | |
| import asyncio | |
| from pathlib import Path | |
| from delegation_mcp.config import DelegationConfig, OrchestratorConfig, DelegationRule | |
| from delegation_mcp.orchestrator import OrchestratorRegistry | |
| from delegation_mcp.delegation import DelegationEngine | |
| def test_config(): | |
| """Create test configuration with mock commands.""" | |
| return DelegationConfig( | |
| orchestrator="claude", | |
| orchestrators={ | |
| "claude": OrchestratorConfig( | |
| name="claude", | |
| command="echo", | |
| args=["[CLAUDE]"], | |
| enabled=True, | |
| timeout=5, | |
| ), | |
| "gemini": OrchestratorConfig( | |
| name="gemini", | |
| command="echo", | |
| args=["[GEMINI]"], | |
| enabled=True, | |
| timeout=5, | |
| ), | |
| "copilot": OrchestratorConfig( | |
| name="copilot", | |
| command="echo", | |
| args=["[COPILOT]"], | |
| enabled=True, | |
| timeout=5, | |
| ), | |
| "aider": OrchestratorConfig( | |
| name="aider", | |
| command="echo", | |
| args=["[AIDER]"], | |
| enabled=True, | |
| timeout=5, | |
| ), | |
| }, | |
| rules=[ | |
| DelegationRule( | |
| pattern="security|vulnerability|audit", | |
| delegate_to="gemini", | |
| priority=5, | |
| description="Security tasks to Gemini", | |
| ), | |
| DelegationRule( | |
| pattern="refactor|architect", | |
| delegate_to="aider", | |
| priority=5, | |
| description="Refactoring to Aider", | |
| ), | |
| DelegationRule( | |
| pattern="github|pull request|pr", | |
| delegate_to="copilot", | |
| priority=5, | |
| description="GitHub tasks to Copilot", | |
| ), | |
| ], | |
| auto_approve=True, | |
| log_delegations=True, | |
| ) | |
| def registry(test_config): | |
| """Create test registry.""" | |
| reg = OrchestratorRegistry() | |
| for orch_config in test_config.orchestrators.values(): | |
| reg.register(orch_config) | |
| return reg | |
| def engine(test_config, registry): | |
| """Create test engine.""" | |
| return DelegationEngine(test_config, registry) | |
| async def test_basic_delegation(engine): | |
| """Test basic task delegation.""" | |
| result = await engine.process("Run security audit") | |
| assert result is not None | |
| assert result.orchestrator == "claude" | |
| assert result.delegated_to == "gemini" | |
| assert result.success is True | |
| assert "[GEMINI]" in result.output | |
| async def test_refactor_delegation(engine): | |
| """Test refactoring delegation.""" | |
| result = await engine.process("Refactor the authentication module") | |
| assert result.delegated_to == "aider" | |
| assert "[AIDER]" in result.output | |
| assert result.success is True | |
| async def test_github_delegation(engine): | |
| """Test GitHub delegation.""" | |
| result = await engine.process("Create a pull request for feature X") | |
| assert result.delegated_to == "copilot" | |
| assert "[COPILOT]" in result.output | |
| assert result.success is True | |
| async def test_no_delegation(engine): | |
| """Test task without matching rule.""" | |
| result = await engine.process("Explain how async works in Python") | |
| assert result.delegated_to is None | |
| assert result.orchestrator == "claude" | |
| assert "[CLAUDE]" in result.output | |
| assert result.success is True | |
| async def test_force_delegation(engine): | |
| """Test forcing delegation to specific agent.""" | |
| result = await engine.process( | |
| "Any task", | |
| force_delegate="gemini" | |
| ) | |
| assert result.delegated_to == "gemini" | |
| assert "[GEMINI]" in result.output | |
| assert result.success is True | |
| async def test_statistics(engine): | |
| """Test delegation statistics.""" | |
| # Run several delegations | |
| await engine.process("Run security audit") | |
| await engine.process("Refactor code") | |
| await engine.process("Explain Python") | |
| stats = engine.get_statistics() | |
| assert stats["total"] == 3 | |
| assert stats["delegations"] == 2 | |
| assert stats["delegation_rate"] > 0 | |
| assert stats["success_rate"] == 100.0 | |
| def test_registry_validation(registry): | |
| """Test orchestrator registry validation.""" | |
| # All should be available since we're using 'echo' | |
| availability = registry.validate_all() | |
| assert "claude" in availability | |
| assert "gemini" in availability | |
| # echo command should be available | |
| assert availability["claude"] is True | |
| async def test_timeout_handling(registry): | |
| """Test timeout handling.""" | |
| # Create a long-running command that actually takes time | |
| # Use python -c to simulate a long running task | |
| long_config = OrchestratorConfig( | |
| name="slow", | |
| command="python", | |
| args=["-c", "import time; time.sleep(10); print('done')"], | |
| enabled=True, | |
| timeout=1, # 1 second timeout | |
| ) | |
| registry.register(long_config) | |
| with pytest.raises(TimeoutError): | |
| await registry.execute("slow", "") | |
| def test_config_serialization(test_config, tmp_path): | |
| """Test config save/load.""" | |
| config_file = tmp_path / "test_config.yaml" | |
| # Save | |
| test_config.to_yaml(config_file) | |
| assert config_file.exists() | |
| # Load | |
| loaded_config = DelegationConfig.from_yaml(config_file) | |
| assert loaded_config.orchestrator == test_config.orchestrator | |
| assert len(loaded_config.rules) == len(test_config.rules) | |
| assert len(loaded_config.orchestrators) == len(test_config.orchestrators) | |