HearthNet-Nemotron / hearthnet /services /chat /thread_service.py
GitHub Actions
fix: easter-egg ticker now properly works in Gradio app
d6ca3a2
Raw
History Blame
9.96 kB
from __future__ import annotations
import contextlib
import time
import uuid
from typing import Any
from hearthnet.bus.capability import CapabilityDescriptor, RouteRequest
from hearthnet.services.chat.thread_views import ThreadViewStore
class ThreadService:
"""Group-chat thread service.
Registers:
chat.thread.create@1.0
chat.thread.send@1.0
chat.thread.history@1.0
chat.thread.invite@1.0
chat.thread.leave@1.0
"""
name = "chat.threads"
def __init__(
self,
node_id: str,
event_log: Any = None,
bus: Any = None,
db_path: str | None = None,
) -> None:
self._node_id = node_id
self._event_log = event_log
self._bus = bus
self._store = ThreadViewStore(db_path=db_path)
# ── Registration ──────────────────────────────────────────────────────────
def capabilities(self) -> list[tuple]:
return [
(
CapabilityDescriptor(name="chat.thread.create", max_concurrent=4, idempotent=False),
self.create_thread,
None,
),
(
CapabilityDescriptor(name="chat.thread.send", max_concurrent=8, idempotent=False),
self.send_message,
None,
),
(
CapabilityDescriptor(name="chat.thread.history", max_concurrent=8, idempotent=True),
self.get_history,
None,
),
(
CapabilityDescriptor(name="chat.thread.invite", max_concurrent=4, idempotent=True),
self.invite_member,
None,
),
(
CapabilityDescriptor(name="chat.thread.leave", max_concurrent=4, idempotent=False),
self.leave_thread,
None,
),
]
def register(self, bus: Any) -> None:
self._bus = bus
for cap, handler, predicate in self.capabilities():
bus.register_local(cap, handler, predicate)
# ── Handlers ──────────────────────────────────────────────────────────────
async def create_thread(self, req: RouteRequest) -> dict:
params: dict = req.body.get("input", {})
name: str = params.get("name", "")
members: list[str] = list(params.get("members", []))
e2e_enabled: bool = bool(params.get("e2e_enabled", False))
caller = req.caller or self._node_id
if caller not in members:
members.append(caller)
thread_id = f"thread:{uuid.uuid4().hex}"
created_at = time.time()
event = {
"event_id": f"evt:{uuid.uuid4().hex}",
"event_type": "chat.thread.created",
"author": caller,
"payload": {
"thread_id": thread_id,
"name": name,
"members": members,
"created_at": created_at,
"e2e_enabled": e2e_enabled,
},
}
if self._event_log is not None:
try:
logged = self._event_log.append_local(
event_type="chat.thread.created",
author=caller,
payload=event["payload"],
)
self._store.apply(
{
"event_id": logged.event_id,
"event_type": "chat.thread.created",
"author": caller,
"payload": event["payload"],
}
)
except Exception:
self._store.apply(event)
else:
self._store.apply(event)
return {"output": {"thread_id": thread_id, "created_at": created_at}, "meta": {}}
async def send_message(self, req: RouteRequest) -> dict:
params: dict = req.body.get("input", {})
thread_id: str | None = params.get("thread_id")
content: str = params.get("content", "")
caller = req.caller or self._node_id
if not thread_id:
return {"error": "bad_request", "message": "thread_id required"}
if not content:
return {"error": "bad_request", "message": "content required"}
# Verify thread exists and caller is a member
thread = self._store.get_thread(thread_id)
if thread is None:
return {"error": "not_found", "message": f"thread {thread_id} not found"}
if caller not in thread.members:
return {"error": "forbidden", "message": "not a member of this thread"}
event_id = f"msg:{uuid.uuid4().hex}"
sent_at = time.time()
event = {
"event_id": event_id,
"event_type": "chat.thread.message.sent",
"author": caller,
"payload": {
"thread_id": thread_id,
"sender": caller,
"content": content,
"sent_at": sent_at,
},
}
if self._event_log is not None:
try:
logged = self._event_log.append_local(
event_type="chat.thread.message.sent",
author=caller,
payload=event["payload"],
)
self._store.apply(
{
"event_id": logged.event_id,
"event_type": "chat.thread.message.sent",
"author": caller,
"payload": event["payload"],
}
)
event_id = logged.event_id
except Exception:
self._store.apply(event)
else:
self._store.apply(event)
return {"output": {"event_id": event_id, "sent_at": sent_at}, "meta": {}}
async def get_history(self, req: RouteRequest) -> dict:
params: dict = req.body.get("input", {})
thread_id: str | None = params.get("thread_id")
since: float | None = params.get("since")
limit: int = int(params.get("limit", 50))
caller = req.caller or self._node_id
if not thread_id:
return {"error": "bad_request", "message": "thread_id required"}
thread = self._store.get_thread(thread_id)
if thread is None:
return {"error": "not_found", "message": f"thread {thread_id} not found"}
if caller not in thread.members:
return {"error": "forbidden", "message": "not a member of this thread"}
messages = self._store.get_messages(thread_id, since=since, limit=limit)
return {
"output": {
"messages": [
{
"event_id": m.event_id,
"thread_id": m.thread_id,
"sender": m.sender,
"content": m.content,
"sent_at": m.sent_at,
"delivered_to": list(m.delivered_to),
}
for m in messages
]
},
"meta": {},
}
async def invite_member(self, req: RouteRequest) -> dict:
params: dict = req.body.get("input", {})
thread_id: str | None = params.get("thread_id")
member_id: str | None = params.get("member_id")
caller = req.caller or self._node_id
if not thread_id or not member_id:
return {"error": "bad_request", "message": "thread_id and member_id required"}
thread = self._store.get_thread(thread_id)
if thread is None:
return {"error": "not_found", "message": f"thread {thread_id} not found"}
if caller not in thread.members:
return {"error": "forbidden", "message": "not a member of this thread"}
event = {
"event_id": f"evt:{uuid.uuid4().hex}",
"event_type": "chat.thread.member.added",
"author": caller,
"payload": {"thread_id": thread_id, "member_id": member_id},
}
if self._event_log is not None:
with contextlib.suppress(Exception):
self._event_log.append_local(
event_type="chat.thread.member.added",
author=caller,
payload=event["payload"],
)
self._store.apply(event)
return {"output": {"success": True}, "meta": {}}
async def leave_thread(self, req: RouteRequest) -> dict:
params: dict = req.body.get("input", {})
thread_id: str | None = params.get("thread_id")
caller = req.caller or self._node_id
if not thread_id:
return {"error": "bad_request", "message": "thread_id required"}
thread = self._store.get_thread(thread_id)
if thread is None:
return {"error": "not_found", "message": f"thread {thread_id} not found"}
if caller not in thread.members:
return {"error": "forbidden", "message": "not a member of this thread"}
event = {
"event_id": f"evt:{uuid.uuid4().hex}",
"event_type": "chat.thread.member.removed",
"author": caller,
"payload": {"thread_id": thread_id, "member_id": caller},
}
if self._event_log is not None:
with contextlib.suppress(Exception):
self._event_log.append_local(
event_type="chat.thread.member.removed",
author=caller,
payload=event["payload"],
)
self._store.apply(event)
return {"output": {"success": True}, "meta": {}}