sense-backend / video_job_queue.py
SHAFI
added video feature
413cf21
Raw
History Blame Contribute Delete
11.1 kB
"""
Video Job Queue β€” async job infrastructure for Segmento Sense.
Pattern: in-process asyncio.Queue + ThreadPoolExecutor.
- No Redis, no Celery, no external dependencies.
- Single background asyncio Task consumes jobs one at a time.
- Job state lives in _job_store (dict); results also persisted to /tmp/
so they survive a dict reset (not a full process restart).
Public API (called from video_router.py and api.py):
setup(classifier) β€” inject shared classifier
create_job(...) β€” register a new job, return job_id
enqueue(job_id) β€” push job_id onto the asyncio.Queue
get_job(job_id) β€” read job status / result
cancel_job(job_id) β€” cancel if still queued
startup() β€” launch background worker (call from FastAPI startup)
"""
from __future__ import annotations
import asyncio
import json
import os
import time
import uuid
from concurrent.futures import ThreadPoolExecutor
from typing import Any, Dict, List, Optional
from file_handlers.video_pipeline import process_video
# ── Constants ─────────────────────────────────────────────────────────────────
_JOB_DIR = "/tmp/video_jobs"
_MAX_WORKERS = 2 # max concurrent video jobs in thread pool
# ── Module-level state ────────────────────────────────────────────────────────
_job_store: Dict[str, Dict[str, Any]] = {}
_queue: asyncio.Queue = asyncio.Queue()
_executor = ThreadPoolExecutor(max_workers=_MAX_WORKERS, thread_name_prefix="video_worker")
_classifier = None # injected by setup()
# ── Dependency injection ──────────────────────────────────────────────────────
def setup(classifier_instance) -> None:
"""Inject the shared RegexClassifier from api.py."""
global _classifier
_classifier = classifier_instance
# ── Disk persistence (best-effort) ────────────────────────────────────────────
def _persist(job_id: str) -> None:
"""Write completed/errored job to disk. Skips large 'parsed_text' field to keep files small."""
os.makedirs(_JOB_DIR, exist_ok=True)
entry = _job_store.get(job_id)
if not entry:
return
try:
# Shallow copy β€” omit internal path and large text from disk store
safe = {k: v for k, v in entry.items() if k not in ("video_path",)}
if safe.get("result") and isinstance(safe["result"], dict):
# Store parsed_text only truncated to 2000 chars on disk (full lives in memory)
result_copy = dict(safe["result"])
result_copy["parsed_text"] = result_copy.get("parsed_text", "")[:2000]
safe["result"] = result_copy
path = os.path.join(_JOB_DIR, f"{job_id}.json")
with open(path, "w", encoding="utf-8") as fh:
json.dump(safe, fh, ensure_ascii=False)
except Exception:
pass # persistence is best-effort
def _load_from_disk(job_id: str) -> Optional[Dict[str, Any]]:
"""Try to recover a job result from disk (for jobs not in memory)."""
path = os.path.join(_JOB_DIR, f"{job_id}.json")
if os.path.exists(path):
try:
with open(path, encoding="utf-8") as fh:
return json.load(fh)
except Exception:
pass
return None
# ── Job lifecycle ─────────────────────────────────────────────────────────────
def create_job(video_path: str, model_keys: List[str]) -> str:
"""Create a job entry in _job_store, return the new job_id."""
job_id = str(uuid.uuid4())
_job_store[job_id] = {
"status": "queued",
"progress": 0,
"stage_detail": "Waiting in queue…",
"result": None,
"error": None,
"created_at": time.time(),
"video_path": video_path, # internal β€” stripped before sending to client
"model_keys": model_keys,
}
return job_id
def get_job(job_id: str) -> Optional[Dict[str, Any]]:
"""Return job status dict, checking disk if not found in memory."""
if job_id in _job_store:
entry = _job_store[job_id]
# Return safe copy without internal file path
return {k: v for k, v in entry.items() if k != "video_path"}
return _load_from_disk(job_id)
def cancel_job(job_id: str) -> bool:
"""
Cancel a queued job. Returns True on success.
Cannot cancel a job that is already extracting/transcribing/scanning.
"""
entry = _job_store.get(job_id)
if entry and entry["status"] == "queued":
entry["status"] = "cancelled"
entry["stage_detail"] = "Cancelled by user."
_persist(job_id)
return True
return False
async def enqueue(job_id: str) -> None:
"""Put an already-created job onto the async queue."""
await _queue.put(job_id)
# ── Core job processor ────────────────────────────────────────────────────────
def _process_job_sync(job_id: str) -> None:
"""
Synchronous heavy-lifting β€” runs inside ThreadPoolExecutor so it never
blocks the FastAPI event loop.
Stages:
1. Video extraction (ffmpeg + Whisper) β†’ 0 % … 95 %
2. NLP model scan β†’ 90 % … 100 %
"""
entry = _job_store.get(job_id)
if not entry or entry["status"] in ("cancelled", "error"):
return
video_path = entry["video_path"]
model_keys = entry["model_keys"]
# ── progress callback (thread-safe dict write) ────────────────────────────
def _progress(pct: int, detail: str) -> None:
if entry.get("status") == "cancelled":
raise InterruptedError("Job cancelled by user")
entry["progress"] = pct
entry["stage_detail"] = detail
try:
# ── Stage 1: Extract text from video ─────────────────────────────────
entry["status"] = "extracting"
_progress(1, "Starting video extraction…")
extracted_text = process_video(video_path, _progress)
if not extracted_text.strip() or extracted_text.startswith("[ERROR]"):
entry["status"] = "error"
entry["error"] = extracted_text or "No extractable text found."
_persist(job_id)
return
# ── Stage 2: Run PII models ───────────────────────────────────────────
entry["status"] = "scanning"
_progress(90, f"Running {len(model_keys)} PII model(s)…")
if _classifier is None:
raise RuntimeError("Classifier not initialised β€” call video_job_queue.setup() first.")
all_detections: Dict[str, List[dict]] = {}
for i, key in enumerate(model_keys):
_progress(
90 + int((i / len(model_keys)) * 8),
f"Scanning with {key} ({i + 1}/{len(model_keys)})…",
)
dets = _classifier.analyze_text_hybrid(extracted_text[:100_000], [key])
all_detections[key] = dets
# ── Stage 3: Build showdown result ────────────────────────────────────
# Re-use the exact same logic as evaluator_api so the frontend
# gets identical shaped data β€” zero frontend changes needed.
from evaluator_api import _build_showdown_result
union_keys: set = set()
for dets in all_detections.values():
for m in dets:
union_keys.add((m["start"], m["end"], m["text"]))
per_model: Dict[str, dict] = {}
for key in model_keys:
per_model[key] = _build_showdown_result(
key,
all_detections[key],
union_keys,
all_detections,
)
ranked = sorted(
[
{"model_key": k, "pii_count": v["pii_count"], "accuracy": v["accuracy"]}
for k, v in per_model.items()
],
key=lambda x: x["pii_count"],
reverse=True,
)
for i, r in enumerate(ranked):
r["rank"] = i + 1
# ── Done ──────────────────────────────────────────────────────────────
entry["status"] = "done"
entry["progress"] = 100
entry["stage_detail"] = "Complete β€” PII scan finished."
entry["result"] = {
"per_model": per_model,
"has_gt": False,
"elapsed": round(time.time() - entry["created_at"], 2),
"union_total": len(union_keys),
"ranked": ranked,
"parsed_text": extracted_text[:100_000],
}
_persist(job_id)
except InterruptedError:
entry["status"] = "cancelled"
entry["stage_detail"] = "Cancelled."
_persist(job_id)
except Exception as exc:
entry["status"] = "error"
entry["error"] = str(exc)
entry["stage_detail"] = f"Error: {exc}"
_persist(job_id)
finally:
# Always clean up the temp video file
try:
if video_path and os.path.exists(video_path):
os.remove(video_path)
except Exception:
pass
# ── Background asyncio worker ─────────────────────────────────────────────────
async def _worker() -> None:
"""
Runs forever as a background asyncio Task.
Pulls job_ids from the queue and processes them in the thread pool.
"""
loop = asyncio.get_event_loop()
while True:
job_id: str = await _queue.get()
entry = _job_store.get(job_id, {})
if entry.get("status") == "cancelled":
_queue.task_done()
continue
try:
await loop.run_in_executor(_executor, _process_job_sync, job_id)
except Exception as exc:
if job_id in _job_store:
_job_store[job_id]["status"] = "error"
_job_store[job_id]["error"] = str(exc)
finally:
_queue.task_done()
async def startup() -> None:
"""
Launch the background worker task.
Must be called once from FastAPI's startup event.
"""
asyncio.create_task(_worker())