Spaces:
Running on Zero
A newer version of the Gradio SDK is available: 6.19.0
M10 β Chat Service
Spec version: v1.0 Depends on: M01 (identity, for signing), M03 (bus), X02 (events), X04 (config), X03 (observability), M07 (attachments via blobs) Depended on by: M08 (UI chat tab)
1. Responsibility
Provide chat.send and chat.history capabilities. Handle direct-message delivery: directly if recipient is online; via store-and-forward through an anchor if offline. Maintain a per-peer chat view materialised from chat.message.* events.
E2E encryption between users is Phase 2 (CONTRACT Β§12 open question 1). MVP relies on TLS-in-transit + signed-at-rest within a trusted community.
2. File layout
hearthnet/services/chat/
βββ __init__.py
βββ service.py # ChatService
βββ delivery.py # DeliveryManager: direct vs store-and-forward
βββ views.py # ChatView: MaterialisedView
3. Public API
3.1 views.py
# hearthnet/services/chat/views.py
@dataclass(frozen=True)
class ChatMessage:
event_id: str
lamport: int
sender: str # NodeID full form
recipient: str
body: str
attachments: list[dict] # [{cid, name}]
created_at: str
delivered_at: str | None
read_at: str | None
class ChatView:
"""MaterialisedView from chat.message.sent / .delivered / .read events."""
def __init__(self, our_node_id_full: str):
...
# MaterialisedView protocol:
def reset(self) -> None: ...
def apply(self, event: Event) -> None: ...
def snapshot_state(self) -> dict: ...
def restore_state(self, state: dict) -> None: ...
# queries:
def history_with(
self,
peer: str | None = None,
*,
since_lamport: int = 0,
limit: int = 200,
) -> list[ChatMessage]: ...
def peers(self) -> list[str]:
"""All NodeIDs we have exchanged messages with."""
def unread_count(self, peer: str) -> int: ...
3.2 delivery.py
# hearthnet/services/chat/delivery.py
class DeliveryManager:
"""Decides direct vs store-and-forward; performs delivery attempts."""
def __init__(
self,
bus: CapabilityBus,
event_log: EventLog,
author_kp: KeyPair,
peer_registry: PeerRegistry,
config: ChatConfig,
):
...
async def deliver(self, message_event: Event) -> str:
"""Attempt delivery.
Returns: 'direct' | 'forwarded' | 'queued'.
Strategy:
1. Look up recipient in peer_registry.
2. If online and reachable: push via pubsub topic chat.message.<recipient_short>.
3. Else: pick 2 anchors with chat.store_and_forward capability (Phase 2),
call them with the encrypted-blob carrying message.
Fall back to leaving it in our log for eventual sync.
4. Mark with method."""
async def on_local_message_arrived(self, message_event: Event) -> None:
"""When we receive a chat.message.sent event addressed to us:
- emit pubsub chat.message.<our_short_id> for UI
- append chat.message.delivered event"""
async def on_pubsub_message(self, payload: dict) -> None:
"""When the pubsub topic delivers a message to us, process it
(which may include appending the event to our log if not already there)."""
3.3 service.py
# hearthnet/services/chat/service.py
class ChatService:
name = "chat"
version = "1.0"
def __init__(
self,
config: ChatConfig,
bus: CapabilityBus,
event_log: EventLog,
replay_engine: ReplayEngine,
peer_registry: PeerRegistry,
author_kp: KeyPair,
our_node_id_full: str,
):
self.view = ChatView(our_node_id_full=our_node_id_full)
replay_engine.register(
"chat",
self.view,
event_types=["chat.message.sent", "chat.message.delivered", "chat.message.read"],
)
self.delivery = DeliveryManager(bus, event_log, author_kp, peer_registry, config)
def capabilities(self) -> list[tuple[CapabilityDescriptor, Callable, ParamsPredicate]]:
"""Registers: chat.send, chat.history."""
async def start(self) -> None:
"""Replay; subscribe to pubsub topic chat.message.<our_short_id>."""
async def stop(self) -> None: ...
def health(self) -> dict: ...
# --- handlers ---
async def handle_send(self, req: RouteRequest) -> dict:
"""CONTRACT Β§4.15.
1. Idempotency by (author, client_id).
2. Append chat.message.sent event.
3. DeliveryManager.deliver(event) β returns delivery method.
4. Return {event_id, lamport, delivered}."""
async def handle_history(self, req: RouteRequest) -> dict:
"""CONTRACT Β§4.16. Self-only: refuses calls where caller != our node_id."""
3.4 Capability descriptors
descriptor_send = CapabilityDescriptor(
name="chat.send", version=(1, 0), stability="stable",
request_schema={...}, response_schema={...}, stream_schema=None,
params={}, max_concurrent=8,
trust_required="member", timeout_seconds=15, idempotent=True,
)
descriptor_history = CapabilityDescriptor(
name="chat.history", version=(1, 0), stability="stable",
request_schema={...}, response_schema={...}, stream_schema=None,
params={}, max_concurrent=8,
trust_required="self", # the bus enforces caller == our_node_id
timeout_seconds=5, idempotent=True,
)
The trust_required="self" is a new level introduced by chat. The bus interprets it as: only the local UI calling through localhost may invoke this capability. Remote callers receive unauthorized.
4. Behaviour
4.1 Send β delivery sequence
UI β bus.call("chat.send", (1,0), {input: {client_id, recipient, body, attachments}})
β ChatService.handle_send
β idempotency check
β event_log.append_local("chat.message.sent", data, author_kp)
β³ this fans out to ReplayEngine which calls ChatView.apply()
β DeliveryManager.deliver(event)
β³ if recipient online:
β publish to pubsub topic chat.message.<recipient_short>
β³ else (Phase 2):
β bus.call("chat.forward", ...) on two anchors with the capability
β³ else:
β noop, will sync via X02 eventually
β return {event_id, lamport, delivered: "direct"|"forwarded"|"queued"}
4.2 Receive sequence
pubsub topic chat.message.<our_short_id> fires with event payload
β DeliveryManager.on_pubsub_message
β event_log.append_received(event) (deduplicated by event_id)
β³ ChatView.apply()
β event_log.append_local("chat.message.delivered", {target_event_id}, our_kp)
β³ propagated back to sender via gossip
β emit local notification (UI hook)
4.3 Read receipts (optional)
When the UI scrolls past a message, it appends chat.message.read. If config.chat.read_receipts_enabled = false, the UI doesn't emit it.
4.4 Store-and-forward (Phase 2 stub)
MVP path: if recipient offline, message stays in our log; recipient gets it when they sync. This is fine for community members on the same LAN where everyone gossips.
Phase 2: a separate chat.forward.put@1.0 capability registered by anchors. Sender ships the event to 2 anchors. When recipient reappears, they probe chat.forward.fetch@1.0 against anchors. After successful delivery, anchors drop the cached event.
4.5 Attachments
attachments is a list of {cid, name}. The actual blob is sent separately via M07. The chat event only references the CID. Receivers fetch on demand.
4.6 No self-chat
recipient == our_node_id is rejected with bad_request.
4.7 Group chat (Phase 2)
Reserved chat.thread.* namespace. Out of scope here.
5. Errors
| Condition | Wire code |
|---|---|
| recipient not a community member | not_found |
| caller calling history but not localhost | unauthorized |
| empty body and no attachments | bad_request |
| attachment CID not known to recipient | (silent; recipient fetches via M07 on read) |
6. Configuration
From X04 Β§3:
config.chat.enabled
config.chat.store_and_forward # Phase 2 flag
Phase 2: config.chat.read_receipts_enabled.
7. Tests
Unit
test_send_appends_event_and_returns_idtest_send_idempotent_on_client_idtest_history_rejects_remote_callertest_view_apply_updates_statetest_self_chat_rejected
Integration
test_two_node_direct_deliverytest_recipient_offline_then_online_sees_message_after_synctest_delivered_event_propagates_back_to_sender
8. Cross-references
| What | Where |
|---|---|
chat.* wire |
CONTRACT Β§4.15β4.16 |
| Event types | CONTRACT Β§7.2 |
| Event log | X02 |
| Attachments | M07 |
| Pubsub topics | CONTRACT Β§8, X01 Β§8 |
| UI chat tab | M08 Β§5.3 |
9. Open questions
- E2E encryption β Phase 2. Will use X25519 + ChaCha20-Poly1305. Body encrypted; envelope (sender, recipient, lamport) stays cleartext. Signature still over ciphertext.
- Group chat β Phase 2.
- Voice notes β Phase 2, would use
stt.*for transcript, blob for audio. - Phone notifications β Phase 2, requires the relay tier.