Cyril Dupland
enhance transcription: add detailed logging for transcription job creation and processing, improve error handling for file pointer resets, and refine chunking logic for large audio files to ensure robust handling of transcription requests.
6c4dfd3 | """Domain service for managing asynchronous transcription jobs. | |
| This service orchestrates: | |
| - job creation in the shared TaskRegistry | |
| - launching the transcription pipeline in the background | |
| - updating job status and storing the final transcript in JobState | |
| """ | |
| from __future__ import annotations | |
| import asyncio | |
| import tempfile | |
| import os | |
| from typing import Optional | |
| from config.settings import settings | |
| from services.task_service import task_registry | |
| from services.transcription_service import ( | |
| transcription_service, | |
| meeting_transcription_service, | |
| _transcribe_with_chunking, | |
| ) | |
| class TranscriptionJobService: | |
| """High-level orchestration for audio transcription jobs.""" | |
| def create_job( | |
| self, | |
| filename: str, | |
| uploaded_by: Optional[str], | |
| *, | |
| job_type: str, | |
| ): | |
| """Create a new transcription job in the registry.""" | |
| # For now we do not scope jobs by project; use a logical project id. | |
| project_id = "transcription" | |
| return task_registry.create_job( | |
| project_id=project_id, | |
| filename=filename, | |
| uploaded_by=uploaded_by, | |
| job_type=job_type, | |
| ) | |
| def launch_job( | |
| self, | |
| job_id: str, | |
| filename: str, | |
| content_bytes: bytes, | |
| *, | |
| language: Optional[str], | |
| prompt: Optional[str], | |
| meeting: bool = False, | |
| duration_seconds: Optional[float] = None, | |
| ) -> None: | |
| """ | |
| Launch the transcription pipeline as a background asyncio Task. | |
| This method is non-blocking and returns immediately. | |
| """ | |
| async def _runner() -> None: | |
| await _run_transcription_pipeline( | |
| job_id=job_id, | |
| filename=filename, | |
| content_bytes=content_bytes, | |
| language=language, | |
| prompt=prompt, | |
| meeting=meeting, | |
| duration_seconds=duration_seconds, | |
| ) | |
| # Ensure we schedule on the running event loop. | |
| loop = asyncio.get_event_loop() | |
| loop.create_task(_runner()) | |
| def get_status_snapshot(self, job_id: str) -> Optional[dict]: | |
| """Return a read-only snapshot of job status.""" | |
| return task_registry.status_snapshot(job_id) | |
| async def _run_transcription_pipeline( | |
| job_id: str, | |
| filename: str, | |
| content_bytes: bytes, | |
| *, | |
| language: Optional[str], | |
| prompt: Optional[str], | |
| meeting: bool, | |
| duration_seconds: Optional[float], | |
| ) -> None: | |
| """ | |
| Execute the transcription pipeline and update job progress in the registry. | |
| """ | |
| job = task_registry.get(job_id) | |
| if not job: | |
| return | |
| task_registry.set_running(job_id) | |
| task_registry.set_stage(job_id, "transcribe") | |
| job.log(f"Starting transcription job for '{filename}' (meeting={meeting})") | |
| # Choose the appropriate underlying transcription configuration. | |
| if meeting: | |
| client = meeting_transcription_service.client | |
| model = meeting_transcription_service.model | |
| response_format = "json" | |
| else: | |
| client = transcription_service.client | |
| model = transcription_service.model | |
| response_format = "verbose_json" | |
| print(f"[Transcription] Starting transcription job for '{filename}' (meeting={meeting})") | |
| print(f"[Transcription] model={model}") | |
| print(f"[Transcription] response_format={response_format}") | |
| tmp_file_path = None | |
| try: | |
| # Persist the uploaded content to a temporary file and reuse the | |
| # shared chunking logic from transcription_service. | |
| suffix = transcription_service._get_file_extension(filename) # type: ignore[attr-defined] | |
| with tempfile.NamedTemporaryFile(delete=False, suffix=suffix) as tmp_file: | |
| tmp_file.write(content_bytes) | |
| tmp_file.flush() | |
| tmp_file_path = tmp_file.name | |
| result = await _transcribe_with_chunking( | |
| client, | |
| model, | |
| tmp_file_path, | |
| language=language, | |
| prompt=prompt, | |
| response_format=response_format, | |
| ) | |
| # Persist transcription result on the JobState. | |
| job = task_registry.get(job_id) | |
| if job: | |
| job.transcript_text = result.get("text") | |
| job.transcript_language = result.get("language") | |
| # Prefer a server-measured audio duration when provided (e.g. meeting uploads), | |
| # otherwise fall back to the duration reported by the transcription result. | |
| job.transcript_duration = ( | |
| duration_seconds if duration_seconds is not None else result.get("duration") | |
| ) | |
| job.transcript_model = result.get("model") or model | |
| job.transcript_metadata = result.get("metadata") | |
| # Mark job as succeeded. | |
| task_registry.set_done(job_id, inserted_count=1) | |
| except Exception as exc: # noqa: BLE001 | |
| task_registry.set_failed(job_id, error=str(exc)) | |
| finally: | |
| if tmp_file_path and os.path.exists(tmp_file_path): | |
| os.unlink(tmp_file_path) | |
| # Singleton instance | |
| transcription_job_service = TranscriptionJobService() | |