Spaces:
Running on Zero
Running on Zero
File size: 7,169 Bytes
31c93b1 146edc4 31c93b1 146edc4 31c93b1 6c38d43 31c93b1 6c38d43 f08047d 6c38d43 31c93b1 f08047d 31c93b1 6c38d43 31c93b1 4aaae80 31c93b1 4aaae80 31c93b1 f08047d 31c93b1 6c38d43 146edc4 6c38d43 3f78ea8 6c38d43 146edc4 6c38d43 31c93b1 | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 | from __future__ import annotations
import logging
from pathlib import Path
from typing import Any
from hearthnet.bus.capability import CapabilityDescriptor, RouteRequest
from hearthnet.services.rag.store import CorpusStore, list_corpora
_log = logging.getLogger(__name__)
class RagService:
name = "rag"
version = "1.0"
def __init__(
self,
corpus: str = "default",
corpora_dir: Path | None = None,
bus: Any = None,
event_log: Any = None,
blob_store: Any = None,
) -> None:
"""bus: optional CapabilityBus for calling embed.text via bus (preferred).
event_log: optional EventLog to emit rag.document.ingested on ingest.
blob_store: optional BlobStore to persist raw text as BLAKE3 content blob.
corpora_dir: defaults to ~/.hearthnet/corpora (never writes to cwd).
"""
self._corpus = corpus
self._corpora_dir = corpora_dir or (Path.home() / ".hearthnet" / "corpora")
self._bus = bus
self._event_log = event_log
self._blob_store = blob_store
self._store = CorpusStore(self._corpora_dir, corpus)
self._pipeline = None # initialized lazily
def _get_embed_fn(self):
async def embed_via_bus(texts: list[str]) -> list[list[float]]:
if self._bus is not None:
result = await self._bus.call("embed.text", (1, 0), {"input": {"texts": texts}})
return result.get("output", {}).get("embeddings", [[0.0] * 16] * len(texts))
from hearthnet.services.embedding.backends import SimpleHashBackend
backend = SimpleHashBackend()
return await backend.embed(texts)
return embed_via_bus
def capabilities(self) -> list[tuple]:
return [
(
CapabilityDescriptor(
name="rag.query",
params={"corpus": self._corpus},
max_concurrent=4,
idempotent=True,
),
self.handle_query,
self._corpus_matches,
),
(
CapabilityDescriptor(
name="rag.ingest",
params={"corpus": self._corpus},
trust_required="trusted",
idempotent=True,
),
self.handle_ingest,
self._corpus_matches,
),
(
CapabilityDescriptor(
name="rag.list_corpora",
params={},
max_concurrent=8,
idempotent=True,
),
self.handle_list_corpora,
None,
),
]
def _corpus_matches(self, offered: dict, requested: dict) -> bool:
return not requested.get("corpus") or requested.get("corpus") == offered.get("corpus")
async def handle_query(self, req: RouteRequest) -> dict:
query = req.body.get("input", {}).get("query", "")
k = int(req.body.get("input", {}).get("k", 5))
if not query:
return {"output": {"chunks": []}, "meta": {"corpus": self._corpus}}
embed_fn = self._get_embed_fn()
embeddings = await embed_fn([query])
query_vec = embeddings[0]
results = self._store.query(query_vec, k=k)
chunks = [
{
"rank": i + 1,
"score": r.score,
"text": r.chunk.text,
"metadata": r.chunk.metadata,
}
for i, r in enumerate(results)
]
return {"output": {"chunks": chunks}, "meta": {"corpus": self._corpus}}
async def handle_ingest(self, req: RouteRequest) -> dict:
inp = req.body.get("input", {})
# Batch format: {"documents": [{"id": ..., "title": ..., "text": ...}]}
# Dispatches each document as a separate ingest call and returns a summary.
documents = inp.get("documents")
if documents:
batch_results = []
for doc in documents:
single_req = RouteRequest(
capability=req.capability,
version_req=req.version_req,
body={
"input": {
"text": doc.get("text", ""),
"title": doc.get("title", "Untitled"),
"doc_cid": doc.get("id") or doc.get("doc_cid"),
}
},
caller=req.caller,
trace_id=req.trace_id,
)
result = await self.handle_ingest(single_req)
batch_results.append(result.get("output", {}))
return {
"output": {"batch": batch_results, "count": len(batch_results)},
"meta": {"corpus": self._corpus},
}
text = inp.get("text", "")
title = inp.get("title", "Untitled")
doc_cid = inp.get("doc_cid")
if not self._pipeline:
from hearthnet.services.rag.ingest import IngestPipeline
self._pipeline = IngestPipeline(self._store, self._get_embed_fn())
result = await self._pipeline.ingest_text(text, title=title, doc_cid=doc_cid)
# Phase 2: persist raw text as a BLAKE3 content-addressed blob so peers
# can fetch it via TransferManager (M07/BitTorrent).
blob_cid: str | None = None
if not result.was_duplicate and self._blob_store is not None:
try:
manifest = self._blob_store.put(text.encode("utf-8"), filename=title)
blob_cid = manifest.cid
except Exception as exc:
_log.warning("RAG blob_store.put failed for '%s': %s", title, exc)
# Emit rag.document.ingested event so peers learn a new doc exists (X02).
if not result.was_duplicate and self._event_log is not None:
try:
author = self._bus.node_id_full if self._bus is not None else "unknown"
payload: dict = {
"corpus": self._corpus,
"doc_cid": result.doc_cid,
"title": title,
"chunks_indexed": result.chunks_indexed,
}
if blob_cid:
payload["blob_cid"] = blob_cid
self._event_log.append_local(
"rag.document.ingested",
author,
payload,
)
except Exception as exc:
_log.warning("RAG event_log.append_local failed for '%s': %s", title, exc)
return {
"output": {
"doc_cid": result.doc_cid,
"chunks_indexed": result.chunks_indexed,
"was_duplicate": result.was_duplicate,
},
"meta": {"corpus": self._corpus, "ms": result.ms},
}
async def handle_list_corpora(self, req: RouteRequest) -> dict:
names = list_corpora(self._corpora_dir)
return {"output": {"corpora": names}, "meta": {}}
|