Spaces:
Running on Zero
A newer version of the Gradio SDK is available: 6.19.0
X01 β Transport
Spec version: v1.0 Depends on: M01 (identity), X04 (config), X03 (observability), FastAPI, uvicorn, httpx Depended on by: M03 (bus), M02 (discovery, indirectly), X02 (sync endpoints)
1. Responsibility
Moves bytes between nodes over HTTP/1.1+TLS. Provides:
- An HTTP server hosting the bus, pubsub, sync, and metrics endpoints
- An HTTP client with TLS pinning and signature signing
- Server-Sent Events (SSE) for streaming
- Backpressure on streams (window-based)
- Rate limiting per (peer, capability)
- Self-signed TLS cert generation and pinning
This module knows nothing about what it transports. It dispatches calls to the bus, which routes to services.
2. File layout
hearthnet/transport/
βββ __init__.py
βββ server.py # FastAPI app factory + lifecycle
βββ client.py # HttpClient: signed requests, TLS pinning
βββ streams.py # SSE writer/reader, frame parsing
βββ backpressure.py # Window-based flow control
βββ tls.py # Cert load/generate, peer pinning store
3. HTTP server
3.1 Public API
# hearthnet/transport/server.py
from fastapi import FastAPI
from typing import Awaitable, Callable
class HttpServer:
def __init__(
self,
config: TransportConfig,
kp: KeyPair,
bus: 'CapabilityBus',
event_sync: 'SyncServer',
community_manifest_provider: Callable[[], CommunityManifest],
):
...
def app(self) -> FastAPI:
"""The configured FastAPI app. Used by tests."""
async def run(self) -> None:
"""Block, serving until cancelled."""
async def shutdown(self) -> None: ...
3.2 Endpoints (mounted)
| Route | Method | Purpose | Handler |
|---|---|---|---|
/bus/v1/call |
POST | Capability call (sync or stream) | bus dispatch |
/manifest |
GET | Current node manifest JSON | identity |
/community/manifest |
GET | Current community manifest | identity |
/sync/v1/heads |
GET | Sync heads | X02 |
/sync/v1/events |
POST | Sync delta | X02 |
/pubsub/v1/subscribe |
GET (long-poll) | Topic subscription | bus pubsub |
/health |
GET | Liveness | observability |
/ready |
GET | Readiness (β₯1 capability + β₯1 peer) | observability |
/metrics |
GET | Prometheus | observability |
/trace/recent |
GET | Last N traces (JSON) | observability |
/mobile/* for the mobile web client is mounted by M08 outside transport's concern.
3.3 Request lifecycle
HTTP request arrives
β
extract X-HearthNet-* headers
β
verify signature (M01) using X-HearthNet-From
β
check author is a community member (community manifest)
β
attach trace (X03) from X-HearthNet-Request-Id
β
rate-limit check (this module)
β
dispatch to bus.handle_call(capability, version, body, caller)
β
return response (or stream)
Failures at each stage emit a typed error matching CONTRACT Β§9.
4. TLS
4.1 Cert generation
On first run, generate a self-signed X.509 certificate with:
- Subject CN =
<short_node_id>.hearthnet.local - SAN =
IP:<config.transport.host>if not 0.0.0.0, else all interfaces - Public key derived from the device Ed25519 key (Ed25519 is a TLS 1.3 signature algorithm)
- Valid for 10 years (covers normal device life)
Cert + key persisted at <DATA>/tls/server.crt and <DATA>/tls/server.key.
4.2 Pinning store
# hearthnet/transport/tls.py
class PinnedCerts:
"""Stores the first-seen TLS cert fingerprint per NodeID.
Mismatches on subsequent connections raise a warning and refuse the connection."""
def __init__(self, db_path: Path): ...
def record(self, node_id: str, fingerprint: bytes) -> None: ...
def expected(self, node_id: str) -> bytes | None: ...
def verify(self, node_id: str, presented: bytes) -> bool: ...
4.3 Hostname verification
Disabled. We pin to NodeID, not DNS. Peers are referenced by IP+port from manifests.
5. HTTP client
5.1 Public API
# hearthnet/transport/client.py
class HttpClient:
def __init__(
self,
kp: KeyPair,
node_id: str,
community_id: str,
pinned_certs: PinnedCerts,
timeout_default_seconds: float = RPC_DEFAULT_TIMEOUT_SECONDS,
):
...
async def call(
self,
peer: Endpoint,
capability: str,
version: str,
body: dict,
*,
trace_id: str | None = None,
timeout_seconds: float | None = None,
) -> dict:
"""Sync RPC. Signs request, opens TLS connection (or reuses pinned),
sends, awaits response, verifies response signature if present,
returns body. Raises CallError on transport / protocol failure."""
async def stream(
self,
peer: Endpoint,
capability: str,
version: str,
body: dict,
*,
trace_id: str | None = None,
cancel: asyncio.Event | None = None,
) -> AsyncIterator[Frame]:
"""Open SSE stream. Yields Frame objects (event_name + data dict).
Honours backpressure: sends ACK frames automatically.
On cancel: closes connection, server aborts within 200ms."""
async def close(self) -> None: ...
class CallError(Exception):
code: ErrorCode
message: str
retry_after_ms: int | None
alt_capabilities: list[str]
alt_nodes: list[str]
5.2 Connection management
- One
httpx.AsyncClientper peer, reused across calls - Idle timeout
CONNECTION_IDLE_SECONDS(60s), then close - On disconnect, lazy reconnect on next call
- Reconnect backoff: exponential, cap
RECONNECT_BACKOFF_CAP_SECONDS(30s)
5.3 Signing
For every outbound request, the client constructs:
envelope = {
"capability": capability,
"version": version,
"request_id": trace_id or new_ulid(),
"from": node_id_full,
"community": community_id,
"timestamp": rfc3339_now(),
"body": body,
}
sig = kp.sign_bytes(canonical_json(envelope))
Headers set: X-HearthNet-Capability, -Version, -Request-Id, -From, -Community, -Timestamp, -Signature.
6. Streaming
6.1 SSE writer (server)
# hearthnet/transport/streams.py
class SseWriter:
def __init__(self, response: StreamingResponse): ...
async def emit(self, event: str, data: dict) -> None: ...
async def emit_token(self, token: dict) -> None: ... # convenience
async def emit_progress(self, current: int, total: int, stage: str) -> None: ...
async def emit_error(self, code: ErrorCode, **kwargs) -> None: ...
async def emit_done(self, **meta) -> None: ...
async def emit_ack(self, upto: int) -> None: ...
@property
def cancelled(self) -> bool: ...
6.2 SSE reader (client)
class SseReader:
async def __aiter__(self) -> AsyncIterator[Frame]: ...
async def cancel(self) -> None: ...
@dataclass(frozen=True)
class Frame:
event: str # "token", "chunk", "progress", "ack", "done", "error", "manifest", "ready", ...
data: dict
seq: int # local sequence number for backpressure
6.3 Backpressure
# hearthnet/transport/backpressure.py
class FlowControl:
"""Window-based flow control for one stream."""
def __init__(self, window: int = STREAM_WINDOW_FRAMES, ack_interval: int = STREAM_ACK_INTERVAL_FRAMES):
...
@property
def window_used(self) -> int: ...
def send(self) -> None: ... # call before emitting a frame; blocks (await) when full
def ack(self, upto: int) -> None: ...
@property
def needs_ack(self) -> bool: ... # reader checks this; emits ack frame
6.4 Cancellation
- Client closes the HTTP connection
- Server's request task is cancelled
- Service handler's generator receives
GeneratorExit(or async equivalent) - Service emits final telemetry and exits within 200ms
- A finally block guarantees resources are freed
7. Rate limiting
# hearthnet/transport/__init__.py (or rate_limit.py)
class RateLimiter:
"""Token-bucket per (peer, capability) and per peer total."""
def __init__(self, config: TransportConfig): ...
def check(self, peer_node_id: str, capability: str) -> RateCheck:
"""Returns ok, soft-limited, or hard-limited.
Soft β return 429 with retry_after_ms
Hard β drop without response (logged + counter)."""
@dataclass(frozen=True)
class RateCheck:
allowed: bool
soft_exceeded: bool
retry_after_ms: int
Limits from constants:
RATE_LIMIT_SOFT_RPS_PER_CAP = 10RATE_LIMIT_HARD_RPS_PER_CAP = 100RATE_LIMIT_SOFT_RPS_TOTAL = 100RATE_LIMIT_HARD_RPS_TOTAL = 1000
8. Pub-sub (long-poll)
# hearthnet/transport/server.py (sub-component)
class PubSubServer:
"""In-memory topic registry; long-poll subscribers."""
async def publish(self, topic: str, payload: dict) -> None: ...
async def subscribe(self, topic: str, *, last_seq: int = 0, timeout_seconds: float = 30) -> dict:
"""Long-poll: returns next message or {timeout: true} after timeout_seconds."""
/pubsub/v1/subscribe?topic=marketplace.post.created&last_seq=0&timeout=30
WebSocket variant deferred to Phase 2.
9. Errors
Server returns errors in the format of CONTRACT Β§5.4. Client raises CallError carrying the same code.
10. Configuration
From X04:
config.transport.host
config.transport.port
config.transport.tls_cert # optional override
config.transport.tls_key # optional override
Constants: STREAM_WINDOW_FRAMES, STREAM_ACK_INTERVAL_FRAMES, STREAM_ACK_TIMEOUT_SECONDS, RPC_DEFAULT_TIMEOUT_SECONDS, CONNECTION_IDLE_SECONDS, RECONNECT_BACKOFF_CAP_SECONDS, RATE_LIMIT_*.
11. Tests
Unit
test_request_signing_roundtripβ sign on client, verify on servertest_envelope_canonicalisationβ same input β same signaturetest_sse_frame_formatβ parses both waystest_flow_control_blocks_when_fullβsend()awaits until ack arrivestest_rate_limit_soft_then_hardtest_tls_pinning_first_seen_then_mismatch
Integration
test_two_node_call_roundtriptest_stream_with_cancellationtest_concurrent_streams_share_connectiontest_chaos_packet_loss_30pct(usingtc)
12. Cross-references
| What | Where |
|---|---|
| Wire format | CONTRACT Β§5 |
| Signing rules | CONTRACT Β§10, M01 Β§3.1 |
| Bus dispatch | M03 |
| Sync endpoints | X02 Β§6 |
| Pub-sub topics | CONTRACT Β§8 |
| Mobile UI mount point | M08 |