GitHub Actions
feat: 15 targeted improvements — RAG persistence, bus failover, agent hardening, deps sync
146edc4
Raw
History Blame
9.97 kB
from __future__ import annotations
import json
import logging
import sqlite3
import uuid
from dataclasses import dataclass
from pathlib import Path
from hearthnet.services.rag.chunker import Chunk
_log = logging.getLogger(__name__)
@dataclass(frozen=True)
class ScoredChunk:
chunk: Chunk
score: float # higher = better
class CorpusStore:
"""Persistent vector store — chromadb if available, SQLite otherwise.
Backend selection (in order of preference):
1. chromadb PersistentClient — if chromadb is installed
2. SQLite (one .db file per corpus) — always available, survives restart
3. in-memory list — last resort if SQLite also fails
The active backend is logged at WARNING level so it is visible in Space logs.
``self._dir.mkdir()`` runs unconditionally so the corpora directory always
exists regardless of which backend wins.
"""
def __init__(self, corpora_dir: Path, corpus_name: str) -> None:
self._dir = corpora_dir
self._corpus = corpus_name
self._use_chroma = False
self._chroma_client = None
self._collection = None
self._db: sqlite3.Connection | None = None
# Pure in-memory fallback (only used when SQLite init also fails)
self._items: list[tuple[Chunk, list[float]]] = []
# Always create the directory — independent of which backend is chosen.
self._dir.mkdir(parents=True, exist_ok=True)
self._try_init_chroma()
if not self._use_chroma:
self._init_sqlite()
if self._use_chroma:
backend = "chroma"
elif self._db is not None:
backend = "sqlite"
else:
backend = "in-memory/ephemeral"
_log.warning("RAG vector store: using %s backend for corpus '%s'", backend, corpus_name)
# ------------------------------------------------------------------
# Backend initialisation
# ------------------------------------------------------------------
def _try_init_chroma(self) -> None:
try:
import chromadb # type: ignore[import-untyped]
self._chroma_client = chromadb.PersistentClient(path=str(self._dir / self._corpus))
self._collection = self._chroma_client.get_or_create_collection(self._corpus)
self._use_chroma = True
except ImportError:
pass
def _init_sqlite(self) -> None:
db_path = self._dir / f"{self._corpus}.db"
try:
self._db = sqlite3.connect(str(db_path), check_same_thread=False)
self._db.execute("""
CREATE TABLE IF NOT EXISTS chunks (
id TEXT PRIMARY KEY,
doc_cid TEXT,
chunk_text TEXT NOT NULL,
metadata_json TEXT NOT NULL DEFAULT '{}',
embedding_json TEXT NOT NULL
)
""")
self._db.execute("CREATE INDEX IF NOT EXISTS idx_doc_cid ON chunks(doc_cid)")
self._db.commit()
except Exception as exc:
_log.warning("RAG SQLite init failed, using in-memory fallback: %s", exc)
self._db = None
# ------------------------------------------------------------------
# Write path
# ------------------------------------------------------------------
def add(self, chunks: list[Chunk], embeddings: list[list[float]]) -> None:
"""Add chunks with their embeddings."""
if self._use_chroma and self._collection is not None:
ids = [str(uuid.uuid4()) for _ in chunks]
documents = [c.text for c in chunks]
metadatas = [dict(c.metadata) for c in chunks]
self._collection.add(
ids=ids,
embeddings=embeddings,
documents=documents,
metadatas=metadatas,
)
elif self._db is not None:
rows = [
(
str(uuid.uuid4()),
chunk.metadata.get("doc_cid"),
chunk.text,
json.dumps(dict(chunk.metadata)),
json.dumps(emb),
)
for chunk, emb in zip(chunks, embeddings, strict=False)
]
self._db.executemany(
"INSERT INTO chunks(id, doc_cid, chunk_text, metadata_json, embedding_json)"
" VALUES (?,?,?,?,?)",
rows,
)
self._db.commit()
else:
for chunk, emb in zip(chunks, embeddings, strict=False):
self._items.append((chunk, emb))
# ------------------------------------------------------------------
# Read path
# ------------------------------------------------------------------
def query(self, embedding: list[float], k: int = 5) -> list[ScoredChunk]:
"""Return top-k chunks by cosine similarity."""
if self._use_chroma and self._collection is not None:
n = min(k, self._collection.count())
if n == 0:
return []
results = self._collection.query(
query_embeddings=[embedding],
n_results=n,
include=["documents", "metadatas", "distances"],
)
scored: list[ScoredChunk] = []
docs = results.get("documents", [[]])[0]
metas = results.get("metadatas", [[]])[0]
# chromadb distances are L2; convert to similarity score
distances = results.get("distances", [[]])[0]
for doc, meta, dist in zip(docs, metas, distances, strict=False):
score = 1.0 / (1.0 + dist)
scored.append(ScoredChunk(chunk=Chunk(text=doc, metadata=meta), score=score))
return scored
# SQLite: load all rows, cosine-rank in Python
if self._db is not None:
rows = self._db.execute(
"SELECT chunk_text, metadata_json, embedding_json FROM chunks"
).fetchall()
if not rows:
return []
scored_items: list[tuple[Chunk, float]] = []
for chunk_text, meta_json, emb_json in rows:
try:
meta = json.loads(meta_json)
emb = json.loads(emb_json)
score = self._cosine_similarity(embedding, emb)
scored_items.append((Chunk(text=chunk_text, metadata=meta), score))
except Exception:
continue
scored_items.sort(key=lambda x: x[1], reverse=True)
return [ScoredChunk(chunk=c, score=s) for c, s in scored_items[:k]]
# Pure in-memory fallback
if not self._items:
return []
mem_scored = [
(chunk, self._cosine_similarity(embedding, emb)) for chunk, emb in self._items
]
mem_scored.sort(key=lambda x: x[1], reverse=True)
return [ScoredChunk(chunk=chunk, score=score) for chunk, score in mem_scored[:k]]
def has_doc(self, doc_cid: str) -> bool:
"""True if any chunk with this doc_cid exists."""
if self._use_chroma and self._collection is not None:
results = self._collection.get(where={"doc_cid": doc_cid}, limit=1, include=[])
return len(results.get("ids", [])) > 0
if self._db is not None:
row = self._db.execute(
"SELECT 1 FROM chunks WHERE doc_cid = ? LIMIT 1", (doc_cid,)
).fetchone()
return row is not None
return any(c.metadata.get("doc_cid") == doc_cid for c, _ in self._items)
def count(self) -> int:
if self._use_chroma and self._collection is not None:
return self._collection.count()
if self._db is not None:
row = self._db.execute("SELECT COUNT(*) FROM chunks").fetchone()
return row[0] if row else 0
return len(self._items)
def clear(self) -> None:
if self._use_chroma and self._collection is not None and self._chroma_client is not None:
self._chroma_client.delete_collection(self._corpus)
self._collection = self._chroma_client.get_or_create_collection(self._corpus)
elif self._db is not None:
self._db.execute("DELETE FROM chunks")
self._db.commit()
else:
self._items.clear()
def corpus_info(self) -> dict:
"""Return backend metadata — exposed via Settings tab and node manifest."""
if self._use_chroma:
backend = "chroma"
persistent = True
elif self._db is not None:
backend = "sqlite"
persistent = True
else:
backend = "in-memory"
persistent = False
return {
"backend": backend,
"persistent": persistent,
"chunks": self.count(),
"corpus": self._corpus,
}
def _cosine_similarity(self, a: list[float], b: list[float]) -> float:
dot = sum(x * y for x, y in zip(a, b, strict=False))
na = sum(x**2 for x in a) ** 0.5
nb = sum(x**2 for x in b) ** 0.5
return dot / (na * nb) if na and nb else 0.0
def list_corpora(corpora_dir: Path) -> list[str]:
"""List corpus names found under corpora_dir."""
if not corpora_dir.exists():
return []
return sorted(p.name for p in corpora_dir.iterdir() if p.is_dir() or p.suffix == ".db")
def corpus_info(corpora_dir: Path, corpus: str) -> dict:
"""Return {corpus, exists, count_chunks, backend, persistent}."""
corpus_dir = corpora_dir / corpus
db_path = corpora_dir / f"{corpus}.db"
exists = corpus_dir.exists() or db_path.exists()
if exists:
store = CorpusStore(corpora_dir, corpus)
return store.corpus_info()
return {"corpus": corpus, "exists": False, "count_chunks": 0, "backend": "none", "persistent": False}