| """In-memory task registry for background ingestion jobs. |
| |
| Provides a swappable abstraction to later plug Celery/RQ without API changes. |
| """ |
| from __future__ import annotations |
|
|
| from dataclasses import dataclass, field |
| from datetime import datetime, timezone |
| from threading import Lock |
| from typing import Dict, Optional, List |
| import uuid |
|
|
|
|
| @dataclass |
| class JobState: |
| job_id: str |
| project_id: str |
| filename: str |
| uploaded_by: Optional[str] = None |
| status: str = "queued" |
| stage: Optional[str] = None |
| progress: Optional[float] = None |
| progress_percent: Optional[int] = None |
| pages_total: Optional[int] = None |
| pages_done: Optional[int] = None |
| chunks_total: Optional[int] = None |
| chunks_done: Optional[int] = None |
| inserted_count: Optional[int] = None |
| error: Optional[str] = None |
| created_at: datetime = field(default_factory=lambda: datetime.now(timezone.utc)) |
| started_at: Optional[datetime] = None |
| updated_at: Optional[datetime] = None |
| finished_at: Optional[datetime] = None |
| logs: List[str] = field(default_factory=list) |
|
|
| def log(self, message: str, max_logs: int = 100) -> None: |
| ts = datetime.now(timezone.utc).isoformat() |
| entry = f"[{ts}] {message}" |
| self.logs.append(entry) |
| if len(self.logs) > max_logs: |
| self.logs.pop(0) |
|
|
|
|
| class TaskRegistry: |
| def __init__(self) -> None: |
| self._jobs: Dict[str, JobState] = {} |
| self._lock = Lock() |
|
|
| def create_job(self, project_id: str, filename: str, uploaded_by: Optional[str]) -> JobState: |
| job_id = str(uuid.uuid4()) |
| job = JobState(job_id=job_id, project_id=project_id, filename=filename, uploaded_by=uploaded_by) |
| with self._lock: |
| self._jobs[job_id] = job |
| job.log("Job created; queued") |
| return job |
|
|
| def get(self, job_id: str) -> Optional[JobState]: |
| with self._lock: |
| return self._jobs.get(job_id) |
|
|
| def set_running(self, job_id: str) -> None: |
| job = self.get(job_id) |
| if not job: |
| return |
| job.status = "running" |
| job.started_at = job.started_at or datetime.now(timezone.utc) |
| job.updated_at = datetime.now(timezone.utc) |
| job.log("Job running") |
|
|
| def set_stage(self, job_id: str, stage: str) -> None: |
| job = self.get(job_id) |
| if not job: |
| return |
| job.stage = stage |
| job.updated_at = datetime.now(timezone.utc) |
| job.log(f"Stage -> {stage}") |
|
|
| def set_progress(self, job_id: str, progress: Optional[float] = None, *, |
| pages_total: Optional[int] = None, pages_done: Optional[int] = None, |
| chunks_total: Optional[int] = None, chunks_done: Optional[int] = None, |
| inserted_count: Optional[int] = None) -> None: |
| job = self.get(job_id) |
| if not job: |
| return |
| if pages_total is not None: |
| job.pages_total = pages_total |
| if pages_done is not None: |
| job.pages_done = pages_done |
| if chunks_total is not None: |
| job.chunks_total = chunks_total |
| if chunks_done is not None: |
| job.chunks_done = chunks_done |
| if inserted_count is not None: |
| job.inserted_count = inserted_count |
| |
| if progress is not None: |
| job.progress = max(0.0, min(1.0, progress)) |
| else: |
| if job.pages_total and job.pages_done is not None: |
| try: |
| job.progress = max(0.0, min(1.0, job.pages_done / float(job.pages_total))) |
| except ZeroDivisionError: |
| job.progress = 0.0 |
| job.progress_percent = int(round((job.progress or 0.0) * 100)) |
| job.updated_at = datetime.now(timezone.utc) |
|
|
| def set_done(self, job_id: str, inserted_count: int) -> None: |
| job = self.get(job_id) |
| if not job: |
| return |
| job.status = "succeeded" |
| job.progress = 1.0 |
| job.progress_percent = 100 |
| job.inserted_count = inserted_count |
| job.updated_at = datetime.now(timezone.utc) |
| job.finished_at = job.finished_at or datetime.now(timezone.utc) |
| job.log(f"Job succeeded; inserted={inserted_count}") |
|
|
| def set_failed(self, job_id: str, error: str) -> None: |
| job = self.get(job_id) |
| if not job: |
| return |
| job.status = "failed" |
| job.error = error |
| job.updated_at = datetime.now(timezone.utc) |
| job.finished_at = job.finished_at or datetime.now(timezone.utc) |
| job.log(f"Job failed: {error}") |
|
|
| def status_snapshot(self, job_id: str) -> Optional[dict]: |
| job = self.get(job_id) |
| if not job: |
| return None |
| return { |
| "job_id": job.job_id, |
| "status": job.status, |
| "progress": job.progress, |
| "progress_percent": job.progress_percent, |
| "stage": job.stage, |
| "pages_total": job.pages_total, |
| "pages_done": job.pages_done, |
| "chunks_total": job.chunks_total, |
| "chunks_done": job.chunks_done, |
| "inserted_count": job.inserted_count, |
| "error": job.error, |
| "created_at": job.created_at, |
| "started_at": job.started_at, |
| "updated_at": job.updated_at, |
| "finished_at": job.finished_at, |
| } |
|
|
|
|
| |
| task_registry = TaskRegistry() |
|
|
|
|
|
|
|
|