""" Video Job Queue — async job infrastructure for Segmento Sense. Pattern: in-process asyncio.Queue + ThreadPoolExecutor. - No Redis, no Celery, no external dependencies. - Single background asyncio Task consumes jobs one at a time. - Job state lives in _job_store (dict); results also persisted to /tmp/ so they survive a dict reset (not a full process restart). Public API (called from video_router.py and api.py): setup(classifier) — inject shared classifier create_job(...) — register a new job, return job_id enqueue(job_id) — push job_id onto the asyncio.Queue get_job(job_id) — read job status / result cancel_job(job_id) — cancel if still queued startup() — launch background worker (call from FastAPI startup) """ from __future__ import annotations import asyncio import json import os import time import uuid from concurrent.futures import ThreadPoolExecutor from typing import Any, Dict, List, Optional from file_handlers.video_pipeline import process_video # ── Constants ───────────────────────────────────────────────────────────────── _JOB_DIR = "/tmp/video_jobs" _MAX_WORKERS = 2 # max concurrent video jobs in thread pool # ── Module-level state ──────────────────────────────────────────────────────── _job_store: Dict[str, Dict[str, Any]] = {} _queue: asyncio.Queue = asyncio.Queue() _executor = ThreadPoolExecutor(max_workers=_MAX_WORKERS, thread_name_prefix="video_worker") _classifier = None # injected by setup() # ── Dependency injection ────────────────────────────────────────────────────── def setup(classifier_instance) -> None: """Inject the shared RegexClassifier from api.py.""" global _classifier _classifier = classifier_instance # ── Disk persistence (best-effort) ──────────────────────────────────────────── def _persist(job_id: str) -> None: """Write completed/errored job to disk. Skips large 'parsed_text' field to keep files small.""" os.makedirs(_JOB_DIR, exist_ok=True) entry = _job_store.get(job_id) if not entry: return try: # Shallow copy — omit internal path and large text from disk store safe = {k: v for k, v in entry.items() if k not in ("video_path",)} if safe.get("result") and isinstance(safe["result"], dict): # Store parsed_text only truncated to 2000 chars on disk (full lives in memory) result_copy = dict(safe["result"]) result_copy["parsed_text"] = result_copy.get("parsed_text", "")[:2000] safe["result"] = result_copy path = os.path.join(_JOB_DIR, f"{job_id}.json") with open(path, "w", encoding="utf-8") as fh: json.dump(safe, fh, ensure_ascii=False) except Exception: pass # persistence is best-effort def _load_from_disk(job_id: str) -> Optional[Dict[str, Any]]: """Try to recover a job result from disk (for jobs not in memory).""" path = os.path.join(_JOB_DIR, f"{job_id}.json") if os.path.exists(path): try: with open(path, encoding="utf-8") as fh: return json.load(fh) except Exception: pass return None # ── Job lifecycle ───────────────────────────────────────────────────────────── def create_job(video_path: str, model_keys: List[str]) -> str: """Create a job entry in _job_store, return the new job_id.""" job_id = str(uuid.uuid4()) _job_store[job_id] = { "status": "queued", "progress": 0, "stage_detail": "Waiting in queue…", "result": None, "error": None, "created_at": time.time(), "video_path": video_path, # internal — stripped before sending to client "model_keys": model_keys, } return job_id def get_job(job_id: str) -> Optional[Dict[str, Any]]: """Return job status dict, checking disk if not found in memory.""" if job_id in _job_store: entry = _job_store[job_id] # Return safe copy without internal file path return {k: v for k, v in entry.items() if k != "video_path"} return _load_from_disk(job_id) def cancel_job(job_id: str) -> bool: """ Cancel a queued job. Returns True on success. Cannot cancel a job that is already extracting/transcribing/scanning. """ entry = _job_store.get(job_id) if entry and entry["status"] == "queued": entry["status"] = "cancelled" entry["stage_detail"] = "Cancelled by user." _persist(job_id) return True return False async def enqueue(job_id: str) -> None: """Put an already-created job onto the async queue.""" await _queue.put(job_id) # ── Core job processor ──────────────────────────────────────────────────────── def _process_job_sync(job_id: str) -> None: """ Synchronous heavy-lifting — runs inside ThreadPoolExecutor so it never blocks the FastAPI event loop. Stages: 1. Video extraction (ffmpeg + Whisper) → 0 % … 95 % 2. NLP model scan → 90 % … 100 % """ entry = _job_store.get(job_id) if not entry or entry["status"] in ("cancelled", "error"): return video_path = entry["video_path"] model_keys = entry["model_keys"] # ── progress callback (thread-safe dict write) ──────────────────────────── def _progress(pct: int, detail: str) -> None: if entry.get("status") == "cancelled": raise InterruptedError("Job cancelled by user") entry["progress"] = pct entry["stage_detail"] = detail try: # ── Stage 1: Extract text from video ───────────────────────────────── entry["status"] = "extracting" _progress(1, "Starting video extraction…") extracted_text = process_video(video_path, _progress) if not extracted_text.strip() or extracted_text.startswith("[ERROR]"): entry["status"] = "error" entry["error"] = extracted_text or "No extractable text found." _persist(job_id) return # ── Stage 2: Run PII models ─────────────────────────────────────────── entry["status"] = "scanning" _progress(90, f"Running {len(model_keys)} PII model(s)…") if _classifier is None: raise RuntimeError("Classifier not initialised — call video_job_queue.setup() first.") all_detections: Dict[str, List[dict]] = {} for i, key in enumerate(model_keys): _progress( 90 + int((i / len(model_keys)) * 8), f"Scanning with {key} ({i + 1}/{len(model_keys)})…", ) dets = _classifier.analyze_text_hybrid(extracted_text[:100_000], [key]) all_detections[key] = dets # ── Stage 3: Build showdown result ──────────────────────────────────── # Re-use the exact same logic as evaluator_api so the frontend # gets identical shaped data — zero frontend changes needed. from evaluator_api import _build_showdown_result union_keys: set = set() for dets in all_detections.values(): for m in dets: union_keys.add((m["start"], m["end"], m["text"])) per_model: Dict[str, dict] = {} for key in model_keys: per_model[key] = _build_showdown_result( key, all_detections[key], union_keys, all_detections, ) ranked = sorted( [ {"model_key": k, "pii_count": v["pii_count"], "accuracy": v["accuracy"]} for k, v in per_model.items() ], key=lambda x: x["pii_count"], reverse=True, ) for i, r in enumerate(ranked): r["rank"] = i + 1 # ── Done ────────────────────────────────────────────────────────────── entry["status"] = "done" entry["progress"] = 100 entry["stage_detail"] = "Complete — PII scan finished." entry["result"] = { "per_model": per_model, "has_gt": False, "elapsed": round(time.time() - entry["created_at"], 2), "union_total": len(union_keys), "ranked": ranked, "parsed_text": extracted_text[:100_000], } _persist(job_id) except InterruptedError: entry["status"] = "cancelled" entry["stage_detail"] = "Cancelled." _persist(job_id) except Exception as exc: entry["status"] = "error" entry["error"] = str(exc) entry["stage_detail"] = f"Error: {exc}" _persist(job_id) finally: # Always clean up the temp video file try: if video_path and os.path.exists(video_path): os.remove(video_path) except Exception: pass # ── Background asyncio worker ───────────────────────────────────────────────── async def _worker() -> None: """ Runs forever as a background asyncio Task. Pulls job_ids from the queue and processes them in the thread pool. """ loop = asyncio.get_event_loop() while True: job_id: str = await _queue.get() entry = _job_store.get(job_id, {}) if entry.get("status") == "cancelled": _queue.task_done() continue try: await loop.run_in_executor(_executor, _process_job_sync, job_id) except Exception as exc: if job_id in _job_store: _job_store[job_id]["status"] = "error" _job_store[job_id]["error"] = str(exc) finally: _queue.task_done() async def startup() -> None: """ Launch the background worker task. Must be called once from FastAPI's startup event. """ asyncio.create_task(_worker())