Spaces:
Sleeping
Sleeping
File size: 5,552 Bytes
8b02e7c | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 | """Base adapter interface for CLI orchestrators."""
import os
import shutil
from abc import ABC, abstractmethod
from typing import Any
class CLIAdapter(ABC):
"""Base adapter for CLI orchestrators."""
def __init__(self, name: str, config: dict[str, Any]):
self.name = name
self.config = config
@abstractmethod
async def execute(self, task: str, **kwargs: Any) -> tuple[str, str, int]:
"""
Execute a task using the CLI.
Args:
task: Task description/query
**kwargs: Additional CLI-specific arguments
Returns:
tuple: (stdout, stderr, return_code)
"""
pass
@abstractmethod
def validate(self) -> bool:
"""Validate the CLI is installed and accessible."""
pass
@abstractmethod
def format_task(self, task: str, **kwargs: Any) -> list[str]:
"""
Format task into CLI command arguments.
Args:
task: Task description
**kwargs: Additional formatting options
Returns:
list: Command arguments
"""
pass
def get_command(self) -> str | list[str]:
"""Get base command for this adapter."""
return self.config.get("command", self.name)
def get_args(self) -> list[str]:
"""Get default arguments for this adapter."""
return self.config.get("args", [])
def get_env(self) -> dict[str, str]:
"""Get environment variables for this adapter."""
return self.config.get("env", {})
def get_timeout(self) -> int:
"""Get timeout in seconds."""
return self.config.get("timeout", 300)
async def execute_streaming(
self,
task: str,
progress_callback: Any = None,
timeout: int | None = None,
**kwargs: Any
) -> tuple[str, str, int]:
"""
Execute task with real-time output streaming.
Args:
task: Task description/query
progress_callback: Optional async callback for progress updates
timeout: Override timeout in seconds
**kwargs: Additional CLI-specific arguments
Returns:
tuple: (stdout, stderr, return_code)
"""
import asyncio
# Get command and args
cmd_args = self.format_task(task, **kwargs)
resolved_cmd = self.resolve_command(cmd_args[0])
if isinstance(resolved_cmd, str):
full_cmd = [resolved_cmd] + cmd_args[1:]
else:
full_cmd = resolved_cmd + cmd_args[1:]
# Merge environment
env = os.environ.copy()
env.update(self.get_env())
# Start process with pipes for streaming
process = await asyncio.create_subprocess_exec(
*full_cmd,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE,
env=env
)
stdout_lines = []
stderr_lines = []
async def read_stream(stream, line_buffer, prefix=""):
"""Read stream line-by-line and call progress callback."""
while True:
line = await stream.readline()
if not line:
break
decoded = line.decode().strip()
if decoded:
line_buffer.append(decoded)
if progress_callback:
try:
await progress_callback(f"{prefix}{decoded}")
except Exception:
pass # Don't fail on callback errors
try:
# Read both streams concurrently with timeout
effective_timeout = timeout or self.get_timeout()
await asyncio.wait_for(
asyncio.gather(
read_stream(process.stdout, stdout_lines),
read_stream(process.stderr, stderr_lines, "[stderr] "),
),
timeout=effective_timeout
)
except asyncio.TimeoutError:
process.kill()
await process.wait()
raise TimeoutError(f"{self.name} timed out after {effective_timeout}s")
# Wait for process to complete
await process.wait()
return (
"\n".join(stdout_lines),
"\n".join(stderr_lines),
process.returncode
)
@staticmethod
def resolve_command(cmd: str | list[str]) -> str | list[str]:
"""
Resolve command to full path on Windows.
On Windows, asyncio.create_subprocess_exec() doesn't reliably search PATH,
so we need to resolve commands to their full paths using shutil.which().
Args:
cmd: Command string or list of command parts
Returns:
Resolved command (full path on Windows, original on Unix)
"""
if os.name != "nt":
# On Unix systems, PATH search works fine
return cmd
# On Windows, resolve the executable path
if isinstance(cmd, list):
if not cmd:
return cmd
# Resolve first element (the executable)
resolved = shutil.which(cmd[0])
if resolved:
return [resolved] + cmd[1:]
return cmd
else:
# Single string command
resolved = shutil.which(cmd)
return resolved if resolved else cmd
|