Cyril Dupland
enhance transcription: add detailed logging for transcription job creation and processing, improve error handling for file pointer resets, and refine chunking logic for large audio files to ensure robust handling of transcription requests.
6c4dfd3 | """Transcription service using OpenAI Whisper and related OpenAI audio models.""" | |
| from typing import Optional, Dict, Any | |
| import tempfile | |
| import os | |
| import math | |
| import time | |
| from fastapi import UploadFile | |
| from openai import AsyncOpenAI, BadRequestError | |
| from config import settings | |
| from pydub import AudioSegment | |
| from services.postprocessing.context import RunContext | |
| from services.postprocessing.registry import build_orchestrator | |
| from services.usage_utils import normalize_usage | |
| # OpenAI impose une limite de 25 Mo par fichier audio. | |
| MAX_OPENAI_AUDIO_FILE_SIZE = 25 * 1024 * 1024 # 25 Mo | |
| # Durée maximale d'un segment audio en millisecondes (ici 10 minutes). | |
| # Avec un export en MP3 128 kbps, chaque chunk reste largement sous 25 Mo. | |
| CHUNK_DURATION_MS = 10 * 60 * 1000 | |
| def _split_audio_file(file_path: str, chunk_duration_ms: int = CHUNK_DURATION_MS) -> list[str]: | |
| """ | |
| Découpe un fichier audio en segments de durée fixe et les réencode en MP3. | |
| Cela permet de réduire la taille de fichiers bruts très lourds (ex: WAV) | |
| avant envoi à l'API OpenAI qui limite à 25 Mo par fichier. | |
| """ | |
| audio = AudioSegment.from_file(file_path) | |
| total_duration_ms = len(audio) | |
| # Au moins un chunk, même si le fichier est court, afin de garantir l'encodage en MP3. | |
| num_chunks = max(1, math.ceil(total_duration_ms / chunk_duration_ms)) | |
| chunks_paths: list[str] = [] | |
| for i in range(num_chunks): | |
| start = i * chunk_duration_ms | |
| end = min((i + 1) * chunk_duration_ms, total_duration_ms) | |
| chunk = audio[start:end] | |
| chunk_path = f"{file_path}_chunk_{i}.mp3" | |
| # Encodage à 128 kbps pour rester largement sous la limite de taille. | |
| chunk.export(chunk_path, format="mp3", bitrate="128k") | |
| chunks_paths.append(chunk_path) | |
| return chunks_paths | |
| def _coerce_usage_to_dict(usage_obj: object) -> Dict[str, Any]: | |
| """Convert transcript.usage (UsageTokens, dict, etc.) into a plain dict. | |
| Best-effort and never raises: returns {} on any error. | |
| """ | |
| try: | |
| if usage_obj is None: | |
| return {} | |
| # Pydantic / OpenAI v1 style | |
| if hasattr(usage_obj, "model_dump"): | |
| return usage_obj.model_dump() | |
| # Already a dict | |
| if isinstance(usage_obj, dict): | |
| return usage_obj | |
| # Fallback: read common attributes if present | |
| result: Dict[str, Any] = {} | |
| for key in ("input_tokens", "output_tokens", "total_tokens", "prompt_tokens", "completion_tokens"): | |
| if hasattr(usage_obj, key): | |
| result[key] = getattr(usage_obj, key) | |
| return result | |
| except Exception: | |
| return {} | |
| async def _transcribe_with_chunking( | |
| client: AsyncOpenAI, | |
| model: str, | |
| temp_file_path: str, | |
| *, | |
| language: Optional[str] = None, | |
| prompt: Optional[str] = None, | |
| response_format: str = "json", | |
| ) -> dict: | |
| """ | |
| Transcrit un fichier audio en gérant automatiquement la limite de 25 Mo, | |
| en collectant les métadonnées d'usage et de latence pour post-traitement. | |
| - Si le fichier est <= 25 Mo : un seul appel à l'API. | |
| - Sinon : découpe en segments plus petits, transcrit chaque chunk, concatène le texte. | |
| """ | |
| start_time = time.time() | |
| file_size = os.path.getsize(temp_file_path) | |
| # Cas simple : un seul appel, pas de découpage nécessaire. | |
| force_chunking = file_size > MAX_OPENAI_AUDIO_FILE_SIZE | |
| if not force_chunking: | |
| usage_totals: Dict[str, int] = {"input_tokens": 0, "output_tokens": 0, "total_tokens": 0} | |
| try: | |
| with open(temp_file_path, "rb") as audio: | |
| transcript = await client.audio.transcriptions.create( | |
| model=model, | |
| file=audio, | |
| language=language, | |
| prompt=prompt, | |
| response_format=response_format, | |
| ) | |
| # Récupération optionnelle de l'usage (selon ce que renvoie l'API audio) | |
| usage_field = getattr(transcript, "usage", None) | |
| raw_usage = _coerce_usage_to_dict(usage_field) | |
| if raw_usage: | |
| usage_totals = normalize_usage(raw_usage) | |
| latency_s = time.time() - start_time | |
| usage_by_model = {model: usage_totals} | |
| ctx = RunContext( | |
| provider="openai", | |
| model=model, | |
| usage_totals=usage_totals, | |
| usage_by_model=usage_by_model, | |
| latency_s=latency_s, | |
| ) | |
| build_orchestrator().run(ctx) | |
| metadata: Dict[str, Any] = { | |
| "usage": usage_totals, | |
| "usage_by_model": usage_by_model, | |
| "latency_s": latency_s, | |
| } | |
| metadata.update(ctx.metadata_out) | |
| return { | |
| "text": transcript.text, | |
| "language": getattr(transcript, "language", None), | |
| "duration": getattr(transcript, "duration", None), | |
| "model": model, | |
| "metadata": metadata, | |
| } | |
| except BadRequestError as exc: | |
| # Même si le fichier est < 25Mo, la limite réelle est un budget tokens | |
| # (instructions + audio) dépendant du modèle et de la durée. | |
| if "input_too_large" in str(exc): | |
| force_chunking = True | |
| else: | |
| raise | |
| # Fichier trop gros ou tokens trop importants : on découpe en plusieurs segments. | |
| chunk_paths = _split_audio_file(temp_file_path) | |
| try: | |
| all_texts: list[str] = [] | |
| total_duration = 0.0 | |
| detected_language = None | |
| usage_totals: Dict[str, int] = {"input_tokens": 0, "output_tokens": 0, "total_tokens": 0} | |
| # Prompt glissant pour garder le contexte entre les segments. | |
| sliding_prompt = prompt | |
| for chunk_path in chunk_paths: | |
| with open(chunk_path, "rb") as audio: | |
| transcript = await client.audio.transcriptions.create( | |
| model=model, | |
| file=audio, | |
| language=language, | |
| prompt=sliding_prompt, | |
| response_format=response_format, | |
| ) | |
| current_text = transcript.text or "" | |
| all_texts.append(current_text) | |
| chunk_duration = getattr(transcript, "duration", 0) or 0 | |
| total_duration += chunk_duration | |
| if not detected_language: | |
| detected_language = getattr(transcript, "language", None) | |
| # Agrégation d'usage par chunk si disponible | |
| usage_field = getattr(transcript, "usage", None) | |
| raw_usage = _coerce_usage_to_dict(usage_field) | |
| if raw_usage: | |
| norm = normalize_usage(raw_usage) | |
| usage_totals["input_tokens"] += norm["input_tokens"] | |
| usage_totals["output_tokens"] += norm["output_tokens"] | |
| usage_totals["total_tokens"] += norm["total_tokens"] | |
| # Utilise la fin du segment courant comme prompt pour le suivant | |
| # afin d'améliorer la continuité de la transcription. | |
| if current_text: | |
| sliding_prompt = current_text[-200:] | |
| latency_s = time.time() - start_time | |
| usage_by_model = {model: usage_totals} | |
| ctx = RunContext( | |
| provider="openai", | |
| model=model, | |
| usage_totals=usage_totals, | |
| usage_by_model=usage_by_model, | |
| latency_s=latency_s, | |
| ) | |
| build_orchestrator().run(ctx) | |
| metadata: Dict[str, Any] = { | |
| "usage": usage_totals, | |
| "usage_by_model": usage_by_model, | |
| "latency_s": latency_s, | |
| } | |
| metadata.update(ctx.metadata_out) | |
| return { | |
| "text": " ".join(all_texts), | |
| "language": detected_language, | |
| "duration": total_duration if total_duration > 0 else None, | |
| "model": model, | |
| "metadata": metadata, | |
| } | |
| finally: | |
| # Nettoyage des chunks temporaires | |
| for chunk_path in chunk_paths: | |
| if os.path.exists(chunk_path): | |
| os.unlink(chunk_path) | |
| class TranscriptionService: | |
| """Service for audio transcription using OpenAI Whisper.""" | |
| def __init__(self): | |
| """Initialize transcription service with OpenAI client.""" | |
| self.client = AsyncOpenAI(api_key=settings.openai_api_key) | |
| self.model = "whisper-1" | |
| async def transcribe( | |
| self, | |
| audio_file: UploadFile, | |
| language: Optional[str] = None, | |
| prompt: Optional[str] = None | |
| ) -> dict: | |
| """ | |
| Transcribe audio file to text using Whisper API. | |
| This method transparently handles files larger than the 25 MB limit | |
| by splitting them into smaller chunks and concatenating the resulting | |
| transcriptions. | |
| """ | |
| tmp_path: Optional[str] = None | |
| try: | |
| # Create a temporary file to save the upload | |
| # Whisper API requires a file path, not file content | |
| with tempfile.NamedTemporaryFile( | |
| delete=False, | |
| suffix=self._get_file_extension(audio_file.filename), | |
| ) as tmp_file: | |
| # Write uploaded content to temp file | |
| content = await audio_file.read() | |
| tmp_file.write(content) | |
| tmp_file.flush() | |
| tmp_path = tmp_file.name | |
| # At this point the context manager has closed the file handle, | |
| # which avoids Windows file locking issues when deleting later. | |
| result = await _transcribe_with_chunking( | |
| self.client, | |
| self.model, | |
| tmp_path, | |
| language=language, | |
| prompt=prompt, | |
| response_format="verbose_json", | |
| ) | |
| return result | |
| finally: | |
| # Clean up temp file (with a small retry window for Windows) | |
| if tmp_path and os.path.exists(tmp_path): | |
| try: | |
| os.unlink(tmp_path) | |
| except PermissionError: | |
| time.sleep(0.1) | |
| try: | |
| os.unlink(tmp_path) | |
| except Exception: | |
| pass | |
| def _get_file_extension(filename: Optional[str]) -> str: | |
| """ | |
| Extract file extension from filename. | |
| Args: | |
| filename: Name of the file | |
| Returns: | |
| File extension with dot (e.g., '.mp3') | |
| """ | |
| if filename and "." in filename: | |
| return "." + filename.rsplit(".", 1)[1] | |
| return ".mp3" # Default extension | |
| def is_supported_format(self, filename: str) -> bool: | |
| """ | |
| Check if audio format is supported by Whisper. | |
| Supported formats: mp3, mp4, mpeg, mpga, m4a, wav, webm | |
| Args: | |
| filename: Name of the file | |
| Returns: | |
| True if format is supported | |
| """ | |
| supported_formats = {".mp3", ".mp4", ".mpeg", ".mpga", ".m4a", ".wav", ".webm"} | |
| extension = self._get_file_extension(filename).lower() | |
| return extension in supported_formats | |
| class MeetingTranscriptionService: | |
| """Service for meeting-oriented audio transcription using a dedicated OpenAI model.""" | |
| def __init__(self): | |
| """Initialize meeting transcription service with OpenAI client.""" | |
| self.client = AsyncOpenAI(api_key=settings.openai_api_key) | |
| # Model dedicated to meeting transcription (configurable via settings) | |
| self.model = getattr(settings, "openai_meeting_transcription_model", "gpt-4o-transcribe-diarize") | |
| async def transcribe( | |
| self, | |
| audio_file: UploadFile, | |
| language: Optional[str] = None, | |
| prompt: Optional[str] = None | |
| ) -> dict: | |
| """ | |
| Transcribe meeting audio to text using a dedicated OpenAI transcription model. | |
| Args: | |
| audio_file: Uploaded audio file | |
| language: Optional ISO-639-1 language code (e.g., 'en', 'fr') | |
| prompt: Optional text to guide the model's style | |
| Returns: | |
| Dictionary with transcription text and metadata | |
| Raises: | |
| Exception: If transcription fails | |
| """ | |
| tmp_path: Optional[str] = None | |
| try: | |
| # Create a temporary file to save the upload | |
| with tempfile.NamedTemporaryFile( | |
| delete=False, | |
| suffix=TranscriptionService._get_file_extension(audio_file.filename) | |
| ) as tmp_file: | |
| # Write uploaded content to temp file | |
| content = await audio_file.read() | |
| tmp_file.write(content) | |
| tmp_file.flush() | |
| tmp_path = tmp_file.name | |
| # Use the shared helper with chunking support. | |
| # For gpt-4o-transcribe family, the only supported response_format is "json". | |
| result = await _transcribe_with_chunking( | |
| self.client, | |
| self.model, | |
| tmp_path, | |
| language=language, | |
| prompt=prompt, | |
| response_format="json", | |
| ) | |
| return result | |
| finally: | |
| if tmp_path and os.path.exists(tmp_path): | |
| try: | |
| os.unlink(tmp_path) | |
| except PermissionError: | |
| time.sleep(0.1) | |
| try: | |
| os.unlink(tmp_path) | |
| except Exception: | |
| pass | |
| # Singleton instances | |
| transcription_service = TranscriptionService() | |
| meeting_transcription_service = MeetingTranscriptionService() | |