| """Supabase retriever utilities shared across agents/nodes.""" |
| from functools import lru_cache |
| from typing import List, Optional |
| import os |
|
|
| from langchain_core.documents import Document |
| from langchain_openai import OpenAIEmbeddings |
| from supabase import create_client, Client |
|
|
| from services.vectorstore_service import get_vector_store, PatchedSupabaseVectorStore |
|
|
| from config.settings import settings |
|
|
|
|
| @lru_cache(maxsize=4) |
| def get_retriever(doc_type: str, k: Optional[int] = None): |
| """Return a retriever for a given `doc_type` using Supabase vector store. |
| |
| Raises ValueError if Supabase credentials are missing. |
| """ |
| url = settings.supabase_url or os.getenv("SUPABASE_URL") |
| key = settings.supabase_key or ( |
| os.getenv("SUPABASE_KEY") |
| or os.getenv("SUPABASE_SERVICE_ROLE_KEY") |
| or os.getenv("SUPABASE_ANON_KEY") |
| or os.getenv("NEXT_PUBLIC_SUPABASE_ANON_KEY") |
| ) |
| if not url or not key: |
| raise ValueError("SUPABASE_URL and a SUPABASE_*KEY env var are required.") |
|
|
| client: Client = create_client(url, key) |
| vector_store = PatchedSupabaseVectorStore( |
| embedding=OpenAIEmbeddings(api_key=settings.openai_api_key), |
| client=client, |
| table_name=settings.supabase_table, |
| query_name=settings.supabase_match_fn, |
| ) |
| top_k = int(k or settings.rag_top_k) |
| return vector_store.as_retriever( |
| search_kwargs={"k": top_k, "filter": {"type": doc_type}} |
| ) |
|
|
|
|
| def get_retriever_for( |
| index_name: str, |
| k: Optional[int] = None, |
| filter: Optional[dict] = None, |
| ): |
| """Return a retriever for a specific logical index (table/query pair).""" |
| vector_store = get_vector_store(index_name=index_name) |
| top_k = int(k or settings.rag_top_k) |
| |
| return vector_store.as_retriever( |
| search_kwargs={"k": top_k, "filter": filter} |
| ) |
|
|
|
|
| def format_documents( |
| docs: List[Document], doc_type: str, max_chars_per_doc: int = 1200 |
| ) -> str: |
| """Format documents into compact blocks suitable as system context.""" |
| blocks: List[str] = [] |
| for i, doc in enumerate(docs, 1): |
| text = (doc.page_content or "")[:max_chars_per_doc] |
| meta = doc.metadata or {} |
| src = meta.get("source", "N/A") |
| page = meta.get("page_number", "N/A") |
| kind = meta.get("type", doc_type) |
| contact = meta.get("contact", None) |
| header = f"[{i}] source={src} page={page} type={kind}" |
| if contact: |
| header += f" contact={contact}" |
| blocks.append(f"<document>\n{header}\n{text}</document>".strip()) |
| return "\n\n---\n\n".join(blocks) |
|
|
|
|
|
|