Spaces:
Running on Zero
Running on Zero
GitHub Actions
feat: 15 targeted improvements — RAG persistence, bus failover, agent hardening, deps sync
146edc4 | from __future__ import annotations | |
| import json | |
| import logging | |
| import sqlite3 | |
| import uuid | |
| from dataclasses import dataclass | |
| from pathlib import Path | |
| from hearthnet.services.rag.chunker import Chunk | |
| _log = logging.getLogger(__name__) | |
| class ScoredChunk: | |
| chunk: Chunk | |
| score: float # higher = better | |
| class CorpusStore: | |
| """Persistent vector store — chromadb if available, SQLite otherwise. | |
| Backend selection (in order of preference): | |
| 1. chromadb PersistentClient — if chromadb is installed | |
| 2. SQLite (one .db file per corpus) — always available, survives restart | |
| 3. in-memory list — last resort if SQLite also fails | |
| The active backend is logged at WARNING level so it is visible in Space logs. | |
| ``self._dir.mkdir()`` runs unconditionally so the corpora directory always | |
| exists regardless of which backend wins. | |
| """ | |
| def __init__(self, corpora_dir: Path, corpus_name: str) -> None: | |
| self._dir = corpora_dir | |
| self._corpus = corpus_name | |
| self._use_chroma = False | |
| self._chroma_client = None | |
| self._collection = None | |
| self._db: sqlite3.Connection | None = None | |
| # Pure in-memory fallback (only used when SQLite init also fails) | |
| self._items: list[tuple[Chunk, list[float]]] = [] | |
| # Always create the directory — independent of which backend is chosen. | |
| self._dir.mkdir(parents=True, exist_ok=True) | |
| self._try_init_chroma() | |
| if not self._use_chroma: | |
| self._init_sqlite() | |
| if self._use_chroma: | |
| backend = "chroma" | |
| elif self._db is not None: | |
| backend = "sqlite" | |
| else: | |
| backend = "in-memory/ephemeral" | |
| _log.warning("RAG vector store: using %s backend for corpus '%s'", backend, corpus_name) | |
| # ------------------------------------------------------------------ | |
| # Backend initialisation | |
| # ------------------------------------------------------------------ | |
| def _try_init_chroma(self) -> None: | |
| try: | |
| import chromadb # type: ignore[import-untyped] | |
| self._chroma_client = chromadb.PersistentClient(path=str(self._dir / self._corpus)) | |
| self._collection = self._chroma_client.get_or_create_collection(self._corpus) | |
| self._use_chroma = True | |
| except ImportError: | |
| pass | |
| def _init_sqlite(self) -> None: | |
| db_path = self._dir / f"{self._corpus}.db" | |
| try: | |
| self._db = sqlite3.connect(str(db_path), check_same_thread=False) | |
| self._db.execute(""" | |
| CREATE TABLE IF NOT EXISTS chunks ( | |
| id TEXT PRIMARY KEY, | |
| doc_cid TEXT, | |
| chunk_text TEXT NOT NULL, | |
| metadata_json TEXT NOT NULL DEFAULT '{}', | |
| embedding_json TEXT NOT NULL | |
| ) | |
| """) | |
| self._db.execute("CREATE INDEX IF NOT EXISTS idx_doc_cid ON chunks(doc_cid)") | |
| self._db.commit() | |
| except Exception as exc: | |
| _log.warning("RAG SQLite init failed, using in-memory fallback: %s", exc) | |
| self._db = None | |
| # ------------------------------------------------------------------ | |
| # Write path | |
| # ------------------------------------------------------------------ | |
| def add(self, chunks: list[Chunk], embeddings: list[list[float]]) -> None: | |
| """Add chunks with their embeddings.""" | |
| if self._use_chroma and self._collection is not None: | |
| ids = [str(uuid.uuid4()) for _ in chunks] | |
| documents = [c.text for c in chunks] | |
| metadatas = [dict(c.metadata) for c in chunks] | |
| self._collection.add( | |
| ids=ids, | |
| embeddings=embeddings, | |
| documents=documents, | |
| metadatas=metadatas, | |
| ) | |
| elif self._db is not None: | |
| rows = [ | |
| ( | |
| str(uuid.uuid4()), | |
| chunk.metadata.get("doc_cid"), | |
| chunk.text, | |
| json.dumps(dict(chunk.metadata)), | |
| json.dumps(emb), | |
| ) | |
| for chunk, emb in zip(chunks, embeddings, strict=False) | |
| ] | |
| self._db.executemany( | |
| "INSERT INTO chunks(id, doc_cid, chunk_text, metadata_json, embedding_json)" | |
| " VALUES (?,?,?,?,?)", | |
| rows, | |
| ) | |
| self._db.commit() | |
| else: | |
| for chunk, emb in zip(chunks, embeddings, strict=False): | |
| self._items.append((chunk, emb)) | |
| # ------------------------------------------------------------------ | |
| # Read path | |
| # ------------------------------------------------------------------ | |
| def query(self, embedding: list[float], k: int = 5) -> list[ScoredChunk]: | |
| """Return top-k chunks by cosine similarity.""" | |
| if self._use_chroma and self._collection is not None: | |
| n = min(k, self._collection.count()) | |
| if n == 0: | |
| return [] | |
| results = self._collection.query( | |
| query_embeddings=[embedding], | |
| n_results=n, | |
| include=["documents", "metadatas", "distances"], | |
| ) | |
| scored: list[ScoredChunk] = [] | |
| docs = results.get("documents", [[]])[0] | |
| metas = results.get("metadatas", [[]])[0] | |
| # chromadb distances are L2; convert to similarity score | |
| distances = results.get("distances", [[]])[0] | |
| for doc, meta, dist in zip(docs, metas, distances, strict=False): | |
| score = 1.0 / (1.0 + dist) | |
| scored.append(ScoredChunk(chunk=Chunk(text=doc, metadata=meta), score=score)) | |
| return scored | |
| # SQLite: load all rows, cosine-rank in Python | |
| if self._db is not None: | |
| rows = self._db.execute( | |
| "SELECT chunk_text, metadata_json, embedding_json FROM chunks" | |
| ).fetchall() | |
| if not rows: | |
| return [] | |
| scored_items: list[tuple[Chunk, float]] = [] | |
| for chunk_text, meta_json, emb_json in rows: | |
| try: | |
| meta = json.loads(meta_json) | |
| emb = json.loads(emb_json) | |
| score = self._cosine_similarity(embedding, emb) | |
| scored_items.append((Chunk(text=chunk_text, metadata=meta), score)) | |
| except Exception: | |
| continue | |
| scored_items.sort(key=lambda x: x[1], reverse=True) | |
| return [ScoredChunk(chunk=c, score=s) for c, s in scored_items[:k]] | |
| # Pure in-memory fallback | |
| if not self._items: | |
| return [] | |
| mem_scored = [ | |
| (chunk, self._cosine_similarity(embedding, emb)) for chunk, emb in self._items | |
| ] | |
| mem_scored.sort(key=lambda x: x[1], reverse=True) | |
| return [ScoredChunk(chunk=chunk, score=score) for chunk, score in mem_scored[:k]] | |
| def has_doc(self, doc_cid: str) -> bool: | |
| """True if any chunk with this doc_cid exists.""" | |
| if self._use_chroma and self._collection is not None: | |
| results = self._collection.get(where={"doc_cid": doc_cid}, limit=1, include=[]) | |
| return len(results.get("ids", [])) > 0 | |
| if self._db is not None: | |
| row = self._db.execute( | |
| "SELECT 1 FROM chunks WHERE doc_cid = ? LIMIT 1", (doc_cid,) | |
| ).fetchone() | |
| return row is not None | |
| return any(c.metadata.get("doc_cid") == doc_cid for c, _ in self._items) | |
| def count(self) -> int: | |
| if self._use_chroma and self._collection is not None: | |
| return self._collection.count() | |
| if self._db is not None: | |
| row = self._db.execute("SELECT COUNT(*) FROM chunks").fetchone() | |
| return row[0] if row else 0 | |
| return len(self._items) | |
| def clear(self) -> None: | |
| if self._use_chroma and self._collection is not None and self._chroma_client is not None: | |
| self._chroma_client.delete_collection(self._corpus) | |
| self._collection = self._chroma_client.get_or_create_collection(self._corpus) | |
| elif self._db is not None: | |
| self._db.execute("DELETE FROM chunks") | |
| self._db.commit() | |
| else: | |
| self._items.clear() | |
| def corpus_info(self) -> dict: | |
| """Return backend metadata — exposed via Settings tab and node manifest.""" | |
| if self._use_chroma: | |
| backend = "chroma" | |
| persistent = True | |
| elif self._db is not None: | |
| backend = "sqlite" | |
| persistent = True | |
| else: | |
| backend = "in-memory" | |
| persistent = False | |
| return { | |
| "backend": backend, | |
| "persistent": persistent, | |
| "chunks": self.count(), | |
| "corpus": self._corpus, | |
| } | |
| def _cosine_similarity(self, a: list[float], b: list[float]) -> float: | |
| dot = sum(x * y for x, y in zip(a, b, strict=False)) | |
| na = sum(x**2 for x in a) ** 0.5 | |
| nb = sum(x**2 for x in b) ** 0.5 | |
| return dot / (na * nb) if na and nb else 0.0 | |
| def list_corpora(corpora_dir: Path) -> list[str]: | |
| """List corpus names found under corpora_dir.""" | |
| if not corpora_dir.exists(): | |
| return [] | |
| return sorted(p.name for p in corpora_dir.iterdir() if p.is_dir() or p.suffix == ".db") | |
| def corpus_info(corpora_dir: Path, corpus: str) -> dict: | |
| """Return {corpus, exists, count_chunks, backend, persistent}.""" | |
| corpus_dir = corpora_dir / corpus | |
| db_path = corpora_dir / f"{corpus}.db" | |
| exists = corpus_dir.exists() or db_path.exists() | |
| if exists: | |
| store = CorpusStore(corpora_dir, corpus) | |
| return store.corpus_info() | |
| return {"corpus": corpus, "exists": False, "count_chunks": 0, "backend": "none", "persistent": False} | |