Spaces:
Running
Running
| """Delegation engine for routing tasks to orchestrators.""" | |
| import re | |
| import logging | |
| from typing import Any | |
| from datetime import datetime | |
| from .config import DelegationConfig, DelegationRule | |
| from .orchestrator import OrchestratorRegistry | |
| from .retry import retry_with_backoff | |
| from .logging_config import delegation_logger | |
| logger = logging.getLogger(__name__) | |
| class DelegationResult: | |
| """Result of a delegation operation.""" | |
| def __init__( | |
| self, | |
| query: str, | |
| orchestrator: str, | |
| delegated_to: str | None, | |
| rule: DelegationRule | None, | |
| output: str, | |
| error: str, | |
| success: bool, | |
| duration: float, | |
| ): | |
| self.query = query | |
| self.orchestrator = orchestrator | |
| self.delegated_to = delegated_to | |
| self.rule = rule | |
| self.output = output | |
| self.error = error | |
| self.success = success | |
| self.duration = duration | |
| self.timestamp = datetime.now() | |
| def __repr__(self) -> str: | |
| delegation = f" -> {self.delegated_to}" if self.delegated_to else "" | |
| return f"<DelegationResult {self.orchestrator}{delegation}: {self.success}>" | |
| class DelegationEngine: | |
| """Engine for delegating tasks based on rules.""" | |
| def __init__(self, config: DelegationConfig, registry: OrchestratorRegistry): | |
| self.config = config | |
| self.registry = registry | |
| self.history: list[DelegationResult] = [] | |
| async def _execute_with_fallback( | |
| self, | |
| query: str, | |
| ranked_agents: list[str], | |
| tried_agents: list[str] = None, | |
| progress_callback: Any = None, | |
| timeout: int | None = None, | |
| ) -> tuple[str, str, str, int]: | |
| """ | |
| Execute query with automatic fallback to next best agent on failure. | |
| Returns: | |
| tuple: (target_agent, stdout, stderr, returncode) | |
| """ | |
| if tried_agents is None: | |
| tried_agents = [] | |
| for agent in ranked_agents: | |
| if agent in tried_agents: | |
| continue | |
| try: | |
| logger.info(f"Executing: {agent}") | |
| stdout, stderr, returncode = await self.registry.execute( | |
| agent, query, timeout=timeout, progress_callback=progress_callback | |
| ) | |
| if returncode == 0: | |
| logger.info(f"Success: {agent} completed task") | |
| return agent, stdout, stderr, returncode | |
| # Failed but can try fallback | |
| logger.warning(f"Fallback: {agent} failed (rc={returncode}) β trying next agent") | |
| tried_agents.append(agent) | |
| except (TimeoutError, RuntimeError, Exception) as e: | |
| error_type = type(e).__name__ | |
| logger.warning(f"Fallback: {agent} error ({error_type}) β trying next agent") | |
| tried_agents.append(agent) | |
| continue | |
| # All agents failed | |
| raise RuntimeError(f"All agents failed. Tried: {', '.join(tried_agents)}") | |
| async def process( | |
| self, | |
| query: str, | |
| force_delegate: str | None = None, | |
| progress_callback: Any = None, | |
| guidance_only: bool = False, | |
| ) -> DelegationResult: | |
| """ | |
| Process a query with delegation logic and automatic fallback. | |
| Args: | |
| query: User query/task | |
| force_delegate: Force delegation to specific orchestrator | |
| progress_callback: Optional async callback for progress reporting | |
| guidance_only: If True, return routing guidance without executing | |
| Returns: | |
| DelegationResult | |
| """ | |
| start = datetime.now() | |
| orchestrator = "claude" | |
| # Determine delegation and get ranked agents | |
| target, rule = self._determine_delegation(query, force_delegate) | |
| # If guidance_only mode, return routing recommendation without executing | |
| if guidance_only: | |
| if target == orchestrator: | |
| return DelegationResult( | |
| query=query, | |
| orchestrator=orchestrator, | |
| delegated_to=None, | |
| rule=rule, | |
| output="HANDLE_DIRECTLY", | |
| error="", | |
| success=True, | |
| duration=0.0, | |
| ) | |
| else: | |
| return DelegationResult( | |
| query=query, | |
| orchestrator=orchestrator, | |
| delegated_to=target, | |
| rule=rule, | |
| output=f"DELEGATE_TO: {target}", | |
| error="", | |
| success=True, | |
| duration=0.0, | |
| ) | |
| # Get full ranking for fallback | |
| if force_delegate: | |
| ranked_agents = [force_delegate] | |
| else: | |
| ranked_agents = self._rank_by_capabilities(query) | |
| # Ensure target is executed first | |
| if target in ranked_agents: | |
| ranked_agents.remove(target) | |
| ranked_agents.insert(0, target) | |
| if not ranked_agents: | |
| ranked_agents = [orchestrator] | |
| # Get recommended timeout based on task classification | |
| _, recommended_timeout = self._classify_task(query) | |
| # Log delegation start | |
| delegated_to = target if target != orchestrator else None | |
| delegation_logger.delegation_start(orchestrator, query, delegated_to) | |
| # Execute with fallback | |
| try: | |
| actual_agent, stdout, stderr, returncode = await self._execute_with_fallback( | |
| query, ranked_agents, progress_callback=progress_callback, timeout=recommended_timeout | |
| ) | |
| success = returncode == 0 | |
| # Update delegated_to if we fell back to different agent | |
| if actual_agent != target: | |
| logger.info(f"Fallback chain: {target} β {actual_agent}") | |
| delegated_to = actual_agent if actual_agent != orchestrator else None | |
| else: | |
| delegated_to = target if target != orchestrator else None | |
| except Exception as e: | |
| # All agents failed | |
| stdout = "" | |
| stderr = str(e) | |
| success = False | |
| actual_agent = target | |
| logger.error(f"All agents failed: {e}") | |
| duration = (datetime.now() - start).total_seconds() | |
| result = DelegationResult( | |
| query=query, | |
| orchestrator=orchestrator, | |
| delegated_to=delegated_to, | |
| rule=rule, | |
| output=stdout, | |
| error=stderr, | |
| success=success, | |
| duration=duration, | |
| ) | |
| # Log result | |
| if success: | |
| delegation_logger.delegation_success(orchestrator, delegated_to, duration) | |
| else: | |
| delegation_logger.delegation_failure(orchestrator, delegated_to, stderr, duration) | |
| if self.config.log_delegations: | |
| self.history.append(result) | |
| return result | |
| def _estimate_task_complexity(self, query: str) -> str: | |
| """ | |
| Estimate task complexity to determine if delegation overhead is worth it. | |
| Returns: | |
| "simple" | "medium" | "complex" | |
| Simple tasks: Claude handles directly (delegation overhead > token savings) | |
| Medium/Complex tasks: Delegate to specialized agents (token savings > overhead) | |
| """ | |
| query_lower = query.lower() | |
| # SIMPLE: Read-only operations and single-step deterministic commands | |
| # These don't benefit from AI - just execute directly | |
| simple_patterns = [ | |
| r"^git\s+status\s*$", | |
| r"^git\s+log", | |
| r"^git\s+show", | |
| r"^git\s+diff\s+[\w\./\-]+\s*$", # Single file diff | |
| r"^git\s+branch\s*(-a|-r)?\s*$", | |
| r"^git\s+remote", | |
| r"^git\s+stash\s+(list|show)?\s*$", | |
| r"^git\s+checkout\s+[\w\-/]+\s*$", # Simple branch switch | |
| r"^git\s+checkout\s+-b\s+[\w\-/]+\s*$", # Create branch | |
| r"^git\s+add\s+[\w\./\-]+\s*$", # Add specific files | |
| r"^git\s+pull\s*$", # Simple pull (no conflicts mentioned) | |
| r"^gh\s+pr\s+(view|list)", | |
| r"^gh\s+issue\s+list", | |
| r"^gh\s+repo\s+view", | |
| ] | |
| # COMPLEX: Multi-step workflows, content generation, safety-critical operations | |
| # These have high token costs or need AI decision-making | |
| complex_indicators = [ | |
| # Git operations requiring intelligence | |
| "commit", # Needs message generation | |
| "create a commit", | |
| "commit message", | |
| "amend", | |
| "rebase", | |
| "cherry-pick", | |
| "squash", | |
| "merge conflict", | |
| "resolve conflict", | |
| "git history", | |
| "clean up", | |
| "--force", | |
| "force push", | |
| "force-with-lease", | |
| # GitHub operations requiring content generation | |
| "create pr", | |
| "create pull request", | |
| "pr create", | |
| "pull request", | |
| "create issue", | |
| "issue create", | |
| "pr review", | |
| "review pr", | |
| "create release", | |
| "release create", | |
| # Multi-step workflows | |
| "create a pr for", | |
| "commit and push", | |
| "push my changes", | |
| "stage and commit", | |
| ] | |
| # MEDIUM: Operations that might need error handling but aren't always complex | |
| medium_indicators = [ | |
| "push -u", | |
| "set-upstream", | |
| "push origin", | |
| "push --tags", | |
| "merge", # Might have conflicts | |
| "revert", | |
| "tag -a", | |
| "checkout -b.*origin", # Track remote branch | |
| ] | |
| # Check simple patterns first | |
| for pattern in simple_patterns: | |
| if re.match(pattern, query, re.IGNORECASE): | |
| logger.debug(f"Complexity: SIMPLE (pattern match: {pattern})") | |
| return "simple" | |
| # Check complex indicators | |
| for indicator in complex_indicators: | |
| if indicator in query_lower: | |
| logger.debug(f"Complexity: COMPLEX (indicator: {indicator})") | |
| return "complex" | |
| # Check medium indicators | |
| for indicator in medium_indicators: | |
| if indicator in query_lower: | |
| logger.debug(f"Complexity: MEDIUM (indicator: {indicator})") | |
| return "medium" | |
| # Default: if query mentions git/github at all, it's medium | |
| # Otherwise let task classification determine routing | |
| if "git" in query_lower or "gh " in query_lower: | |
| logger.debug("Complexity: MEDIUM (default git/gh command)") | |
| return "medium" | |
| # Not a git/github command - let normal routing decide | |
| return "medium" | |
| def _classify_task(self, query: str) -> tuple[str, int]: | |
| """Classify task type and return recommended timeout.""" | |
| query_lower = query.lower() | |
| keywords = { | |
| "security_audit": ["security", "vulnerability", "audit", "cve", "exploit", "penetration"], | |
| "vulnerability_scan": ["scan", "vulnerability", "vuln", "security issue"], | |
| "code_review": ["review", "code quality", "best practice", "lint"], | |
| "architecture": ["architecture", "design", "system design", "structure"], | |
| "refactoring": ["refactor", "restructure", "clean up", "improve code"], | |
| "quick_fix": ["fix", "bug", "error", "issue", "broken"], | |
| "documentation": ["document", "docs", "readme", "guide", "explain"], | |
| "testing": ["test", "unittest", "integration test", "e2e"], | |
| "performance": ["performance", "optimize", "speed", "latency", "benchmark"], | |
| "git_workflow": ["commit", "push", "rebase", "merge", "cherry-pick", "squash", "git history"], | |
| "github_operations": ["pull request", "pr create", "pr review", "issue create", "release"], | |
| } | |
| # Timeout presets based on task complexity | |
| TIMEOUT_PRESETS = { | |
| "quick_fix": 60, # 1 min - simple bug fixes | |
| "refactoring": 300, # 5 min - code refactoring | |
| "security_audit": 600, # 10 min - comprehensive security review | |
| "code_review": 600, # 10 min - full code review | |
| "performance": 900, # 15 min - profiling/optimization | |
| "testing": 300, # 5 min - test generation | |
| "documentation": 180, # 3 min - documentation writing | |
| "architecture": 300, # 5 min - design work | |
| "vulnerability_scan": 300, # 5 min - automated scanning | |
| "git_workflow": 180, # 3 min - git operations | |
| "github_operations": 240, # 4 min - GitHub API operations | |
| "general": 300, # 5 min - default | |
| } | |
| for task_type, terms in keywords.items(): | |
| if any(term in query_lower for term in terms): | |
| timeout = TIMEOUT_PRESETS.get(task_type, 300) | |
| return task_type, timeout | |
| return "general", 300 | |
| def _rank_by_capabilities(self, query: str) -> list[str]: | |
| """Rank agents by capability scores for this query.""" | |
| task_type, _ = self._classify_task(query) # Unpack tuple, ignore timeout | |
| scores = [] | |
| for name, config in self.config.orchestrators.items(): | |
| if not config.enabled: | |
| continue | |
| # Get capability score for this task type | |
| capability_score = getattr(config.capabilities, task_type, 0.5) | |
| # Simple scoring: capability is primary factor | |
| score = capability_score | |
| scores.append((name, score)) | |
| # Sort by score descending | |
| scores.sort(key=lambda x: x[1], reverse=True) | |
| # Log ranking for transparency | |
| if scores: | |
| ranking_str = ", ".join([f"{name} ({score:.2f})" for name, score in scores[:3]]) | |
| logger.info(f"Task: {task_type} | Ranked: {ranking_str}") | |
| # Return agent names in ranked order | |
| return [name for name, _ in scores] | |
| def _determine_delegation( | |
| self, | |
| query: str, | |
| force_delegate: str | None, | |
| ) -> tuple[str, DelegationRule | None]: | |
| """ | |
| Determine which orchestrator should handle the query using capability-based routing. | |
| Returns: | |
| tuple: (target_orchestrator, matching_rule) | |
| """ | |
| # Force delegation overrides everything | |
| if force_delegate: | |
| logger.info(f"Routing: FORCED β {force_delegate}") | |
| return force_delegate, None | |
| # Check task complexity first - simple tasks handled directly by Claude | |
| complexity = self._estimate_task_complexity(query) | |
| if complexity == "simple": | |
| logger.info(f"Routing: SIMPLE task β claude (delegation overhead not worth it)") | |
| return "claude", None | |
| # Check explicit delegation rules | |
| rule = self.config.find_delegation_rule(query) | |
| if rule: | |
| logger.info(f"Routing: {rule.pattern} β {rule.delegate_to} (rule-based)") | |
| return rule.delegate_to, rule | |
| # Use capability-based routing for medium/complex tasks | |
| if self.config.routing_strategy in ["capability", "hybrid"]: | |
| ranked = self._rank_by_capabilities(query) | |
| if ranked: | |
| task_type, _ = self._classify_task(query) # Unpack tuple | |
| # If top ranked agent is Claude, check if delegation is still worth it | |
| if ranked[0] == "claude" and complexity == "medium": | |
| logger.info(f"Routing: {task_type} β claude (best match, medium complexity)") | |
| return "claude", None | |
| logger.info(f"Routing: {task_type} [{complexity}] β {ranked[0]} (capability-based)") | |
| return ranked[0], None | |
| # Fallback to primary orchestrator | |
| logger.info(f"Routing: DEFAULT β claude") | |
| return "claude", None | |
| def get_statistics(self) -> dict[str, Any]: | |
| """Get delegation statistics.""" | |
| if not self.history: | |
| return {"total": 0, "by_orchestrator": {}, "delegations": 0} | |
| by_orchestrator: dict[str, int] = {} | |
| delegations = 0 | |
| for result in self.history: | |
| target = result.delegated_to or result.orchestrator | |
| by_orchestrator[target] = by_orchestrator.get(target, 0) + 1 | |
| if result.delegated_to: | |
| delegations += 1 | |
| return { | |
| "total": len(self.history), | |
| "by_orchestrator": by_orchestrator, | |
| "delegations": delegations, | |
| "delegation_rate": delegations / len(self.history) * 100, | |
| "success_rate": sum(r.success for r in self.history) / len(self.history) * 100, | |
| "avg_duration": sum(r.duration for r in self.history) / len(self.history), | |
| } | |
| def clear_history(self) -> None: | |
| """Clear delegation history.""" | |
| self.history.clear() | |