Cduplar's picture
Initial public release: Multi-Agent MCP Delegation Server
8b02e7c
Raw
History Blame Contribute Delete
5.55 kB
"""Base adapter interface for CLI orchestrators."""
import os
import shutil
from abc import ABC, abstractmethod
from typing import Any
class CLIAdapter(ABC):
"""Base adapter for CLI orchestrators."""
def __init__(self, name: str, config: dict[str, Any]):
self.name = name
self.config = config
@abstractmethod
async def execute(self, task: str, **kwargs: Any) -> tuple[str, str, int]:
"""
Execute a task using the CLI.
Args:
task: Task description/query
**kwargs: Additional CLI-specific arguments
Returns:
tuple: (stdout, stderr, return_code)
"""
pass
@abstractmethod
def validate(self) -> bool:
"""Validate the CLI is installed and accessible."""
pass
@abstractmethod
def format_task(self, task: str, **kwargs: Any) -> list[str]:
"""
Format task into CLI command arguments.
Args:
task: Task description
**kwargs: Additional formatting options
Returns:
list: Command arguments
"""
pass
def get_command(self) -> str | list[str]:
"""Get base command for this adapter."""
return self.config.get("command", self.name)
def get_args(self) -> list[str]:
"""Get default arguments for this adapter."""
return self.config.get("args", [])
def get_env(self) -> dict[str, str]:
"""Get environment variables for this adapter."""
return self.config.get("env", {})
def get_timeout(self) -> int:
"""Get timeout in seconds."""
return self.config.get("timeout", 300)
async def execute_streaming(
self,
task: str,
progress_callback: Any = None,
timeout: int | None = None,
**kwargs: Any
) -> tuple[str, str, int]:
"""
Execute task with real-time output streaming.
Args:
task: Task description/query
progress_callback: Optional async callback for progress updates
timeout: Override timeout in seconds
**kwargs: Additional CLI-specific arguments
Returns:
tuple: (stdout, stderr, return_code)
"""
import asyncio
# Get command and args
cmd_args = self.format_task(task, **kwargs)
resolved_cmd = self.resolve_command(cmd_args[0])
if isinstance(resolved_cmd, str):
full_cmd = [resolved_cmd] + cmd_args[1:]
else:
full_cmd = resolved_cmd + cmd_args[1:]
# Merge environment
env = os.environ.copy()
env.update(self.get_env())
# Start process with pipes for streaming
process = await asyncio.create_subprocess_exec(
*full_cmd,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE,
env=env
)
stdout_lines = []
stderr_lines = []
async def read_stream(stream, line_buffer, prefix=""):
"""Read stream line-by-line and call progress callback."""
while True:
line = await stream.readline()
if not line:
break
decoded = line.decode().strip()
if decoded:
line_buffer.append(decoded)
if progress_callback:
try:
await progress_callback(f"{prefix}{decoded}")
except Exception:
pass # Don't fail on callback errors
try:
# Read both streams concurrently with timeout
effective_timeout = timeout or self.get_timeout()
await asyncio.wait_for(
asyncio.gather(
read_stream(process.stdout, stdout_lines),
read_stream(process.stderr, stderr_lines, "[stderr] "),
),
timeout=effective_timeout
)
except asyncio.TimeoutError:
process.kill()
await process.wait()
raise TimeoutError(f"{self.name} timed out after {effective_timeout}s")
# Wait for process to complete
await process.wait()
return (
"\n".join(stdout_lines),
"\n".join(stderr_lines),
process.returncode
)
@staticmethod
def resolve_command(cmd: str | list[str]) -> str | list[str]:
"""
Resolve command to full path on Windows.
On Windows, asyncio.create_subprocess_exec() doesn't reliably search PATH,
so we need to resolve commands to their full paths using shutil.which().
Args:
cmd: Command string or list of command parts
Returns:
Resolved command (full path on Windows, original on Unix)
"""
if os.name != "nt":
# On Unix systems, PATH search works fine
return cmd
# On Windows, resolve the executable path
if isinstance(cmd, list):
if not cmd:
return cmd
# Resolve first element (the executable)
resolved = shutil.which(cmd[0])
if resolved:
return [resolved] + cmd[1:]
return cmd
else:
# Single string command
resolved = shutil.which(cmd)
return resolved if resolved else cmd