routeur_ia_api / services /task_service.py
Cyril Dupland
Injestion project documents
595f77d
raw
history blame
5.55 kB
"""In-memory task registry for background ingestion jobs.
Provides a swappable abstraction to later plug Celery/RQ without API changes.
"""
from __future__ import annotations
from dataclasses import dataclass, field
from datetime import datetime, timezone
from threading import Lock
from typing import Dict, Optional, List
import uuid
@dataclass
class JobState:
job_id: str
project_id: str
filename: str
uploaded_by: Optional[str] = None
status: str = "queued" # queued | running | succeeded | failed
stage: Optional[str] = None # upload | ocr | chunk | embed | index
progress: Optional[float] = None
progress_percent: Optional[int] = None
pages_total: Optional[int] = None
pages_done: Optional[int] = None
chunks_total: Optional[int] = None
chunks_done: Optional[int] = None
inserted_count: Optional[int] = None
error: Optional[str] = None
created_at: datetime = field(default_factory=lambda: datetime.now(timezone.utc))
started_at: Optional[datetime] = None
updated_at: Optional[datetime] = None
finished_at: Optional[datetime] = None
logs: List[str] = field(default_factory=list)
def log(self, message: str, max_logs: int = 100) -> None:
ts = datetime.now(timezone.utc).isoformat()
entry = f"[{ts}] {message}"
self.logs.append(entry)
if len(self.logs) > max_logs:
self.logs.pop(0)
class TaskRegistry:
def __init__(self) -> None:
self._jobs: Dict[str, JobState] = {}
self._lock = Lock()
def create_job(self, project_id: str, filename: str, uploaded_by: Optional[str]) -> JobState:
job_id = str(uuid.uuid4())
job = JobState(job_id=job_id, project_id=project_id, filename=filename, uploaded_by=uploaded_by)
with self._lock:
self._jobs[job_id] = job
job.log("Job created; queued")
return job
def get(self, job_id: str) -> Optional[JobState]:
with self._lock:
return self._jobs.get(job_id)
def set_running(self, job_id: str) -> None:
job = self.get(job_id)
if not job:
return
job.status = "running"
job.started_at = job.started_at or datetime.now(timezone.utc)
job.updated_at = datetime.now(timezone.utc)
job.log("Job running")
def set_stage(self, job_id: str, stage: str) -> None:
job = self.get(job_id)
if not job:
return
job.stage = stage
job.updated_at = datetime.now(timezone.utc)
job.log(f"Stage -> {stage}")
def set_progress(self, job_id: str, progress: Optional[float] = None, *,
pages_total: Optional[int] = None, pages_done: Optional[int] = None,
chunks_total: Optional[int] = None, chunks_done: Optional[int] = None,
inserted_count: Optional[int] = None) -> None:
job = self.get(job_id)
if not job:
return
if pages_total is not None:
job.pages_total = pages_total
if pages_done is not None:
job.pages_done = pages_done
if chunks_total is not None:
job.chunks_total = chunks_total
if chunks_done is not None:
job.chunks_done = chunks_done
if inserted_count is not None:
job.inserted_count = inserted_count
# Derive progress if not explicitly set
if progress is not None:
job.progress = max(0.0, min(1.0, progress))
else:
if job.pages_total and job.pages_done is not None:
try:
job.progress = max(0.0, min(1.0, job.pages_done / float(job.pages_total)))
except ZeroDivisionError:
job.progress = 0.0
job.progress_percent = int(round((job.progress or 0.0) * 100))
job.updated_at = datetime.now(timezone.utc)
def set_done(self, job_id: str, inserted_count: int) -> None:
job = self.get(job_id)
if not job:
return
job.status = "succeeded"
job.progress = 1.0
job.progress_percent = 100
job.inserted_count = inserted_count
job.updated_at = datetime.now(timezone.utc)
job.finished_at = job.finished_at or datetime.now(timezone.utc)
job.log(f"Job succeeded; inserted={inserted_count}")
def set_failed(self, job_id: str, error: str) -> None:
job = self.get(job_id)
if not job:
return
job.status = "failed"
job.error = error
job.updated_at = datetime.now(timezone.utc)
job.finished_at = job.finished_at or datetime.now(timezone.utc)
job.log(f"Job failed: {error}")
def status_snapshot(self, job_id: str) -> Optional[dict]:
job = self.get(job_id)
if not job:
return None
return {
"job_id": job.job_id,
"status": job.status,
"progress": job.progress,
"progress_percent": job.progress_percent,
"stage": job.stage,
"pages_total": job.pages_total,
"pages_done": job.pages_done,
"chunks_total": job.chunks_total,
"chunks_done": job.chunks_done,
"inserted_count": job.inserted_count,
"error": job.error,
"created_at": job.created_at,
"started_at": job.started_at,
"updated_at": job.updated_at,
"finished_at": job.finished_at,
}
# Singleton registry
task_registry = TaskRegistry()