Spaces:
Running
Running
| """ | |
| Video Job Queue β async job infrastructure for Segmento Sense. | |
| Pattern: in-process asyncio.Queue + ThreadPoolExecutor. | |
| - No Redis, no Celery, no external dependencies. | |
| - Single background asyncio Task consumes jobs one at a time. | |
| - Job state lives in _job_store (dict); results also persisted to /tmp/ | |
| so they survive a dict reset (not a full process restart). | |
| Public API (called from video_router.py and api.py): | |
| setup(classifier) β inject shared classifier | |
| create_job(...) β register a new job, return job_id | |
| enqueue(job_id) β push job_id onto the asyncio.Queue | |
| get_job(job_id) β read job status / result | |
| cancel_job(job_id) β cancel if still queued | |
| startup() β launch background worker (call from FastAPI startup) | |
| """ | |
| from __future__ import annotations | |
| import asyncio | |
| import json | |
| import os | |
| import time | |
| import uuid | |
| from concurrent.futures import ThreadPoolExecutor | |
| from typing import Any, Dict, List, Optional | |
| from file_handlers.video_pipeline import process_video | |
| # ββ Constants βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| _JOB_DIR = "/tmp/video_jobs" | |
| _MAX_WORKERS = 2 # max concurrent video jobs in thread pool | |
| # ββ Module-level state ββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| _job_store: Dict[str, Dict[str, Any]] = {} | |
| _queue: asyncio.Queue = asyncio.Queue() | |
| _executor = ThreadPoolExecutor(max_workers=_MAX_WORKERS, thread_name_prefix="video_worker") | |
| _classifier = None # injected by setup() | |
| # ββ Dependency injection ββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| def setup(classifier_instance) -> None: | |
| """Inject the shared RegexClassifier from api.py.""" | |
| global _classifier | |
| _classifier = classifier_instance | |
| # ββ Disk persistence (best-effort) ββββββββββββββββββββββββββββββββββββββββββββ | |
| def _persist(job_id: str) -> None: | |
| """Write completed/errored job to disk. Skips large 'parsed_text' field to keep files small.""" | |
| os.makedirs(_JOB_DIR, exist_ok=True) | |
| entry = _job_store.get(job_id) | |
| if not entry: | |
| return | |
| try: | |
| # Shallow copy β omit internal path and large text from disk store | |
| safe = {k: v for k, v in entry.items() if k not in ("video_path",)} | |
| if safe.get("result") and isinstance(safe["result"], dict): | |
| # Store parsed_text only truncated to 2000 chars on disk (full lives in memory) | |
| result_copy = dict(safe["result"]) | |
| result_copy["parsed_text"] = result_copy.get("parsed_text", "")[:2000] | |
| safe["result"] = result_copy | |
| path = os.path.join(_JOB_DIR, f"{job_id}.json") | |
| with open(path, "w", encoding="utf-8") as fh: | |
| json.dump(safe, fh, ensure_ascii=False) | |
| except Exception: | |
| pass # persistence is best-effort | |
| def _load_from_disk(job_id: str) -> Optional[Dict[str, Any]]: | |
| """Try to recover a job result from disk (for jobs not in memory).""" | |
| path = os.path.join(_JOB_DIR, f"{job_id}.json") | |
| if os.path.exists(path): | |
| try: | |
| with open(path, encoding="utf-8") as fh: | |
| return json.load(fh) | |
| except Exception: | |
| pass | |
| return None | |
| # ββ Job lifecycle βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| def create_job(video_path: str, model_keys: List[str]) -> str: | |
| """Create a job entry in _job_store, return the new job_id.""" | |
| job_id = str(uuid.uuid4()) | |
| _job_store[job_id] = { | |
| "status": "queued", | |
| "progress": 0, | |
| "stage_detail": "Waiting in queueβ¦", | |
| "result": None, | |
| "error": None, | |
| "created_at": time.time(), | |
| "video_path": video_path, # internal β stripped before sending to client | |
| "model_keys": model_keys, | |
| } | |
| return job_id | |
| def get_job(job_id: str) -> Optional[Dict[str, Any]]: | |
| """Return job status dict, checking disk if not found in memory.""" | |
| if job_id in _job_store: | |
| entry = _job_store[job_id] | |
| # Return safe copy without internal file path | |
| return {k: v for k, v in entry.items() if k != "video_path"} | |
| return _load_from_disk(job_id) | |
| def cancel_job(job_id: str) -> bool: | |
| """ | |
| Cancel a queued job. Returns True on success. | |
| Cannot cancel a job that is already extracting/transcribing/scanning. | |
| """ | |
| entry = _job_store.get(job_id) | |
| if entry and entry["status"] == "queued": | |
| entry["status"] = "cancelled" | |
| entry["stage_detail"] = "Cancelled by user." | |
| _persist(job_id) | |
| return True | |
| return False | |
| async def enqueue(job_id: str) -> None: | |
| """Put an already-created job onto the async queue.""" | |
| await _queue.put(job_id) | |
| # ββ Core job processor ββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| def _process_job_sync(job_id: str) -> None: | |
| """ | |
| Synchronous heavy-lifting β runs inside ThreadPoolExecutor so it never | |
| blocks the FastAPI event loop. | |
| Stages: | |
| 1. Video extraction (ffmpeg + Whisper) β 0 % β¦ 95 % | |
| 2. NLP model scan β 90 % β¦ 100 % | |
| """ | |
| entry = _job_store.get(job_id) | |
| if not entry or entry["status"] in ("cancelled", "error"): | |
| return | |
| video_path = entry["video_path"] | |
| model_keys = entry["model_keys"] | |
| # ββ progress callback (thread-safe dict write) ββββββββββββββββββββββββββββ | |
| def _progress(pct: int, detail: str) -> None: | |
| if entry.get("status") == "cancelled": | |
| raise InterruptedError("Job cancelled by user") | |
| entry["progress"] = pct | |
| entry["stage_detail"] = detail | |
| try: | |
| # ββ Stage 1: Extract text from video βββββββββββββββββββββββββββββββββ | |
| entry["status"] = "extracting" | |
| _progress(1, "Starting video extractionβ¦") | |
| extracted_text = process_video(video_path, _progress) | |
| if not extracted_text.strip() or extracted_text.startswith("[ERROR]"): | |
| entry["status"] = "error" | |
| entry["error"] = extracted_text or "No extractable text found." | |
| _persist(job_id) | |
| return | |
| # ββ Stage 2: Run PII models βββββββββββββββββββββββββββββββββββββββββββ | |
| entry["status"] = "scanning" | |
| _progress(90, f"Running {len(model_keys)} PII model(s)β¦") | |
| if _classifier is None: | |
| raise RuntimeError("Classifier not initialised β call video_job_queue.setup() first.") | |
| all_detections: Dict[str, List[dict]] = {} | |
| for i, key in enumerate(model_keys): | |
| _progress( | |
| 90 + int((i / len(model_keys)) * 8), | |
| f"Scanning with {key} ({i + 1}/{len(model_keys)})β¦", | |
| ) | |
| dets = _classifier.analyze_text_hybrid(extracted_text[:100_000], [key]) | |
| all_detections[key] = dets | |
| # ββ Stage 3: Build showdown result ββββββββββββββββββββββββββββββββββββ | |
| # Re-use the exact same logic as evaluator_api so the frontend | |
| # gets identical shaped data β zero frontend changes needed. | |
| from evaluator_api import _build_showdown_result | |
| union_keys: set = set() | |
| for dets in all_detections.values(): | |
| for m in dets: | |
| union_keys.add((m["start"], m["end"], m["text"])) | |
| per_model: Dict[str, dict] = {} | |
| for key in model_keys: | |
| per_model[key] = _build_showdown_result( | |
| key, | |
| all_detections[key], | |
| union_keys, | |
| all_detections, | |
| ) | |
| ranked = sorted( | |
| [ | |
| {"model_key": k, "pii_count": v["pii_count"], "accuracy": v["accuracy"]} | |
| for k, v in per_model.items() | |
| ], | |
| key=lambda x: x["pii_count"], | |
| reverse=True, | |
| ) | |
| for i, r in enumerate(ranked): | |
| r["rank"] = i + 1 | |
| # ββ Done ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| entry["status"] = "done" | |
| entry["progress"] = 100 | |
| entry["stage_detail"] = "Complete β PII scan finished." | |
| entry["result"] = { | |
| "per_model": per_model, | |
| "has_gt": False, | |
| "elapsed": round(time.time() - entry["created_at"], 2), | |
| "union_total": len(union_keys), | |
| "ranked": ranked, | |
| "parsed_text": extracted_text[:100_000], | |
| } | |
| _persist(job_id) | |
| except InterruptedError: | |
| entry["status"] = "cancelled" | |
| entry["stage_detail"] = "Cancelled." | |
| _persist(job_id) | |
| except Exception as exc: | |
| entry["status"] = "error" | |
| entry["error"] = str(exc) | |
| entry["stage_detail"] = f"Error: {exc}" | |
| _persist(job_id) | |
| finally: | |
| # Always clean up the temp video file | |
| try: | |
| if video_path and os.path.exists(video_path): | |
| os.remove(video_path) | |
| except Exception: | |
| pass | |
| # ββ Background asyncio worker βββββββββββββββββββββββββββββββββββββββββββββββββ | |
| async def _worker() -> None: | |
| """ | |
| Runs forever as a background asyncio Task. | |
| Pulls job_ids from the queue and processes them in the thread pool. | |
| """ | |
| loop = asyncio.get_event_loop() | |
| while True: | |
| job_id: str = await _queue.get() | |
| entry = _job_store.get(job_id, {}) | |
| if entry.get("status") == "cancelled": | |
| _queue.task_done() | |
| continue | |
| try: | |
| await loop.run_in_executor(_executor, _process_job_sync, job_id) | |
| except Exception as exc: | |
| if job_id in _job_store: | |
| _job_store[job_id]["status"] = "error" | |
| _job_store[job_id]["error"] = str(exc) | |
| finally: | |
| _queue.task_done() | |
| async def startup() -> None: | |
| """ | |
| Launch the background worker task. | |
| Must be called once from FastAPI's startup event. | |
| """ | |
| asyncio.create_task(_worker()) | |