"""Agent service for executing LangGraph agents.""" from typing import Optional, AsyncIterator, List, Dict from langchain_core.messages import HumanMessage, AIMessage, BaseMessage from langchain_core.language_models.chat_models import BaseChatModel from domain.enums import ModelName, AgentType from .llm_service import llm_service from .agent_registry import agent_registry class AgentService: """ Service for executing agent graphs with different LLMs. This service is the bridge between the API layer and the LangGraph agents. It handles: - Creating the right LLM based on model selection - Getting the right agent graph from the registry - Executing the graph with or without streaming """ def __init__(self): """Initialize the agent service.""" pass async def invoke( self, message: str, model_name: ModelName, agent_type: AgentType = AgentType.SIMPLE, temperature: float = 0.7, max_tokens: Optional[int] = None, conversation_history: Optional[List[Dict[str, str]]] = None ) -> dict: """ Invoke agent for a single response (non-streaming). Args: message: User message model_name: LLM model to use agent_type: Type of agent graph temperature: Sampling temperature max_tokens: Max tokens to generate conversation_history: Optional conversation history Returns: Response dictionary with content and metadata """ # Create LLM instance llm = llm_service.get_llm( model_name=model_name, temperature=temperature, streaming=False, max_tokens=max_tokens ) # Get agent builder and create graph builder = agent_registry.get_builder(agent_type) graph = builder(llm) # Prepare messages messages = self._prepare_messages(message, conversation_history) # Execute graph result = await graph.ainvoke({"messages": messages}) # Extract response response_message = result["messages"][-1] response_content = response_message.content return { "response": response_content, "model": model_name.value, "agent_type": agent_type.value, "usage": getattr(response_message, "usage_metadata", None), "metadata": { "message_count": len(result["messages"]) } } async def stream( self, message: str, model_name: ModelName, agent_type: AgentType = AgentType.SIMPLE, temperature: float = 0.7, max_tokens: Optional[int] = None, conversation_history: Optional[List[Dict[str, str]]] = None ) -> AsyncIterator[dict]: """ Stream agent response token by token. Args: message: User message model_name: LLM model to use agent_type: Type of agent graph temperature: Sampling temperature max_tokens: Max tokens to generate conversation_history: Optional conversation history Yields: Dictionary chunks with content and metadata """ # Create LLM instance with streaming enabled llm = llm_service.get_llm( model_name=model_name, temperature=temperature, streaming=True, max_tokens=max_tokens ) # Get agent builder and create graph builder = agent_registry.get_builder(agent_type) graph = builder(llm) # Prepare messages messages = self._prepare_messages(message, conversation_history) # Stream graph execution async for event in graph.astream({"messages": messages}): # Extract content from the event if "agent" in event: messages_in_event = event["agent"]["messages"] if messages_in_event: last_message = messages_in_event[-1] if hasattr(last_message, "content"): yield { "content": last_message.content, "done": False, "metadata": { "model": model_name.value, "agent_type": agent_type.value } } # Send final chunk yield { "content": "", "done": True, "metadata": { "model": model_name.value, "agent_type": agent_type.value } } def _prepare_messages( self, message: str, conversation_history: Optional[List[Dict[str, str]]] = None ) -> List[BaseMessage]: """ Prepare messages list from user input and optional history. Args: message: Current user message conversation_history: Optional list of previous messages Returns: List of LangChain messages """ messages = [] # Add conversation history if provided if conversation_history: for msg in conversation_history: role = msg.get("role", "user") content = msg.get("content", "") if role == "user": messages.append(HumanMessage(content=content)) elif role == "assistant": messages.append(AIMessage(content=content)) # Add current message messages.append(HumanMessage(content=message)) return messages # Singleton instance agent_service = AgentService()