File size: 5,676 Bytes
31c93b1
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
"""HTTP client for making signed capability calls to remote nodes."""
from __future__ import annotations

import json
import secrets
from dataclasses import dataclass, field
from datetime import datetime, timezone
from typing import AsyncIterator

try:
    import httpx
    HAS_HTTPX = True
except ImportError:
    HAS_HTTPX = False


def _new_request_id() -> str:
    return secrets.token_hex(8)


def _iso_now() -> str:
    return datetime.now(timezone.utc).strftime("%Y-%m-%dT%H:%M:%SZ")


@dataclass
class CallError(Exception):
    code: str
    message: str
    alt_nodes: list[str] = field(default_factory=list)

    def __post_init__(self):
        super().__init__(self.message)


class HttpClient:
    """Manages HTTP connections to one remote node. Reuses connections."""

    def __init__(
        self,
        base_url: str,
        our_node_id: str,
        community_id: str,
        signing_key=None,
        verify_ssl: bool = False,
    ):
        self._base_url = base_url.rstrip("/")
        self._our_node_id = our_node_id
        self._community_id = community_id
        self._signing_key = signing_key
        self._verify_ssl = verify_ssl
        self._client: httpx.AsyncClient | None = None

    async def _get_client(self) -> httpx.AsyncClient:
        if self._client is None:
            if not HAS_HTTPX:
                raise CallError("internal_error", "httpx not installed")
            self._client = httpx.AsyncClient(verify=self._verify_ssl, timeout=30.0)
        return self._client

    async def call(
        self,
        capability: str,
        version: tuple[int, int],
        body: dict,
        *,
        timeout: float = 30.0,
    ) -> dict:
        """Make a synchronous capability call. Returns response dict."""
        client = await self._get_client()
        payload = {
            "capability": capability,
            "version": f"{version[0]}.{version[1]}",
            **body,
        }
        headers = self._make_headers(payload)
        try:
            resp = await client.post(
                f"{self._base_url}/bus/v1/call",
                json=payload,
                headers=headers,
                timeout=timeout,
            )
            resp.raise_for_status()
            return resp.json()
        except httpx.HTTPStatusError as exc:
            raise CallError("http_error", str(exc)) from exc
        except Exception as exc:
            raise CallError("partition", str(exc)) from exc

    async def stream(
        self,
        capability: str,
        version: tuple[int, int],
        body: dict,
    ) -> AsyncIterator[dict]:
        """Make a streaming capability call. Yields SSE frame dicts."""
        if not HAS_HTTPX:
            raise CallError("internal_error", "httpx not installed")
        payload = {
            "capability": capability,
            "version": f"{version[0]}.{version[1]}",
            "stream": True,
            **body,
        }
        headers = self._make_headers(payload)
        headers["Accept"] = "text/event-stream"
        try:
            async with httpx.AsyncClient(verify=self._verify_ssl) as client:
                async with client.stream(
                    "POST",
                    f"{self._base_url}/bus/v1/call",
                    json=payload,
                    headers=headers,
                ) as resp:
                    async for line in resp.aiter_lines():
                        if line.startswith("data: "):
                            try:
                                yield json.loads(line[6:])
                            except json.JSONDecodeError:
                                pass
        except Exception as exc:
            raise CallError("partition", str(exc)) from exc

    async def fetch_manifest(self) -> dict:
        client = await self._get_client()
        try:
            resp = await client.get(f"{self._base_url}/manifest")
            resp.raise_for_status()
            return resp.json()
        except Exception as exc:
            raise CallError("manifest_fetch_failed", str(exc)) from exc

    async def fetch_capabilities(self) -> list:
        client = await self._get_client()
        try:
            resp = await client.get(f"{self._base_url}/bus/v1/capabilities")
            resp.raise_for_status()
            return resp.json()
        except Exception as exc:
            raise CallError("capabilities_fetch_failed", str(exc)) from exc

    async def health(self) -> dict:
        client = await self._get_client()
        try:
            resp = await client.get(f"{self._base_url}/health")
            resp.raise_for_status()
            return resp.json()
        except Exception as exc:
            raise CallError("health_check_failed", str(exc)) from exc

    def _make_headers(self, payload: dict) -> dict:
        """Sign the request envelope and return X-HearthNet-* headers."""
        headers = {
            "X-HearthNet-From": self._our_node_id,
            "X-HearthNet-Community": self._community_id,
            "X-HearthNet-Request-Id": _new_request_id(),
            "X-HearthNet-Timestamp": _iso_now(),
            "Content-Type": "application/json",
        }
        if self._signing_key is not None:
            try:
                from hearthnet.identity.keys import sign_payload
                signed = sign_payload(payload, self._signing_key)
                headers["X-HearthNet-Signature"] = signed.get("signature", "")
            except Exception:
                pass
        return headers

    async def close(self) -> None:
        if self._client is not None:
            await self._client.aclose()
            self._client = None