Spaces:
Sleeping
Sleeping
| """Configuration manager for runtime agent and routing rules management.""" | |
| import yaml | |
| import re | |
| from pathlib import Path | |
| from typing import Any | |
| from dataclasses import dataclass | |
| from ..config import DelegationConfig, OrchestratorConfig, DelegationRule | |
| from ..orchestrator import OrchestratorRegistry | |
| class AgentStatus: | |
| """Agent status information.""" | |
| name: str | |
| enabled: bool | |
| installed: bool | |
| config: OrchestratorConfig | None | |
| status_text: str | |
| status_icon: str | |
| def to_dict(self) -> dict[str, Any]: | |
| """Convert to dictionary for UI display.""" | |
| return { | |
| "name": self.name, | |
| "enabled": self.enabled, | |
| "installed": self.installed, | |
| "status": self.status_text, | |
| "icon": self.status_icon, | |
| } | |
| class ConfigurationManager: | |
| """Manages agent configuration and routing rules.""" | |
| def __init__( | |
| self, | |
| orchestrators_path: Path = Path("config/orchestrators.yaml"), | |
| rules_path: Path = Path("config/delegation_rules.yaml"), | |
| ): | |
| """Initialize configuration manager. | |
| Args: | |
| orchestrators_path: Path to orchestrators YAML config | |
| rules_path: Path to delegation rules YAML config | |
| """ | |
| self.orchestrators_path = orchestrators_path | |
| self.rules_path = rules_path | |
| self.registry = OrchestratorRegistry() | |
| # Store default configurations for reset functionality | |
| self.default_orchestrators: dict[str, Any] = {} | |
| self.default_rules: list[dict[str, Any]] = [] | |
| # Load configurations | |
| self.load_configurations() | |
| def load_configurations(self) -> None: | |
| """Load orchestrator and delegation rule configurations.""" | |
| # Load orchestrators | |
| with open(self.orchestrators_path) as f: | |
| orch_data = yaml.safe_load(f) | |
| self.orchestrators_config = orch_data.get("orchestrators", {}) | |
| # Store defaults | |
| if not self.default_orchestrators: | |
| self.default_orchestrators = { | |
| k: v.copy() for k, v in self.orchestrators_config.items() | |
| } | |
| # Register orchestrators | |
| for name, config in self.orchestrators_config.items(): | |
| self.registry.register(OrchestratorConfig(**config)) | |
| # Load delegation rules | |
| with open(self.rules_path) as f: | |
| rules_data = yaml.safe_load(f) | |
| self.primary_orchestrator = rules_data.get("orchestrator", "claude") | |
| self.rules_list = rules_data.get("rules", []) | |
| # Store defaults | |
| if not self.default_rules: | |
| self.default_rules = [r.copy() for r in self.rules_list] | |
| def get_agent_statuses(self) -> list[AgentStatus]: | |
| """Get status information for all agents. | |
| Returns: | |
| List of AgentStatus objects with installation and enablement info | |
| """ | |
| # Validate which agents are installed | |
| installed_agents = self.registry.validate_all() | |
| statuses = [] | |
| for name, config_dict in self.orchestrators_config.items(): | |
| config = OrchestratorConfig(**config_dict) | |
| is_installed = installed_agents.get(name, False) | |
| is_enabled = config.enabled | |
| # Determine status | |
| if is_enabled and is_installed: | |
| status_text = "Active" | |
| status_icon = "🟢" | |
| elif is_enabled and not is_installed: | |
| status_text = "Not Installed" | |
| status_icon = "⚠️" | |
| else: | |
| status_text = "Disabled" | |
| status_icon = "🔴" | |
| statuses.append(AgentStatus( | |
| name=name, | |
| enabled=is_enabled, | |
| installed=is_installed, | |
| config=config, | |
| status_text=status_text, | |
| status_icon=status_icon, | |
| )) | |
| return statuses | |
| def toggle_agent(self, agent_name: str, enabled: bool) -> tuple[bool, str]: | |
| """Toggle an agent on or off. | |
| Args: | |
| agent_name: Name of the agent to toggle | |
| enabled: True to enable, False to disable | |
| Returns: | |
| tuple: (success, message) | |
| """ | |
| if agent_name not in self.orchestrators_config: | |
| return False, f"Agent '{agent_name}' not found" | |
| # Check if this is the primary orchestrator | |
| if not enabled and agent_name == self.primary_orchestrator: | |
| return False, f"Cannot disable primary orchestrator '{agent_name}'. Please select a different primary orchestrator first." | |
| # Check if disabling would break any routing rules | |
| if not enabled: | |
| broken_rules = [ | |
| rule for rule in self.rules_list | |
| if rule.get("delegate_to") == agent_name | |
| ] | |
| if broken_rules: | |
| rule_descriptions = [r.get("description", r.get("pattern")) for r in broken_rules] | |
| return False, f"Cannot disable '{agent_name}'. It is used in {len(broken_rules)} routing rule(s): {', '.join(rule_descriptions[:3])}" | |
| # Update configuration | |
| self.orchestrators_config[agent_name]["enabled"] = enabled | |
| # Update registry | |
| config = OrchestratorConfig(**self.orchestrators_config[agent_name]) | |
| self.registry.register(config) | |
| return True, f"Agent '{agent_name}' {'enabled' if enabled else 'disabled'} successfully" | |
| def set_primary_orchestrator(self, agent_name: str) -> tuple[bool, str]: | |
| """Set the primary orchestrator. | |
| Args: | |
| agent_name: Name of the agent to set as primary | |
| Returns: | |
| tuple: (success, message) | |
| """ | |
| if agent_name not in self.orchestrators_config: | |
| return False, f"Agent '{agent_name}' not found" | |
| config = self.orchestrators_config[agent_name] | |
| # Validate agent is enabled | |
| if not config.get("enabled", False): | |
| return False, f"Cannot set '{agent_name}' as primary orchestrator because it is disabled. Please enable it first." | |
| # Validate agent is installed | |
| installed = self.registry.validate_all() | |
| if not installed.get(agent_name, False): | |
| return False, f"Cannot set '{agent_name}' as primary orchestrator because it is not installed." | |
| self.primary_orchestrator = agent_name | |
| return True, f"Primary orchestrator set to '{agent_name}'" | |
| def validate_routing_rules(self, yaml_text: str) -> tuple[bool, str, list[dict[str, Any]] | None]: | |
| """Validate routing rules YAML. | |
| Args: | |
| yaml_text: YAML text to validate | |
| Returns: | |
| tuple: (is_valid, message, parsed_rules) | |
| """ | |
| try: | |
| # Parse YAML | |
| data = yaml.safe_load(yaml_text) | |
| if not isinstance(data, list): | |
| return False, "Rules must be a list", None | |
| # Validate each rule | |
| for i, rule in enumerate(data): | |
| if not isinstance(rule, dict): | |
| return False, f"Rule {i+1} must be a dictionary", None | |
| # Required fields | |
| if "pattern" not in rule: | |
| return False, f"Rule {i+1} missing required field 'pattern'", None | |
| if "delegate_to" not in rule: | |
| return False, f"Rule {i+1} missing required field 'delegate_to'", None | |
| # Validate regex pattern | |
| try: | |
| re.compile(rule["pattern"]) | |
| except re.error as e: | |
| return False, f"Rule {i+1} has invalid regex pattern: {e}", None | |
| # Validate delegate_to exists | |
| delegate_to = rule["delegate_to"] | |
| if delegate_to not in self.orchestrators_config: | |
| return False, f"Rule {i+1} delegates to unknown agent '{delegate_to}'", None | |
| # Validate delegate_to is enabled | |
| if not self.orchestrators_config[delegate_to].get("enabled", False): | |
| return False, f"Rule {i+1} delegates to disabled agent '{delegate_to}'", None | |
| # Validate priority is a number | |
| if "priority" in rule and not isinstance(rule["priority"], (int, float)): | |
| return False, f"Rule {i+1} priority must be a number", None | |
| return True, "✅ Routing rules are valid", data | |
| except yaml.YAMLError as e: | |
| return False, f"YAML parsing error: {e}", None | |
| except Exception as e: | |
| return False, f"Validation error: {e}", None | |
| def preview_routing_rules(self, rules: list[dict[str, Any]]) -> str: | |
| """Generate a preview of how routing rules will affect delegation. | |
| Args: | |
| rules: List of routing rules | |
| Returns: | |
| Formatted preview text | |
| """ | |
| if not rules: | |
| return "No routing rules defined. All tasks will go to the primary orchestrator." | |
| preview = "## Routing Rules Preview\n\n" | |
| preview += f"**Primary Orchestrator:** {self.primary_orchestrator}\n\n" | |
| # Sort by priority (highest first) | |
| sorted_rules = sorted(rules, key=lambda r: r.get("priority", 0), reverse=True) | |
| preview += "**Rules (by priority):**\n\n" | |
| for i, rule in enumerate(sorted_rules, 1): | |
| pattern = rule.get("pattern", "") | |
| delegate_to = rule.get("delegate_to", "") | |
| priority = rule.get("priority", 0) | |
| description = rule.get("description", "") | |
| preview += f"{i}. **Pattern:** `{pattern}`\n" | |
| preview += f" **Delegates to:** {delegate_to}\n" | |
| preview += f" **Priority:** {priority}\n" | |
| if description: | |
| preview += f" **Description:** {description}\n" | |
| preview += "\n" | |
| # Add example queries | |
| preview += "\n**Example Query Matching:**\n\n" | |
| example_queries = [ | |
| "Fix this security vulnerability", | |
| "Refactor the authentication module", | |
| "Create a pull request for this feature", | |
| "Run the test suite", | |
| ] | |
| for query in example_queries: | |
| matched = False | |
| for rule in sorted_rules: | |
| if re.search(rule["pattern"], query, re.IGNORECASE): | |
| preview += f"- \"{query}\" → **{rule['delegate_to']}** (matches: `{rule['pattern']}`)\n" | |
| matched = True | |
| break | |
| if not matched: | |
| preview += f"- \"{query}\" → **{self.primary_orchestrator}** (no rule match)\n" | |
| return preview | |
| def save_configurations(self, rules_yaml: str | None = None) -> tuple[bool, str]: | |
| """Save configurations to YAML files. | |
| Args: | |
| rules_yaml: Optional YAML text for routing rules | |
| Returns: | |
| tuple: (success, message) | |
| """ | |
| try: | |
| # Save orchestrators | |
| with open(self.orchestrators_path, "w") as f: | |
| yaml.dump( | |
| {"orchestrators": self.orchestrators_config}, | |
| f, | |
| default_flow_style=False, | |
| sort_keys=False, | |
| ) | |
| # Save delegation rules | |
| rules_data = { | |
| "orchestrator": self.primary_orchestrator, | |
| } | |
| if rules_yaml: | |
| # Validate and use provided rules | |
| is_valid, message, parsed_rules = self.validate_routing_rules(rules_yaml) | |
| if not is_valid: | |
| return False, f"Cannot save: {message}" | |
| rules_data["rules"] = parsed_rules | |
| self.rules_list = parsed_rules or [] | |
| else: | |
| rules_data["rules"] = self.rules_list | |
| with open(self.rules_path, "w") as f: | |
| # Add comment header | |
| f.write("# Delegation MCP Configuration\n") | |
| f.write("# Configure your primary orchestrator and delegation rules\n") | |
| f.write("# Note: Orchestrator definitions are in config/orchestrators.yaml\n\n") | |
| yaml.dump(rules_data, f, default_flow_style=False, sort_keys=False) | |
| return True, "✅ Configuration saved successfully" | |
| except Exception as e: | |
| return False, f"Failed to save configuration: {e}" | |
| def reset_to_defaults(self) -> tuple[bool, str]: | |
| """Reset all configurations to defaults. | |
| Returns: | |
| tuple: (success, message) | |
| """ | |
| try: | |
| # Reset orchestrators | |
| self.orchestrators_config = { | |
| k: v.copy() for k, v in self.default_orchestrators.items() | |
| } | |
| # Reset rules | |
| self.rules_list = [r.copy() for r in self.default_rules] | |
| self.primary_orchestrator = "claude" | |
| # Re-register orchestrators | |
| for name, config in self.orchestrators_config.items(): | |
| self.registry.register(OrchestratorConfig(**config)) | |
| # Save defaults | |
| success, message = self.save_configurations() | |
| if success: | |
| return True, "✅ Configuration reset to defaults" | |
| return False, f"Failed to save defaults: {message}" | |
| except Exception as e: | |
| return False, f"Failed to reset configuration: {e}" | |
| def get_rules_yaml(self) -> str: | |
| """Get current routing rules as YAML text. | |
| Returns: | |
| YAML text of routing rules | |
| """ | |
| return yaml.dump(self.rules_list, default_flow_style=False, sort_keys=False) | |
| def get_agent_capabilities(self, agent_name: str) -> dict[str, Any]: | |
| """Get agent capabilities from configuration. | |
| Args: | |
| agent_name: Name of the agent | |
| Returns: | |
| Dictionary of agent capabilities | |
| """ | |
| if agent_name not in self.orchestrators_config: | |
| return {} | |
| config = self.orchestrators_config[agent_name] | |
| return { | |
| "command": config.get("command", ""), | |
| "args": config.get("args", []), | |
| "timeout": config.get("timeout", 300), | |
| "max_retries": config.get("max_retries", 3), | |
| "enabled": config.get("enabled", False), | |
| } | |