Cyril Dupland
Add batch processing service for structured data extraction from OCR results. Include metrics for carbon impact, latency, and pricing in API examples. Update default OCR model in Mistral service for consistency.
b1df3b7 | """Reusable Mistral service: file upload, OCR, and chat completion. | |
| This module provides a clean, SOLID-oriented wrapper around the Mistral | |
| Python SDK, exposing high-level methods suitable for reuse across the API. | |
| """ | |
| from __future__ import annotations | |
| from typing import Optional, Dict, Any, Union | |
| import tempfile | |
| import os | |
| import base64 | |
| import json | |
| import requests | |
| from mistralai import Mistral | |
| from config import settings | |
| class MistralServiceError(Exception): | |
| """Domain-specific error for Mistral service failures.""" | |
| class MistralService: | |
| """Service for interacting with Mistral's Files, OCR, and Chat APIs. | |
| Responsibilities: | |
| - Manage a single Mistral client instance (dependency inversion via settings) | |
| - Provide cohesive, high-level operations: PDF upload, OCR, chat completion | |
| - Hide SDK-specific details behind clear method contracts | |
| """ | |
| def __init__( | |
| self, | |
| api_key: Optional[str] = None, | |
| ocr_model: str = "mistral-ocr-2503", | |
| chat_model: str = "mistral-large-latest", | |
| ) -> None: | |
| self._api_key = api_key or settings.mistralai_api_key | |
| if not self._api_key: | |
| raise MistralServiceError("Missing Mistral API key configuration") | |
| self.client = Mistral(api_key=self._api_key) | |
| self.ocr_model = ocr_model | |
| self.chat_model = chat_model | |
| self._rest_base_url = "https://api.mistral.ai/v1" | |
| # ---------- File Handling ---------- | |
| def upload_pdf(self, content: bytes, filename: str) -> str: | |
| """Upload a PDF file to Mistral and return a signed URL. | |
| Args: | |
| content: Raw PDF bytes | |
| filename: Original filename (used for content-type and metadata) | |
| Returns: | |
| Signed URL string suitable for use as a document source | |
| """ | |
| if not filename: | |
| raise MistralServiceError("Filename is required for PDF upload") | |
| # Prefer REST API for broader compatibility | |
| try: | |
| upload_url = f"{self._rest_base_url}/files" | |
| headers = {"Authorization": f"Bearer {self._api_key}"} | |
| files = [ | |
| ("file", (filename, content, "application/pdf")) | |
| ] | |
| data = {"purpose": "ocr"} | |
| res = requests.post(upload_url, headers=headers, data=data, files=files) | |
| res.raise_for_status() | |
| uploaded = res.json() | |
| file_id = uploaded.get("id") | |
| if not file_id: | |
| raise MistralServiceError(f"File upload failed: {uploaded}") | |
| # Get signed URL | |
| url_url = f"{self._rest_base_url}/files/{file_id}/url" | |
| res = requests.get(url_url, headers=headers) | |
| res.raise_for_status() | |
| signed_url = res.json().get("url") | |
| if not signed_url: | |
| raise MistralServiceError(f"Failed to get signed URL: {res.text}") | |
| return signed_url | |
| except Exception as exc: | |
| # Fallback: inline data URI if REST fails | |
| return self.encode_pdf_bytes_to_data_uri(content) | |
| # ---------- Document Source Builders ---------- | |
| def build_document_url(url: str) -> Dict[str, str]: | |
| """Create a document source dict for OCR/chat from a URL.""" | |
| return {"type": "document_url", "document_url": url} | |
| def build_image_url(url: str) -> Dict[str, str]: | |
| """Create an image source dict for OCR/chat from a URL or data URI.""" | |
| return {"type": "image_url", "image_url": url} | |
| def encode_image_bytes_to_data_uri(image_bytes: bytes, mime: str = "image/png") -> str: | |
| """Encode raw image bytes into a data URI suitable for Mistral image input.""" | |
| b64 = base64.b64encode(image_bytes).decode("utf-8") | |
| return f"data:{mime};base64,{b64}" | |
| def encode_pdf_bytes_to_data_uri(pdf_bytes: bytes) -> str: | |
| """Encode PDF bytes into a data URI suitable for use as a document URL.""" | |
| b64 = base64.b64encode(pdf_bytes).decode("utf-8") | |
| return f"data:application/pdf;base64,{b64}" | |
| # ---------- OCR ---------- | |
| def process_ocr( | |
| self, | |
| document_source: Dict[str, Any], | |
| *, | |
| include_image_base64: bool = False, | |
| ) -> Dict[str, Any]: | |
| """Run OCR on a document. | |
| Args: | |
| document_source: A dict like {"type": "document_url", "document_url": ...} | |
| or {"type": "image_url", "image_url": ...} | |
| include_image_base64: Whether to include base64 images in the response | |
| Returns: | |
| Response as a plain dict (SDK model dumped) | |
| """ | |
| # Prefer REST API for OCR for better compatibility | |
| try: | |
| ocr_url = f"{self._rest_base_url}/ocr" | |
| headers = { | |
| "Authorization": f"Bearer {self._api_key}", | |
| "Content-Type": "application/json", | |
| } | |
| payload = { | |
| "model": self.ocr_model, | |
| "document": {"document_url": document_source.get("document_url")} | |
| } | |
| # Only include the flag if True to minimize payload differences | |
| if include_image_base64: | |
| payload["include_image_base64"] = True | |
| res = requests.post(ocr_url, headers=headers, data=json.dumps(payload)) | |
| res.raise_for_status() | |
| return res.json() | |
| except Exception as exc: | |
| raise MistralServiceError(f"OCR processing failed: {exc}") from exc | |
| def ocr_response_to_markdown(ocr_response: Dict[str, Any]) -> str: | |
| """Aggregate OCR pages into a single markdown string.""" | |
| pages = ocr_response.get("pages", []) | |
| markdown_pages: list[str] = [] | |
| for page in pages: | |
| idx = page.get("index") | |
| md = page.get("markdown", "") | |
| if idx is not None: | |
| markdown_pages.append(f"\n\n---\n\n# Page {idx+1}\n\n{md}") | |
| else: | |
| markdown_pages.append(md) | |
| return "\n".join(markdown_pages) | |
| # ---------- Chat Completion (with JSON) ---------- | |
| def complete_json_from_document( | |
| self, | |
| document_source: Dict[str, Any], | |
| *, | |
| system_prompt: str, | |
| user_text: Optional[str] = None, | |
| model: Optional[str] = None, | |
| json_schema: Optional[Dict[str, Any]] = None, | |
| strict_json: bool = True, | |
| use_ocr_for_document: bool = True, | |
| ocr_max_chars: Optional[int] = None, | |
| ) -> Union[Dict[str, Any], str]: | |
| """Create a chat completion grounded on a document, requesting JSON output. | |
| Args: | |
| document_source: Document or image source (see builder helpers) | |
| system_prompt: System instructions (e.g., extraction guidance) | |
| user_text: Optional user text content to accompany the document | |
| model: Optional override for chat model | |
| json_schema: Optional JSON schema to enforce output structure | |
| strict_json: If True, requests strictly valid JSON | |
| use_ocr_for_document: When True, OCR the document and provide text to chat | |
| ocr_max_chars: Optionally truncate OCR text to this many characters | |
| Returns: | |
| Parsed JSON dict when possible; otherwise the raw string content | |
| """ | |
| # If schema provided, inject it into system prompt (SDK only supports json_object type) | |
| system_content = system_prompt | |
| if json_schema is not None: | |
| schema_str = json.dumps(json_schema, indent=2) | |
| system_content += f"\n\n**JSON Schema to follow:**\n```json\n{schema_str}\n```" | |
| messages: list[Dict[str, Any]] = [ | |
| {"role": "system", "content": system_content}, | |
| ] | |
| # Build user content as a single string to satisfy SDK expectations | |
| user_content_parts: list[str] = [] | |
| if user_text: | |
| user_content_parts.append(user_text) | |
| if use_ocr_for_document: | |
| # Run OCR to convert document to markdown text | |
| ocr_dict = self.process_ocr(document_source, include_image_base64=False) | |
| ocr_markdown = self.ocr_response_to_markdown(ocr_dict) | |
| if ocr_max_chars is not None and isinstance(ocr_max_chars, int) and ocr_max_chars > 0: | |
| ocr_markdown = ocr_markdown[:ocr_max_chars] | |
| user_content_parts.append("\n\n=== DOCUMENT CONTENT (OCR) ===\n" + ocr_markdown) | |
| else: | |
| # As a fallback, include the document URL if available (model can't fetch, but keeps spec simple) | |
| doc_url = document_source.get("document_url") | |
| if doc_url: | |
| user_content_parts.append(f"Document URL: {doc_url}") | |
| messages.append({"role": "user", "content": "\n\n".join(user_content_parts)}) | |
| # SDK only supports "text" or "json_object" response_format | |
| response_format: Dict[str, Any] = {"type": "json_object"} | |
| try: | |
| chat_response = self.client.chat.complete( | |
| model=model or self.chat_model, | |
| messages=messages, | |
| response_format=response_format, | |
| ) | |
| content = chat_response.choices[0].message.content | |
| except Exception as exc: | |
| raise MistralServiceError(f"Chat completion failed: {exc}") from exc | |
| # Try to parse JSON; fall back to raw string if parsing fails | |
| if isinstance(content, str): | |
| try: | |
| return json.loads(content) | |
| except Exception: | |
| return content | |
| return content | |
| # Singleton instance | |
| mistral_service = MistralService() | |