routeur_ia_api / services /transcription_job_service.py
Cyril Dupland
enhance transcription: add detailed logging for transcription job creation and processing, improve error handling for file pointer resets, and refine chunking logic for large audio files to ensure robust handling of transcription requests.
6c4dfd3
raw
history blame
5.19 kB
"""Domain service for managing asynchronous transcription jobs.
This service orchestrates:
- job creation in the shared TaskRegistry
- launching the transcription pipeline in the background
- updating job status and storing the final transcript in JobState
"""
from __future__ import annotations
import asyncio
import tempfile
import os
from typing import Optional
from config.settings import settings
from services.task_service import task_registry
from services.transcription_service import (
transcription_service,
meeting_transcription_service,
_transcribe_with_chunking,
)
class TranscriptionJobService:
"""High-level orchestration for audio transcription jobs."""
def create_job(
self,
filename: str,
uploaded_by: Optional[str],
*,
job_type: str,
):
"""Create a new transcription job in the registry."""
# For now we do not scope jobs by project; use a logical project id.
project_id = "transcription"
return task_registry.create_job(
project_id=project_id,
filename=filename,
uploaded_by=uploaded_by,
job_type=job_type,
)
def launch_job(
self,
job_id: str,
filename: str,
content_bytes: bytes,
*,
language: Optional[str],
prompt: Optional[str],
meeting: bool = False,
duration_seconds: Optional[float] = None,
) -> None:
"""
Launch the transcription pipeline as a background asyncio Task.
This method is non-blocking and returns immediately.
"""
async def _runner() -> None:
await _run_transcription_pipeline(
job_id=job_id,
filename=filename,
content_bytes=content_bytes,
language=language,
prompt=prompt,
meeting=meeting,
duration_seconds=duration_seconds,
)
# Ensure we schedule on the running event loop.
loop = asyncio.get_event_loop()
loop.create_task(_runner())
def get_status_snapshot(self, job_id: str) -> Optional[dict]:
"""Return a read-only snapshot of job status."""
return task_registry.status_snapshot(job_id)
async def _run_transcription_pipeline(
job_id: str,
filename: str,
content_bytes: bytes,
*,
language: Optional[str],
prompt: Optional[str],
meeting: bool,
duration_seconds: Optional[float],
) -> None:
"""
Execute the transcription pipeline and update job progress in the registry.
"""
job = task_registry.get(job_id)
if not job:
return
task_registry.set_running(job_id)
task_registry.set_stage(job_id, "transcribe")
job.log(f"Starting transcription job for '{filename}' (meeting={meeting})")
# Choose the appropriate underlying transcription configuration.
if meeting:
client = meeting_transcription_service.client
model = meeting_transcription_service.model
response_format = "json"
else:
client = transcription_service.client
model = transcription_service.model
response_format = "verbose_json"
print(f"[Transcription] Starting transcription job for '{filename}' (meeting={meeting})")
print(f"[Transcription] model={model}")
print(f"[Transcription] response_format={response_format}")
tmp_file_path = None
try:
# Persist the uploaded content to a temporary file and reuse the
# shared chunking logic from transcription_service.
suffix = transcription_service._get_file_extension(filename) # type: ignore[attr-defined]
with tempfile.NamedTemporaryFile(delete=False, suffix=suffix) as tmp_file:
tmp_file.write(content_bytes)
tmp_file.flush()
tmp_file_path = tmp_file.name
result = await _transcribe_with_chunking(
client,
model,
tmp_file_path,
language=language,
prompt=prompt,
response_format=response_format,
)
# Persist transcription result on the JobState.
job = task_registry.get(job_id)
if job:
job.transcript_text = result.get("text")
job.transcript_language = result.get("language")
# Prefer a server-measured audio duration when provided (e.g. meeting uploads),
# otherwise fall back to the duration reported by the transcription result.
job.transcript_duration = (
duration_seconds if duration_seconds is not None else result.get("duration")
)
job.transcript_model = result.get("model") or model
job.transcript_metadata = result.get("metadata")
# Mark job as succeeded.
task_registry.set_done(job_id, inserted_count=1)
except Exception as exc: # noqa: BLE001
task_registry.set_failed(job_id, error=str(exc))
finally:
if tmp_file_path and os.path.exists(tmp_file_path):
os.unlink(tmp_file_path)
# Singleton instance
transcription_job_service = TranscriptionJobService()