routeur_ia_api / services /vectorstore_service.py
Cyril Dupland
feat voice: init
48d6c63
raw
history blame
5.98 kB
"""Vector store service wrapping SupabaseVectorStore and embeddings.
Centralizes initialization to keep routes/services clean and consistent.
"""
from __future__ import annotations
import json
import warnings
from functools import lru_cache
from typing import Any, Dict, List, Optional, Tuple
import os
from langchain_core.documents import Document
from langchain_mistralai import MistralAIEmbeddings
from langchain_community.vectorstores import SupabaseVectorStore
from supabase import create_client, Client
from config.settings import settings
_DEBUG_LOG = r"c:\Users\cd\Documents\CAPL\ROUTEUR\dev\routeur_ia_api\.cursor\debug.log"
class VectorStoreServiceError(Exception):
pass
class PatchedSupabaseVectorStore(SupabaseVectorStore):
"""Fixes postgrest 2.28+ incompatibility where .params moved to .request.params."""
def similarity_search_by_vector_with_relevance_scores(
self,
query: List[float],
k: int,
filter: Optional[Dict[str, Any]] = None,
postgrest_filter: Optional[str] = None,
score_threshold: Optional[float] = None,
) -> List[Tuple[Document, float]]:
if filter:
for key, value in filter.items():
if isinstance(value, dict) and "$in" in value:
in_values = value["$in"]
values_str = ",".join(f"'{str(v)}'" for v in in_values)
new_filter = f"metadata->>{key} IN ({values_str})"
if postgrest_filter:
postgrest_filter = f"({postgrest_filter}) and ({new_filter})"
else:
postgrest_filter = new_filter
match_documents_params = self.match_args(query, filter)
query_builder = self._client.rpc(self.query_name, match_documents_params)
# #region agent log
import time as _t
with open(_DEBUG_LOG, "a", encoding="utf-8") as _f:
_f.write(json.dumps({"id": "log_patch_1", "timestamp": int(_t.time()*1000), "location": "vectorstore_service.py:PatchedSupabaseVectorStore", "message": "Using patched similarity_search", "data": {"k": k, "has_postgrest_filter": bool(postgrest_filter), "query_builder_type": type(query_builder).__name__, "has_request_attr": hasattr(query_builder, "request")}, "hypothesisId": "A"}) + "\n")
# #endregion
if postgrest_filter:
query_builder.request.params = query_builder.request.params.set(
"and", f"({postgrest_filter})"
)
query_builder.request.params = query_builder.request.params.set("limit", k)
res = query_builder.execute()
# #region agent log
with open(_DEBUG_LOG, "a", encoding="utf-8") as _f:
_f.write(json.dumps({"id": "log_patch_2", "timestamp": int(_t.time()*1000), "location": "vectorstore_service.py:PatchedSupabaseVectorStore", "message": "RPC execute success", "data": {"result_count": len(res.data) if res.data else 0}, "hypothesisId": "A"}) + "\n")
# #endregion
match_result = [
(
Document(
metadata=search.get("metadata", {}),
page_content=search.get("content", ""),
),
search.get("similarity", 0.0),
)
for search in res.data
if search.get("content")
]
if score_threshold is not None:
match_result = [
(doc, similarity)
for doc, similarity in match_result
if similarity >= score_threshold
]
if len(match_result) == 0:
warnings.warn(
"No relevant docs were retrieved using the relevance score"
f" threshold {score_threshold}"
)
return match_result
@lru_cache(maxsize=1)
def _get_supabase_client() -> Client:
url = settings.supabase_url or os.getenv("SUPABASE_URL")
key = settings.supabase_key or (
os.getenv("SUPABASE_KEY")
or os.getenv("SUPABASE_SERVICE_ROLE_KEY")
or os.getenv("SUPABASE_ANON_KEY")
or os.getenv("NEXT_PUBLIC_SUPABASE_ANON_KEY")
)
if not url or not key:
raise VectorStoreServiceError("SUPABASE_URL and a SUPABASE_*KEY env var are required.")
return create_client(url, key)
@lru_cache(maxsize=1)
def _get_embeddings() -> MistralAIEmbeddings:
return MistralAIEmbeddings(model="mistral-embed", api_key=settings.mistralai_api_key)
def _resolve_table_and_query(
index_name: Optional[str],
table_name: Optional[str],
query_name: Optional[str],
) -> Tuple[str, str]:
# Explicit beats implicit
if table_name and query_name:
return table_name, query_name
# Named logical index
if index_name:
idx = settings.vector_indexes.get(index_name)
if not idx:
raise VectorStoreServiceError(f"Unknown vector index '{index_name}'. Configure settings.vector_indexes.")
return idx["table"], idx["query_name"]
# Backward-compatible default
return settings.supabase_table, settings.supabase_match_fn
@lru_cache(maxsize=16)
def get_vector_store(
index_name: Optional[str] = None,
*,
table_name: Optional[str] = None,
query_name: Optional[str] = None,
) -> PatchedSupabaseVectorStore:
client = _get_supabase_client()
emb = _get_embeddings()
table, query = _resolve_table_and_query(index_name, table_name, query_name)
return PatchedSupabaseVectorStore(
embedding=emb,
client=client,
table_name=table,
query_name=query,
)
def add_documents(
documents: List[Document],
index_name: Optional[str] = None,
*,
table_name: Optional[str] = None,
query_name: Optional[str] = None,
) -> List[str]:
if not documents:
return []
vs = get_vector_store(index_name, table_name=table_name, query_name=query_name)
return vs.add_documents(documents)