Spaces:
Running on Zero
Running on Zero
GitHub Actions
feat: 15 targeted improvements — RAG persistence, bus failover, agent hardening, deps sync
146edc4 | from __future__ import annotations | |
| import logging | |
| from pathlib import Path | |
| from typing import Any | |
| from hearthnet.bus.capability import CapabilityDescriptor, RouteRequest | |
| from hearthnet.services.rag.store import CorpusStore, list_corpora | |
| _log = logging.getLogger(__name__) | |
| class RagService: | |
| name = "rag" | |
| version = "1.0" | |
| def __init__( | |
| self, | |
| corpus: str = "default", | |
| corpora_dir: Path | None = None, | |
| bus: Any = None, | |
| event_log: Any = None, | |
| blob_store: Any = None, | |
| ) -> None: | |
| """bus: optional CapabilityBus for calling embed.text via bus (preferred). | |
| event_log: optional EventLog to emit rag.document.ingested on ingest. | |
| blob_store: optional BlobStore to persist raw text as BLAKE3 content blob. | |
| corpora_dir: defaults to ~/.hearthnet/corpora (never writes to cwd). | |
| """ | |
| self._corpus = corpus | |
| self._corpora_dir = corpora_dir or (Path.home() / ".hearthnet" / "corpora") | |
| self._bus = bus | |
| self._event_log = event_log | |
| self._blob_store = blob_store | |
| self._store = CorpusStore(self._corpora_dir, corpus) | |
| self._pipeline = None # initialized lazily | |
| def _get_embed_fn(self): | |
| async def embed_via_bus(texts: list[str]) -> list[list[float]]: | |
| if self._bus is not None: | |
| result = await self._bus.call("embed.text", (1, 0), {"input": {"texts": texts}}) | |
| return result.get("output", {}).get("embeddings", [[0.0] * 16] * len(texts)) | |
| from hearthnet.services.embedding.backends import SimpleHashBackend | |
| backend = SimpleHashBackend() | |
| return await backend.embed(texts) | |
| return embed_via_bus | |
| def capabilities(self) -> list[tuple]: | |
| return [ | |
| ( | |
| CapabilityDescriptor( | |
| name="rag.query", | |
| params={"corpus": self._corpus}, | |
| max_concurrent=4, | |
| idempotent=True, | |
| ), | |
| self.handle_query, | |
| self._corpus_matches, | |
| ), | |
| ( | |
| CapabilityDescriptor( | |
| name="rag.ingest", | |
| params={"corpus": self._corpus}, | |
| trust_required="trusted", | |
| idempotent=True, | |
| ), | |
| self.handle_ingest, | |
| self._corpus_matches, | |
| ), | |
| ( | |
| CapabilityDescriptor( | |
| name="rag.list_corpora", | |
| params={}, | |
| max_concurrent=8, | |
| idempotent=True, | |
| ), | |
| self.handle_list_corpora, | |
| None, | |
| ), | |
| ] | |
| def _corpus_matches(self, offered: dict, requested: dict) -> bool: | |
| return not requested.get("corpus") or requested.get("corpus") == offered.get("corpus") | |
| async def handle_query(self, req: RouteRequest) -> dict: | |
| query = req.body.get("input", {}).get("query", "") | |
| k = int(req.body.get("input", {}).get("k", 5)) | |
| if not query: | |
| return {"output": {"chunks": []}, "meta": {"corpus": self._corpus}} | |
| embed_fn = self._get_embed_fn() | |
| embeddings = await embed_fn([query]) | |
| query_vec = embeddings[0] | |
| results = self._store.query(query_vec, k=k) | |
| chunks = [ | |
| { | |
| "rank": i + 1, | |
| "score": r.score, | |
| "text": r.chunk.text, | |
| "metadata": r.chunk.metadata, | |
| } | |
| for i, r in enumerate(results) | |
| ] | |
| return {"output": {"chunks": chunks}, "meta": {"corpus": self._corpus}} | |
| async def handle_ingest(self, req: RouteRequest) -> dict: | |
| inp = req.body.get("input", {}) | |
| # Batch format: {"documents": [{"id": ..., "title": ..., "text": ...}]} | |
| # Dispatches each document as a separate ingest call and returns a summary. | |
| documents = inp.get("documents") | |
| if documents: | |
| batch_results = [] | |
| for doc in documents: | |
| single_req = RouteRequest( | |
| capability=req.capability, | |
| version_req=req.version_req, | |
| body={ | |
| "input": { | |
| "text": doc.get("text", ""), | |
| "title": doc.get("title", "Untitled"), | |
| "doc_cid": doc.get("id") or doc.get("doc_cid"), | |
| } | |
| }, | |
| caller=req.caller, | |
| trace_id=req.trace_id, | |
| ) | |
| result = await self.handle_ingest(single_req) | |
| batch_results.append(result.get("output", {})) | |
| return { | |
| "output": {"batch": batch_results, "count": len(batch_results)}, | |
| "meta": {"corpus": self._corpus}, | |
| } | |
| text = inp.get("text", "") | |
| title = inp.get("title", "Untitled") | |
| doc_cid = inp.get("doc_cid") | |
| if not self._pipeline: | |
| from hearthnet.services.rag.ingest import IngestPipeline | |
| self._pipeline = IngestPipeline(self._store, self._get_embed_fn()) | |
| result = await self._pipeline.ingest_text(text, title=title, doc_cid=doc_cid) | |
| # Phase 2: persist raw text as a BLAKE3 content-addressed blob so peers | |
| # can fetch it via TransferManager (M07/BitTorrent). | |
| blob_cid: str | None = None | |
| if not result.was_duplicate and self._blob_store is not None: | |
| try: | |
| manifest = self._blob_store.put(text.encode("utf-8"), filename=title) | |
| blob_cid = manifest.cid | |
| except Exception as exc: | |
| _log.warning("RAG blob_store.put failed for '%s': %s", title, exc) | |
| # Emit rag.document.ingested event so peers learn a new doc exists (X02). | |
| if not result.was_duplicate and self._event_log is not None: | |
| try: | |
| author = self._bus.node_id_full if self._bus is not None else "unknown" | |
| payload: dict = { | |
| "corpus": self._corpus, | |
| "doc_cid": result.doc_cid, | |
| "title": title, | |
| "chunks_indexed": result.chunks_indexed, | |
| } | |
| if blob_cid: | |
| payload["blob_cid"] = blob_cid | |
| self._event_log.append_local( | |
| "rag.document.ingested", | |
| author, | |
| payload, | |
| ) | |
| except Exception as exc: | |
| _log.warning("RAG event_log.append_local failed for '%s': %s", title, exc) | |
| return { | |
| "output": { | |
| "doc_cid": result.doc_cid, | |
| "chunks_indexed": result.chunks_indexed, | |
| "was_duplicate": result.was_duplicate, | |
| }, | |
| "meta": {"corpus": self._corpus, "ms": result.ms}, | |
| } | |
| async def handle_list_corpora(self, req: RouteRequest) -> dict: | |
| names = list_corpora(self._corpora_dir) | |
| return {"output": {"corpora": names}, "meta": {}} | |