"""Completion routes for AI agent interactions.""" import json from fastapi import APIRouter, HTTPException, status, Depends from fastapi.responses import StreamingResponse from typing import AsyncIterator from core.security import get_current_user from domain.models import CompletionRequest, CompletionResponse, StreamChunk, ErrorResponse from services.agent_service import agent_service from services.llm_service import llm_service router = APIRouter(prefix="/completion", tags=["Completion"]) @router.post( "", response_model=CompletionResponse, responses={ 200: { "description": "Non-streaming: JSON response | Streaming: Server-Sent Events (SSE)", # We keep an explicit SSE example for the streaming mode. "content": { "text/event-stream": { "example": "data: {\"content\": \"Hello\", \"done\": false}\n\n" } }, }, 400: { "description": "Bad Request - Invalid model configuration or streaming not supported", "model": ErrorResponse, "examples": { "streaming_not_supported": { "summary": "Model does not support streaming", "value": { "error": "Model 'gpt-5' does not support streaming. Please set stream=false or use a streaming-capable model.", "detail": "Model 'gpt-5' does not support streaming. Please set stream=false or use a streaming-capable model.", "timestamp": "2024-01-01T00:00:00Z", }, } }, }, 500: {"model": ErrorResponse}, }, ) async def complete( request: CompletionRequest, current_user: dict = Depends(get_current_user) ): """ Generate AI completion for a user message. This endpoint supports both streaming and non-streaming responses based on the `stream` parameter in the request body. **Non-streaming mode (stream=false):** - Returns a complete JSON response with the full answer - Response model: `CompletionResponse` **Streaming mode (stream=true):** - Returns Server-Sent Events (SSE) with incremental chunks - Each event is a JSON object with `content`, `done`, and `metadata` - Content-Type: `text/event-stream` Args: request: Completion request with message, model, agent and streaming flag current_user: Authenticated user (JWT required) Returns: CompletionResponse (non-streaming) or StreamingResponse (streaming) Raises: HTTPException: If agent type is not available or execution fails """ try: # Validate streaming capability for the requested model if request.stream and not llm_service.supports_streaming(request.model): raise HTTPException( status_code=status.HTTP_400_BAD_REQUEST, detail=f"Model '{request.model.value}' does not support streaming. " f"Please set stream=false or use a streaming-capable model." ) # Check if streaming is requested if request.stream: return await _stream_completion(request) else: return await _complete(request) except ValueError as e: # Agent not available or validation error raise HTTPException( status_code=status.HTTP_400_BAD_REQUEST, detail=str(e) ) except Exception as e: # Unexpected error raise HTTPException( status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail=f"Completion failed: {str(e)}" ) async def _complete(request: CompletionRequest) -> CompletionResponse: """ Handle non-streaming completion. Args: request: Completion request Returns: Complete response with full text """ result = await agent_service.invoke( message=request.message, model_name=request.model, agent=request.agent, temperature=request.temperature, max_tokens=request.max_tokens, conversation_history=request.conversation_history, project_id=request.project_id, sources=request.sources, conversation_id=request.conversation_id, ) return CompletionResponse(**result, conversation_id=request.conversation_id) async def _stream_completion(request: CompletionRequest) -> StreamingResponse: """ Handle streaming completion with Server-Sent Events. Args: request: Completion request Returns: StreamingResponse with SSE """ async def event_generator() -> AsyncIterator[str]: """Generate Server-Sent Events for streaming.""" try: async for chunk in agent_service.stream( message=request.message, model_name=request.model, agent=request.agent, temperature=request.temperature, max_tokens=request.max_tokens, conversation_history=request.conversation_history, project_id=request.project_id, sources=request.sources, conversation_id=request.conversation_id, ): # Always include conversation_id at top level so the client has it from the first chunk chunk["conversation_id"] = request.conversation_id # Format as SSE: "data: {json}\n\n" chunk_json = json.dumps(chunk, ensure_ascii=False) yield f"data: {chunk_json}\n\n" except Exception as e: # Send error as final event error_chunk = { "content": "", "done": True, "error": str(e) } yield f"data: {json.dumps(error_chunk)}\n\n" return StreamingResponse( event_generator(), media_type="text/event-stream", headers={ "Cache-Control": "no-cache", "Connection": "keep-alive", "X-Accel-Buffering": "no" # Disable buffering in nginx } )