File size: 5,552 Bytes
8b02e7c
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
"""Base adapter interface for CLI orchestrators."""

import os
import shutil
from abc import ABC, abstractmethod
from typing import Any


class CLIAdapter(ABC):
    """Base adapter for CLI orchestrators."""

    def __init__(self, name: str, config: dict[str, Any]):
        self.name = name
        self.config = config

    @abstractmethod
    async def execute(self, task: str, **kwargs: Any) -> tuple[str, str, int]:
        """
        Execute a task using the CLI.

        Args:
            task: Task description/query
            **kwargs: Additional CLI-specific arguments

        Returns:
            tuple: (stdout, stderr, return_code)
        """
        pass

    @abstractmethod
    def validate(self) -> bool:
        """Validate the CLI is installed and accessible."""
        pass

    @abstractmethod
    def format_task(self, task: str, **kwargs: Any) -> list[str]:
        """
        Format task into CLI command arguments.

        Args:
            task: Task description
            **kwargs: Additional formatting options

        Returns:
            list: Command arguments
        """
        pass

    def get_command(self) -> str | list[str]:
        """Get base command for this adapter."""
        return self.config.get("command", self.name)

    def get_args(self) -> list[str]:
        """Get default arguments for this adapter."""
        return self.config.get("args", [])

    def get_env(self) -> dict[str, str]:
        """Get environment variables for this adapter."""
        return self.config.get("env", {})

    def get_timeout(self) -> int:
        """Get timeout in seconds."""
        return self.config.get("timeout", 300)

    async def execute_streaming(
        self, 
        task: str,
        progress_callback: Any = None,
        timeout: int | None = None,
        **kwargs: Any
    ) -> tuple[str, str, int]:
        """
        Execute task with real-time output streaming.
        
        Args:
            task: Task description/query
            progress_callback: Optional async callback for progress updates
            timeout: Override timeout in seconds
            **kwargs: Additional CLI-specific arguments
        
        Returns:
            tuple: (stdout, stderr, return_code)
        """
        import asyncio
        
        # Get command and args
        cmd_args = self.format_task(task, **kwargs)
        resolved_cmd = self.resolve_command(cmd_args[0])
        
        if isinstance(resolved_cmd, str):
            full_cmd = [resolved_cmd] + cmd_args[1:]
        else:
            full_cmd = resolved_cmd + cmd_args[1:]
        
        # Merge environment
        env = os.environ.copy()
        env.update(self.get_env())
        
        # Start process with pipes for streaming
        process = await asyncio.create_subprocess_exec(
            *full_cmd,
            stdout=asyncio.subprocess.PIPE,
            stderr=asyncio.subprocess.PIPE,
            env=env
        )
        
        stdout_lines = []
        stderr_lines = []
        
        async def read_stream(stream, line_buffer, prefix=""):
            """Read stream line-by-line and call progress callback."""
            while True:
                line = await stream.readline()
                if not line:
                    break
                decoded = line.decode().strip()
                if decoded:
                    line_buffer.append(decoded)
                    if progress_callback:
                        try:
                            await progress_callback(f"{prefix}{decoded}")
                        except Exception:
                            pass  # Don't fail on callback errors
        
        try:
            # Read both streams concurrently with timeout
            effective_timeout = timeout or self.get_timeout()
            await asyncio.wait_for(
                asyncio.gather(
                    read_stream(process.stdout, stdout_lines),
                    read_stream(process.stderr, stderr_lines, "[stderr] "),
                ),
                timeout=effective_timeout
            )
        except asyncio.TimeoutError:
            process.kill()
            await process.wait()
            raise TimeoutError(f"{self.name} timed out after {effective_timeout}s")
        
        # Wait for process to complete
        await process.wait()
        
        return (
            "\n".join(stdout_lines),
            "\n".join(stderr_lines),
            process.returncode
        )

    @staticmethod
    def resolve_command(cmd: str | list[str]) -> str | list[str]:
        """
        Resolve command to full path on Windows.

        On Windows, asyncio.create_subprocess_exec() doesn't reliably search PATH,
        so we need to resolve commands to their full paths using shutil.which().

        Args:
            cmd: Command string or list of command parts

        Returns:
            Resolved command (full path on Windows, original on Unix)
        """
        if os.name != "nt":
            # On Unix systems, PATH search works fine
            return cmd

        # On Windows, resolve the executable path
        if isinstance(cmd, list):
            if not cmd:
                return cmd
            # Resolve first element (the executable)
            resolved = shutil.which(cmd[0])
            if resolved:
                return [resolved] + cmd[1:]
            return cmd
        else:
            # Single string command
            resolved = shutil.which(cmd)
            return resolved if resolved else cmd