Spaces:
Running on Zero
Running on Zero
GitHub Actions
feat: impl_ref gaps closed — Nemotron/OpenBMB backends, spec refs, extended UI, HOWTO, multi-node E2E
8514223 | """X01 - FastAPI HTTP Transport Server. | |
| Spec: docs/X01-transport.md §3 | |
| Impl-ref: impl_ref.md §4 | |
| Endpoints: | |
| POST /bus/v1/call - signed capability RPC | |
| GET /manifest - node manifest | |
| GET /community/manifest - community manifest | |
| GET /sync/v1/heads - event log heads | |
| POST /sync/v1/events - receive events from peers | |
| GET /pubsub/v1/subscribe - SSE pub-sub stream | |
| GET /ws/pubsub/v1/{topic} - WebSocket pub-sub | |
| GET /health - liveness | |
| GET /ready - readiness | |
| GET /metrics - Prometheus metrics | |
| GET /trace/recent - recent bus traces | |
| """ | |
| from __future__ import annotations | |
| import asyncio | |
| from datetime import datetime, timezone | |
| from typing import Any, Callable | |
| try: | |
| import uvicorn | |
| from fastapi import FastAPI, HTTPException, Request, Response | |
| from fastapi.responses import JSONResponse, StreamingResponse | |
| HAS_FASTAPI = True | |
| except ImportError: | |
| HAS_FASTAPI = False | |
| def _iso_now() -> str: | |
| return datetime.now(timezone.utc).strftime("%Y-%m-%dT%H:%M:%SZ") | |
| def _parse_version(version_str: str) -> tuple[int, int]: | |
| parts = version_str.split(".") | |
| if len(parts) < 2: | |
| parts.append("0") | |
| return (int(parts[0]), int(parts[1])) | |
| class HttpServer: | |
| def __init__( | |
| self, | |
| bus=None, | |
| node_manifest_fn: Callable[[], dict] | None = None, | |
| community_manifest_fn: Callable[[], dict] | None = None, | |
| sync_server=None, | |
| trace_ring=None, | |
| blob_store=None, | |
| host: str = "0.0.0.0", # nosec B104 — binding to all interfaces is intentional for a LAN-serving node | |
| port: int = 7080, | |
| ): | |
| self._bus = bus | |
| self._node_manifest_fn = node_manifest_fn | |
| self._community_manifest_fn = community_manifest_fn | |
| self._sync_server = sync_server | |
| self._trace_ring = trace_ring | |
| self._blob_store = blob_store | |
| self._host = host | |
| self._port = port | |
| self._server_task: asyncio.Task | None = None | |
| self._uvicorn_server = None | |
| self._app = None | |
| self._ws_pubsub: Any = None # WebsocketPubSub, lazy-initialised | |
| def build_app(self) -> Any: | |
| """Build and return the FastAPI application.""" | |
| if not HAS_FASTAPI: | |
| raise ImportError( | |
| "fastapi and uvicorn are required for HttpServer. " | |
| "Install them with: pip install fastapi uvicorn" | |
| ) | |
| app = FastAPI(title="HearthNet") | |
| async def health(): | |
| return JSONResponse({"status": "ok", "ts": _iso_now()}) | |
| async def ready(): | |
| if self._bus is not None: | |
| try: | |
| caps = self._bus.list_capabilities() | |
| if caps: | |
| return JSONResponse({"status": "ready"}) | |
| except Exception: | |
| pass | |
| raise HTTPException(status_code=503, detail="not_ready") | |
| async def manifest(): | |
| if self._node_manifest_fn is not None: | |
| try: | |
| return JSONResponse(self._node_manifest_fn()) | |
| except Exception as exc: | |
| return JSONResponse({"error": "manifest_error", "message": str(exc)}, status_code=500) | |
| return JSONResponse({"error": "no_manifest"}) | |
| async def community_manifest(): | |
| if self._community_manifest_fn is not None: | |
| try: | |
| return JSONResponse(self._community_manifest_fn()) | |
| except Exception as exc: | |
| return JSONResponse({"error": "manifest_error", "message": str(exc)}, status_code=500) | |
| return JSONResponse({"error": "no_manifest"}) | |
| async def list_capabilities(): | |
| if self._bus is None: | |
| return JSONResponse([]) | |
| try: | |
| caps = self._bus.list_capabilities() | |
| return JSONResponse(caps if isinstance(caps, list) else list(caps)) | |
| except Exception as exc: | |
| return JSONResponse({"error": "bus_error", "message": str(exc)}, status_code=500) | |
| async def bus_call(request: Request): | |
| if self._bus is None: | |
| return JSONResponse({"error": "no_bus", "message": "bus not configured"}, status_code=503) | |
| try: | |
| body = await request.json() | |
| except Exception: | |
| raise HTTPException(status_code=400, detail="invalid_json") | |
| capability = body.get("capability") | |
| version_str = body.get("version", "1.0") | |
| params = body.get("params", {}) | |
| input_data = body.get("input", {}) | |
| stream = body.get("stream", False) | |
| if not capability: | |
| return JSONResponse({"error": "missing_capability", "message": "capability field required"}, status_code=400) | |
| try: | |
| version_tuple = _parse_version(version_str) | |
| except (ValueError, TypeError) as exc: | |
| return JSONResponse({"error": "invalid_version", "message": str(exc)}, status_code=400) | |
| call_body = {"params": params, "input": input_data} | |
| if stream: | |
| from hearthnet.transport.streams import encode_sse_frame | |
| async def _stream_gen(): | |
| try: | |
| result = self._bus.call(capability, version_tuple, call_body) | |
| if hasattr(result, "__aiter__"): | |
| async for chunk in result: | |
| yield encode_sse_frame(chunk) | |
| elif asyncio.iscoroutine(result): | |
| data = await result | |
| yield encode_sse_frame(data) | |
| yield encode_sse_frame({"done": True}, event="done") | |
| else: | |
| yield encode_sse_frame(result) | |
| yield encode_sse_frame({"done": True}, event="done") | |
| except Exception as exc: | |
| yield encode_sse_frame({"error": "call_error", "message": str(exc)}, event="error") | |
| return StreamingResponse(_stream_gen(), media_type="text/event-stream") | |
| try: | |
| result = self._bus.call(capability, version_tuple, call_body) | |
| if asyncio.iscoroutine(result): | |
| result = await result | |
| return JSONResponse(result) | |
| except Exception as exc: | |
| return JSONResponse({"error": "call_error", "message": str(exc)}, status_code=500) | |
| async def trace_recent(n: int = 20): | |
| if self._trace_ring is None: | |
| return JSONResponse([]) | |
| try: | |
| traces = self._trace_ring.recent(n) | |
| return JSONResponse(traces if isinstance(traces, list) else list(traces)) | |
| except Exception as exc: | |
| return JSONResponse({"error": "trace_error", "message": str(exc)}, status_code=500) | |
| async def metrics(): | |
| try: | |
| from hearthnet.observability.metrics import get_prometheus_text | |
| text = get_prometheus_text() | |
| return Response(content=text, media_type="text/plain; version=0.0.4") | |
| except ImportError: | |
| return Response(content="# metrics not available\n", media_type="text/plain") | |
| except Exception as exc: | |
| return Response(content=f"# error: {exc}\n", media_type="text/plain") | |
| async def sync_heads(): | |
| if self._sync_server is None: | |
| return JSONResponse({"error": "no_sync"}) | |
| try: | |
| heads = self._sync_server.heads() | |
| if asyncio.iscoroutine(heads): | |
| heads = await heads | |
| return JSONResponse(heads) | |
| except Exception as exc: | |
| return JSONResponse({"error": "sync_error", "message": str(exc)}, status_code=500) | |
| async def sync_events(request: Request): | |
| if self._sync_server is None: | |
| return JSONResponse({"error": "no_sync"}, status_code=503) | |
| try: | |
| body = await request.json() | |
| result = self._sync_server.serve_events(body) | |
| if asyncio.iscoroutine(result): | |
| result = await result | |
| return JSONResponse(result if result is not None else {"ok": True}) | |
| except Exception as exc: | |
| return JSONResponse({"error": "sync_error", "message": str(exc)}, status_code=500) | |
| async def serve_chunk(chunk_cid: str): | |
| if self._blob_store is None: | |
| raise HTTPException(status_code=503, detail="no_blob_store") | |
| try: | |
| chunk_bytes = self._blob_store.get_chunk(chunk_cid) | |
| if asyncio.iscoroutine(chunk_bytes): | |
| chunk_bytes = await chunk_bytes | |
| if chunk_bytes is None: | |
| raise HTTPException(status_code=404, detail="chunk_not_found") | |
| return Response(content=chunk_bytes, media_type="application/octet-stream") | |
| except HTTPException: | |
| raise | |
| except Exception as exc: | |
| raise HTTPException(status_code=500, detail=str(exc)) from exc | |
| # ── WebSocket pubsub endpoint (X06) ────────────────────────────────── | |
| # Lazy import keeps websocket.py optional — server still works without it. | |
| try: | |
| from hearthnet.transport.websocket import ( # noqa: PLC0415 | |
| WebSocketSession, | |
| WebsocketPubSub, | |
| ) | |
| from fastapi import WebSocket as _WS # noqa: PLC0415 | |
| from starlette.websockets import WebSocketDisconnect as _WSDisc # noqa: PLC0415 | |
| if self._ws_pubsub is None: | |
| self._ws_pubsub = WebsocketPubSub() | |
| _pubsub = self._ws_pubsub | |
| async def ws_pubsub(websocket: _WS, topic: str): | |
| await websocket.accept() | |
| session = WebSocketSession(websocket) | |
| _pubsub.subscribe(topic, session) | |
| try: | |
| while True: | |
| frame = await session.receive_frame() | |
| if frame is None: | |
| break | |
| # Acknowledge ACK frames; ignore others silently | |
| if frame.type == "ack": | |
| up_to = frame.data.get("upto", 0) | |
| await session.send_ack(up_to) | |
| except _WSDisc: | |
| pass | |
| except Exception: | |
| pass | |
| finally: | |
| _pubsub.unsubscribe(topic, session) | |
| await session.close() | |
| except ImportError: | |
| pass # websockets / starlette WS not available; endpoint not registered | |
| self._app = app | |
| return app | |
| async def publish_event(self, topic: str, event: str, data: dict) -> int: | |
| """ | |
| Fan-out *event*/*data* to all WebSocket sessions subscribed to *topic*. | |
| Returns the number of sessions that received the message. | |
| Returns 0 if the WebSocket pubsub is not initialised. | |
| """ | |
| if self._ws_pubsub is None: | |
| return 0 | |
| try: | |
| return await self._ws_pubsub.publish(topic, event, data) | |
| except Exception as exc: | |
| import logging as _logging # noqa: PLC0415 | |
| _logging.getLogger(__name__).warning("HttpServer.publish_event error: %s", exc) | |
| return 0 | |
| async def start(self) -> None: | |
| """Start uvicorn in background task.""" | |
| if not HAS_FASTAPI: | |
| raise ImportError( | |
| "fastapi and uvicorn are required for HttpServer. " | |
| "Install them with: pip install fastapi uvicorn" | |
| ) | |
| if self._app is None: | |
| self.build_app() | |
| config = uvicorn.Config( | |
| app=self._app, | |
| host=self._host, | |
| port=self._port, | |
| log_level="warning", | |
| ) | |
| self._uvicorn_server = uvicorn.Server(config) | |
| self._server_task = asyncio.create_task(self._uvicorn_server.serve()) | |
| async def shutdown(self) -> None: | |
| """Stop uvicorn.""" | |
| if self._uvicorn_server is not None: | |
| self._uvicorn_server.should_exit = True | |
| if self._server_task is not None: | |
| try: | |
| await asyncio.wait_for(self._server_task, timeout=5.0) | |
| except (asyncio.TimeoutError, asyncio.CancelledError): | |
| self._server_task.cancel() | |
| finally: | |
| self._server_task = None | |
| self._uvicorn_server = None | |