"""Domain service for managing asynchronous transcription jobs. This service orchestrates: - job creation in the shared TaskRegistry - launching the transcription pipeline in the background - updating job status and storing the final transcript in JobState """ from __future__ import annotations import asyncio import tempfile import os from typing import Optional from config.settings import settings from services.task_service import task_registry from services.transcription_service import ( transcription_service, meeting_transcription_service, _transcribe_with_chunking, ) class TranscriptionJobService: """High-level orchestration for audio transcription jobs.""" def create_job( self, filename: str, uploaded_by: Optional[str], *, job_type: str, ): """Create a new transcription job in the registry.""" # For now we do not scope jobs by project; use a logical project id. project_id = "transcription" return task_registry.create_job( project_id=project_id, filename=filename, uploaded_by=uploaded_by, job_type=job_type, ) def launch_job( self, job_id: str, filename: str, content_bytes: bytes, *, language: Optional[str], prompt: Optional[str], meeting: bool = False, duration_seconds: Optional[float] = None, ) -> None: """ Launch the transcription pipeline as a background asyncio Task. This method is non-blocking and returns immediately. """ async def _runner() -> None: await _run_transcription_pipeline( job_id=job_id, filename=filename, content_bytes=content_bytes, language=language, prompt=prompt, meeting=meeting, duration_seconds=duration_seconds, ) # Ensure we schedule on the running event loop. loop = asyncio.get_event_loop() loop.create_task(_runner()) def get_status_snapshot(self, job_id: str) -> Optional[dict]: """Return a read-only snapshot of job status.""" return task_registry.status_snapshot(job_id) async def _run_transcription_pipeline( job_id: str, filename: str, content_bytes: bytes, *, language: Optional[str], prompt: Optional[str], meeting: bool, duration_seconds: Optional[float], ) -> None: """ Execute the transcription pipeline and update job progress in the registry. """ job = task_registry.get(job_id) if not job: return task_registry.set_running(job_id) task_registry.set_stage(job_id, "transcribe") job.log(f"Starting transcription job for '{filename}' (meeting={meeting})") # Choose the appropriate underlying transcription configuration. if meeting: client = meeting_transcription_service.client model = meeting_transcription_service.model response_format = "json" else: client = transcription_service.client model = transcription_service.model response_format = "verbose_json" print(f"[Transcription] Starting transcription job for '{filename}' (meeting={meeting})") print(f"[Transcription] model={model}") print(f"[Transcription] response_format={response_format}") tmp_file_path = None try: # Persist the uploaded content to a temporary file and reuse the # shared chunking logic from transcription_service. suffix = transcription_service._get_file_extension(filename) # type: ignore[attr-defined] with tempfile.NamedTemporaryFile(delete=False, suffix=suffix) as tmp_file: tmp_file.write(content_bytes) tmp_file.flush() tmp_file_path = tmp_file.name result = await _transcribe_with_chunking( client, model, tmp_file_path, language=language, prompt=prompt, response_format=response_format, ) # Persist transcription result on the JobState. job = task_registry.get(job_id) if job: job.transcript_text = result.get("text") job.transcript_language = result.get("language") # Prefer a server-measured audio duration when provided (e.g. meeting uploads), # otherwise fall back to the duration reported by the transcription result. job.transcript_duration = ( duration_seconds if duration_seconds is not None else result.get("duration") ) job.transcript_model = result.get("model") or model job.transcript_metadata = result.get("metadata") # Mark job as succeeded. task_registry.set_done(job_id, inserted_count=1) except Exception as exc: # noqa: BLE001 task_registry.set_failed(job_id, error=str(exc)) finally: if tmp_file_path and os.path.exists(tmp_file_path): os.unlink(tmp_file_path) # Singleton instance transcription_job_service = TranscriptionJobService()