Spaces:
Running on Zero
Running on Zero
File size: 23,335 Bytes
6f9a5fd | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392 393 394 395 396 397 398 399 400 401 402 403 404 405 406 407 408 409 410 411 412 413 414 415 416 417 418 419 420 421 422 423 424 425 426 427 428 429 430 431 432 433 434 435 436 437 438 439 440 441 442 443 444 445 446 447 448 449 450 451 452 453 454 455 456 457 458 459 460 461 462 463 464 465 466 467 468 469 470 471 472 473 474 475 476 477 478 479 480 481 482 483 484 485 486 487 488 489 490 491 492 493 494 495 496 497 498 499 500 501 502 503 504 505 506 507 508 509 510 511 512 513 514 515 516 517 518 519 520 521 522 523 524 525 526 527 528 529 530 531 532 533 534 535 536 537 538 539 540 541 542 543 544 545 546 547 548 549 550 551 552 553 554 555 556 557 558 559 560 561 562 563 564 565 566 567 568 569 570 571 572 573 574 575 576 577 578 579 580 581 582 583 584 585 586 587 588 589 590 591 592 593 594 595 596 597 598 599 600 601 602 603 604 605 606 607 608 609 610 611 612 613 614 615 616 617 618 619 620 621 622 623 624 625 626 627 628 629 630 631 632 633 | # M03 β Capability Bus
**Spec version:** v1.0
**Depends on:** M01 (identity), X01 (transport, for outbound calls), X03 (observability), X04 (config), M02 (discovery, for peer events)
**Depended on by:** every service (M04, M05, M06, M07, M10, M11), M08 (UI calls capabilities via bus), M09 (registers nothing but reads peer state via bus), M12 (CLI inspects bus state)
This is **the** integration point. Re-read [00-OVERVIEW Β§2](../00-OVERVIEW.md) before changing this file.
---
## 1. Responsibility
- Maintain a registry of **capabilities**: local (offered by us) and remote (offered by other nodes)
- Validate every call against a JSON schema declared by the capability descriptor
- Route every call to the best provider via a scoring algorithm
- Track per-(node, capability) health and quarantine misbehaving providers
- Enforce per-capability concurrency limits and per-peer rate budgets (delegating to X01)
- Emit structured trace events for every call (via X03)
- Provide sticky routing for multi-turn capabilities (chat)
What the bus does **not** do:
- It does not move bytes (X01 does)
- It does not persist anything (X02 does, for events)
- It does not know any service's internals (services register themselves)
---
## 2. File layout
```
hearthnet/bus/
βββ __init__.py # exports: CapabilityBus, CapabilityDescriptor, CapabilityEntry
βββ capability.py # dataclasses: CapabilityDescriptor, CapabilityEntry, RouteRequest
βββ registry.py # Registry: local + remote capability index
βββ router.py # Router: scoring algorithm
βββ health.py # HealthTracker: rolling-window per-(node, cap) stats
βββ schema.py # SchemaValidator + schema_hash computation
βββ trace.py # TraceHook: emit standardised trace events
```
---
## 3. Public API
### 3.1 `capability.py`
```python
# hearthnet/bus/capability.py
from dataclasses import dataclass, field
from typing import Any, AsyncIterator, Callable, Awaitable
CapabilityName = str # "llm.chat"
Version = tuple[int, int] # (1, 0)
# --- Descriptor: the full capability spec, used at registration ---
@dataclass(frozen=True)
class CapabilityDescriptor:
"""The complete contract for one capability offered by one service.
Registered exactly once per (service, capability)."""
name: CapabilityName
version: Version # (1, 0)
stability: str # "stable" | "beta" | "experimental"
request_schema: dict # JSON Schema
response_schema: dict | None # JSON Schema; None if pure stream
stream_schema: dict | None # JSON Schema for stream frames; None if non-streaming
params: dict[str, Any] # capability-instance params (e.g. {"model": "..."})
max_concurrent: int # per-node limit
trust_required: str # "member" | "trusted" | "anchor" | "self"
timeout_seconds: int # default deadline for this capability
idempotent: bool
@property
def version_str(self) -> str:
"""E.g. '1.0'"""
def schema_hash(self) -> str:
"""BLAKE3 over canonical-JSON of {name, version, request_schema, response_schema, stream_schema}.
Prefixed 'blake3:'. See CONTRACT Β§11."""
# --- Entry: the bus's record of one capability instance ---
@dataclass
class CapabilityEntry:
"""The bus's view of one (node, capability) pair.
For local capabilities, node_id == self.node_id and handler != None."""
node_id: str # full form
descriptor: CapabilityDescriptor
is_local: bool
handler: Callable | None # only for local entries
endpoint: Endpoint | None # only for remote entries
in_flight: int = 0
last_seen: float = 0.0 # monotonic seconds
p50_latency_ms: float = 0.0
p99_latency_ms: float = 0.0
success_rate: float = 1.0 # rolling HEALTH_WINDOW_CALLS
quarantined_until: float = 0.0
sticky_sessions: set[str] = field(default_factory=set)
# --- Internal request envelope ---
@dataclass(frozen=True)
class RouteRequest:
"""An inbound call after signature/membership check.
Constructed by the transport layer before being handed to the bus."""
capability: CapabilityName
version_req: Version # the *requested* major+minimum minor
body: dict # the JSON body (params + input)
caller: str # full NodeID
trace_id: str
session_id: str | None # for sticky routing
deadline_ms: int # absolute monotonic ms
stream: bool # caller-requested
```
### 3.2 `registry.py`
```python
# hearthnet/bus/registry.py
class Registry:
"""Holds CapabilityEntry instances keyed by (node_id, name, version).
Thread-safe via asyncio.Lock."""
def __init__(self, our_node_id: str): ...
# -- local registration --
def register_local(
self,
descriptor: CapabilityDescriptor,
handler: Callable[[RouteRequest], Awaitable[dict] | AsyncIterator[dict]],
) -> None:
"""Register a capability our process offers. Idempotent if descriptor unchanged.
Raises BusError('schema_invalid') if request/response schemas don't validate against JSON Schema meta-schema.
Raises BusError('namespace_violation') if name is outside this service's declared prefix."""
def deregister_local(self, name: CapabilityName, version: Version) -> None: ...
# -- remote sync (driven by M02 peer events + the periodic manifest fetch) --
def update_from_peer_manifest(self, peer: PeerRecord, manifest: NodeManifest) -> Diff:
"""Compare offered capabilities to existing entries; add/remove as needed.
Returns a Diff describing changes."""
def remove_peer(self, node_id: str) -> int:
"""Remove all entries for a peer. Returns count removed."""
# -- queries --
def find(
self,
name: CapabilityName,
version_req: Version,
params_filter: Callable[[dict], bool] | None = None,
) -> list[CapabilityEntry]:
"""Return all entries matching name and version compatibility.
See CONTRACT Β§2.1 for compatibility rules."""
def entry(self, node_id: str, name: CapabilityName, version: Version) -> CapabilityEntry | None: ...
def all_local(self) -> list[CapabilityEntry]: ...
def all(self) -> list[CapabilityEntry]: ...
# -- subscriptions --
def subscribe(self) -> AsyncIterator[RegistryEvent]:
"""Yield 'added' / 'removed' / 'updated' events. Used by UI topology viz."""
@dataclass(frozen=True)
class Diff:
added: list[CapabilityEntry]
removed: list[CapabilityEntry]
updated: list[CapabilityEntry]
@dataclass(frozen=True)
class RegistryEvent:
kind: str # "added" | "removed" | "updated"
entry: CapabilityEntry
```
### 3.3 `health.py`
```python
# hearthnet/bus/health.py
class HealthTracker:
"""Rolling-window health stats per (node_id, capability_name, version).
Constant memory: O(nodes Γ capabilities)."""
def __init__(self, window: int = HEALTH_WINDOW_CALLS): ...
def record(self, entry: CapabilityEntry, *, success: bool, latency_ms: float) -> None:
"""Append a sample; recompute p50/p99/success_rate; update entry in place.
Quarantines entry if success_rate drops below HEALTH_QUARANTINE_THRESHOLD."""
def is_quarantined(self, entry: CapabilityEntry) -> bool: ...
def reset(self, entry: CapabilityEntry) -> None:
"""Clear stats β used after quarantine timeout."""
```
Internal: each entry holds a fixed-size ring buffer of `(success, latency_ms)` samples. Old samples drop off as new ones arrive.
### 3.4 `schema.py`
```python
# hearthnet/bus/schema.py
class SchemaValidator:
"""JSON Schema validation, with caching."""
def __init__(self): ...
def validate_request(self, descriptor: CapabilityDescriptor, body: dict) -> None:
"""Raises BusError('schema_mismatch') with expected schema_hash if invalid."""
def validate_response(self, descriptor: CapabilityDescriptor, body: dict) -> None: ...
def validate_stream_frame(self, descriptor: CapabilityDescriptor, frame: dict) -> None: ...
def compute_schema_hash(descriptor_partial: dict) -> str:
"""BLAKE3 over canonical-JSON. See CONTRACT Β§11.
Argument shape:
{
'name': ..., 'version': ...,
'request_schema': {...},
'response_schema': {...} or None,
'stream_schema': {...} or None,
}
Returns 'blake3:<hex>'."""
```
### 3.5 `router.py`
```python
# hearthnet/bus/router.py
class Router:
"""Selects the best CapabilityEntry for a request."""
def __init__(self, registry: Registry, config: BusConfig, our_node_id: str): ...
def route(self, req: RouteRequest) -> CapabilityEntry | None:
"""Return the chosen entry, or None if no candidate is viable.
Candidates must:
- match name and version (CONTRACT Β§2.1)
- pass params_compatible() (capability-specific, see Β§5.5)
- not be quarantined
- have in_flight < max_concurrent
- have last_seen within freshness window (60s)
Scoring: see Β§5.4."""
def route_sticky(self, req: RouteRequest) -> CapabilityEntry | None:
"""If req.session_id is bound to an entry, return that entry if still viable.
Otherwise fall back to route() and bind."""
def release_session(self, session_id: str) -> None: ...
```
### 3.6 `trace.py`
```python
# hearthnet/bus/trace.py
@dataclass(frozen=True)
class CallTraceEvent:
ts: str
trace_id: str
capability: CapabilityName
version: str
from_node: str
to_node: str
is_local: bool
result: str # "ok" | error_code
ms: float
tokens_in: int | None # llm.*-specific
tokens_out: int | None
bytes_in: int
bytes_out: int
class TraceHook:
"""Emits trace events to the ring buffer (X03) and Prometheus metrics."""
def __init__(self): ...
def on_call_start(self, req: RouteRequest, entry: CapabilityEntry) -> None: ...
def on_call_end(
self,
req: RouteRequest,
entry: CapabilityEntry,
*,
result: str,
latency_ms: float,
bytes_in: int,
bytes_out: int,
tokens_in: int | None = None,
tokens_out: int | None = None,
) -> None: ...
```
### 3.7 `CapabilityBus` (the facade)
```python
# hearthnet/bus/__init__.py
class CapabilityBus:
"""The integration point. Services register with this; transport dispatches to it."""
def __init__(
self,
node_id_full: str,
community_id: str,
config: BusConfig,
transport_client: HttpClient,
community_manifest_provider: Callable[[], CommunityManifest],
):
self.registry = Registry(our_node_id=node_id_full)
self.health = HealthTracker()
self.schema = SchemaValidator()
self.router = Router(self.registry, config, our_node_id=node_id_full)
self.trace = TraceHook()
self._client = transport_client
...
# --- service-side: registration ---
def register_service(self, service: 'Service') -> None:
"""Calls service.capabilities() and registers each with the local handler."""
def register_capability(
self,
descriptor: CapabilityDescriptor,
handler: Callable[[RouteRequest], Awaitable[dict] | AsyncIterator[dict]],
) -> None:
"""Lower-level alternative when a module has no Service class."""
# --- transport-side: dispatch ---
async def handle_call(self, req: RouteRequest) -> dict | AsyncIterator[dict]:
"""Called by the X01 server after auth.
Decides local vs remote, validates schema, runs the handler or makes a remote call,
records trace + health, returns the payload (or yields frames)."""
# --- caller-side: outbound capability invocation ---
async def call(
self,
capability: CapabilityName,
version_req: Version,
body: dict,
*,
session_id: str | None = None,
timeout_seconds: float | None = None,
) -> dict:
"""Used by services that need to invoke other capabilities (e.g. rag.query calling embed.text).
Goes through the router. If chosen entry is local, runs handler directly.
If remote, uses X01 client. Records trace + health."""
async def stream(
self,
capability: CapabilityName,
version_req: Version,
body: dict,
*,
session_id: str | None = None,
) -> AsyncIterator[Frame]:
"""Streaming version of call()."""
# --- peer integration ---
def on_peer_added(self, peer: PeerRecord) -> None: ...
def on_peer_updated(self, peer: PeerRecord) -> None: ...
def on_peer_removed(self, node_id: str) -> None: ...
# --- introspection (used by UI / CLI) ---
def topology_snapshot(self) -> 'TopologySnapshot': ...
def recent_traces(self, n: int = 50) -> list[CallTraceEvent]: ...
def stats(self) -> dict: ...
@dataclass(frozen=True)
class TopologySnapshot:
our_node_id: str
peers: list[PeerRecord]
capabilities_local: list[CapabilityEntry]
capabilities_remote: list[CapabilityEntry]
in_flight_total: int
class BusError(Exception):
"""code in {schema_invalid, namespace_violation, schema_mismatch, not_found, capacity_exceeded,
quarantined, partition, timeout, internal_error}"""
code: str
```
---
## 4. The Service protocol (consumed from `services/base.py`)
```python
# hearthnet/services/base.py
class Service(Protocol):
"""Implemented by every L4 service module."""
name: str # "llm" | "rag" | "marketplace" | ...
version: str # service version, separate from capability version
def capabilities(self) -> list[tuple[CapabilityDescriptor, Callable]]:
"""Return (descriptor, handler) pairs for each capability this service offers.
Handlers signature: (RouteRequest) -> Awaitable[dict] | AsyncIterator[dict]"""
async def start(self) -> None:
"""Warm up backends, open DBs, etc."""
async def stop(self) -> None:
"""Release resources."""
def health(self) -> dict:
"""Implementation-specific health blob (used by /health)."""
```
---
## 5. Behaviour
### 5.1 Bootstrap
```
node.py creates CapabilityBus
β
node.py instantiates services, calls bus.register_service(s) for each
β
each service.start() runs (model loaded, DB opened, etc.)
β
node.py wires bus.on_peer_added <- PeerRegistry.subscribe()
β
node.py wires X01 server handler to bus.handle_call
β
bus is live
```
### 5.2 Peer event handling
When [M02](M02-discovery.md) emits `PeerEvent("added")` with a verified manifest:
```
bus.on_peer_added(peer)
β registry.update_from_peer_manifest(peer, peer.manifest) β Diff
β for each added entry: emit RegistryEvent('added')
β for each removed entry: emit RegistryEvent('removed')
```
### 5.3 Inbound call lifecycle (`handle_call`)
```
1. transport receives, verifies signature, builds RouteRequest
2. bus.handle_call(req):
a. session_id present?
yes β entry = router.route_sticky(req)
no β entry = router.route(req)
b. entry is None β raise BusError('not_found')
c. enforce trust_required (compare against caller's level in community manifest)
d. validate_request(entry.descriptor, req.body)
e. entry.in_flight >= max_concurrent β raise BusError('capacity_exceeded') with retry_after
f. entry.in_flight += 1
g. trace.on_call_start(req, entry)
h. if entry.is_local:
result = await entry.handler(req)
else:
result = await self._client.call(entry.endpoint, ...)
i. validate_response(entry.descriptor, result) (if non-stream)
j. trace.on_call_end(...), health.record(...)
k. entry.in_flight -= 1
l. return result
```
For streaming: same flow but `result` is an async iterator; validation is per-frame; final telemetry recorded on `done` or `error`.
### 5.4 Routing algorithm (Router.route)
```python
def route(req):
candidates = registry.find(req.capability, req.version_req)
candidates = [
e for e in candidates
if not health.is_quarantined(e)
and e.in_flight < e.descriptor.max_concurrent
and (e.is_local or e.last_seen > monotonic() - 60)
and params_compatible(e.descriptor.params, req.body.get("params", {}))
]
if not candidates:
return None
if config.prefer_local:
local = [e for e in candidates if e.is_local]
if local and local[0].in_flight / max(local[0].descriptor.max_concurrent, 1) < config.local_load_threshold:
return local[0]
return min(candidates, key=score)
def score(e):
latency = e.p50_latency_ms if e.p50_latency_ms > 0 else 500.0 # unknown β assume 500ms
load = e.in_flight / max(e.descriptor.max_concurrent, 1)
reliability_penalty = (1 - e.success_rate) * 1000
locality_bonus = -50 if e.is_local else 0
return latency * (1 + load) + reliability_penalty + locality_bonus
```
### 5.5 `params_compatible` (per-capability)
The bus alone cannot know which `params` matter for compatibility (model name for `llm.chat`, corpus for `rag.query`). Services register a `params_compatible` predicate alongside their descriptor:
```python
# In service's capabilities() return value, tuple is actually:
# (descriptor, handler, params_compatible)
# where params_compatible: Callable[[dict, dict], bool]
# args: (offered_params, requested_params) β True if requested can be served
```
Default predicate: `lambda offered, requested: True` (any-matches-any). LLM service overrides to check model/quant/ctx; RAG overrides to check corpus name; etc. Documented per-service in M04 / M05 / etc.
### 5.6 Sticky routing
For multi-turn capabilities (`llm.chat` continuations, future `chat.thread`):
- Caller passes `session_id`
- First call: router picks an entry, records `entry.sticky_sessions.add(session_id)`
- Subsequent calls with same `session_id`: router returns same entry if still viable
- TTL: 10 minutes idle (driven by a background sweeper)
- On entry removal (peer left): session unbinds; next call gets a new entry; caller MAY observe context loss (capability handler returns `bad_request` if context required)
### 5.7 Quarantine
```
health.record(entry, success, latency):
append to ring buffer
recompute success_rate over last HEALTH_WINDOW_CALLS samples (or fewer if young)
if success_rate < HEALTH_QUARANTINE_THRESHOLD:
entry.quarantined_until = monotonic() + HEALTH_QUARANTINE_SECONDS
log.warning("quarantined", capability, node_id, success_rate)
metrics.counter("hearthnet_quarantines_total").inc()
router.route considers quarantined_until <= monotonic() before including.
```
After quarantine timeout, the next call is a "probe": if it succeeds, history resets; if it fails, immediate re-quarantine.
### 5.8 Outbound `call` from a service
```
service code: await bus.call("embed.text", (1,0), {"params": {...}, "input": {...}})
β router.route(...) β entry
β if entry.is_local: dispatch to handler directly (no HTTP roundtrip)
β else: client.call(entry.endpoint, ...)
```
Local short-circuit is the reason a service in the same process can use the bus without paying network cost.
---
## 6. Errors (BusError code mapping)
| BusError code | Wire `ErrorCode` | HTTP status |
|---------------|-----------------|-------------|
| `schema_invalid` | (raised at registration; never on wire) | β |
| `namespace_violation` | (raised at registration) | β |
| `schema_mismatch` | `schema_mismatch` | 400 |
| `not_found` | `not_found` | 404 |
| `capacity_exceeded` | `capacity_exceeded` | 429 |
| `quarantined` | `partition` | 503 |
| `partition` | `partition` | 503 |
| `timeout` | `timeout` | 408 |
| `internal_error` | `internal_error` | 500 |
Plus auth-level errors raised before the bus sees the request: `unauthorized`, `revoked`, `invalid_signature`, `expired`, `rate_limited` β see [X01 Β§3.3](../cross-cutting/X01-transport.md).
---
## 7. Configuration
From [X04](../cross-cutting/X04-config.md):
```python
config.bus.prefer_local
config.bus.local_load_threshold
```
Constants used: `HEALTH_WINDOW_CALLS`, `HEALTH_QUARANTINE_THRESHOLD`, `HEALTH_QUARANTINE_SECONDS`.
---
## 8. Tests
### Unit
- `test_register_local_validates_descriptor`
- `test_register_local_namespace_violation_raises`
- `test_schema_hash_stable_across_runs`
- `test_schema_hash_changes_on_schema_change`
- `test_router_prefers_local_when_underloaded`
- `test_router_prefers_remote_when_local_overloaded`
- `test_router_skips_quarantined`
- `test_router_breaks_ties_by_latency_then_reliability`
- `test_health_quarantines_at_threshold`
- `test_health_resets_after_quarantine_window`
- `test_params_compatible_predicate_invoked`
- `test_sticky_session_binds_then_unbinds_on_peer_removal`
- `test_in_flight_decrements_on_handler_exception`
### Integration
- `test_two_nodes_route_to_each_other` β two in-process buses, one registers `llm.chat`, the other calls
- `test_three_nodes_load_balance` β three providers, 100 calls, distribution within 30% of even
- `test_quarantine_after_chaos` β fault inject one provider, observe quarantine then recovery
- `test_streaming_call_records_full_trace`
- `test_sticky_routing_preserves_session_across_calls`
---
## 9. Cross-references
| What | Where |
|------|-------|
| Capability descriptor concept | [CONTRACT Β§2, Β§11](../CAPABILITY_CONTRACT.md) |
| Schema hash computation | [CONTRACT Β§11](../CAPABILITY_CONTRACT.md) |
| Version compatibility | [CONTRACT Β§2.1](../CAPABILITY_CONTRACT.md) |
| Wire-level error codes | [CONTRACT Β§9](../CAPABILITY_CONTRACT.md) |
| Transport dispatch | [X01 Β§3.3](../cross-cutting/X01-transport.md) |
| Peer registry | [M02 Β§3.1](M02-discovery.md) |
| Trace ring buffer | [X03 Β§5](../cross-cutting/X03-observability.md) |
| Service protocol consumers | [M04](M04-llm.md), [M05](M05-rag.md), [M06](M06-marketplace.md), [M07](M07-file-blobs.md), [M10](M10-chat.md), [M11](M11-embedding.md) |
| UI consumes topology_snapshot | [M08 Β§3.2](M08-ui.md) |
---
## 10. Open questions
1. **Per-capability params predicate registration** β currently a third tuple element. Cleaner alternative: store as method on a `CapabilitySpec` subclass. Decide before M04 lands.
2. **Sticky session TTL** β fixed 10 minutes? Or per-capability declared? MVP: fixed. Phase 2: declared.
3. **Load balancing fairness** β current scorer is greedy. Should we add a small random jitter to avoid herd-on-fastest? MVP: no. If we see herd in tests, add Ξ΅-noise.
4. **Schema cache invalidation** β currently keyed by `schema_hash`. Implicit invalidation on hash change. Should be sufficient.
|