Spaces:
Running on Zero
Running on Zero
| """HTTP bus transport — bridges bus.transport.call() to real peers over HTTP. | |
| The default :class:`~hearthnet.bus.InMemoryTransport` only delivers calls to | |
| buses living in the same Python process (used by the in-process multi-node demo | |
| and tests). :class:`HttpBusTransport` is a drop-in superset: it still delivers | |
| in-process when the target node is registered locally, but falls back to a real | |
| ``POST /bus/v1/call`` over HTTP when the target is a remote peer reachable via a | |
| registered endpoint. | |
| This is what makes a local node talk to the HuggingFace Space node (and vice | |
| versa) over the public internet. No mocks: a remote call is a genuine signed-ish | |
| HTTP request to the peer's FastAPI ``/bus/v1/call`` endpoint. | |
| """ | |
| from __future__ import annotations | |
| from typing import Any | |
| from hearthnet.bus import BusError, InMemoryTransport | |
| from hearthnet.bus.capability import RouteRequest | |
| from hearthnet.types import Endpoint | |
| def _endpoint_to_url(ep: Endpoint) -> str: | |
| """Build a base URL from an Endpoint. | |
| transport is one of "https" | "http" | "memory". Port 443 -> https, else the | |
| declared transport scheme (defaulting to http). | |
| """ | |
| scheme = "https" if (ep.transport == "https" or ep.port == 443) else "http" | |
| # Omit the port for the standard 80/443 to keep URLs clean (HF Space uses 443). | |
| if ep.port in (80, 443): | |
| return f"{scheme}://{ep.host}" | |
| return f"{scheme}://{ep.host}:{ep.port}" | |
| class HttpBusTransport(InMemoryTransport): | |
| """In-process delivery first, real HTTP forwarding for remote peers.""" | |
| async def call(self, node_id: str, req: RouteRequest) -> dict[str, Any]: | |
| # 1) In-process target (same machine, shared transport, or tests). | |
| if node_id in self._buses: | |
| return await super().call(node_id, req) | |
| # 2) Remote target — resolve its endpoint from any registered registry. | |
| endpoint = self._resolve_endpoint(node_id) | |
| if endpoint is None or endpoint.transport == "memory": | |
| raise BusError("partition", f"node {node_id} is not reachable") | |
| return await self._http_call(endpoint, req) | |
| def _resolve_endpoint(self, node_id: str) -> Endpoint | None: | |
| for bus in self._buses.values(): | |
| for entry in bus.registry.all_remote(): | |
| if entry.node_id == node_id and entry.endpoint is not None: | |
| return entry.endpoint | |
| return None | |
| async def _http_call(self, endpoint: Endpoint, req: RouteRequest) -> dict[str, Any]: | |
| try: | |
| import httpx | |
| except ImportError as exc: # pragma: no cover - httpx is a core dep | |
| raise BusError("internal_error", "httpx not installed") from exc | |
| url = f"{_endpoint_to_url(endpoint)}/bus/v1/call" | |
| payload = { | |
| "capability": req.capability, | |
| "version": f"{req.version_req[0]}.{req.version_req[1]}", | |
| "params": req.body.get("params", {}), | |
| "input": req.body.get("input", {}), | |
| } | |
| try: | |
| async with httpx.AsyncClient(timeout=30.0) as client: | |
| resp = await client.post(url, json=payload) | |
| resp.raise_for_status() | |
| data = resp.json() | |
| except httpx.HTTPStatusError as exc: | |
| raise BusError("http_error", f"{url} -> {exc}") from exc | |
| except Exception as exc: | |
| raise BusError("partition", f"{url} unreachable: {exc}") from exc | |
| # The remote may signal a typed error in-band. | |
| if isinstance(data, dict) and "error" in data and "output" not in data: | |
| raise BusError(str(data.get("error", "call_error")), str(data.get("message", ""))) | |
| return data | |