"""Ingestion service: Mistral OCR -> chunk -> embed -> Supabase insert. This service is invoked by a background task. It reports progress through the shared TaskRegistry. """ from __future__ import annotations from typing import List from langchain_core.documents import Document from config.settings import settings from services.mistral_service import mistral_service from services.vectorstore_service import add_documents from services.task_service import task_registry def _chunk_text(text: str, chunk_size: int, chunk_overlap: int) -> List[str]: """Simple, dependency-free text splitter with overlap. Splits on character count boundaries; for production, replace with token-aware splitters if needed. """ if chunk_size <= 0: return [text] if not text: return [] chunks: List[str] = [] start = 0 n = len(text) step = max(1, chunk_size - max(0, chunk_overlap)) while start < n: end = min(n, start + chunk_size) chunks.append(text[start:end]) if end == n: break start = start + step return chunks def run_job(job_id: str, project_id: str, filename: str, content_bytes: bytes, uploaded_by: str | None, index_name: str | None = None) -> None: """Execute the ingestion pipeline and update job progress along the way.""" job = task_registry.get(job_id) if not job: return try: task_registry.set_running(job_id) # 1) Upload task_registry.set_stage(job_id, "upload") job.log(f"Uploading '{filename}' to Mistral") signed_url = mistral_service.upload_pdf(content_bytes, filename) # 2) OCR task_registry.set_stage(job_id, "ocr") job.log("Starting OCR") doc_source = mistral_service.build_document_url(signed_url) ocr_result = mistral_service.process_ocr(document_source=doc_source, include_image_base64=False) pages = ocr_result.get("pages", []) pages_total = len(pages) task_registry.set_progress(job_id, pages_total=pages_total, pages_done=0) # 3) Chunk per page task_registry.set_stage(job_id, "chunk") chunk_size = settings.chunk_size chunk_overlap = settings.chunk_overlap documents: List[Document] = [] for page in pages: page_index = int(page.get("index", 0)) page_markdown = (page.get("markdown") or "").replace("$\\checkmark$", "-") chunks = _chunk_text(page_markdown, chunk_size=chunk_size, chunk_overlap=chunk_overlap) for i, chunk in enumerate(chunks): metadata = { "source": filename, "page_number": page_index + 1, "type": settings.doc_default_type, "project_id": project_id, "uploaded_by": uploaded_by, "chunk_index": i, } documents.append(Document(page_content=chunk, metadata=metadata)) # page-level progress done = (job.pages_done or 0) + 1 task_registry.set_progress(job_id, pages_done=done) # 4) Embed + index task_registry.set_stage(job_id, "index") job.log(f"Indexing {len(documents)} chunks to Supabase") ids = add_documents(documents, index_name=index_name) inserted_count = len(ids) # Done task_registry.set_done(job_id, inserted_count=inserted_count) except Exception as exc: task_registry.set_failed(job_id, error=str(exc))