routeur_ia_api / services /transcription_service.py
Cyril Dupland
enhance transcription: add detailed logging for transcription job creation and processing, improve error handling for file pointer resets, and refine chunking logic for large audio files to ensure robust handling of transcription requests.
6c4dfd3
Raw
History Blame
13.9 kB
"""Transcription service using OpenAI Whisper and related OpenAI audio models."""
from typing import Optional, Dict, Any
import tempfile
import os
import math
import time
from fastapi import UploadFile
from openai import AsyncOpenAI, BadRequestError
from config import settings
from pydub import AudioSegment
from services.postprocessing.context import RunContext
from services.postprocessing.registry import build_orchestrator
from services.usage_utils import normalize_usage
# OpenAI impose une limite de 25 Mo par fichier audio.
MAX_OPENAI_AUDIO_FILE_SIZE = 25 * 1024 * 1024 # 25 Mo
# Durée maximale d'un segment audio en millisecondes (ici 10 minutes).
# Avec un export en MP3 128 kbps, chaque chunk reste largement sous 25 Mo.
CHUNK_DURATION_MS = 10 * 60 * 1000
def _split_audio_file(file_path: str, chunk_duration_ms: int = CHUNK_DURATION_MS) -> list[str]:
"""
Découpe un fichier audio en segments de durée fixe et les réencode en MP3.
Cela permet de réduire la taille de fichiers bruts très lourds (ex: WAV)
avant envoi à l'API OpenAI qui limite à 25 Mo par fichier.
"""
audio = AudioSegment.from_file(file_path)
total_duration_ms = len(audio)
# Au moins un chunk, même si le fichier est court, afin de garantir l'encodage en MP3.
num_chunks = max(1, math.ceil(total_duration_ms / chunk_duration_ms))
chunks_paths: list[str] = []
for i in range(num_chunks):
start = i * chunk_duration_ms
end = min((i + 1) * chunk_duration_ms, total_duration_ms)
chunk = audio[start:end]
chunk_path = f"{file_path}_chunk_{i}.mp3"
# Encodage à 128 kbps pour rester largement sous la limite de taille.
chunk.export(chunk_path, format="mp3", bitrate="128k")
chunks_paths.append(chunk_path)
return chunks_paths
def _coerce_usage_to_dict(usage_obj: object) -> Dict[str, Any]:
"""Convert transcript.usage (UsageTokens, dict, etc.) into a plain dict.
Best-effort and never raises: returns {} on any error.
"""
try:
if usage_obj is None:
return {}
# Pydantic / OpenAI v1 style
if hasattr(usage_obj, "model_dump"):
return usage_obj.model_dump()
# Already a dict
if isinstance(usage_obj, dict):
return usage_obj
# Fallback: read common attributes if present
result: Dict[str, Any] = {}
for key in ("input_tokens", "output_tokens", "total_tokens", "prompt_tokens", "completion_tokens"):
if hasattr(usage_obj, key):
result[key] = getattr(usage_obj, key)
return result
except Exception:
return {}
async def _transcribe_with_chunking(
client: AsyncOpenAI,
model: str,
temp_file_path: str,
*,
language: Optional[str] = None,
prompt: Optional[str] = None,
response_format: str = "json",
) -> dict:
"""
Transcrit un fichier audio en gérant automatiquement la limite de 25 Mo,
en collectant les métadonnées d'usage et de latence pour post-traitement.
- Si le fichier est <= 25 Mo : un seul appel à l'API.
- Sinon : découpe en segments plus petits, transcrit chaque chunk, concatène le texte.
"""
start_time = time.time()
file_size = os.path.getsize(temp_file_path)
# Cas simple : un seul appel, pas de découpage nécessaire.
force_chunking = file_size > MAX_OPENAI_AUDIO_FILE_SIZE
if not force_chunking:
usage_totals: Dict[str, int] = {"input_tokens": 0, "output_tokens": 0, "total_tokens": 0}
try:
with open(temp_file_path, "rb") as audio:
transcript = await client.audio.transcriptions.create(
model=model,
file=audio,
language=language,
prompt=prompt,
response_format=response_format,
)
# Récupération optionnelle de l'usage (selon ce que renvoie l'API audio)
usage_field = getattr(transcript, "usage", None)
raw_usage = _coerce_usage_to_dict(usage_field)
if raw_usage:
usage_totals = normalize_usage(raw_usage)
latency_s = time.time() - start_time
usage_by_model = {model: usage_totals}
ctx = RunContext(
provider="openai",
model=model,
usage_totals=usage_totals,
usage_by_model=usage_by_model,
latency_s=latency_s,
)
build_orchestrator().run(ctx)
metadata: Dict[str, Any] = {
"usage": usage_totals,
"usage_by_model": usage_by_model,
"latency_s": latency_s,
}
metadata.update(ctx.metadata_out)
return {
"text": transcript.text,
"language": getattr(transcript, "language", None),
"duration": getattr(transcript, "duration", None),
"model": model,
"metadata": metadata,
}
except BadRequestError as exc:
# Même si le fichier est < 25Mo, la limite réelle est un budget tokens
# (instructions + audio) dépendant du modèle et de la durée.
if "input_too_large" in str(exc):
force_chunking = True
else:
raise
# Fichier trop gros ou tokens trop importants : on découpe en plusieurs segments.
chunk_paths = _split_audio_file(temp_file_path)
try:
all_texts: list[str] = []
total_duration = 0.0
detected_language = None
usage_totals: Dict[str, int] = {"input_tokens": 0, "output_tokens": 0, "total_tokens": 0}
# Prompt glissant pour garder le contexte entre les segments.
sliding_prompt = prompt
for chunk_path in chunk_paths:
with open(chunk_path, "rb") as audio:
transcript = await client.audio.transcriptions.create(
model=model,
file=audio,
language=language,
prompt=sliding_prompt,
response_format=response_format,
)
current_text = transcript.text or ""
all_texts.append(current_text)
chunk_duration = getattr(transcript, "duration", 0) or 0
total_duration += chunk_duration
if not detected_language:
detected_language = getattr(transcript, "language", None)
# Agrégation d'usage par chunk si disponible
usage_field = getattr(transcript, "usage", None)
raw_usage = _coerce_usage_to_dict(usage_field)
if raw_usage:
norm = normalize_usage(raw_usage)
usage_totals["input_tokens"] += norm["input_tokens"]
usage_totals["output_tokens"] += norm["output_tokens"]
usage_totals["total_tokens"] += norm["total_tokens"]
# Utilise la fin du segment courant comme prompt pour le suivant
# afin d'améliorer la continuité de la transcription.
if current_text:
sliding_prompt = current_text[-200:]
latency_s = time.time() - start_time
usage_by_model = {model: usage_totals}
ctx = RunContext(
provider="openai",
model=model,
usage_totals=usage_totals,
usage_by_model=usage_by_model,
latency_s=latency_s,
)
build_orchestrator().run(ctx)
metadata: Dict[str, Any] = {
"usage": usage_totals,
"usage_by_model": usage_by_model,
"latency_s": latency_s,
}
metadata.update(ctx.metadata_out)
return {
"text": " ".join(all_texts),
"language": detected_language,
"duration": total_duration if total_duration > 0 else None,
"model": model,
"metadata": metadata,
}
finally:
# Nettoyage des chunks temporaires
for chunk_path in chunk_paths:
if os.path.exists(chunk_path):
os.unlink(chunk_path)
class TranscriptionService:
"""Service for audio transcription using OpenAI Whisper."""
def __init__(self):
"""Initialize transcription service with OpenAI client."""
self.client = AsyncOpenAI(api_key=settings.openai_api_key)
self.model = "whisper-1"
async def transcribe(
self,
audio_file: UploadFile,
language: Optional[str] = None,
prompt: Optional[str] = None
) -> dict:
"""
Transcribe audio file to text using Whisper API.
This method transparently handles files larger than the 25 MB limit
by splitting them into smaller chunks and concatenating the resulting
transcriptions.
"""
tmp_path: Optional[str] = None
try:
# Create a temporary file to save the upload
# Whisper API requires a file path, not file content
with tempfile.NamedTemporaryFile(
delete=False,
suffix=self._get_file_extension(audio_file.filename),
) as tmp_file:
# Write uploaded content to temp file
content = await audio_file.read()
tmp_file.write(content)
tmp_file.flush()
tmp_path = tmp_file.name
# At this point the context manager has closed the file handle,
# which avoids Windows file locking issues when deleting later.
result = await _transcribe_with_chunking(
self.client,
self.model,
tmp_path,
language=language,
prompt=prompt,
response_format="verbose_json",
)
return result
finally:
# Clean up temp file (with a small retry window for Windows)
if tmp_path and os.path.exists(tmp_path):
try:
os.unlink(tmp_path)
except PermissionError:
time.sleep(0.1)
try:
os.unlink(tmp_path)
except Exception:
pass
@staticmethod
def _get_file_extension(filename: Optional[str]) -> str:
"""
Extract file extension from filename.
Args:
filename: Name of the file
Returns:
File extension with dot (e.g., '.mp3')
"""
if filename and "." in filename:
return "." + filename.rsplit(".", 1)[1]
return ".mp3" # Default extension
def is_supported_format(self, filename: str) -> bool:
"""
Check if audio format is supported by Whisper.
Supported formats: mp3, mp4, mpeg, mpga, m4a, wav, webm
Args:
filename: Name of the file
Returns:
True if format is supported
"""
supported_formats = {".mp3", ".mp4", ".mpeg", ".mpga", ".m4a", ".wav", ".webm"}
extension = self._get_file_extension(filename).lower()
return extension in supported_formats
class MeetingTranscriptionService:
"""Service for meeting-oriented audio transcription using a dedicated OpenAI model."""
def __init__(self):
"""Initialize meeting transcription service with OpenAI client."""
self.client = AsyncOpenAI(api_key=settings.openai_api_key)
# Model dedicated to meeting transcription (configurable via settings)
self.model = getattr(settings, "openai_meeting_transcription_model", "gpt-4o-transcribe-diarize")
async def transcribe(
self,
audio_file: UploadFile,
language: Optional[str] = None,
prompt: Optional[str] = None
) -> dict:
"""
Transcribe meeting audio to text using a dedicated OpenAI transcription model.
Args:
audio_file: Uploaded audio file
language: Optional ISO-639-1 language code (e.g., 'en', 'fr')
prompt: Optional text to guide the model's style
Returns:
Dictionary with transcription text and metadata
Raises:
Exception: If transcription fails
"""
tmp_path: Optional[str] = None
try:
# Create a temporary file to save the upload
with tempfile.NamedTemporaryFile(
delete=False,
suffix=TranscriptionService._get_file_extension(audio_file.filename)
) as tmp_file:
# Write uploaded content to temp file
content = await audio_file.read()
tmp_file.write(content)
tmp_file.flush()
tmp_path = tmp_file.name
# Use the shared helper with chunking support.
# For gpt-4o-transcribe family, the only supported response_format is "json".
result = await _transcribe_with_chunking(
self.client,
self.model,
tmp_path,
language=language,
prompt=prompt,
response_format="json",
)
return result
finally:
if tmp_path and os.path.exists(tmp_path):
try:
os.unlink(tmp_path)
except PermissionError:
time.sleep(0.1)
try:
os.unlink(tmp_path)
except Exception:
pass
# Singleton instances
transcription_service = TranscriptionService()
meeting_transcription_service = MeetingTranscriptionService()