Spaces:
Running on Zero
Running on Zero
| from __future__ import annotations | |
| import logging | |
| import uuid | |
| from datetime import datetime, timezone as _tz | |
| from hearthnet.bus.capability import CapabilityDescriptor, RouteRequest | |
| UTC = _tz.utc | |
| from hearthnet.services.chat.delivery import DeliveryManager | |
| _log = logging.getLogger(__name__) | |
| from hearthnet.services.chat.views import ChatView | |
| class ChatService: | |
| name = "chat" | |
| version = "1.0" | |
| def __init__(self, node_id: str, event_log=None, bus=None) -> None: | |
| self._node_id = node_id | |
| self._event_log = event_log | |
| self._bus = bus | |
| self._view = ChatView(node_id) | |
| self._delivery = DeliveryManager(bus=bus, our_node_id=node_id) | |
| # Backward compat: in-memory messages list | |
| self.messages: list[dict] = [] | |
| def capabilities(self) -> list[tuple]: | |
| return [ | |
| ( | |
| CapabilityDescriptor(name="chat.send", max_concurrent=8, idempotent=True), | |
| self.send, | |
| None, | |
| ), | |
| ( | |
| CapabilityDescriptor(name="chat.history", max_concurrent=8, idempotent=True), | |
| self.history, | |
| None, | |
| ), | |
| ( | |
| CapabilityDescriptor(name="chat.deliver", max_concurrent=8, idempotent=True), | |
| self.deliver, | |
| None, | |
| ), | |
| ] | |
| async def send(self, req: RouteRequest) -> dict: | |
| payload = dict(req.body.get("input", {})) | |
| if not payload.get("recipient") and not payload.get("to"): | |
| return {"error": "bad_request", "message": "recipient required"} | |
| recipient = payload.get("recipient") or payload.get("to", "") | |
| if recipient == self._node_id: | |
| # Self-message: store locally and return direct status | |
| event_id = payload.get("event_id") or f"msg:{uuid.uuid4().hex}" | |
| now = datetime.now(UTC).strftime("%Y-%m-%dT%H:%M:%SZ") | |
| msg = { | |
| "event_id": event_id, | |
| "from": self._node_id, | |
| "to": self._node_id, | |
| "body": payload.get("body", ""), | |
| "attachments": payload.get("attachments", []), | |
| "sent_at": now, | |
| "client_id": payload.get("client_id", event_id), | |
| } | |
| self.messages.append(msg) | |
| if self._view is not None: | |
| try: | |
| self._view._messages.append(msg) | |
| except Exception: | |
| pass | |
| return {"output": {"event_id": event_id, "lamport": 0, "delivered": "direct"}, "meta": {}} | |
| event_id = payload.get("event_id") or f"msg:{uuid.uuid4().hex}" | |
| client_id = payload.get("client_id", event_id) | |
| now = datetime.now(UTC).strftime("%Y-%m-%dT%H:%M:%SZ") | |
| msg_payload = { | |
| "to": recipient, | |
| "body": payload.get("body", ""), | |
| "attachments": payload.get("attachments", []), | |
| "sent_at": now, | |
| "client_id": client_id, | |
| } | |
| if self._event_log is not None: | |
| try: | |
| event = self._event_log.append_local( | |
| event_type="chat.message.sent", | |
| author=req.caller or self._node_id, | |
| payload=msg_payload, | |
| ) | |
| self._view.apply(event) | |
| message = { | |
| "event_id": event.event_id, | |
| "from": req.caller or self._node_id, | |
| "to": recipient, | |
| "body": payload.get("body", ""), | |
| "attachments": payload.get("attachments", []), | |
| "sent_at": now, | |
| "client_id": client_id, | |
| } | |
| delivered = await self._deliver_remote(message) | |
| return { | |
| "output": { | |
| "event_id": event.event_id, | |
| "lamport": event.lamport, | |
| "delivered": delivered, | |
| }, | |
| "meta": {}, | |
| } | |
| except Exception as exc: | |
| _log.warning("ChatService.send event_log path failed, falling back to in-memory: %s", exc) | |
| # Demo / backward-compat mode | |
| message = { | |
| "event_id": event_id, | |
| "from": req.caller or self._node_id, | |
| "to": recipient, | |
| "body": payload.get("body", ""), | |
| "attachments": payload.get("attachments", []), | |
| "sent_at": now, | |
| "client_id": client_id, | |
| } | |
| self.messages.append(message) | |
| delivered = await self._deliver_remote(message) | |
| return { | |
| "output": { | |
| "event_id": event_id, | |
| "lamport": len(self.messages), | |
| "delivered": delivered, | |
| }, | |
| "meta": {}, | |
| } | |
| async def _deliver_remote(self, message: dict) -> str: | |
| """Push *message* to the recipient node over the transport. | |
| Returns ``"delivered"`` when the recipient node acknowledges receipt, | |
| else ``"queued"`` (store-and-forward — recipient offline/unreachable). | |
| """ | |
| recipient = message.get("to", "") | |
| if not recipient or recipient == self._node_id: | |
| return "direct" | |
| bus = self._bus | |
| if bus is None or getattr(bus, "transport", None) is None: | |
| return "queued" | |
| try: | |
| inbound = RouteRequest( | |
| capability="chat.deliver", | |
| version_req=(1, 0), | |
| body={"input": dict(message)}, | |
| caller=self._node_id, | |
| trace_id=uuid.uuid4().hex, | |
| ) | |
| result = await bus.transport.call(recipient, inbound) | |
| if result.get("output", {}).get("status") == "received": | |
| return "delivered" | |
| return "queued" | |
| except Exception: | |
| return "queued" | |
| async def deliver(self, req: RouteRequest) -> dict: | |
| """Inbound delivery from a peer — materialise into our local chat log. | |
| Stores into both the backward-compat ``messages`` list and the | |
| event-sourced :class:`ChatView` so :meth:`history` returns the message | |
| regardless of which mode this node runs in. Idempotent on ``event_id``. | |
| """ | |
| payload = dict(req.body.get("input", {})) | |
| event_id = payload.get("event_id") or f"msg:{uuid.uuid4().hex}" | |
| from_node = payload.get("from") or req.caller or "" | |
| to_node = payload.get("to") or self._node_id | |
| if any(m.get("event_id") == event_id for m in self.messages): | |
| return {"output": {"status": "received", "event_id": event_id}, "meta": {}} | |
| message = { | |
| "event_id": event_id, | |
| "from": from_node, | |
| "to": to_node, | |
| "body": payload.get("body", ""), | |
| "attachments": payload.get("attachments", []), | |
| } | |
| self.messages.append(message) | |
| self._view.apply( | |
| { | |
| "event_type": "chat.message.sent", | |
| "event_id": event_id, | |
| "author": from_node, | |
| "payload": { | |
| "to": to_node, | |
| "body": payload.get("body", ""), | |
| "attachments": payload.get("attachments", []), | |
| "sent_at": payload.get("sent_at", ""), | |
| "client_id": payload.get("client_id", event_id), | |
| }, | |
| } | |
| ) | |
| return {"output": {"status": "received", "event_id": event_id}, "meta": {}} | |
| async def history(self, req: RouteRequest) -> dict: | |
| peer = req.body.get("input", {}).get("peer") | |
| if self._event_log is not None: | |
| if peer: | |
| msgs = [m.as_dict() for m in self._view.messages_with(peer)] | |
| else: | |
| msgs = [m.as_dict() for m in self._view.all_messages()] | |
| else: | |
| msgs = [ | |
| m | |
| for m in self.messages | |
| if peer is None or m.get("from") == peer or m.get("to") == peer | |
| ] | |
| return {"output": {"messages": msgs}, "meta": {}} | |