"""Document ingestion routes: upload, track status, and delete documents.""" from fastapi import APIRouter, UploadFile, File, HTTPException, status, Depends, BackgroundTasks, Form from fastapi import Query from typing import Optional, Literal import uuid from core.security import get_current_user from domain.models import UploadJobResponse, JobStatusResponse, ErrorResponse, DeleteDocumentResponse from services.task_service import task_registry from services import ingestion_service from services import vectorstore_service from config.settings import settings router = APIRouter(prefix="/projects/{id_project}/documents", tags=["Documents"]) SupportedDocumentType = Literal["pdf", "txt", "docx"] _DOCX_CONTENT_TYPES = { "application/vnd.openxmlformats-officedocument.wordprocessingml.document", "application/msword", } def _detect_document_type(file: UploadFile) -> SupportedDocumentType: """ Validate the uploaded document and return its logical type. Currently supports: - PDF: .pdf extension or application/pdf - TXT: .txt extension or text/plain - DOCX: .docx extension or Word OpenXML content type """ filename = (file.filename or "").lower() content_type = (file.content_type or "").lower() is_pdf = filename.endswith(".pdf") or content_type == "application/pdf" if is_pdf: return "pdf" is_docx = filename.endswith(".docx") or content_type in _DOCX_CONTENT_TYPES if is_docx: return "docx" is_txt = filename.endswith(".txt") or content_type == "text/plain" if is_txt: return "txt" raise HTTPException( status_code=status.HTTP_400_BAD_REQUEST, detail="Only PDF, Word (.docx) and plain text (.txt) files are supported at the moment.", ) @router.post( "", response_model=UploadJobResponse, responses={ 400: {"model": ErrorResponse}, 401: {"model": ErrorResponse}, 500: {"model": ErrorResponse}, 202: {"description": "Accepted: returns job id while processing continues in background"}, } ) async def upload_project_document( id_project: str, background_tasks: BackgroundTasks, current_user: dict = Depends(get_current_user), id_document: str = Form(..., description="UUID du document (obligatoire)"), file: UploadFile = File(..., description="Project document to ingest (PDF, .docx or plain text)"), index_name: str = Query("projects", description="Logical index (settings.vector_indexes key)"), ) -> UploadJobResponse: """ Start background ingestion for a project document. Body fields (multipart/form-data): - id_document (UUID, required): stable identifier of the logical document - file (PDF, .docx or .txt): binary content Supported formats: - PDF: processed via Mistral OCR -> chunk -> embed -> index - DOCX (Word): text extraction -> chunk -> embed -> index (no OCR) - Plain text (.txt): direct text -> chunk -> embed -> index (no OCR) Returns a `job_id` for status polling. """ # Validate id_document as UUID try: document_uuid = uuid.UUID(id_document) except ValueError: raise HTTPException( status_code=status.HTTP_400_BAD_REQUEST, detail="id_document must be a valid UUID.", ) file_type = _detect_document_type(file) # Size validation file.file.seek(0, 2) file_size = file.file.tell() file.file.seek(0) max_bytes = settings.max_upload_mb_pdf * 1024 * 1024 if file_size > max_bytes: raise HTTPException( status_code=status.HTTP_400_BAD_REQUEST, detail=f"File too large. Max {settings.max_upload_mb_pdf} MB." ) content = await file.read() uploaded_by: Optional[str] = ( current_user.get("sub") or current_user.get("user_id") or current_user.get("email") ) # Ensure no duplicate document for this project if vectorstore_service.document_exists( project_id=id_project, document_id=str(document_uuid), index_name=index_name, ): raise HTTPException( status_code=status.HTTP_409_CONFLICT, detail="A document with this id_document already exists for this project.", ) # Create job and enqueue background task job = task_registry.create_job(project_id=id_project, filename=file.filename, uploaded_by=uploaded_by) background_tasks.add_task( ingestion_service.run_job, job.job_id, id_project, file.filename, content, uploaded_by, index_name, file_type, str(document_uuid), ) return UploadJobResponse(job_id=job.job_id, status=job.status, created_at=job.created_at) @router.delete( "/{id_document}", response_model=DeleteDocumentResponse, responses={ 404: {"model": ErrorResponse}, 401: {"model": ErrorResponse}, }, ) async def delete_project_document( id_project: str, id_document: str, current_user: dict = Depends(get_current_user), index_name: str = Query("projects", description="Logical index (settings.vector_indexes key)"), ) -> DeleteDocumentResponse: """ Delete all vector chunks for a given logical document in a project. Path params: - id_project: project identifier - id_document: UUID of the logical document """ try: document_uuid = uuid.UUID(id_document) except ValueError: raise HTTPException( status_code=status.HTTP_400_BAD_REQUEST, detail="id_document must be a valid UUID.", ) deleted_count = vectorstore_service.delete_documents_by_document_id( project_id=id_project, document_id=str(document_uuid), index_name=index_name, ) if deleted_count == 0: raise HTTPException( status_code=status.HTTP_404_NOT_FOUND, detail="Document not found for this project.", ) return DeleteDocumentResponse( project_id=id_project, document_id=str(document_uuid), deleted_count=deleted_count, ) @router.get( "/jobs/{job_id}", response_model=JobStatusResponse, responses={ 404: {"model": ErrorResponse}, 401: {"model": ErrorResponse}, } ) async def get_job_status( id_project: str, job_id: str, current_user: dict = Depends(get_current_user) ) -> JobStatusResponse: """ Get the current status/progress of a background ingestion job. """ snapshot = task_registry.status_snapshot(job_id) if not snapshot: raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="Job not found") # Optional: ensure job belongs to the requested project job = task_registry.get(job_id) if job and job.project_id != id_project: raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="Job not found for project") return JobStatusResponse(**snapshot)