"""Transcription service using OpenAI Whisper and related OpenAI audio models.""" from typing import Optional, Dict, Any import tempfile import os import math import time from fastapi import UploadFile from openai import AsyncOpenAI, BadRequestError from config import settings from pydub import AudioSegment from services.postprocessing.context import RunContext from services.postprocessing.registry import build_orchestrator from services.usage_utils import normalize_usage # OpenAI impose une limite de 25 Mo par fichier audio. MAX_OPENAI_AUDIO_FILE_SIZE = 25 * 1024 * 1024 # 25 Mo # Durée maximale d'un segment audio en millisecondes (ici 10 minutes). # Avec un export en MP3 128 kbps, chaque chunk reste largement sous 25 Mo. CHUNK_DURATION_MS = 10 * 60 * 1000 def _split_audio_file(file_path: str, chunk_duration_ms: int = CHUNK_DURATION_MS) -> list[str]: """ Découpe un fichier audio en segments de durée fixe et les réencode en MP3. Cela permet de réduire la taille de fichiers bruts très lourds (ex: WAV) avant envoi à l'API OpenAI qui limite à 25 Mo par fichier. """ audio = AudioSegment.from_file(file_path) total_duration_ms = len(audio) # Au moins un chunk, même si le fichier est court, afin de garantir l'encodage en MP3. num_chunks = max(1, math.ceil(total_duration_ms / chunk_duration_ms)) chunks_paths: list[str] = [] for i in range(num_chunks): start = i * chunk_duration_ms end = min((i + 1) * chunk_duration_ms, total_duration_ms) chunk = audio[start:end] chunk_path = f"{file_path}_chunk_{i}.mp3" # Encodage à 128 kbps pour rester largement sous la limite de taille. chunk.export(chunk_path, format="mp3", bitrate="128k") chunks_paths.append(chunk_path) return chunks_paths def _coerce_usage_to_dict(usage_obj: object) -> Dict[str, Any]: """Convert transcript.usage (UsageTokens, dict, etc.) into a plain dict. Best-effort and never raises: returns {} on any error. """ try: if usage_obj is None: return {} # Pydantic / OpenAI v1 style if hasattr(usage_obj, "model_dump"): return usage_obj.model_dump() # Already a dict if isinstance(usage_obj, dict): return usage_obj # Fallback: read common attributes if present result: Dict[str, Any] = {} for key in ("input_tokens", "output_tokens", "total_tokens", "prompt_tokens", "completion_tokens"): if hasattr(usage_obj, key): result[key] = getattr(usage_obj, key) return result except Exception: return {} async def _transcribe_with_chunking( client: AsyncOpenAI, model: str, temp_file_path: str, *, language: Optional[str] = None, prompt: Optional[str] = None, response_format: str = "json", ) -> dict: """ Transcrit un fichier audio en gérant automatiquement la limite de 25 Mo, en collectant les métadonnées d'usage et de latence pour post-traitement. - Si le fichier est <= 25 Mo : un seul appel à l'API. - Sinon : découpe en segments plus petits, transcrit chaque chunk, concatène le texte. """ start_time = time.time() file_size = os.path.getsize(temp_file_path) # Cas simple : un seul appel, pas de découpage nécessaire. force_chunking = file_size > MAX_OPENAI_AUDIO_FILE_SIZE if not force_chunking: usage_totals: Dict[str, int] = {"input_tokens": 0, "output_tokens": 0, "total_tokens": 0} try: with open(temp_file_path, "rb") as audio: transcript = await client.audio.transcriptions.create( model=model, file=audio, language=language, prompt=prompt, response_format=response_format, ) # Récupération optionnelle de l'usage (selon ce que renvoie l'API audio) usage_field = getattr(transcript, "usage", None) raw_usage = _coerce_usage_to_dict(usage_field) if raw_usage: usage_totals = normalize_usage(raw_usage) latency_s = time.time() - start_time usage_by_model = {model: usage_totals} ctx = RunContext( provider="openai", model=model, usage_totals=usage_totals, usage_by_model=usage_by_model, latency_s=latency_s, ) build_orchestrator().run(ctx) metadata: Dict[str, Any] = { "usage": usage_totals, "usage_by_model": usage_by_model, "latency_s": latency_s, } metadata.update(ctx.metadata_out) return { "text": transcript.text, "language": getattr(transcript, "language", None), "duration": getattr(transcript, "duration", None), "model": model, "metadata": metadata, } except BadRequestError as exc: # Même si le fichier est < 25Mo, la limite réelle est un budget tokens # (instructions + audio) dépendant du modèle et de la durée. if "input_too_large" in str(exc): force_chunking = True else: raise # Fichier trop gros ou tokens trop importants : on découpe en plusieurs segments. chunk_paths = _split_audio_file(temp_file_path) try: all_texts: list[str] = [] total_duration = 0.0 detected_language = None usage_totals: Dict[str, int] = {"input_tokens": 0, "output_tokens": 0, "total_tokens": 0} # Prompt glissant pour garder le contexte entre les segments. sliding_prompt = prompt for chunk_path in chunk_paths: with open(chunk_path, "rb") as audio: transcript = await client.audio.transcriptions.create( model=model, file=audio, language=language, prompt=sliding_prompt, response_format=response_format, ) current_text = transcript.text or "" all_texts.append(current_text) chunk_duration = getattr(transcript, "duration", 0) or 0 total_duration += chunk_duration if not detected_language: detected_language = getattr(transcript, "language", None) # Agrégation d'usage par chunk si disponible usage_field = getattr(transcript, "usage", None) raw_usage = _coerce_usage_to_dict(usage_field) if raw_usage: norm = normalize_usage(raw_usage) usage_totals["input_tokens"] += norm["input_tokens"] usage_totals["output_tokens"] += norm["output_tokens"] usage_totals["total_tokens"] += norm["total_tokens"] # Utilise la fin du segment courant comme prompt pour le suivant # afin d'améliorer la continuité de la transcription. if current_text: sliding_prompt = current_text[-200:] latency_s = time.time() - start_time usage_by_model = {model: usage_totals} ctx = RunContext( provider="openai", model=model, usage_totals=usage_totals, usage_by_model=usage_by_model, latency_s=latency_s, ) build_orchestrator().run(ctx) metadata: Dict[str, Any] = { "usage": usage_totals, "usage_by_model": usage_by_model, "latency_s": latency_s, } metadata.update(ctx.metadata_out) return { "text": " ".join(all_texts), "language": detected_language, "duration": total_duration if total_duration > 0 else None, "model": model, "metadata": metadata, } finally: # Nettoyage des chunks temporaires for chunk_path in chunk_paths: if os.path.exists(chunk_path): os.unlink(chunk_path) class TranscriptionService: """Service for audio transcription using OpenAI Whisper.""" def __init__(self): """Initialize transcription service with OpenAI client.""" self.client = AsyncOpenAI(api_key=settings.openai_api_key) self.model = "whisper-1" async def transcribe( self, audio_file: UploadFile, language: Optional[str] = None, prompt: Optional[str] = None ) -> dict: """ Transcribe audio file to text using Whisper API. This method transparently handles files larger than the 25 MB limit by splitting them into smaller chunks and concatenating the resulting transcriptions. """ tmp_path: Optional[str] = None try: # Create a temporary file to save the upload # Whisper API requires a file path, not file content with tempfile.NamedTemporaryFile( delete=False, suffix=self._get_file_extension(audio_file.filename), ) as tmp_file: # Write uploaded content to temp file content = await audio_file.read() tmp_file.write(content) tmp_file.flush() tmp_path = tmp_file.name # At this point the context manager has closed the file handle, # which avoids Windows file locking issues when deleting later. result = await _transcribe_with_chunking( self.client, self.model, tmp_path, language=language, prompt=prompt, response_format="verbose_json", ) return result finally: # Clean up temp file (with a small retry window for Windows) if tmp_path and os.path.exists(tmp_path): try: os.unlink(tmp_path) except PermissionError: time.sleep(0.1) try: os.unlink(tmp_path) except Exception: pass @staticmethod def _get_file_extension(filename: Optional[str]) -> str: """ Extract file extension from filename. Args: filename: Name of the file Returns: File extension with dot (e.g., '.mp3') """ if filename and "." in filename: return "." + filename.rsplit(".", 1)[1] return ".mp3" # Default extension def is_supported_format(self, filename: str) -> bool: """ Check if audio format is supported by Whisper. Supported formats: mp3, mp4, mpeg, mpga, m4a, wav, webm Args: filename: Name of the file Returns: True if format is supported """ supported_formats = {".mp3", ".mp4", ".mpeg", ".mpga", ".m4a", ".wav", ".webm"} extension = self._get_file_extension(filename).lower() return extension in supported_formats class MeetingTranscriptionService: """Service for meeting-oriented audio transcription using a dedicated OpenAI model.""" def __init__(self): """Initialize meeting transcription service with OpenAI client.""" self.client = AsyncOpenAI(api_key=settings.openai_api_key) # Model dedicated to meeting transcription (configurable via settings) self.model = getattr(settings, "openai_meeting_transcription_model", "gpt-4o-transcribe-diarize") async def transcribe( self, audio_file: UploadFile, language: Optional[str] = None, prompt: Optional[str] = None ) -> dict: """ Transcribe meeting audio to text using a dedicated OpenAI transcription model. Args: audio_file: Uploaded audio file language: Optional ISO-639-1 language code (e.g., 'en', 'fr') prompt: Optional text to guide the model's style Returns: Dictionary with transcription text and metadata Raises: Exception: If transcription fails """ tmp_path: Optional[str] = None try: # Create a temporary file to save the upload with tempfile.NamedTemporaryFile( delete=False, suffix=TranscriptionService._get_file_extension(audio_file.filename) ) as tmp_file: # Write uploaded content to temp file content = await audio_file.read() tmp_file.write(content) tmp_file.flush() tmp_path = tmp_file.name # Use the shared helper with chunking support. # For gpt-4o-transcribe family, the only supported response_format is "json". result = await _transcribe_with_chunking( self.client, self.model, tmp_path, language=language, prompt=prompt, response_format="json", ) return result finally: if tmp_path and os.path.exists(tmp_path): try: os.unlink(tmp_path) except PermissionError: time.sleep(0.1) try: os.unlink(tmp_path) except Exception: pass # Singleton instances transcription_service = TranscriptionService() meeting_transcription_service = MeetingTranscriptionService()