"""Vector store service wrapping SupabaseVectorStore and embeddings. Centralizes initialization to keep routes/services clean and consistent. """ from __future__ import annotations import json import warnings from functools import lru_cache from typing import Any, Dict, List, Optional, Tuple import os from langchain_core.documents import Document from langchain_mistralai import MistralAIEmbeddings from langchain_community.vectorstores import SupabaseVectorStore from supabase import create_client, Client from config.settings import settings _DEBUG_LOG = r"c:\Users\cd\Documents\CAPL\ROUTEUR\dev\routeur_ia_api\.cursor\debug.log" class VectorStoreServiceError(Exception): pass class PatchedSupabaseVectorStore(SupabaseVectorStore): """Fixes postgrest 2.28+ incompatibility where .params moved to .request.params.""" def similarity_search_by_vector_with_relevance_scores( self, query: List[float], k: int, filter: Optional[Dict[str, Any]] = None, postgrest_filter: Optional[str] = None, score_threshold: Optional[float] = None, ) -> List[Tuple[Document, float]]: if filter: for key, value in filter.items(): if isinstance(value, dict) and "$in" in value: in_values = value["$in"] values_str = ",".join(f"'{str(v)}'" for v in in_values) new_filter = f"metadata->>{key} IN ({values_str})" if postgrest_filter: postgrest_filter = f"({postgrest_filter}) and ({new_filter})" else: postgrest_filter = new_filter match_documents_params = self.match_args(query, filter) query_builder = self._client.rpc(self.query_name, match_documents_params) # #region agent log import time as _t with open(_DEBUG_LOG, "a", encoding="utf-8") as _f: _f.write(json.dumps({"id": "log_patch_1", "timestamp": int(_t.time()*1000), "location": "vectorstore_service.py:PatchedSupabaseVectorStore", "message": "Using patched similarity_search", "data": {"k": k, "has_postgrest_filter": bool(postgrest_filter), "query_builder_type": type(query_builder).__name__, "has_request_attr": hasattr(query_builder, "request")}, "hypothesisId": "A"}) + "\n") # #endregion if postgrest_filter: query_builder.request.params = query_builder.request.params.set( "and", f"({postgrest_filter})" ) query_builder.request.params = query_builder.request.params.set("limit", k) res = query_builder.execute() # #region agent log with open(_DEBUG_LOG, "a", encoding="utf-8") as _f: _f.write(json.dumps({"id": "log_patch_2", "timestamp": int(_t.time()*1000), "location": "vectorstore_service.py:PatchedSupabaseVectorStore", "message": "RPC execute success", "data": {"result_count": len(res.data) if res.data else 0}, "hypothesisId": "A"}) + "\n") # #endregion match_result = [ ( Document( metadata=search.get("metadata", {}), page_content=search.get("content", ""), ), search.get("similarity", 0.0), ) for search in res.data if search.get("content") ] if score_threshold is not None: match_result = [ (doc, similarity) for doc, similarity in match_result if similarity >= score_threshold ] if len(match_result) == 0: warnings.warn( "No relevant docs were retrieved using the relevance score" f" threshold {score_threshold}" ) return match_result @lru_cache(maxsize=1) def _get_supabase_client() -> Client: url = settings.supabase_url or os.getenv("SUPABASE_URL") key = settings.supabase_key or ( os.getenv("SUPABASE_KEY") or os.getenv("SUPABASE_SERVICE_ROLE_KEY") or os.getenv("SUPABASE_ANON_KEY") or os.getenv("NEXT_PUBLIC_SUPABASE_ANON_KEY") ) if not url or not key: raise VectorStoreServiceError("SUPABASE_URL and a SUPABASE_*KEY env var are required.") return create_client(url, key) @lru_cache(maxsize=1) def _get_embeddings() -> MistralAIEmbeddings: return MistralAIEmbeddings(model="mistral-embed", api_key=settings.mistralai_api_key) def _resolve_table_and_query( index_name: Optional[str], table_name: Optional[str], query_name: Optional[str], ) -> Tuple[str, str]: # Explicit beats implicit if table_name and query_name: return table_name, query_name # Named logical index if index_name: idx = settings.vector_indexes.get(index_name) if not idx: raise VectorStoreServiceError(f"Unknown vector index '{index_name}'. Configure settings.vector_indexes.") return idx["table"], idx["query_name"] # Backward-compatible default return settings.supabase_table, settings.supabase_match_fn @lru_cache(maxsize=16) def get_vector_store( index_name: Optional[str] = None, *, table_name: Optional[str] = None, query_name: Optional[str] = None, ) -> PatchedSupabaseVectorStore: client = _get_supabase_client() emb = _get_embeddings() table, query = _resolve_table_and_query(index_name, table_name, query_name) return PatchedSupabaseVectorStore( embedding=emb, client=client, table_name=table, query_name=query, ) def add_documents( documents: List[Document], index_name: Optional[str] = None, *, table_name: Optional[str] = None, query_name: Optional[str] = None, ) -> List[str]: if not documents: return [] vs = get_vector_store(index_name, table_name=table_name, query_name=query_name) return vs.add_documents(documents)