Spaces:
Sleeping
Sleeping
| """Tests for workflow engine.""" | |
| import pytest | |
| from pathlib import Path | |
| from delegation_mcp.workflow import ( | |
| WorkflowDefinition, | |
| WorkflowStep, | |
| WorkflowContext, | |
| WorkflowEngine, | |
| ) | |
| from delegation_mcp.config import OrchestratorConfig | |
| from delegation_mcp.orchestrator import OrchestratorRegistry | |
| def simple_workflow(): | |
| """Create a simple test workflow.""" | |
| return WorkflowDefinition( | |
| name="Test Workflow", | |
| description="Simple test workflow", | |
| steps=[ | |
| WorkflowStep( | |
| id="step1", | |
| agent="claude", | |
| task="Analyze {{ code_path }}", | |
| output="analysis", | |
| description="First step", | |
| ), | |
| WorkflowStep( | |
| id="step2", | |
| agent="gemini", | |
| task="Review: {{ analysis }}", | |
| output="review", | |
| condition="{{ analysis | length > 0 }}", | |
| description="Second step", | |
| ), | |
| ], | |
| ) | |
| def test_registry(): | |
| """Create test registry with mock commands.""" | |
| reg = OrchestratorRegistry() | |
| reg.register( | |
| OrchestratorConfig( | |
| name="claude", | |
| command="echo", | |
| args=["[CLAUDE]"], | |
| enabled=True, | |
| ) | |
| ) | |
| reg.register( | |
| OrchestratorConfig( | |
| name="gemini", | |
| command="echo", | |
| args=["[GEMINI]"], | |
| enabled=True, | |
| ) | |
| ) | |
| return reg | |
| def workflow_engine(test_registry): | |
| """Create workflow engine.""" | |
| return WorkflowEngine(test_registry) | |
| def test_workflow_context(): | |
| """Test workflow context variable management.""" | |
| context = WorkflowContext() | |
| # Set and get variables | |
| context.set("foo", "bar") | |
| assert context.get("foo") == "bar" | |
| assert context.get("missing") is None | |
| assert context.get("missing", "default") == "default" | |
| def test_context_interpolation(): | |
| """Test variable interpolation in templates.""" | |
| context = WorkflowContext() | |
| context.set("name", "World") | |
| context.set("count", 42) | |
| result = context.interpolate("Hello {{ name }}! Count: {{ count }}") | |
| assert result == "Hello World! Count: 42" | |
| def test_context_condition_evaluation(): | |
| """Test condition evaluation.""" | |
| context = WorkflowContext() | |
| # Test truthy/falsy | |
| context.set("exists", "value") | |
| assert context.evaluate_condition("{{ exists }}") is True | |
| context.set("empty", "") | |
| assert context.evaluate_condition("{{ empty }}") is False | |
| # Test length conditions | |
| context.set("items", [1, 2, 3]) | |
| assert context.evaluate_condition("{{ items | length > 0 }}") is True | |
| assert context.evaluate_condition("{{ items | length > 5 }}") is False | |
| context.set("text", "hello") | |
| assert context.evaluate_condition("{{ text | length > 0 }}") is True | |
| def test_workflow_serialization(simple_workflow, tmp_path): | |
| """Test workflow save/load.""" | |
| workflow_file = tmp_path / "test.yaml" | |
| # Save | |
| simple_workflow.to_yaml(workflow_file) | |
| assert workflow_file.exists() | |
| # Load | |
| loaded = WorkflowDefinition.from_yaml(workflow_file) | |
| assert loaded.name == simple_workflow.name | |
| assert len(loaded.steps) == len(simple_workflow.steps) | |
| assert loaded.steps[0].id == "step1" | |
| async def test_workflow_execution(workflow_engine, simple_workflow): | |
| """Test basic workflow execution.""" | |
| result = await workflow_engine.execute( | |
| simple_workflow, | |
| initial_context={"code_path": "test.py"} | |
| ) | |
| assert result is not None | |
| assert result.workflow_name == "Test Workflow" | |
| assert result.steps_completed == 2 | |
| assert result.total_steps == 2 | |
| assert result.success is True | |
| assert "analysis" in result.outputs | |
| assert "review" in result.outputs | |
| async def test_workflow_conditional_skip(workflow_engine): | |
| """Test conditional step skipping.""" | |
| workflow = WorkflowDefinition( | |
| name="Conditional Test", | |
| steps=[ | |
| WorkflowStep( | |
| id="always", | |
| agent="claude", | |
| task="Always run", | |
| output="result1", | |
| ), | |
| WorkflowStep( | |
| id="never", | |
| agent="gemini", | |
| task="Never run", | |
| output="result2", | |
| condition="{{ missing_var }}", | |
| ), | |
| ], | |
| ) | |
| result = await workflow_engine.execute(workflow) | |
| # Only first step should execute | |
| assert result.steps_completed == 1 | |
| assert "result1" in result.outputs | |
| assert "result2" not in result.outputs | |
| async def test_workflow_error_handling(test_registry): | |
| """Test workflow error handling.""" | |
| # Register a failing command | |
| test_registry.register( | |
| OrchestratorConfig( | |
| name="failing", | |
| command="false", # Command that always fails | |
| args=[], | |
| enabled=True, | |
| ) | |
| ) | |
| engine = WorkflowEngine(test_registry) | |
| workflow = WorkflowDefinition( | |
| name="Error Test", | |
| steps=[ | |
| WorkflowStep( | |
| id="fail", | |
| agent="failing", | |
| task="This will fail", | |
| output="result", | |
| ), | |
| WorkflowStep( | |
| id="never_reached", | |
| agent="claude", | |
| task="Should not execute", | |
| output="result2", | |
| ), | |
| ], | |
| ) | |
| result = await engine.execute(workflow) | |
| # Should stop on first error | |
| assert result.success is False | |
| assert result.steps_completed == 0 # Failed step doesn't count as completed | |
| assert len(result.errors) > 0 | |
| async def test_load_actual_workflows(workflow_engine): | |
| """Test loading actual workflow files.""" | |
| workflows_dir = Path("workflows") | |
| if not workflows_dir.exists(): | |
| pytest.skip("Workflows directory not found") | |
| workflows = workflow_engine.list_workflows(workflows_dir) | |
| # Should load at least some workflows | |
| assert len(workflows) > 0 | |
| # Check first workflow is valid | |
| workflow = workflows[0] | |
| assert workflow.name | |
| assert len(workflow.steps) > 0 | |
| assert all(step.agent for step in workflow.steps) | |
| assert all(step.task for step in workflow.steps) | |