"""In-memory task registry for background ingestion jobs. Provides a swappable abstraction to later plug Celery/RQ without API changes. """ from __future__ import annotations from dataclasses import dataclass, field from datetime import datetime, timezone from threading import Lock from typing import Dict, Optional, List import uuid @dataclass class JobState: job_id: str project_id: str filename: str uploaded_by: Optional[str] = None status: str = "queued" # queued | running | succeeded | failed stage: Optional[str] = None # upload | ocr | chunk | embed | index progress: Optional[float] = None progress_percent: Optional[int] = None pages_total: Optional[int] = None pages_done: Optional[int] = None chunks_total: Optional[int] = None chunks_done: Optional[int] = None inserted_count: Optional[int] = None error: Optional[str] = None created_at: datetime = field(default_factory=lambda: datetime.now(timezone.utc)) started_at: Optional[datetime] = None updated_at: Optional[datetime] = None finished_at: Optional[datetime] = None logs: List[str] = field(default_factory=list) def log(self, message: str, max_logs: int = 100) -> None: ts = datetime.now(timezone.utc).isoformat() entry = f"[{ts}] {message}" self.logs.append(entry) if len(self.logs) > max_logs: self.logs.pop(0) class TaskRegistry: def __init__(self) -> None: self._jobs: Dict[str, JobState] = {} self._lock = Lock() def create_job(self, project_id: str, filename: str, uploaded_by: Optional[str]) -> JobState: job_id = str(uuid.uuid4()) job = JobState(job_id=job_id, project_id=project_id, filename=filename, uploaded_by=uploaded_by) with self._lock: self._jobs[job_id] = job job.log("Job created; queued") return job def get(self, job_id: str) -> Optional[JobState]: with self._lock: return self._jobs.get(job_id) def set_running(self, job_id: str) -> None: job = self.get(job_id) if not job: return job.status = "running" job.started_at = job.started_at or datetime.now(timezone.utc) job.updated_at = datetime.now(timezone.utc) job.log("Job running") def set_stage(self, job_id: str, stage: str) -> None: job = self.get(job_id) if not job: return job.stage = stage job.updated_at = datetime.now(timezone.utc) job.log(f"Stage -> {stage}") def set_progress(self, job_id: str, progress: Optional[float] = None, *, pages_total: Optional[int] = None, pages_done: Optional[int] = None, chunks_total: Optional[int] = None, chunks_done: Optional[int] = None, inserted_count: Optional[int] = None) -> None: job = self.get(job_id) if not job: return if pages_total is not None: job.pages_total = pages_total if pages_done is not None: job.pages_done = pages_done if chunks_total is not None: job.chunks_total = chunks_total if chunks_done is not None: job.chunks_done = chunks_done if inserted_count is not None: job.inserted_count = inserted_count # Derive progress if not explicitly set if progress is not None: job.progress = max(0.0, min(1.0, progress)) else: if job.pages_total and job.pages_done is not None: try: job.progress = max(0.0, min(1.0, job.pages_done / float(job.pages_total))) except ZeroDivisionError: job.progress = 0.0 job.progress_percent = int(round((job.progress or 0.0) * 100)) job.updated_at = datetime.now(timezone.utc) def set_done(self, job_id: str, inserted_count: int) -> None: job = self.get(job_id) if not job: return job.status = "succeeded" job.progress = 1.0 job.progress_percent = 100 job.inserted_count = inserted_count job.updated_at = datetime.now(timezone.utc) job.finished_at = job.finished_at or datetime.now(timezone.utc) job.log(f"Job succeeded; inserted={inserted_count}") def set_failed(self, job_id: str, error: str) -> None: job = self.get(job_id) if not job: return job.status = "failed" job.error = error job.updated_at = datetime.now(timezone.utc) job.finished_at = job.finished_at or datetime.now(timezone.utc) job.log(f"Job failed: {error}") def status_snapshot(self, job_id: str) -> Optional[dict]: job = self.get(job_id) if not job: return None return { "job_id": job.job_id, "status": job.status, "progress": job.progress, "progress_percent": job.progress_percent, "stage": job.stage, "pages_total": job.pages_total, "pages_done": job.pages_done, "chunks_total": job.chunks_total, "chunks_done": job.chunks_done, "inserted_count": job.inserted_count, "error": job.error, "created_at": job.created_at, "started_at": job.started_at, "updated_at": job.updated_at, "finished_at": job.finished_at, } # Singleton registry task_registry = TaskRegistry()