Spaces:
Running on Zero
Running on Zero
File size: 5,676 Bytes
31c93b1 | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 | """HTTP client for making signed capability calls to remote nodes."""
from __future__ import annotations
import json
import secrets
from dataclasses import dataclass, field
from datetime import datetime, timezone
from typing import AsyncIterator
try:
import httpx
HAS_HTTPX = True
except ImportError:
HAS_HTTPX = False
def _new_request_id() -> str:
return secrets.token_hex(8)
def _iso_now() -> str:
return datetime.now(timezone.utc).strftime("%Y-%m-%dT%H:%M:%SZ")
@dataclass
class CallError(Exception):
code: str
message: str
alt_nodes: list[str] = field(default_factory=list)
def __post_init__(self):
super().__init__(self.message)
class HttpClient:
"""Manages HTTP connections to one remote node. Reuses connections."""
def __init__(
self,
base_url: str,
our_node_id: str,
community_id: str,
signing_key=None,
verify_ssl: bool = False,
):
self._base_url = base_url.rstrip("/")
self._our_node_id = our_node_id
self._community_id = community_id
self._signing_key = signing_key
self._verify_ssl = verify_ssl
self._client: httpx.AsyncClient | None = None
async def _get_client(self) -> httpx.AsyncClient:
if self._client is None:
if not HAS_HTTPX:
raise CallError("internal_error", "httpx not installed")
self._client = httpx.AsyncClient(verify=self._verify_ssl, timeout=30.0)
return self._client
async def call(
self,
capability: str,
version: tuple[int, int],
body: dict,
*,
timeout: float = 30.0,
) -> dict:
"""Make a synchronous capability call. Returns response dict."""
client = await self._get_client()
payload = {
"capability": capability,
"version": f"{version[0]}.{version[1]}",
**body,
}
headers = self._make_headers(payload)
try:
resp = await client.post(
f"{self._base_url}/bus/v1/call",
json=payload,
headers=headers,
timeout=timeout,
)
resp.raise_for_status()
return resp.json()
except httpx.HTTPStatusError as exc:
raise CallError("http_error", str(exc)) from exc
except Exception as exc:
raise CallError("partition", str(exc)) from exc
async def stream(
self,
capability: str,
version: tuple[int, int],
body: dict,
) -> AsyncIterator[dict]:
"""Make a streaming capability call. Yields SSE frame dicts."""
if not HAS_HTTPX:
raise CallError("internal_error", "httpx not installed")
payload = {
"capability": capability,
"version": f"{version[0]}.{version[1]}",
"stream": True,
**body,
}
headers = self._make_headers(payload)
headers["Accept"] = "text/event-stream"
try:
async with httpx.AsyncClient(verify=self._verify_ssl) as client:
async with client.stream(
"POST",
f"{self._base_url}/bus/v1/call",
json=payload,
headers=headers,
) as resp:
async for line in resp.aiter_lines():
if line.startswith("data: "):
try:
yield json.loads(line[6:])
except json.JSONDecodeError:
pass
except Exception as exc:
raise CallError("partition", str(exc)) from exc
async def fetch_manifest(self) -> dict:
client = await self._get_client()
try:
resp = await client.get(f"{self._base_url}/manifest")
resp.raise_for_status()
return resp.json()
except Exception as exc:
raise CallError("manifest_fetch_failed", str(exc)) from exc
async def fetch_capabilities(self) -> list:
client = await self._get_client()
try:
resp = await client.get(f"{self._base_url}/bus/v1/capabilities")
resp.raise_for_status()
return resp.json()
except Exception as exc:
raise CallError("capabilities_fetch_failed", str(exc)) from exc
async def health(self) -> dict:
client = await self._get_client()
try:
resp = await client.get(f"{self._base_url}/health")
resp.raise_for_status()
return resp.json()
except Exception as exc:
raise CallError("health_check_failed", str(exc)) from exc
def _make_headers(self, payload: dict) -> dict:
"""Sign the request envelope and return X-HearthNet-* headers."""
headers = {
"X-HearthNet-From": self._our_node_id,
"X-HearthNet-Community": self._community_id,
"X-HearthNet-Request-Id": _new_request_id(),
"X-HearthNet-Timestamp": _iso_now(),
"Content-Type": "application/json",
}
if self._signing_key is not None:
try:
from hearthnet.identity.keys import sign_payload
signed = sign_payload(payload, self._signing_key)
headers["X-HearthNet-Signature"] = signed.get("signature", "")
except Exception:
pass
return headers
async def close(self) -> None:
if self._client is not None:
await self._client.aclose()
self._client = None
|