"""Simple base LangGraph for conversational agent.""" from typing import TypedDict, Annotated, Sequence, List, Optional from langchain_core.messages import BaseMessage, HumanMessage, AIMessage, SystemMessage from langchain_core.language_models.chat_models import BaseChatModel from langchain_core.documents import Document from langgraph.graph import StateGraph, END from langgraph.graph.message import add_messages from .prompts import SYSTEM_PROMPT_TEMPLATE # RAG imports (reuse setup from knowledge/ocr.ipynb) import os from functools import lru_cache from supabase import create_client, Client from langchain_openai import OpenAIEmbeddings from langchain_community.vectorstores import SupabaseVectorStore class AgentState(TypedDict, total=False): """State for the conversational agent with RAG.""" messages: Annotated[Sequence[BaseMessage], add_messages] query: Optional[str] formation_docs: List[Document] prestation_docs: List[Document] formation_context: str prestation_context: str project_docs: List[Document] project_context: str def create_simple_graph(llm: BaseChatModel): """ Create a simple conversational graph with LangGraph. This is a basic graph that takes a message, sends it to the LLM, and returns the response. It can be easily replaced with more complex graphs. Args: llm: Language model to use for generation Returns: Compiled LangGraph """ def call_model(state: AgentState) -> AgentState: """Call the LLM with the current messages.""" print(f"Calling model with messages: {state['messages']}") messages = state["messages"] response = llm.invoke(messages) return {"messages": messages + [AIMessage(content=response.content)] } # Build the graph workflow = StateGraph(AgentState) # Add nodes workflow.add_node("agent", call_model) # Set entry point workflow.set_entry_point("agent") # Add edge to end workflow.add_edge("agent", END) # Compile and return return workflow.compile() def create_simple_graph_with_history(llm: BaseChatModel): """ Create a conversational graph with history + RAG retrieval from Supabase. Entry -> retrieve (RAG) -> agent (generate) -> END """ @lru_cache(maxsize=2) def _get_retriever(doc_type: str, k: int = int(os.getenv("RAG_TOP_K", "5"))): """Get retriever for specific document type (formation or prestation).""" url = os.getenv("SUPABASE_URL") key = ( os.getenv("SUPABASE_KEY") or os.getenv("SUPABASE_SERVICE_ROLE_KEY") or os.getenv("SUPABASE_ANON_KEY") or os.getenv("NEXT_PUBLIC_SUPABASE_ANON_KEY") ) if not url or not key: raise ValueError("SUPABASE_URL and a SUPABASE_*KEY env var are required.") client: Client = create_client(url, key) vector_store = SupabaseVectorStore( embedding=OpenAIEmbeddings(api_key=os.getenv("OPENAI_API_KEY")), client=client, table_name=os.getenv("SUPABASE_TABLE", "documents"), query_name=os.getenv("SUPABASE_MATCH_FN", "match_documents"), ) return vector_store.as_retriever(search_kwargs={"k": k, "filter": {"type": doc_type}}) def _format_docs(docs: List[Document], doc_type: str, max_chars_per_doc: int = 1200) -> str: """Format documents with type-specific formatting.""" blocks = [] for i, doc in enumerate(docs, 1): text = (doc.page_content or "")[:max_chars_per_doc] meta = doc.metadata or {} src = meta.get("source", "N/A") page = meta.get("page_number", "N/A") kind = meta.get("type", "N/A") contact = meta.get("contact", None) header = f"[{i}] source={src} page={page} type={kind}" if contact: header += f" contact={contact}" blocks.append(f"\n{header}\n{text}".strip()) return "\n\n---\n\n".join(blocks) def retrieve(state: AgentState) -> AgentState: """Separate retriever node: builds query, fetches docs for both types, formats context.""" # Get query from state or last human message q = state.get("query") if not q: q = "" for msg in reversed(list(state.get("messages", []))): if getattr(msg, "type", "") == "human": q = (msg.content or "").strip() break # Get retrievers for both types formation_retriever = _get_retriever("formation", k=8) prestation_retriever = _get_retriever("prestation", k=8) # Retrieve documents for both types formation_docs = formation_retriever.invoke(q or "") prestation_docs = prestation_retriever.invoke(q or "") # Format contexts for both types formation_context = _format_docs(formation_docs, "formation") prestation_context = _format_docs(prestation_docs, "prestation") return { "formation_docs": formation_docs, "prestation_docs": prestation_docs, "formation_context": formation_context, "prestation_context": prestation_context } def call_model_with_history(state: AgentState) -> AgentState: """Generation node: SYSTEM + RAG context + conversation.""" messages = list(state.get("messages", [])) sys_msgs: List[BaseMessage] = [SystemMessage(content=SYSTEM_PROMPT_TEMPLATE)] # Get both contexts formation_context = state.get("formation_context", "") prestation_context = state.get("prestation_context", "") # Add formation context if available if formation_context: sys_msgs.append(SystemMessage(content=( "CONTEXTE FORMATIONS (extraits du catalogue formations; n'utilise rien d'autre):\n\n" f"{formation_context}\n\n" "Consignes formations: Utilise exclusivement ce contexte pour recommander les formations. " "Cite la page et la source pour chaque recommandation. " "Une formation = un document." ))) # Add prestation context if available if prestation_context: sys_msgs.append(SystemMessage(content=( "CONTEXTE PRESTATIONS (extraits du catalogue services; n'utilise rien d'autre):\n\n" f"{prestation_context}\n\n" "Consignes prestations: Utilise exclusivement ce contexte pour recommander les prestations. " "Cite la page et la source pour chaque recommandation. " "Un document peut contenir plusieurs prestations." ))) response = llm.invoke(sys_msgs + messages) return {"messages": messages + [AIMessage(content=response.content)]} # Build the graph workflow = StateGraph(AgentState) # Add nodes workflow.add_node("retrieve", retrieve) workflow.add_node("agent", call_model_with_history) # Set entry point workflow.set_entry_point("retrieve") # Add edges workflow.add_edge("retrieve", "agent") workflow.add_edge("agent", END) # Compile and return return workflow.compile()