"""Drop-in adapter for the legacy TAU `knowledge_graph.rag_pipeline.RAGPipeline`. Lets existing call-sites that do: from knowledge_graph.rag_pipeline import get_rag_pipeline, RAGQuery resp = get_rag_pipeline().query(RAGQuery(question="...", ...)) keep working, but route through the new TAU-RAG pipeline. Zero code changes in the legacy consumers — just replace the old module with an import shim: # knowledge_graph/rag_pipeline.py from tau_rag.api.tau_adapter import ( get_rag_pipeline, RAGPipeline, RAGQuery, RAGResponse, RetrievalStrategy, RAGContext, ) """ from __future__ import annotations from dataclasses import dataclass, field from enum import Enum from typing import Any, Dict, List, Optional from ..core.config import Config from ..core.types import Chunk, Document, Query, Strategy from ..pipeline import Pipeline # ----------------------------------------------------------- legacy data types class RetrievalStrategy(Enum): SEMANTIC = "semantic" KEYWORD = "keyword" HYBRID = "hybrid" GRAPH = "graph" @dataclass class RAGQuery: question: str context_hint: Optional[str] = None filters: Dict[str, Any] = field(default_factory=dict) strategy: RetrievalStrategy = RetrievalStrategy.HYBRID max_context_docs: int = 5 max_context_tokens: int = 2000 @dataclass class RAGContext: documents: List[Dict[str, Any]] total_tokens: int retrieval_time_ms: float strategy_used: RetrievalStrategy @dataclass class RAGResponse: answer: str context: RAGContext confidence: float sources: List[str] tau_signals: Dict[str, float] generation_time_ms: float # -------------------------------------------------------------------- adapter def _map_strategy(s: RetrievalStrategy) -> Strategy: return { RetrievalStrategy.SEMANTIC: Strategy.SEMANTIC, RetrievalStrategy.KEYWORD: Strategy.LEXICAL, RetrievalStrategy.HYBRID: Strategy.HYBRID, RetrievalStrategy.GRAPH: Strategy.GRAPH, }.get(s, Strategy.HYBRID) class RAGPipeline: """Legacy API surface over the new Pipeline.""" def __init__(self, config: Optional[Config] = None): self._inner = Pipeline.from_config(config or Config.no_llm()) self.query_cache: Dict[str, RAGResponse] = {} # kept for compatibility # Legacy method def query(self, q: RAGQuery) -> RAGResponse: new_q = Query( text=q.question, lang="he", filters=q.filters or {}, strategy=_map_strategy(q.strategy), k=q.max_context_docs * 2, rerank_k=q.max_context_docs, max_context_tokens=q.max_context_tokens, ) r = self._inner.run(new_q) docs = [{ "id": ret.chunk.doc_id, "text": ret.chunk.text, "score": ret.score, "metadata": ret.chunk.metadata, } for ret in r.retrieved] return RAGResponse( answer=r.answer, context=RAGContext( documents=docs, total_tokens=sum(int(len(d["text"].split()) * 1.3) for d in docs), retrieval_time_ms=r.timing_ms.get("retrieve_ms", 0.0), strategy_used=q.strategy, ), confidence=1.0 - r.signals.xi, sources=r.sources, tau_signals=r.signals.to_dict(), generation_time_ms=r.timing_ms.get("generate_ms", 0.0), ) # Legacy methods used by the old integration def add_to_knowledge_base( self, doc_id: str, text: str, metadata: Optional[Dict[str, Any]] = None, ): self._inner.add_documents([ Document(id=doc_id, text=text, metadata=metadata or {}), ]) def clear_cache(self): self._inner.cache.clear() self.query_cache.clear() def get_stats(self) -> Dict[str, Any]: return { "cache_size": len(self._inner.cache), "retrievers": list(self._inner.retrievers.retrievers.keys()), } _singleton: Optional[RAGPipeline] = None def get_rag_pipeline() -> RAGPipeline: global _singleton if _singleton is None: _singleton = RAGPipeline() return _singleton def reset_rag_pipeline() -> None: global _singleton _singleton = None