from __future__ import annotations import logging import uuid from datetime import datetime, timezone as _tz from hearthnet.bus.capability import CapabilityDescriptor, RouteRequest UTC = _tz.utc from hearthnet.services.chat.delivery import DeliveryManager _log = logging.getLogger(__name__) from hearthnet.services.chat.views import ChatView class ChatService: name = "chat" version = "1.0" def __init__(self, node_id: str, event_log=None, bus=None) -> None: self._node_id = node_id self._event_log = event_log self._bus = bus self._view = ChatView(node_id) self._delivery = DeliveryManager(bus=bus, our_node_id=node_id) # Backward compat: in-memory messages list self.messages: list[dict] = [] def capabilities(self) -> list[tuple]: return [ ( CapabilityDescriptor(name="chat.send", max_concurrent=8, idempotent=True), self.send, None, ), ( CapabilityDescriptor(name="chat.history", max_concurrent=8, idempotent=True), self.history, None, ), ( CapabilityDescriptor(name="chat.deliver", max_concurrent=8, idempotent=True), self.deliver, None, ), ] async def send(self, req: RouteRequest) -> dict: payload = dict(req.body.get("input", {})) if not payload.get("recipient") and not payload.get("to"): return {"error": "bad_request", "message": "recipient required"} recipient = payload.get("recipient") or payload.get("to", "") if recipient == self._node_id: # Self-message: store locally and return direct status event_id = payload.get("event_id") or f"msg:{uuid.uuid4().hex}" now = datetime.now(UTC).strftime("%Y-%m-%dT%H:%M:%SZ") msg = { "event_id": event_id, "from": self._node_id, "to": self._node_id, "body": payload.get("body", ""), "attachments": payload.get("attachments", []), "sent_at": now, "client_id": payload.get("client_id", event_id), } self.messages.append(msg) if self._view is not None: try: self._view._messages.append(msg) except Exception: pass return {"output": {"event_id": event_id, "lamport": 0, "delivered": "direct"}, "meta": {}} event_id = payload.get("event_id") or f"msg:{uuid.uuid4().hex}" client_id = payload.get("client_id", event_id) now = datetime.now(UTC).strftime("%Y-%m-%dT%H:%M:%SZ") msg_payload = { "to": recipient, "body": payload.get("body", ""), "attachments": payload.get("attachments", []), "sent_at": now, "client_id": client_id, } if self._event_log is not None: try: event = self._event_log.append_local( event_type="chat.message.sent", author=req.caller or self._node_id, payload=msg_payload, ) self._view.apply(event) message = { "event_id": event.event_id, "from": req.caller or self._node_id, "to": recipient, "body": payload.get("body", ""), "attachments": payload.get("attachments", []), "sent_at": now, "client_id": client_id, } delivered = await self._deliver_remote(message) return { "output": { "event_id": event.event_id, "lamport": event.lamport, "delivered": delivered, }, "meta": {}, } except Exception as exc: _log.warning("ChatService.send event_log path failed, falling back to in-memory: %s", exc) # Demo / backward-compat mode message = { "event_id": event_id, "from": req.caller or self._node_id, "to": recipient, "body": payload.get("body", ""), "attachments": payload.get("attachments", []), "sent_at": now, "client_id": client_id, } self.messages.append(message) delivered = await self._deliver_remote(message) return { "output": { "event_id": event_id, "lamport": len(self.messages), "delivered": delivered, }, "meta": {}, } async def _deliver_remote(self, message: dict) -> str: """Push *message* to the recipient node over the transport. Returns ``"delivered"`` when the recipient node acknowledges receipt, else ``"queued"`` (store-and-forward — recipient offline/unreachable). """ recipient = message.get("to", "") if not recipient or recipient == self._node_id: return "direct" bus = self._bus if bus is None or getattr(bus, "transport", None) is None: return "queued" try: inbound = RouteRequest( capability="chat.deliver", version_req=(1, 0), body={"input": dict(message)}, caller=self._node_id, trace_id=uuid.uuid4().hex, ) result = await bus.transport.call(recipient, inbound) if result.get("output", {}).get("status") == "received": return "delivered" return "queued" except Exception: return "queued" async def deliver(self, req: RouteRequest) -> dict: """Inbound delivery from a peer — materialise into our local chat log. Stores into both the backward-compat ``messages`` list and the event-sourced :class:`ChatView` so :meth:`history` returns the message regardless of which mode this node runs in. Idempotent on ``event_id``. """ payload = dict(req.body.get("input", {})) event_id = payload.get("event_id") or f"msg:{uuid.uuid4().hex}" from_node = payload.get("from") or req.caller or "" to_node = payload.get("to") or self._node_id if any(m.get("event_id") == event_id for m in self.messages): return {"output": {"status": "received", "event_id": event_id}, "meta": {}} message = { "event_id": event_id, "from": from_node, "to": to_node, "body": payload.get("body", ""), "attachments": payload.get("attachments", []), } self.messages.append(message) self._view.apply( { "event_type": "chat.message.sent", "event_id": event_id, "author": from_node, "payload": { "to": to_node, "body": payload.get("body", ""), "attachments": payload.get("attachments", []), "sent_at": payload.get("sent_at", ""), "client_id": payload.get("client_id", event_id), }, } ) return {"output": {"status": "received", "event_id": event_id}, "meta": {}} async def history(self, req: RouteRequest) -> dict: peer = req.body.get("input", {}).get("peer") if self._event_log is not None: if peer: msgs = [m.as_dict() for m in self._view.messages_with(peer)] else: msgs = [m.as_dict() for m in self._view.all_messages()] else: msgs = [ m for m in self.messages if peer is None or m.get("from") == peer or m.get("to") == peer ] return {"output": {"messages": msgs}, "meta": {}}