"""Reusable Mistral service: file upload, OCR, and chat completion. This module provides a clean, SOLID-oriented wrapper around the Mistral Python SDK, exposing high-level methods suitable for reuse across the API. """ from __future__ import annotations from typing import Optional, Dict, Any, Union import tempfile import os import base64 import json import requests from mistralai import Mistral from config import settings class MistralServiceError(Exception): """Domain-specific error for Mistral service failures.""" class MistralService: """Service for interacting with Mistral's Files, OCR, and Chat APIs. Responsibilities: - Manage a single Mistral client instance (dependency inversion via settings) - Provide cohesive, high-level operations: PDF upload, OCR, chat completion - Hide SDK-specific details behind clear method contracts """ def __init__( self, api_key: Optional[str] = None, ocr_model: str = "mistral-ocr-2503", chat_model: str = "mistral-large-latest", ) -> None: self._api_key = api_key or settings.mistralai_api_key if not self._api_key: raise MistralServiceError("Missing Mistral API key configuration") self.client = Mistral(api_key=self._api_key) self.ocr_model = ocr_model self.chat_model = chat_model self._rest_base_url = "https://api.mistral.ai/v1" # ---------- File Handling ---------- def upload_pdf(self, content: bytes, filename: str) -> str: """Upload a PDF file to Mistral and return a signed URL. Args: content: Raw PDF bytes filename: Original filename (used for content-type and metadata) Returns: Signed URL string suitable for use as a document source """ if not filename: raise MistralServiceError("Filename is required for PDF upload") # Prefer REST API for broader compatibility try: upload_url = f"{self._rest_base_url}/files" headers = {"Authorization": f"Bearer {self._api_key}"} files = [ ("file", (filename, content, "application/pdf")) ] data = {"purpose": "ocr"} res = requests.post(upload_url, headers=headers, data=data, files=files) res.raise_for_status() uploaded = res.json() file_id = uploaded.get("id") if not file_id: raise MistralServiceError(f"File upload failed: {uploaded}") # Get signed URL url_url = f"{self._rest_base_url}/files/{file_id}/url" res = requests.get(url_url, headers=headers) res.raise_for_status() signed_url = res.json().get("url") if not signed_url: raise MistralServiceError(f"Failed to get signed URL: {res.text}") return signed_url except Exception as exc: # Fallback: inline data URI if REST fails return self.encode_pdf_bytes_to_data_uri(content) # ---------- Document Source Builders ---------- @staticmethod def build_document_url(url: str) -> Dict[str, str]: """Create a document source dict for OCR/chat from a URL.""" return {"type": "document_url", "document_url": url} @staticmethod def build_image_url(url: str) -> Dict[str, str]: """Create an image source dict for OCR/chat from a URL or data URI.""" return {"type": "image_url", "image_url": url} @staticmethod def encode_image_bytes_to_data_uri(image_bytes: bytes, mime: str = "image/png") -> str: """Encode raw image bytes into a data URI suitable for Mistral image input.""" b64 = base64.b64encode(image_bytes).decode("utf-8") return f"data:{mime};base64,{b64}" @staticmethod def encode_pdf_bytes_to_data_uri(pdf_bytes: bytes) -> str: """Encode PDF bytes into a data URI suitable for use as a document URL.""" b64 = base64.b64encode(pdf_bytes).decode("utf-8") return f"data:application/pdf;base64,{b64}" # ---------- OCR ---------- def process_ocr( self, document_source: Dict[str, Any], *, include_image_base64: bool = False, ) -> Dict[str, Any]: """Run OCR on a document. Args: document_source: A dict like {"type": "document_url", "document_url": ...} or {"type": "image_url", "image_url": ...} include_image_base64: Whether to include base64 images in the response Returns: Response as a plain dict (SDK model dumped) """ # Prefer REST API for OCR for better compatibility try: ocr_url = f"{self._rest_base_url}/ocr" headers = { "Authorization": f"Bearer {self._api_key}", "Content-Type": "application/json", } payload = { "model": self.ocr_model, "document": {"document_url": document_source.get("document_url")} } # Only include the flag if True to minimize payload differences if include_image_base64: payload["include_image_base64"] = True res = requests.post(ocr_url, headers=headers, data=json.dumps(payload)) res.raise_for_status() return res.json() except Exception as exc: raise MistralServiceError(f"OCR processing failed: {exc}") from exc @staticmethod def ocr_response_to_markdown(ocr_response: Dict[str, Any]) -> str: """Aggregate OCR pages into a single markdown string.""" pages = ocr_response.get("pages", []) markdown_pages: list[str] = [] for page in pages: idx = page.get("index") md = page.get("markdown", "") if idx is not None: markdown_pages.append(f"\n\n---\n\n# Page {idx+1}\n\n{md}") else: markdown_pages.append(md) return "\n".join(markdown_pages) # ---------- Chat Completion (with JSON) ---------- def complete_json_from_document( self, document_source: Dict[str, Any], *, system_prompt: str, user_text: Optional[str] = None, model: Optional[str] = None, json_schema: Optional[Dict[str, Any]] = None, strict_json: bool = True, use_ocr_for_document: bool = True, ocr_max_chars: Optional[int] = None, ) -> Union[Dict[str, Any], str]: """Create a chat completion grounded on a document, requesting JSON output. Args: document_source: Document or image source (see builder helpers) system_prompt: System instructions (e.g., extraction guidance) user_text: Optional user text content to accompany the document model: Optional override for chat model json_schema: Optional JSON schema to enforce output structure strict_json: If True, requests strictly valid JSON use_ocr_for_document: When True, OCR the document and provide text to chat ocr_max_chars: Optionally truncate OCR text to this many characters Returns: Parsed JSON dict when possible; otherwise the raw string content """ # If schema provided, inject it into system prompt (SDK only supports json_object type) system_content = system_prompt if json_schema is not None: schema_str = json.dumps(json_schema, indent=2) system_content += f"\n\n**JSON Schema to follow:**\n```json\n{schema_str}\n```" messages: list[Dict[str, Any]] = [ {"role": "system", "content": system_content}, ] # Build user content as a single string to satisfy SDK expectations user_content_parts: list[str] = [] if user_text: user_content_parts.append(user_text) if use_ocr_for_document: # Run OCR to convert document to markdown text ocr_dict = self.process_ocr(document_source, include_image_base64=False) ocr_markdown = self.ocr_response_to_markdown(ocr_dict) if ocr_max_chars is not None and isinstance(ocr_max_chars, int) and ocr_max_chars > 0: ocr_markdown = ocr_markdown[:ocr_max_chars] user_content_parts.append("\n\n=== DOCUMENT CONTENT (OCR) ===\n" + ocr_markdown) else: # As a fallback, include the document URL if available (model can't fetch, but keeps spec simple) doc_url = document_source.get("document_url") if doc_url: user_content_parts.append(f"Document URL: {doc_url}") messages.append({"role": "user", "content": "\n\n".join(user_content_parts)}) # SDK only supports "text" or "json_object" response_format response_format: Dict[str, Any] = {"type": "json_object"} try: chat_response = self.client.chat.complete( model=model or self.chat_model, messages=messages, response_format=response_format, ) content = chat_response.choices[0].message.content except Exception as exc: raise MistralServiceError(f"Chat completion failed: {exc}") from exc # Try to parse JSON; fall back to raw string if parsing fails if isinstance(content, str): try: return json.loads(content) except Exception: return content return content # Singleton instance mistral_service = MistralService()