routeur_ia_api / services /mistral_service.py
Cyril Dupland
Add batch processing service for structured data extraction from OCR results. Include metrics for carbon impact, latency, and pricing in API examples. Update default OCR model in Mistral service for consistency.
b1df3b7
raw
history blame
9.68 kB
"""Reusable Mistral service: file upload, OCR, and chat completion.
This module provides a clean, SOLID-oriented wrapper around the Mistral
Python SDK, exposing high-level methods suitable for reuse across the API.
"""
from __future__ import annotations
from typing import Optional, Dict, Any, Union
import tempfile
import os
import base64
import json
import requests
from mistralai import Mistral
from config import settings
class MistralServiceError(Exception):
"""Domain-specific error for Mistral service failures."""
class MistralService:
"""Service for interacting with Mistral's Files, OCR, and Chat APIs.
Responsibilities:
- Manage a single Mistral client instance (dependency inversion via settings)
- Provide cohesive, high-level operations: PDF upload, OCR, chat completion
- Hide SDK-specific details behind clear method contracts
"""
def __init__(
self,
api_key: Optional[str] = None,
ocr_model: str = "mistral-ocr-2503",
chat_model: str = "mistral-large-latest",
) -> None:
self._api_key = api_key or settings.mistralai_api_key
if not self._api_key:
raise MistralServiceError("Missing Mistral API key configuration")
self.client = Mistral(api_key=self._api_key)
self.ocr_model = ocr_model
self.chat_model = chat_model
self._rest_base_url = "https://api.mistral.ai/v1"
# ---------- File Handling ----------
def upload_pdf(self, content: bytes, filename: str) -> str:
"""Upload a PDF file to Mistral and return a signed URL.
Args:
content: Raw PDF bytes
filename: Original filename (used for content-type and metadata)
Returns:
Signed URL string suitable for use as a document source
"""
if not filename:
raise MistralServiceError("Filename is required for PDF upload")
# Prefer REST API for broader compatibility
try:
upload_url = f"{self._rest_base_url}/files"
headers = {"Authorization": f"Bearer {self._api_key}"}
files = [
("file", (filename, content, "application/pdf"))
]
data = {"purpose": "ocr"}
res = requests.post(upload_url, headers=headers, data=data, files=files)
res.raise_for_status()
uploaded = res.json()
file_id = uploaded.get("id")
if not file_id:
raise MistralServiceError(f"File upload failed: {uploaded}")
# Get signed URL
url_url = f"{self._rest_base_url}/files/{file_id}/url"
res = requests.get(url_url, headers=headers)
res.raise_for_status()
signed_url = res.json().get("url")
if not signed_url:
raise MistralServiceError(f"Failed to get signed URL: {res.text}")
return signed_url
except Exception as exc:
# Fallback: inline data URI if REST fails
return self.encode_pdf_bytes_to_data_uri(content)
# ---------- Document Source Builders ----------
@staticmethod
def build_document_url(url: str) -> Dict[str, str]:
"""Create a document source dict for OCR/chat from a URL."""
return {"type": "document_url", "document_url": url}
@staticmethod
def build_image_url(url: str) -> Dict[str, str]:
"""Create an image source dict for OCR/chat from a URL or data URI."""
return {"type": "image_url", "image_url": url}
@staticmethod
def encode_image_bytes_to_data_uri(image_bytes: bytes, mime: str = "image/png") -> str:
"""Encode raw image bytes into a data URI suitable for Mistral image input."""
b64 = base64.b64encode(image_bytes).decode("utf-8")
return f"data:{mime};base64,{b64}"
@staticmethod
def encode_pdf_bytes_to_data_uri(pdf_bytes: bytes) -> str:
"""Encode PDF bytes into a data URI suitable for use as a document URL."""
b64 = base64.b64encode(pdf_bytes).decode("utf-8")
return f"data:application/pdf;base64,{b64}"
# ---------- OCR ----------
def process_ocr(
self,
document_source: Dict[str, Any],
*,
include_image_base64: bool = False,
) -> Dict[str, Any]:
"""Run OCR on a document.
Args:
document_source: A dict like {"type": "document_url", "document_url": ...}
or {"type": "image_url", "image_url": ...}
include_image_base64: Whether to include base64 images in the response
Returns:
Response as a plain dict (SDK model dumped)
"""
# Prefer REST API for OCR for better compatibility
try:
ocr_url = f"{self._rest_base_url}/ocr"
headers = {
"Authorization": f"Bearer {self._api_key}",
"Content-Type": "application/json",
}
payload = {
"model": self.ocr_model,
"document": {"document_url": document_source.get("document_url")}
}
# Only include the flag if True to minimize payload differences
if include_image_base64:
payload["include_image_base64"] = True
res = requests.post(ocr_url, headers=headers, data=json.dumps(payload))
res.raise_for_status()
return res.json()
except Exception as exc:
raise MistralServiceError(f"OCR processing failed: {exc}") from exc
@staticmethod
def ocr_response_to_markdown(ocr_response: Dict[str, Any]) -> str:
"""Aggregate OCR pages into a single markdown string."""
pages = ocr_response.get("pages", [])
markdown_pages: list[str] = []
for page in pages:
idx = page.get("index")
md = page.get("markdown", "")
if idx is not None:
markdown_pages.append(f"\n\n---\n\n# Page {idx+1}\n\n{md}")
else:
markdown_pages.append(md)
return "\n".join(markdown_pages)
# ---------- Chat Completion (with JSON) ----------
def complete_json_from_document(
self,
document_source: Dict[str, Any],
*,
system_prompt: str,
user_text: Optional[str] = None,
model: Optional[str] = None,
json_schema: Optional[Dict[str, Any]] = None,
strict_json: bool = True,
use_ocr_for_document: bool = True,
ocr_max_chars: Optional[int] = None,
) -> Union[Dict[str, Any], str]:
"""Create a chat completion grounded on a document, requesting JSON output.
Args:
document_source: Document or image source (see builder helpers)
system_prompt: System instructions (e.g., extraction guidance)
user_text: Optional user text content to accompany the document
model: Optional override for chat model
json_schema: Optional JSON schema to enforce output structure
strict_json: If True, requests strictly valid JSON
use_ocr_for_document: When True, OCR the document and provide text to chat
ocr_max_chars: Optionally truncate OCR text to this many characters
Returns:
Parsed JSON dict when possible; otherwise the raw string content
"""
# If schema provided, inject it into system prompt (SDK only supports json_object type)
system_content = system_prompt
if json_schema is not None:
schema_str = json.dumps(json_schema, indent=2)
system_content += f"\n\n**JSON Schema to follow:**\n```json\n{schema_str}\n```"
messages: list[Dict[str, Any]] = [
{"role": "system", "content": system_content},
]
# Build user content as a single string to satisfy SDK expectations
user_content_parts: list[str] = []
if user_text:
user_content_parts.append(user_text)
if use_ocr_for_document:
# Run OCR to convert document to markdown text
ocr_dict = self.process_ocr(document_source, include_image_base64=False)
ocr_markdown = self.ocr_response_to_markdown(ocr_dict)
if ocr_max_chars is not None and isinstance(ocr_max_chars, int) and ocr_max_chars > 0:
ocr_markdown = ocr_markdown[:ocr_max_chars]
user_content_parts.append("\n\n=== DOCUMENT CONTENT (OCR) ===\n" + ocr_markdown)
else:
# As a fallback, include the document URL if available (model can't fetch, but keeps spec simple)
doc_url = document_source.get("document_url")
if doc_url:
user_content_parts.append(f"Document URL: {doc_url}")
messages.append({"role": "user", "content": "\n\n".join(user_content_parts)})
# SDK only supports "text" or "json_object" response_format
response_format: Dict[str, Any] = {"type": "json_object"}
try:
chat_response = self.client.chat.complete(
model=model or self.chat_model,
messages=messages,
response_format=response_format,
)
content = chat_response.choices[0].message.content
except Exception as exc:
raise MistralServiceError(f"Chat completion failed: {exc}") from exc
# Try to parse JSON; fall back to raw string if parsing fails
if isinstance(content, str):
try:
return json.loads(content)
except Exception:
return content
return content
# Singleton instance
mistral_service = MistralService()