"""Prestations catalogue retrieval against Supabase documents_v2 (V2).""" from __future__ import annotations from functools import lru_cache from typing import List, Protocol, runtime_checkable from langchain_core.documents import Document from supabase import Client from config.settings import settings from services.retrieval.document_mapper import row_to_document from services.vectorstore_service import _get_embeddings, _get_supabase_client @runtime_checkable class Embedder(Protocol): """Minimal embedding interface (DIP).""" def embed_query(self, text: str) -> List[float]: ... class PrestationsV2Retriever: """Semantic search over public.documents_v2 via match_documents_v2_full.""" def __init__( self, embedder: Embedder, client: Client, query_name: str, mapper=row_to_document, ) -> None: self._embedder = embedder self._client = client self._query_name = query_name self._mapper = mapper def search( self, query: str, *, k: int = 10, score_threshold: float = 0.5, offset: int = 0, ) -> List[Document]: qv = self._embedder.embed_query(query or "") resp = self._client.rpc( self._query_name, { "query_embedding": qv, "match_threshold": float(score_threshold), "match_count": int(k), "match_offset": int(offset), }, ).execute() rows = getattr(resp, "data", None) or [] return [self._mapper(row) for row in rows if row] @lru_cache(maxsize=1) def get_prestations_v2_retriever() -> PrestationsV2Retriever: idx = settings.vector_indexes.get("prestations_v2") if not idx: raise ValueError("vector_indexes['prestations_v2'] is not configured in settings.") query_name = idx["query_name"] return PrestationsV2Retriever( embedder=_get_embeddings(), client=_get_supabase_client(), query_name=query_name, )