routeur_ia_api / api /routes /transcription.py
Cyril Dupland
enhance transcription: add detailed logging for transcription job creation and processing, improve error handling for file pointer resets, and refine chunking logic for large audio files to ensure robust handling of transcription requests.
6c4dfd3
"""Transcription routes for audio to text conversion."""
from fastapi import APIRouter, UploadFile, File, HTTPException, status, Depends, Query, Form
from typing import Optional, Tuple
import os
import tempfile
from config import settings
from core.security import get_current_user
from domain.models import TranscriptionResponse, ErrorResponse, UploadJobResponse, JobStatusResponse
from services.transcription_service import transcription_service, meeting_transcription_service
from services.transcription_job_service import transcription_job_service
from services.audio_utils import get_audio_duration_seconds
from services.quota import get_meeting_quota_checker
from services.quota.client_checker import check_client_max_duration
router = APIRouter(prefix="/transcription", tags=["Transcription"])
async def validate_audio_upload(
file: UploadFile = File(..., description="Audio file to transcribe"),
) -> UploadFile:
"""Validate audio file format and size. Raises HTTPException if invalid."""
if not transcription_service.is_supported_format(file.filename):
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="Unsupported file format. Supported: mp3, mp4, mpeg, mpga, m4a, wav, webm",
)
file.file.seek(0, 2)
file_size = file.file.tell()
file.file.seek(0)
print(f"[Transcription] Uploaded file '{file.filename}' size={file_size / (1024 * 1024):.2f} MB")
max_size = settings.max_upload_mb_audio * 1024 * 1024
if file_size > max_size:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail=(
f"File too large. Maximum size is {settings.max_upload_mb_audio} MB, "
f"got {file_size / (1024 * 1024):.2f} MB"
),
)
return file
async def validate_meeting_quota(
project_id: str = Form(..., description="Project ID for meeting quota"),
max_duration_seconds: float = Form(
...,
description="Max allowed audio duration in seconds for this upload (e.g. 3600 = 60 min)",
),
file: UploadFile = Depends(validate_audio_upload),
) -> Tuple[str, float, float]:
"""Compute duration and enforce client + optional server quota for meeting uploads.
Returns a tuple (project_id, duration_seconds, max_duration_seconds).
"""
tmp_path: Optional[str] = None
try:
# Persist file to a temporary path for duration calculation
suffix = os.path.splitext(file.filename or "audio")[1]
with tempfile.NamedTemporaryFile(delete=False, suffix=suffix) as tmp:
content = await file.read()
tmp.write(content)
tmp.flush()
tmp_path = tmp.name
duration_seconds = get_audio_duration_seconds(tmp_path)
print(
f"[Transcription] Meeting upload project_id={project_id}, "
f"duration_s={duration_seconds}, max_duration_s={max_duration_seconds}"
)
# Client-provided limit (primary validation)
check_client_max_duration(duration_seconds, max_duration_seconds)
# Optional: Supabase Core server-side quota (no-op if not configured)
quota_checker = get_meeting_quota_checker()
try:
quota_checker.check_quota(project_id, duration_seconds)
except Exception as exc:
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail=str(exc),
) from exc
# Reset original file pointer so downstream consumers can read it
pos_before_reset = None
try:
pos_before_reset = file.file.tell()
except Exception:
pos_before_reset = None
file.file.seek(0)
pos_after_reset = None
try:
pos_after_reset = file.file.tell()
except Exception:
pos_after_reset = None
return project_id, duration_seconds, max_duration_seconds
except Exception as exc:
raise
finally:
if tmp_path and os.path.exists(tmp_path):
try:
os.unlink(tmp_path)
except Exception:
pass
@router.post(
"/meeting",
response_model=TranscriptionResponse,
responses={
400: {"model": ErrorResponse, "description": "Invalid file format"},
500: {"model": ErrorResponse, "description": "Meeting transcription failed"}
}
)
async def transcribe_meeting_audio(
current_user: dict = Depends(get_current_user),
file: UploadFile = Depends(validate_audio_upload),
quota_ctx: Tuple[str, float, float] = Depends(validate_meeting_quota),
language: Optional[str] = Query(None, description="ISO-639-1 language code (e.g., 'en', 'fr')"),
prompt: Optional[str] = Query(None, description="Optional text to guide the model's style for meetings")
) -> TranscriptionResponse:
"""
Transcribe a meeting audio file to text using a dedicated OpenAI model (gpt-4o-transcribe-diarize).
This endpoint is optimized for meeting-style audio and uses a different model
than the standard `/transcription` route.
**Supported formats:** mp3, mp4, mpeg, mpga, m4a, wav, webm
**Max file size:** configurable (default 500 MB; files are chunked automatically if > 25 MB)
Args:
file: Meeting audio file upload
language: Optional language code to improve accuracy
prompt: Optional prompt to guide transcription style (e.g., \"notes de réunion structurées\")
current_user: Authenticated user (JWT required)
Returns:
Transcription with text, detected language, and duration
Raises:
HTTPException: If file format is unsupported or transcription fails
"""
project_id, duration_seconds, max_duration_seconds = quota_ctx
try:
# Transcribe meeting audio using dedicated service/model
result = await meeting_transcription_service.transcribe(
audio_file=file,
language=language,
prompt=prompt,
)
# Override duration with the server-measured audio duration (pydub),
# so the client always gets a reliable duration in the first response.
result["duration_s"] = duration_seconds
return TranscriptionResponse(**result)
except Exception as e:
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail=f"Meeting transcription failed: {str(e)}",
)
@router.get("/supported-formats")
async def get_supported_formats(
current_user: dict = Depends(get_current_user)
) -> dict:
"""
Get list of supported audio formats.
Returns:
Dictionary with supported formats and info
"""
return {
"supported_formats": ["mp3", "mp4", "mpeg", "mpga", "m4a", "wav", "webm"],
"max_file_size_mb": settings.max_upload_mb_audio,
"model": "whisper-1",
"languages": "Auto-detection or specify ISO-639-1 code"
}
@router.post(
"",
response_model=TranscriptionResponse,
responses={
400: {"model": ErrorResponse, "description": "Invalid file format"},
500: {"model": ErrorResponse, "description": "Transcription failed"}
}
)
async def transcribe_audio(
current_user: dict = Depends(get_current_user),
file: UploadFile = Depends(validate_audio_upload),
language: Optional[str] = Query(None, description="ISO-639-1 language code (e.g., 'en', 'fr')"),
prompt: Optional[str] = Query(None, description="Optional text to guide the model's style")
) -> TranscriptionResponse:
"""
Transcribe an audio file to text using OpenAI Whisper.
**Supported formats:** mp3, mp4, mpeg, mpga, m4a, wav, webm
**Max file size:** configurable (default 500 MB; files are chunked automatically if > 25 MB)
Args:
file: Audio file upload
language: Optional language code to improve accuracy
prompt: Optional prompt to guide transcription style
current_user: Authenticated user (JWT required)
Returns:
Transcription with text, detected language, and duration
Raises:
HTTPException: If file format is unsupported or transcription fails
"""
try:
# Transcribe audio
result = await transcription_service.transcribe(
audio_file=file,
language=language,
prompt=prompt
)
return TranscriptionResponse(**result)
except Exception as e:
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail=f"Transcription failed: {str(e)}"
)
@router.post(
"/jobs",
response_model=UploadJobResponse,
status_code=status.HTTP_202_ACCEPTED,
responses={
400: {"model": ErrorResponse, "description": "Invalid file format"},
500: {"model": ErrorResponse, "description": "Failed to start transcription job"},
},
)
async def create_transcription_job(
current_user: dict = Depends(get_current_user),
file: UploadFile = Depends(validate_audio_upload),
language: Optional[str] = Query(None, description="ISO-639-1 language code (e.g., 'en', 'fr')"),
prompt: Optional[str] = Query(None, description="Optional text to guide the model's style"),
) -> UploadJobResponse:
"""
Create a background job to transcribe an audio file using OpenAI Whisper.
Returns a job_id that can be used to poll status and retrieve
the final transcript via /transcription/jobs/{job_id}.
"""
content = await file.read()
uploaded_by: Optional[str] = (
current_user.get("sub") or current_user.get("user_id") or current_user.get("email")
)
# Create job and launch background transcription pipeline.
job = transcription_job_service.create_job(
filename=file.filename or "audio",
uploaded_by=uploaded_by,
job_type="transcription_audio",
)
transcription_job_service.launch_job(
job_id=job.job_id,
filename=file.filename or "audio",
content_bytes=content,
language=language,
prompt=prompt,
meeting=False,
)
return UploadJobResponse(job_id=job.job_id, status=job.status, created_at=job.created_at)
@router.post(
"/meeting/jobs",
response_model=UploadJobResponse,
status_code=status.HTTP_202_ACCEPTED,
responses={
400: {"model": ErrorResponse, "description": "Invalid file format"},
500: {"model": ErrorResponse, "description": "Failed to start meeting transcription job"},
},
)
async def create_meeting_transcription_job(
current_user: dict = Depends(get_current_user),
file: UploadFile = Depends(validate_audio_upload),
quota_ctx: Tuple[str, float, float] = Depends(validate_meeting_quota),
language: Optional[str] = Query(None, description="ISO-639-1 language code (e.g., 'en', 'fr')"),
prompt: Optional[str] = Query(
None,
description="Optional text to guide the model's style for meetings",
),
) -> UploadJobResponse:
"""
Create a background job to transcribe a meeting audio file using
the dedicated meeting transcription model.
Returns a job_id that can be used to poll status and retrieve
the final transcript via /transcription/jobs/{job_id}.
"""
project_id, duration_seconds, max_duration_seconds = quota_ctx
content = await file.read()
uploaded_by: Optional[str] = (
current_user.get("sub") or current_user.get("user_id") or current_user.get("email")
)
print(
f"[Transcription] Creating meeting transcription job for project_id={project_id}, "
f"endpoint='/meeting/jobs'"
)
job = transcription_job_service.create_job(
filename=file.filename or "audio",
uploaded_by=uploaded_by,
job_type="transcription_meeting",
)
transcription_job_service.launch_job(
job_id=job.job_id,
filename=file.filename or "audio",
content_bytes=content,
language=language,
prompt=prompt,
meeting=True,
duration_seconds=duration_seconds,
)
return UploadJobResponse(
job_id=job.job_id,
status=job.status,
created_at=job.created_at,
duration_s=duration_seconds,
)
@router.get(
"/jobs/{job_id}",
response_model=JobStatusResponse,
responses={
404: {"model": ErrorResponse, "description": "Job not found"},
},
)
async def get_transcription_job_status(
job_id: str,
current_user: dict = Depends(get_current_user),
) -> JobStatusResponse:
"""
Get the current status and (when completed) the transcript
for a background transcription job.
"""
snapshot = transcription_job_service.get_status_snapshot(job_id)
if not snapshot:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail="Job not found",
)
return JobStatusResponse(**snapshot)