"""Ingestion service: Mistral OCR -> chunk -> embed -> Supabase insert. This service is invoked by a background task. It reports progress through the shared TaskRegistry. """ from __future__ import annotations import io from typing import List, Literal from langchain_core.documents import Document from config.settings import settings from services.mistral_service import mistral_service from services.vectorstore_service import add_documents from services.task_service import task_registry def _chunk_text(text: str, chunk_size: int, chunk_overlap: int) -> List[str]: """Simple, dependency-free text splitter with overlap. Splits on character count boundaries; for production, replace with token-aware splitters if needed. """ if chunk_size <= 0: return [text] if not text: return [] chunks: List[str] = [] start = 0 n = len(text) step = max(1, chunk_size - max(0, chunk_overlap)) while start < n: end = min(n, start + chunk_size) chunks.append(text[start:end]) if end == n: break start = start + step return chunks DocumentFileType = Literal["pdf", "txt", "docx"] def _extract_text_from_docx(content_bytes: bytes) -> str: """Extract textual content from a .docx (Word OpenXML) byte stream. Concatenates paragraphs and table cells in document order, separated by line breaks. Headers/footers are intentionally skipped to keep chunks aligned with the visible body content. """ try: from docx import Document as _DocxDocument # type: ignore[import-not-found] except ImportError as exc: raise RuntimeError( "python-docx is required to ingest .docx files. " "Install it with `pip install python-docx`." ) from exc document = _DocxDocument(io.BytesIO(content_bytes)) parts: List[str] = [] for paragraph in document.paragraphs: text = (paragraph.text or "").strip() if text: parts.append(text) for table in document.tables: for row in table.rows: row_cells = [(cell.text or "").strip() for cell in row.cells] row_text = " | ".join(cell for cell in row_cells if cell) if row_text: parts.append(row_text) return "\n".join(parts) def _run_pdf_pipeline( job_id: str, project_id: str, filename: str, content_bytes: bytes, uploaded_by: str | None, index_name: str | None, document_id: str | None, ) -> None: """Execute the existing PDF ingestion pipeline (Mistral OCR -> chunk -> index).""" job = task_registry.get(job_id) if not job: return task_registry.set_running(job_id) # 1) Upload task_registry.set_stage(job_id, "upload") job.log(f"Uploading '{filename}' to Mistral") signed_url = mistral_service.upload_pdf(content_bytes, filename) # 2) OCR task_registry.set_stage(job_id, "ocr") job.log("Starting OCR") doc_source = mistral_service.build_document_url(signed_url) ocr_result = mistral_service.process_ocr(document_source=doc_source, include_image_base64=False) pages = ocr_result.get("pages", []) pages_total = len(pages) task_registry.set_progress(job_id, pages_total=pages_total, pages_done=0) # 3) Chunk per page task_registry.set_stage(job_id, "chunk") chunk_size = settings.chunk_size chunk_overlap = settings.chunk_overlap documents: List[Document] = [] for page in pages: page_index = int(page.get("index", 0)) page_markdown = (page.get("markdown") or "").replace("$\\checkmark$", "-") chunks = _chunk_text(page_markdown, chunk_size=chunk_size, chunk_overlap=chunk_overlap) for i, chunk in enumerate(chunks): metadata = { "source": filename, "page_number": page_index + 1, "type": settings.doc_default_type, "project_id": project_id, "uploaded_by": uploaded_by, "chunk_index": i, "document_id": document_id, } documents.append(Document(page_content=chunk, metadata=metadata)) # page-level progress task_registry.set_progress( job_id, pages_done=(job.pages_done or 0) + 1, ) # 4) Embed + index task_registry.set_stage(job_id, "index") job.log(f"Indexing {len(documents)} chunks to Supabase") ids = add_documents(documents, index_name=index_name) inserted_count = len(ids) # Done task_registry.set_done(job_id, inserted_count=inserted_count) def _run_text_pipeline( job_id: str, project_id: str, filename: str, content_bytes: bytes, uploaded_by: str | None, index_name: str | None, document_id: str | None, ) -> None: """Execute the text ingestion pipeline (plain text -> chunk -> index).""" job = task_registry.get(job_id) if not job: return task_registry.set_running(job_id) # 1) Parse / decode task_registry.set_stage(job_id, "parse") job.log(f"Decoding text content for '{filename}'") try: text = content_bytes.decode("utf-8", errors="replace") except Exception as exc: raise RuntimeError(f"Failed to decode text file '{filename}': {exc}") from exc # 2) Chunk task_registry.set_stage(job_id, "chunk") chunk_size = settings.chunk_size chunk_overlap = settings.chunk_overlap chunks = _chunk_text(text, chunk_size=chunk_size, chunk_overlap=chunk_overlap) documents: List[Document] = [] chunks_total = len(chunks) task_registry.set_progress(job_id, chunks_total=chunks_total, chunks_done=0) for i, chunk in enumerate(chunks): metadata = { "source": filename, "page_number": 1, "type": f"{settings.doc_default_type}_txt", "project_id": project_id, "uploaded_by": uploaded_by, "chunk_index": i, "document_id": document_id, } documents.append(Document(page_content=chunk, metadata=metadata)) task_registry.set_progress(job_id, chunks_done=i + 1) # 3) Embed + index task_registry.set_stage(job_id, "index") job.log(f"Indexing {len(documents)} text chunks to Supabase") ids = add_documents(documents, index_name=index_name) inserted_count = len(ids) task_registry.set_done(job_id, inserted_count=inserted_count) def _run_docx_pipeline( job_id: str, project_id: str, filename: str, content_bytes: bytes, uploaded_by: str | None, index_name: str | None, document_id: str | None, ) -> None: """Execute the .docx ingestion pipeline (Word -> text -> chunk -> index).""" job = task_registry.get(job_id) if not job: return task_registry.set_running(job_id) # 1) Parse / extract text from the Word document task_registry.set_stage(job_id, "parse") job.log(f"Extracting text from Word document '{filename}'") try: text = _extract_text_from_docx(content_bytes) except Exception as exc: raise RuntimeError(f"Failed to read docx file '{filename}': {exc}") from exc # 2) Chunk task_registry.set_stage(job_id, "chunk") chunk_size = settings.chunk_size chunk_overlap = settings.chunk_overlap chunks = _chunk_text(text, chunk_size=chunk_size, chunk_overlap=chunk_overlap) documents: List[Document] = [] chunks_total = len(chunks) task_registry.set_progress(job_id, chunks_total=chunks_total, chunks_done=0) for i, chunk in enumerate(chunks): metadata = { "source": filename, "page_number": 1, "type": f"{settings.doc_default_type}_docx", "project_id": project_id, "uploaded_by": uploaded_by, "chunk_index": i, "document_id": document_id, } documents.append(Document(page_content=chunk, metadata=metadata)) task_registry.set_progress(job_id, chunks_done=i + 1) # 3) Embed + index task_registry.set_stage(job_id, "index") job.log(f"Indexing {len(documents)} docx chunks to Supabase") ids = add_documents(documents, index_name=index_name) inserted_count = len(ids) task_registry.set_done(job_id, inserted_count=inserted_count) def run_job( job_id: str, project_id: str, filename: str, content_bytes: bytes, uploaded_by: str | None, index_name: str | None = None, file_type: DocumentFileType = "pdf", document_id: str | None = None, ) -> None: """Execute the ingestion pipeline and update job progress along the way.""" try: if file_type == "txt": _run_text_pipeline(job_id, project_id, filename, content_bytes, uploaded_by, index_name, document_id) elif file_type == "docx": _run_docx_pipeline(job_id, project_id, filename, content_bytes, uploaded_by, index_name, document_id) else: _run_pdf_pipeline(job_id, project_id, filename, content_bytes, uploaded_by, index_name, document_id) except Exception as exc: task_registry.set_failed(job_id, error=str(exc))