"""Transcription routes for audio to text conversion.""" from fastapi import APIRouter, UploadFile, File, HTTPException, status, Depends, Query, Form from typing import Optional, Tuple import os import tempfile from config import settings from core.security import get_current_user from domain.models import TranscriptionResponse, ErrorResponse, UploadJobResponse, JobStatusResponse from services.transcription_service import transcription_service, meeting_transcription_service from services.transcription_job_service import transcription_job_service from services.audio_utils import get_audio_duration_seconds from services.quota import get_meeting_quota_checker from services.quota.client_checker import check_client_max_duration router = APIRouter(prefix="/transcription", tags=["Transcription"]) async def validate_audio_upload( file: UploadFile = File(..., description="Audio file to transcribe"), ) -> UploadFile: """Validate audio file format and size. Raises HTTPException if invalid.""" if not transcription_service.is_supported_format(file.filename): raise HTTPException( status_code=status.HTTP_400_BAD_REQUEST, detail="Unsupported file format. Supported: mp3, mp4, mpeg, mpga, m4a, wav, webm", ) file.file.seek(0, 2) file_size = file.file.tell() file.file.seek(0) print(f"[Transcription] Uploaded file '{file.filename}' size={file_size / (1024 * 1024):.2f} MB") max_size = settings.max_upload_mb_audio * 1024 * 1024 if file_size > max_size: raise HTTPException( status_code=status.HTTP_400_BAD_REQUEST, detail=( f"File too large. Maximum size is {settings.max_upload_mb_audio} MB, " f"got {file_size / (1024 * 1024):.2f} MB" ), ) return file async def validate_meeting_quota( project_id: str = Form(..., description="Project ID for meeting quota"), max_duration_seconds: float = Form( ..., description="Max allowed audio duration in seconds for this upload (e.g. 3600 = 60 min)", ), file: UploadFile = Depends(validate_audio_upload), ) -> Tuple[str, float, float]: """Compute duration and enforce client + optional server quota for meeting uploads. Returns a tuple (project_id, duration_seconds, max_duration_seconds). """ tmp_path: Optional[str] = None try: # Persist file to a temporary path for duration calculation suffix = os.path.splitext(file.filename or "audio")[1] with tempfile.NamedTemporaryFile(delete=False, suffix=suffix) as tmp: content = await file.read() tmp.write(content) tmp.flush() tmp_path = tmp.name duration_seconds = get_audio_duration_seconds(tmp_path) print( f"[Transcription] Meeting upload project_id={project_id}, " f"duration_s={duration_seconds}, max_duration_s={max_duration_seconds}" ) # Client-provided limit (primary validation) check_client_max_duration(duration_seconds, max_duration_seconds) # Optional: Supabase Core server-side quota (no-op if not configured) quota_checker = get_meeting_quota_checker() try: quota_checker.check_quota(project_id, duration_seconds) except Exception as exc: raise HTTPException( status_code=status.HTTP_403_FORBIDDEN, detail=str(exc), ) from exc # Reset original file pointer so downstream consumers can read it pos_before_reset = None try: pos_before_reset = file.file.tell() except Exception: pos_before_reset = None file.file.seek(0) pos_after_reset = None try: pos_after_reset = file.file.tell() except Exception: pos_after_reset = None return project_id, duration_seconds, max_duration_seconds except Exception as exc: raise finally: if tmp_path and os.path.exists(tmp_path): try: os.unlink(tmp_path) except Exception: pass @router.post( "/meeting", response_model=TranscriptionResponse, responses={ 400: {"model": ErrorResponse, "description": "Invalid file format"}, 500: {"model": ErrorResponse, "description": "Meeting transcription failed"} } ) async def transcribe_meeting_audio( current_user: dict = Depends(get_current_user), file: UploadFile = Depends(validate_audio_upload), quota_ctx: Tuple[str, float, float] = Depends(validate_meeting_quota), language: Optional[str] = Query(None, description="ISO-639-1 language code (e.g., 'en', 'fr')"), prompt: Optional[str] = Query(None, description="Optional text to guide the model's style for meetings") ) -> TranscriptionResponse: """ Transcribe a meeting audio file to text using a dedicated OpenAI model (gpt-4o-transcribe-diarize). This endpoint is optimized for meeting-style audio and uses a different model than the standard `/transcription` route. **Supported formats:** mp3, mp4, mpeg, mpga, m4a, wav, webm **Max file size:** configurable (default 500 MB; files are chunked automatically if > 25 MB) Args: file: Meeting audio file upload language: Optional language code to improve accuracy prompt: Optional prompt to guide transcription style (e.g., \"notes de réunion structurées\") current_user: Authenticated user (JWT required) Returns: Transcription with text, detected language, and duration Raises: HTTPException: If file format is unsupported or transcription fails """ project_id, duration_seconds, max_duration_seconds = quota_ctx try: # Transcribe meeting audio using dedicated service/model result = await meeting_transcription_service.transcribe( audio_file=file, language=language, prompt=prompt, ) # Override duration with the server-measured audio duration (pydub), # so the client always gets a reliable duration in the first response. result["duration_s"] = duration_seconds return TranscriptionResponse(**result) except Exception as e: raise HTTPException( status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail=f"Meeting transcription failed: {str(e)}", ) @router.get("/supported-formats") async def get_supported_formats( current_user: dict = Depends(get_current_user) ) -> dict: """ Get list of supported audio formats. Returns: Dictionary with supported formats and info """ return { "supported_formats": ["mp3", "mp4", "mpeg", "mpga", "m4a", "wav", "webm"], "max_file_size_mb": settings.max_upload_mb_audio, "model": "whisper-1", "languages": "Auto-detection or specify ISO-639-1 code" } @router.post( "", response_model=TranscriptionResponse, responses={ 400: {"model": ErrorResponse, "description": "Invalid file format"}, 500: {"model": ErrorResponse, "description": "Transcription failed"} } ) async def transcribe_audio( current_user: dict = Depends(get_current_user), file: UploadFile = Depends(validate_audio_upload), language: Optional[str] = Query(None, description="ISO-639-1 language code (e.g., 'en', 'fr')"), prompt: Optional[str] = Query(None, description="Optional text to guide the model's style") ) -> TranscriptionResponse: """ Transcribe an audio file to text using OpenAI Whisper. **Supported formats:** mp3, mp4, mpeg, mpga, m4a, wav, webm **Max file size:** configurable (default 500 MB; files are chunked automatically if > 25 MB) Args: file: Audio file upload language: Optional language code to improve accuracy prompt: Optional prompt to guide transcription style current_user: Authenticated user (JWT required) Returns: Transcription with text, detected language, and duration Raises: HTTPException: If file format is unsupported or transcription fails """ try: # Transcribe audio result = await transcription_service.transcribe( audio_file=file, language=language, prompt=prompt ) return TranscriptionResponse(**result) except Exception as e: raise HTTPException( status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail=f"Transcription failed: {str(e)}" ) @router.post( "/jobs", response_model=UploadJobResponse, status_code=status.HTTP_202_ACCEPTED, responses={ 400: {"model": ErrorResponse, "description": "Invalid file format"}, 500: {"model": ErrorResponse, "description": "Failed to start transcription job"}, }, ) async def create_transcription_job( current_user: dict = Depends(get_current_user), file: UploadFile = Depends(validate_audio_upload), language: Optional[str] = Query(None, description="ISO-639-1 language code (e.g., 'en', 'fr')"), prompt: Optional[str] = Query(None, description="Optional text to guide the model's style"), ) -> UploadJobResponse: """ Create a background job to transcribe an audio file using OpenAI Whisper. Returns a job_id that can be used to poll status and retrieve the final transcript via /transcription/jobs/{job_id}. """ content = await file.read() uploaded_by: Optional[str] = ( current_user.get("sub") or current_user.get("user_id") or current_user.get("email") ) # Create job and launch background transcription pipeline. job = transcription_job_service.create_job( filename=file.filename or "audio", uploaded_by=uploaded_by, job_type="transcription_audio", ) transcription_job_service.launch_job( job_id=job.job_id, filename=file.filename or "audio", content_bytes=content, language=language, prompt=prompt, meeting=False, ) return UploadJobResponse(job_id=job.job_id, status=job.status, created_at=job.created_at) @router.post( "/meeting/jobs", response_model=UploadJobResponse, status_code=status.HTTP_202_ACCEPTED, responses={ 400: {"model": ErrorResponse, "description": "Invalid file format"}, 500: {"model": ErrorResponse, "description": "Failed to start meeting transcription job"}, }, ) async def create_meeting_transcription_job( current_user: dict = Depends(get_current_user), file: UploadFile = Depends(validate_audio_upload), quota_ctx: Tuple[str, float, float] = Depends(validate_meeting_quota), language: Optional[str] = Query(None, description="ISO-639-1 language code (e.g., 'en', 'fr')"), prompt: Optional[str] = Query( None, description="Optional text to guide the model's style for meetings", ), ) -> UploadJobResponse: """ Create a background job to transcribe a meeting audio file using the dedicated meeting transcription model. Returns a job_id that can be used to poll status and retrieve the final transcript via /transcription/jobs/{job_id}. """ project_id, duration_seconds, max_duration_seconds = quota_ctx content = await file.read() uploaded_by: Optional[str] = ( current_user.get("sub") or current_user.get("user_id") or current_user.get("email") ) print( f"[Transcription] Creating meeting transcription job for project_id={project_id}, " f"endpoint='/meeting/jobs'" ) job = transcription_job_service.create_job( filename=file.filename or "audio", uploaded_by=uploaded_by, job_type="transcription_meeting", ) transcription_job_service.launch_job( job_id=job.job_id, filename=file.filename or "audio", content_bytes=content, language=language, prompt=prompt, meeting=True, duration_seconds=duration_seconds, ) return UploadJobResponse( job_id=job.job_id, status=job.status, created_at=job.created_at, duration_s=duration_seconds, ) @router.get( "/jobs/{job_id}", response_model=JobStatusResponse, responses={ 404: {"model": ErrorResponse, "description": "Job not found"}, }, ) async def get_transcription_job_status( job_id: str, current_user: dict = Depends(get_current_user), ) -> JobStatusResponse: """ Get the current status and (when completed) the transcript for a background transcription job. """ snapshot = transcription_job_service.get_status_snapshot(job_id) if not snapshot: raise HTTPException( status_code=status.HTTP_404_NOT_FOUND, detail="Job not found", ) return JobStatusResponse(**snapshot)