GitHub Actions
feat: 15 targeted improvements — RAG persistence, bus failover, agent hardening, deps sync
146edc4
Raw
History Blame
7.17 kB
from __future__ import annotations
import logging
from pathlib import Path
from typing import Any
from hearthnet.bus.capability import CapabilityDescriptor, RouteRequest
from hearthnet.services.rag.store import CorpusStore, list_corpora
_log = logging.getLogger(__name__)
class RagService:
name = "rag"
version = "1.0"
def __init__(
self,
corpus: str = "default",
corpora_dir: Path | None = None,
bus: Any = None,
event_log: Any = None,
blob_store: Any = None,
) -> None:
"""bus: optional CapabilityBus for calling embed.text via bus (preferred).
event_log: optional EventLog to emit rag.document.ingested on ingest.
blob_store: optional BlobStore to persist raw text as BLAKE3 content blob.
corpora_dir: defaults to ~/.hearthnet/corpora (never writes to cwd).
"""
self._corpus = corpus
self._corpora_dir = corpora_dir or (Path.home() / ".hearthnet" / "corpora")
self._bus = bus
self._event_log = event_log
self._blob_store = blob_store
self._store = CorpusStore(self._corpora_dir, corpus)
self._pipeline = None # initialized lazily
def _get_embed_fn(self):
async def embed_via_bus(texts: list[str]) -> list[list[float]]:
if self._bus is not None:
result = await self._bus.call("embed.text", (1, 0), {"input": {"texts": texts}})
return result.get("output", {}).get("embeddings", [[0.0] * 16] * len(texts))
from hearthnet.services.embedding.backends import SimpleHashBackend
backend = SimpleHashBackend()
return await backend.embed(texts)
return embed_via_bus
def capabilities(self) -> list[tuple]:
return [
(
CapabilityDescriptor(
name="rag.query",
params={"corpus": self._corpus},
max_concurrent=4,
idempotent=True,
),
self.handle_query,
self._corpus_matches,
),
(
CapabilityDescriptor(
name="rag.ingest",
params={"corpus": self._corpus},
trust_required="trusted",
idempotent=True,
),
self.handle_ingest,
self._corpus_matches,
),
(
CapabilityDescriptor(
name="rag.list_corpora",
params={},
max_concurrent=8,
idempotent=True,
),
self.handle_list_corpora,
None,
),
]
def _corpus_matches(self, offered: dict, requested: dict) -> bool:
return not requested.get("corpus") or requested.get("corpus") == offered.get("corpus")
async def handle_query(self, req: RouteRequest) -> dict:
query = req.body.get("input", {}).get("query", "")
k = int(req.body.get("input", {}).get("k", 5))
if not query:
return {"output": {"chunks": []}, "meta": {"corpus": self._corpus}}
embed_fn = self._get_embed_fn()
embeddings = await embed_fn([query])
query_vec = embeddings[0]
results = self._store.query(query_vec, k=k)
chunks = [
{
"rank": i + 1,
"score": r.score,
"text": r.chunk.text,
"metadata": r.chunk.metadata,
}
for i, r in enumerate(results)
]
return {"output": {"chunks": chunks}, "meta": {"corpus": self._corpus}}
async def handle_ingest(self, req: RouteRequest) -> dict:
inp = req.body.get("input", {})
# Batch format: {"documents": [{"id": ..., "title": ..., "text": ...}]}
# Dispatches each document as a separate ingest call and returns a summary.
documents = inp.get("documents")
if documents:
batch_results = []
for doc in documents:
single_req = RouteRequest(
capability=req.capability,
version_req=req.version_req,
body={
"input": {
"text": doc.get("text", ""),
"title": doc.get("title", "Untitled"),
"doc_cid": doc.get("id") or doc.get("doc_cid"),
}
},
caller=req.caller,
trace_id=req.trace_id,
)
result = await self.handle_ingest(single_req)
batch_results.append(result.get("output", {}))
return {
"output": {"batch": batch_results, "count": len(batch_results)},
"meta": {"corpus": self._corpus},
}
text = inp.get("text", "")
title = inp.get("title", "Untitled")
doc_cid = inp.get("doc_cid")
if not self._pipeline:
from hearthnet.services.rag.ingest import IngestPipeline
self._pipeline = IngestPipeline(self._store, self._get_embed_fn())
result = await self._pipeline.ingest_text(text, title=title, doc_cid=doc_cid)
# Phase 2: persist raw text as a BLAKE3 content-addressed blob so peers
# can fetch it via TransferManager (M07/BitTorrent).
blob_cid: str | None = None
if not result.was_duplicate and self._blob_store is not None:
try:
manifest = self._blob_store.put(text.encode("utf-8"), filename=title)
blob_cid = manifest.cid
except Exception as exc:
_log.warning("RAG blob_store.put failed for '%s': %s", title, exc)
# Emit rag.document.ingested event so peers learn a new doc exists (X02).
if not result.was_duplicate and self._event_log is not None:
try:
author = self._bus.node_id_full if self._bus is not None else "unknown"
payload: dict = {
"corpus": self._corpus,
"doc_cid": result.doc_cid,
"title": title,
"chunks_indexed": result.chunks_indexed,
}
if blob_cid:
payload["blob_cid"] = blob_cid
self._event_log.append_local(
"rag.document.ingested",
author,
payload,
)
except Exception as exc:
_log.warning("RAG event_log.append_local failed for '%s': %s", title, exc)
return {
"output": {
"doc_cid": result.doc_cid,
"chunks_indexed": result.chunks_indexed,
"was_duplicate": result.was_duplicate,
},
"meta": {"corpus": self._corpus, "ms": result.ms},
}
async def handle_list_corpora(self, req: RouteRequest) -> dict:
names = list_corpora(self._corpora_dir)
return {"output": {"corpora": names}, "meta": {}}