| """Vector store service wrapping SupabaseVectorStore and embeddings. |
| |
| Centralizes initialization to keep routes/services clean and consistent. |
| """ |
| from __future__ import annotations |
|
|
| import json |
| import warnings |
| from functools import lru_cache |
| from typing import Any, Dict, List, Optional, Tuple |
| import os |
|
|
| from langchain_core.documents import Document |
| from langchain_mistralai import MistralAIEmbeddings |
| from langchain_community.vectorstores import SupabaseVectorStore |
| from supabase import create_client, Client |
|
|
| from config.settings import settings |
|
|
| _DEBUG_LOG = r"c:\Users\cd\Documents\CAPL\ROUTEUR\dev\routeur_ia_api\.cursor\debug.log" |
|
|
|
|
| class VectorStoreServiceError(Exception): |
| pass |
|
|
|
|
| class PatchedSupabaseVectorStore(SupabaseVectorStore): |
| """Fixes postgrest 2.28+ incompatibility where .params moved to .request.params.""" |
|
|
| def similarity_search_by_vector_with_relevance_scores( |
| self, |
| query: List[float], |
| k: int, |
| filter: Optional[Dict[str, Any]] = None, |
| postgrest_filter: Optional[str] = None, |
| score_threshold: Optional[float] = None, |
| ) -> List[Tuple[Document, float]]: |
| if filter: |
| for key, value in filter.items(): |
| if isinstance(value, dict) and "$in" in value: |
| in_values = value["$in"] |
| values_str = ",".join(f"'{str(v)}'" for v in in_values) |
| new_filter = f"metadata->>{key} IN ({values_str})" |
| if postgrest_filter: |
| postgrest_filter = f"({postgrest_filter}) and ({new_filter})" |
| else: |
| postgrest_filter = new_filter |
|
|
| match_documents_params = self.match_args(query, filter) |
| query_builder = self._client.rpc(self.query_name, match_documents_params) |
|
|
| |
| import time as _t |
| with open(_DEBUG_LOG, "a", encoding="utf-8") as _f: |
| _f.write(json.dumps({"id": "log_patch_1", "timestamp": int(_t.time()*1000), "location": "vectorstore_service.py:PatchedSupabaseVectorStore", "message": "Using patched similarity_search", "data": {"k": k, "has_postgrest_filter": bool(postgrest_filter), "query_builder_type": type(query_builder).__name__, "has_request_attr": hasattr(query_builder, "request")}, "hypothesisId": "A"}) + "\n") |
| |
|
|
| if postgrest_filter: |
| query_builder.request.params = query_builder.request.params.set( |
| "and", f"({postgrest_filter})" |
| ) |
|
|
| query_builder.request.params = query_builder.request.params.set("limit", k) |
|
|
| res = query_builder.execute() |
|
|
| |
| with open(_DEBUG_LOG, "a", encoding="utf-8") as _f: |
| _f.write(json.dumps({"id": "log_patch_2", "timestamp": int(_t.time()*1000), "location": "vectorstore_service.py:PatchedSupabaseVectorStore", "message": "RPC execute success", "data": {"result_count": len(res.data) if res.data else 0}, "hypothesisId": "A"}) + "\n") |
| |
|
|
| match_result = [ |
| ( |
| Document( |
| metadata=search.get("metadata", {}), |
| page_content=search.get("content", ""), |
| ), |
| search.get("similarity", 0.0), |
| ) |
| for search in res.data |
| if search.get("content") |
| ] |
|
|
| if score_threshold is not None: |
| match_result = [ |
| (doc, similarity) |
| for doc, similarity in match_result |
| if similarity >= score_threshold |
| ] |
| if len(match_result) == 0: |
| warnings.warn( |
| "No relevant docs were retrieved using the relevance score" |
| f" threshold {score_threshold}" |
| ) |
|
|
| return match_result |
|
|
|
|
| @lru_cache(maxsize=1) |
| def _get_supabase_client() -> Client: |
| url = settings.supabase_url or os.getenv("SUPABASE_URL") |
| key = settings.supabase_key or ( |
| os.getenv("SUPABASE_KEY") |
| or os.getenv("SUPABASE_SERVICE_ROLE_KEY") |
| or os.getenv("SUPABASE_ANON_KEY") |
| or os.getenv("NEXT_PUBLIC_SUPABASE_ANON_KEY") |
| ) |
| if not url or not key: |
| raise VectorStoreServiceError("SUPABASE_URL and a SUPABASE_*KEY env var are required.") |
| return create_client(url, key) |
|
|
|
|
| @lru_cache(maxsize=1) |
| def _get_embeddings() -> MistralAIEmbeddings: |
| return MistralAIEmbeddings(model="mistral-embed", api_key=settings.mistralai_api_key) |
|
|
|
|
| def _resolve_table_and_query( |
| index_name: Optional[str], |
| table_name: Optional[str], |
| query_name: Optional[str], |
| ) -> Tuple[str, str]: |
| |
| if table_name and query_name: |
| return table_name, query_name |
|
|
| |
| if index_name: |
| idx = settings.vector_indexes.get(index_name) |
| if not idx: |
| raise VectorStoreServiceError(f"Unknown vector index '{index_name}'. Configure settings.vector_indexes.") |
| return idx["table"], idx["query_name"] |
|
|
| |
| return settings.supabase_table, settings.supabase_match_fn |
|
|
|
|
| @lru_cache(maxsize=16) |
| def get_vector_store( |
| index_name: Optional[str] = None, |
| *, |
| table_name: Optional[str] = None, |
| query_name: Optional[str] = None, |
| ) -> PatchedSupabaseVectorStore: |
| client = _get_supabase_client() |
| emb = _get_embeddings() |
| table, query = _resolve_table_and_query(index_name, table_name, query_name) |
| return PatchedSupabaseVectorStore( |
| embedding=emb, |
| client=client, |
| table_name=table, |
| query_name=query, |
| ) |
|
|
|
|
| def add_documents( |
| documents: List[Document], |
| index_name: Optional[str] = None, |
| *, |
| table_name: Optional[str] = None, |
| query_name: Optional[str] = None, |
| ) -> List[str]: |
| if not documents: |
| return [] |
| vs = get_vector_store(index_name, table_name=table_name, query_name=query_name) |
| return vs.add_documents(documents) |
|
|
|
|
|
|
|
|