SHAFI commited on
Commit
413cf21
·
1 Parent(s): 633b94c

added video feature

Browse files
api.py CHANGED
@@ -11,6 +11,8 @@ import asyncio
11
  from backend import RegexClassifier
12
  from email_service import send_welcome_email
13
  import evaluator_api
 
 
14
 
15
  # Initialize FastAPI app
16
  app = FastAPI(
@@ -41,6 +43,16 @@ classifier = RegexClassifier()
41
  evaluator_api.setup(classifier)
42
  app.include_router(evaluator_api.router)
43
 
 
 
 
 
 
 
 
 
 
 
44
  # Maximum file size (1GB)
45
  MAX_FILE_SIZE = 1024 * 1024 * 1024 # 1GB in bytes
46
 
 
11
  from backend import RegexClassifier
12
  from email_service import send_welcome_email
13
  import evaluator_api
14
+ import video_router
15
+ import video_job_queue
16
 
17
  # Initialize FastAPI app
18
  app = FastAPI(
 
43
  evaluator_api.setup(classifier)
44
  app.include_router(evaluator_api.router)
45
 
46
+ # Wire video API — async job queue (startup worker launched below)
47
+ video_job_queue.setup(classifier)
48
+ app.include_router(video_router.router)
49
+
50
+
51
+ @app.on_event("startup")
52
+ async def _start_video_worker():
53
+ """Launch the background asyncio video processing worker."""
54
+ await video_job_queue.startup()
55
+
56
  # Maximum file size (1GB)
57
  MAX_FILE_SIZE = 1024 * 1024 * 1024 # 1GB in bytes
58
 
file_handlers/universal_parser.py CHANGED
@@ -26,6 +26,12 @@ CATEGORY_FILE_TYPES = {
26
  {"ext": "eml", "label": "Email (.eml)"},
27
  {"ext": "epub", "label": "eBook (.epub)"},
28
  {"ext": "pptx", "label": "PowerPoint (.pptx)"},
 
 
 
 
 
 
29
  ],
30
  "semi_structured": [
31
  {"ext": "json", "label": "JSON (.json)"},
 
26
  {"ext": "eml", "label": "Email (.eml)"},
27
  {"ext": "epub", "label": "eBook (.epub)"},
28
  {"ext": "pptx", "label": "PowerPoint (.pptx)"},
29
+ # Video formats — processed via async job queue, not universal_parser
30
+ {"ext": "mp4", "label": "🎬 MP4 Video (.mp4)", "async": True},
31
+ {"ext": "mkv", "label": "🎬 MKV Video (.mkv)", "async": True},
32
+ {"ext": "avi", "label": "🎬 AVI Video (.avi)", "async": True},
33
+ {"ext": "mov", "label": "🎬 MOV Video (.mov)", "async": True},
34
+ {"ext": "webm", "label": "🎬 WebM Video (.webm)", "async": True},
35
  ],
36
  "semi_structured": [
37
  {"ext": "json", "label": "JSON (.json)"},
file_handlers/video_pipeline.py ADDED
@@ -0,0 +1,266 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ """
2
+ Video Pipeline — 3-channel text extraction for PII detection.
3
+
4
+ Channels:
5
+ 1. Metadata — ffprobe JSON → flat key: value text
6
+ 2. Subtitles — embedded SRT/VTT tracks demuxed via ffmpeg
7
+ 3. Audio — ffmpeg WAV + faster-whisper (base) transcription
8
+
9
+ Returns merged plain text with source attribution headers so the
10
+ downstream NLP models receive clearly labelled, scannable content.
11
+
12
+ All external calls are guarded with timeouts so this never hangs forever.
13
+ """
14
+ from __future__ import annotations
15
+
16
+ import json
17
+ import os
18
+ import re
19
+ import subprocess
20
+ import tempfile
21
+ from typing import Callable, Optional
22
+
23
+ # ── Lazy Whisper loader ──────────────────────────────────────────────────────
24
+
25
+ WHISPER_MODEL_SIZE = "base"
26
+ _whisper_model = None
27
+
28
+
29
+ def _get_whisper():
30
+ global _whisper_model
31
+ if _whisper_model is None:
32
+ try:
33
+ from faster_whisper import WhisperModel
34
+ _whisper_model = WhisperModel(
35
+ WHISPER_MODEL_SIZE,
36
+ device="cpu",
37
+ compute_type="int8", # quantised — runs on CPU without VRAM
38
+ )
39
+ except ImportError:
40
+ raise RuntimeError(
41
+ "faster-whisper is not installed. "
42
+ "Run: pip install faster-whisper"
43
+ )
44
+ return _whisper_model
45
+
46
+
47
+ # ── ffmpeg availability ───────────────────────────────────────────────────────
48
+
49
+ def _check_ffmpeg() -> bool:
50
+ """Return True if ffmpeg/ffprobe are available in PATH."""
51
+ try:
52
+ subprocess.run(
53
+ ["ffmpeg", "-version"],
54
+ capture_output=True, check=True, timeout=5,
55
+ )
56
+ return True
57
+ except Exception:
58
+ return False
59
+
60
+
61
+ # ── Channel 1: Metadata ───────────────────────────────────────────────────────
62
+
63
+ def _extract_metadata(video_path: str) -> str:
64
+ """Use ffprobe to extract all metadata tags as flat key: value text."""
65
+ try:
66
+ result = subprocess.run(
67
+ [
68
+ "ffprobe", "-v", "quiet",
69
+ "-print_format", "json",
70
+ "-show_format", "-show_streams",
71
+ video_path,
72
+ ],
73
+ capture_output=True, text=True, timeout=30, check=True,
74
+ )
75
+ data = json.loads(result.stdout)
76
+ lines = ["[METADATA]"]
77
+
78
+ fmt_tags = data.get("format", {}).get("tags", {})
79
+ for key, val in fmt_tags.items():
80
+ lines.append(f"{key}: {val}")
81
+
82
+ for idx, stream in enumerate(data.get("streams", [])):
83
+ stream_tags = stream.get("tags", {})
84
+ for key, val in stream_tags.items():
85
+ lines.append(f"stream{idx}_{key}: {val}")
86
+
87
+ return "\n".join(lines) if len(lines) > 1 else ""
88
+ except Exception as exc:
89
+ return f"[METADATA]\n[Could not extract: {exc}]"
90
+
91
+
92
+ # ── Channel 2: Subtitles ──────────────────────────────────────────────────────
93
+
94
+ _TS_PATTERN = re.compile(r"(\d{2}:\d{2}:\d{2}),\d{3} --> ")
95
+ _TS_CAPTURE = re.compile(r"(\d{2}:\d{2}:\d{2})")
96
+
97
+
98
+ def _parse_srt(raw: str) -> list[str]:
99
+ """Convert raw SRT content to clean timestamped lines."""
100
+ lines = raw.splitlines()
101
+ result = []
102
+ i = 0
103
+ while i < len(lines):
104
+ line = lines[i].strip()
105
+ if _TS_PATTERN.match(line):
106
+ ts_match = _TS_CAPTURE.match(line)
107
+ label = ts_match.group(1) if ts_match else ""
108
+ i += 1
109
+ parts: list[str] = []
110
+ while i < len(lines) and lines[i].strip():
111
+ parts.append(lines[i].strip())
112
+ i += 1
113
+ if parts:
114
+ result.append(f"[{label}] {' '.join(parts)}")
115
+ i += 1
116
+ return result
117
+
118
+
119
+ def _extract_subtitles(video_path: str, tmp_dir: str) -> str:
120
+ """Demux the first embedded subtitle track; parse and return clean text."""
121
+ srt_path = os.path.join(tmp_dir, "subs.srt")
122
+ try:
123
+ subprocess.run(
124
+ ["ffmpeg", "-y", "-i", video_path, "-map", "0:s:0", srt_path],
125
+ capture_output=True, timeout=60, check=True,
126
+ )
127
+ except Exception:
128
+ return "" # No subtitles is normal — not an error
129
+
130
+ if not os.path.exists(srt_path) or os.path.getsize(srt_path) == 0:
131
+ return ""
132
+
133
+ with open(srt_path, encoding="utf-8", errors="replace") as fh:
134
+ raw = fh.read()
135
+
136
+ lines = _parse_srt(raw)
137
+ if not lines:
138
+ return ""
139
+ return "[SUBTITLES]\n" + "\n".join(lines)
140
+
141
+
142
+ # ── Channel 3: Audio Transcript ────────────────────────────────────���──────────
143
+
144
+ def _extract_audio_transcript(
145
+ video_path: str,
146
+ tmp_dir: str,
147
+ progress_cb: Optional[Callable[[int, str], None]] = None,
148
+ ) -> str:
149
+ """
150
+ Extract audio track with ffmpeg → transcribe with faster-whisper (base).
151
+ progress_cb(percent: int, detail: str) is called throughout transcription.
152
+ """
153
+ wav_path = os.path.join(tmp_dir, "audio.wav")
154
+
155
+ # Step A: Extract audio as 16 kHz mono WAV (Whisper requirement)
156
+ try:
157
+ subprocess.run(
158
+ [
159
+ "ffmpeg", "-y", "-i", video_path,
160
+ "-vn", # no video
161
+ "-acodec", "pcm_s16le", # 16-bit PCM
162
+ "-ar", "16000", # 16 kHz sample rate
163
+ "-ac", "1", # mono
164
+ wav_path,
165
+ ],
166
+ capture_output=True, timeout=180, check=True,
167
+ )
168
+ except subprocess.TimeoutExpired:
169
+ return "[AUDIO TRANSCRIPT]\n[Audio extraction timed out after 3 minutes]"
170
+ except Exception as exc:
171
+ return f"[AUDIO TRANSCRIPT]\n[Audio extraction failed: {exc}]"
172
+
173
+ if not os.path.exists(wav_path) or os.path.getsize(wav_path) < 1024:
174
+ return "[AUDIO TRANSCRIPT]\n[No audio track found in this video]"
175
+
176
+ # Step B: Transcribe
177
+ try:
178
+ model = _get_whisper()
179
+ if progress_cb:
180
+ progress_cb(0, "Starting Whisper transcription…")
181
+
182
+ segments_iter, info = model.transcribe(
183
+ wav_path,
184
+ beam_size=5,
185
+ language=None, # auto-detect
186
+ vad_filter=True, # skip silent sections — faster
187
+ )
188
+ duration = float(info.duration) if info.duration else 1.0
189
+
190
+ transcript_lines = ["[AUDIO TRANSCRIPT]"]
191
+ for seg in segments_iter:
192
+ start_s = int(seg.start)
193
+ end_s = int(seg.end)
194
+ ts = f"{start_s // 60:02d}:{start_s % 60:02d} → {end_s // 60:02d}:{end_s % 60:02d}"
195
+ transcript_lines.append(f"[{ts}] {seg.text.strip()}")
196
+
197
+ if progress_cb:
198
+ pct = min(int((seg.end / duration) * 100), 99)
199
+ dur_fmt = f"{int(duration // 60):02d}:{int(duration % 60):02d}"
200
+ progress_cb(pct, f"Transcribing audio… {ts} / {dur_fmt}")
201
+
202
+ return "\n".join(transcript_lines)
203
+
204
+ except Exception as exc:
205
+ return f"[AUDIO TRANSCRIPT]\n[Transcription failed: {exc}]"
206
+
207
+
208
+ # ── Public entry point ────────────────────────────────────────────────────────
209
+
210
+ def process_video(
211
+ video_path: str,
212
+ progress_cb: Optional[Callable[[int, str], None]] = None,
213
+ ) -> str:
214
+ """
215
+ Full 3-channel extraction pipeline.
216
+ Returns merged text with source-attribution headers.
217
+
218
+ progress_cb(percent: int, detail: str)
219
+ — called throughout; percent maps 0 → 95 (final 5% is model scanning)
220
+ """
221
+ if not _check_ffmpeg():
222
+ return (
223
+ "[ERROR] ffmpeg not found in PATH.\n"
224
+ "On HuggingFace Spaces add 'ffmpeg' to packages.txt.\n"
225
+ "Locally: https://ffmpeg.org/download.html"
226
+ )
227
+
228
+ parts: list[str] = []
229
+
230
+ with tempfile.TemporaryDirectory() as tmp_dir:
231
+
232
+ # ── Channel 1: Metadata (instant, ~1 s) ─────────────────────────────
233
+ if progress_cb:
234
+ progress_cb(3, "Extracting video metadata…")
235
+ meta = _extract_metadata(video_path)
236
+ if meta:
237
+ parts.append(meta)
238
+
239
+ # ── Channel 2: Subtitles (fast, ~2–5 s) ─────────────────────────────
240
+ if progress_cb:
241
+ progress_cb(8, "Demuxing embedded subtitles…")
242
+ subs = _extract_subtitles(video_path, tmp_dir)
243
+ if subs:
244
+ parts.append(subs)
245
+
246
+ # ── Channel 3: Audio Transcript (slow — Whisper) ─────────────────────
247
+ if progress_cb:
248
+ progress_cb(13, "Initialising Whisper model…")
249
+
250
+ def _audio_progress(pct: int, detail: str) -> None:
251
+ # Remap audio progress: 13 % → 93 %
252
+ mapped = 13 + int(pct * 0.80)
253
+ if progress_cb:
254
+ progress_cb(mapped, detail)
255
+
256
+ transcript = _extract_audio_transcript(video_path, tmp_dir, _audio_progress)
257
+ if transcript:
258
+ parts.append(transcript)
259
+
260
+ if progress_cb:
261
+ progress_cb(95, "Extraction complete — handing off to PII models…")
262
+
263
+ if not parts:
264
+ return "[No extractable text found in this video file]"
265
+
266
+ return "\n\n".join(parts)
packages.txt CHANGED
@@ -1,4 +1,5 @@
1
  tesseract-ocr
2
  tesseract-ocr-eng
3
  libgl1-mesa-glx
4
- libglib2.0-0
 
 
1
  tesseract-ocr
2
  tesseract-ocr-eng
3
  libgl1-mesa-glx
4
+ libglib2.0-0
5
+ ffmpeg
requirements.txt CHANGED
@@ -49,3 +49,6 @@ pydifact
49
  openpyxl
50
  xlrd
51
  lxml
 
 
 
 
49
  openpyxl
50
  xlrd
51
  lxml
52
+ # ── Video PII processing ────────────────────────────────────
53
+ faster-whisper
54
+ opencv-python-headless
video_job_queue.py ADDED
@@ -0,0 +1,279 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ """
2
+ Video Job Queue — async job infrastructure for Segmento Sense.
3
+
4
+ Pattern: in-process asyncio.Queue + ThreadPoolExecutor.
5
+ - No Redis, no Celery, no external dependencies.
6
+ - Single background asyncio Task consumes jobs one at a time.
7
+ - Job state lives in _job_store (dict); results also persisted to /tmp/
8
+ so they survive a dict reset (not a full process restart).
9
+
10
+ Public API (called from video_router.py and api.py):
11
+ setup(classifier) — inject shared classifier
12
+ create_job(...) — register a new job, return job_id
13
+ enqueue(job_id) — push job_id onto the asyncio.Queue
14
+ get_job(job_id) — read job status / result
15
+ cancel_job(job_id) — cancel if still queued
16
+ startup() — launch background worker (call from FastAPI startup)
17
+ """
18
+ from __future__ import annotations
19
+
20
+ import asyncio
21
+ import json
22
+ import os
23
+ import time
24
+ import uuid
25
+ from concurrent.futures import ThreadPoolExecutor
26
+ from typing import Any, Dict, List, Optional
27
+
28
+ from file_handlers.video_pipeline import process_video
29
+
30
+ # ── Constants ─────────────────────────────────────────────────────────────────
31
+
32
+ _JOB_DIR = "/tmp/video_jobs"
33
+ _MAX_WORKERS = 2 # max concurrent video jobs in thread pool
34
+
35
+ # ── Module-level state ────────────────────────────────────────────────────────
36
+
37
+ _job_store: Dict[str, Dict[str, Any]] = {}
38
+ _queue: asyncio.Queue = asyncio.Queue()
39
+ _executor = ThreadPoolExecutor(max_workers=_MAX_WORKERS, thread_name_prefix="video_worker")
40
+ _classifier = None # injected by setup()
41
+
42
+
43
+ # ── Dependency injection ──────────────────────────────────────────────────────
44
+
45
+ def setup(classifier_instance) -> None:
46
+ """Inject the shared RegexClassifier from api.py."""
47
+ global _classifier
48
+ _classifier = classifier_instance
49
+
50
+
51
+ # ── Disk persistence (best-effort) ────────────────────────────────────────────
52
+
53
+ def _persist(job_id: str) -> None:
54
+ """Write completed/errored job to disk. Skips large 'parsed_text' field to keep files small."""
55
+ os.makedirs(_JOB_DIR, exist_ok=True)
56
+ entry = _job_store.get(job_id)
57
+ if not entry:
58
+ return
59
+ try:
60
+ # Shallow copy — omit internal path and large text from disk store
61
+ safe = {k: v for k, v in entry.items() if k not in ("video_path",)}
62
+ if safe.get("result") and isinstance(safe["result"], dict):
63
+ # Store parsed_text only truncated to 2000 chars on disk (full lives in memory)
64
+ result_copy = dict(safe["result"])
65
+ result_copy["parsed_text"] = result_copy.get("parsed_text", "")[:2000]
66
+ safe["result"] = result_copy
67
+ path = os.path.join(_JOB_DIR, f"{job_id}.json")
68
+ with open(path, "w", encoding="utf-8") as fh:
69
+ json.dump(safe, fh, ensure_ascii=False)
70
+ except Exception:
71
+ pass # persistence is best-effort
72
+
73
+
74
+ def _load_from_disk(job_id: str) -> Optional[Dict[str, Any]]:
75
+ """Try to recover a job result from disk (for jobs not in memory)."""
76
+ path = os.path.join(_JOB_DIR, f"{job_id}.json")
77
+ if os.path.exists(path):
78
+ try:
79
+ with open(path, encoding="utf-8") as fh:
80
+ return json.load(fh)
81
+ except Exception:
82
+ pass
83
+ return None
84
+
85
+
86
+ # ── Job lifecycle ─────────────────────────────────────────────────────────────
87
+
88
+ def create_job(video_path: str, model_keys: List[str]) -> str:
89
+ """Create a job entry in _job_store, return the new job_id."""
90
+ job_id = str(uuid.uuid4())
91
+ _job_store[job_id] = {
92
+ "status": "queued",
93
+ "progress": 0,
94
+ "stage_detail": "Waiting in queue…",
95
+ "result": None,
96
+ "error": None,
97
+ "created_at": time.time(),
98
+ "video_path": video_path, # internal — stripped before sending to client
99
+ "model_keys": model_keys,
100
+ }
101
+ return job_id
102
+
103
+
104
+ def get_job(job_id: str) -> Optional[Dict[str, Any]]:
105
+ """Return job status dict, checking disk if not found in memory."""
106
+ if job_id in _job_store:
107
+ entry = _job_store[job_id]
108
+ # Return safe copy without internal file path
109
+ return {k: v for k, v in entry.items() if k != "video_path"}
110
+ return _load_from_disk(job_id)
111
+
112
+
113
+ def cancel_job(job_id: str) -> bool:
114
+ """
115
+ Cancel a queued job. Returns True on success.
116
+ Cannot cancel a job that is already extracting/transcribing/scanning.
117
+ """
118
+ entry = _job_store.get(job_id)
119
+ if entry and entry["status"] == "queued":
120
+ entry["status"] = "cancelled"
121
+ entry["stage_detail"] = "Cancelled by user."
122
+ _persist(job_id)
123
+ return True
124
+ return False
125
+
126
+
127
+ async def enqueue(job_id: str) -> None:
128
+ """Put an already-created job onto the async queue."""
129
+ await _queue.put(job_id)
130
+
131
+
132
+ # ── Core job processor ────────────────────────────────────────────────────────
133
+
134
+ def _process_job_sync(job_id: str) -> None:
135
+ """
136
+ Synchronous heavy-lifting — runs inside ThreadPoolExecutor so it never
137
+ blocks the FastAPI event loop.
138
+
139
+ Stages:
140
+ 1. Video extraction (ffmpeg + Whisper) → 0 % … 95 %
141
+ 2. NLP model scan → 90 % … 100 %
142
+ """
143
+ entry = _job_store.get(job_id)
144
+ if not entry or entry["status"] in ("cancelled", "error"):
145
+ return
146
+
147
+ video_path = entry["video_path"]
148
+ model_keys = entry["model_keys"]
149
+
150
+ # ── progress callback (thread-safe dict write) ────────────────────────────
151
+ def _progress(pct: int, detail: str) -> None:
152
+ if entry.get("status") == "cancelled":
153
+ raise InterruptedError("Job cancelled by user")
154
+ entry["progress"] = pct
155
+ entry["stage_detail"] = detail
156
+
157
+ try:
158
+ # ── Stage 1: Extract text from video ─────────────────────────────────
159
+ entry["status"] = "extracting"
160
+ _progress(1, "Starting video extraction…")
161
+
162
+ extracted_text = process_video(video_path, _progress)
163
+
164
+ if not extracted_text.strip() or extracted_text.startswith("[ERROR]"):
165
+ entry["status"] = "error"
166
+ entry["error"] = extracted_text or "No extractable text found."
167
+ _persist(job_id)
168
+ return
169
+
170
+ # ── Stage 2: Run PII models ───────────────────────────────────────────
171
+ entry["status"] = "scanning"
172
+ _progress(90, f"Running {len(model_keys)} PII model(s)…")
173
+
174
+ if _classifier is None:
175
+ raise RuntimeError("Classifier not initialised — call video_job_queue.setup() first.")
176
+
177
+ all_detections: Dict[str, List[dict]] = {}
178
+ for i, key in enumerate(model_keys):
179
+ _progress(
180
+ 90 + int((i / len(model_keys)) * 8),
181
+ f"Scanning with {key} ({i + 1}/{len(model_keys)})…",
182
+ )
183
+ dets = _classifier.analyze_text_hybrid(extracted_text[:100_000], [key])
184
+ all_detections[key] = dets
185
+
186
+ # ── Stage 3: Build showdown result ────────────────────────────────────
187
+ # Re-use the exact same logic as evaluator_api so the frontend
188
+ # gets identical shaped data — zero frontend changes needed.
189
+ from evaluator_api import _build_showdown_result
190
+
191
+ union_keys: set = set()
192
+ for dets in all_detections.values():
193
+ for m in dets:
194
+ union_keys.add((m["start"], m["end"], m["text"]))
195
+
196
+ per_model: Dict[str, dict] = {}
197
+ for key in model_keys:
198
+ per_model[key] = _build_showdown_result(
199
+ key,
200
+ all_detections[key],
201
+ union_keys,
202
+ all_detections,
203
+ )
204
+
205
+ ranked = sorted(
206
+ [
207
+ {"model_key": k, "pii_count": v["pii_count"], "accuracy": v["accuracy"]}
208
+ for k, v in per_model.items()
209
+ ],
210
+ key=lambda x: x["pii_count"],
211
+ reverse=True,
212
+ )
213
+ for i, r in enumerate(ranked):
214
+ r["rank"] = i + 1
215
+
216
+ # ── Done ──────────────────────────────────────────────────────────────
217
+ entry["status"] = "done"
218
+ entry["progress"] = 100
219
+ entry["stage_detail"] = "Complete — PII scan finished."
220
+ entry["result"] = {
221
+ "per_model": per_model,
222
+ "has_gt": False,
223
+ "elapsed": round(time.time() - entry["created_at"], 2),
224
+ "union_total": len(union_keys),
225
+ "ranked": ranked,
226
+ "parsed_text": extracted_text[:100_000],
227
+ }
228
+ _persist(job_id)
229
+
230
+ except InterruptedError:
231
+ entry["status"] = "cancelled"
232
+ entry["stage_detail"] = "Cancelled."
233
+ _persist(job_id)
234
+
235
+ except Exception as exc:
236
+ entry["status"] = "error"
237
+ entry["error"] = str(exc)
238
+ entry["stage_detail"] = f"Error: {exc}"
239
+ _persist(job_id)
240
+
241
+ finally:
242
+ # Always clean up the temp video file
243
+ try:
244
+ if video_path and os.path.exists(video_path):
245
+ os.remove(video_path)
246
+ except Exception:
247
+ pass
248
+
249
+
250
+ # ── Background asyncio worker ─────────────────────────────────────────────────
251
+
252
+ async def _worker() -> None:
253
+ """
254
+ Runs forever as a background asyncio Task.
255
+ Pulls job_ids from the queue and processes them in the thread pool.
256
+ """
257
+ loop = asyncio.get_event_loop()
258
+ while True:
259
+ job_id: str = await _queue.get()
260
+ entry = _job_store.get(job_id, {})
261
+ if entry.get("status") == "cancelled":
262
+ _queue.task_done()
263
+ continue
264
+ try:
265
+ await loop.run_in_executor(_executor, _process_job_sync, job_id)
266
+ except Exception as exc:
267
+ if job_id in _job_store:
268
+ _job_store[job_id]["status"] = "error"
269
+ _job_store[job_id]["error"] = str(exc)
270
+ finally:
271
+ _queue.task_done()
272
+
273
+
274
+ async def startup() -> None:
275
+ """
276
+ Launch the background worker task.
277
+ Must be called once from FastAPI's startup event.
278
+ """
279
+ asyncio.create_task(_worker())
video_router.py ADDED
@@ -0,0 +1,122 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ """
2
+ Video Router — FastAPI router for async video PII processing.
3
+
4
+ Endpoints:
5
+ POST /api/video/upload — accept video, start async job, return job_id
6
+ GET /api/video/status/{job_id} — poll job status + progress + result
7
+ DELETE /api/video/cancel/{job_id} — cancel a queued job
8
+ """
9
+ from __future__ import annotations
10
+
11
+ import os
12
+ import tempfile
13
+
14
+ from fastapi import APIRouter, File, Form, HTTPException, UploadFile
15
+ from fastapi.responses import JSONResponse
16
+
17
+ import video_job_queue as _queue
18
+
19
+ router = APIRouter(prefix="/api/video", tags=["video"])
20
+
21
+ ALLOWED_EXTENSIONS = {"mp4", "mkv", "avi", "mov", "webm"}
22
+ MAX_VIDEO_BYTES = 500 * 1024 * 1024 # 500 MB hard limit
23
+ UPLOAD_DIR = "/tmp/video_uploads"
24
+
25
+
26
+ @router.post("/upload")
27
+ async def video_upload(
28
+ file: UploadFile = File(...),
29
+ model_keys: str = Form("regex,spacy,deberta"),
30
+ ):
31
+ """
32
+ Accept a video file upload.
33
+ Saves bytes to /tmp/, creates an async job, returns {job_id} immediately.
34
+ The client should poll GET /api/video/status/{job_id} every ~3 seconds.
35
+ """
36
+ # Validate extension
37
+ filename = file.filename or ""
38
+ ext = filename.rsplit(".", 1)[-1].lower() if "." in filename else ""
39
+ if ext not in ALLOWED_EXTENSIONS:
40
+ raise HTTPException(
41
+ status_code=400,
42
+ detail=(
43
+ f"Unsupported video format '{ext}'. "
44
+ f"Supported: {', '.join(sorted(ALLOWED_EXTENSIONS))}"
45
+ ),
46
+ )
47
+
48
+ # Read and size-check
49
+ content = await file.read()
50
+ if len(content) > MAX_VIDEO_BYTES:
51
+ raise HTTPException(
52
+ status_code=413,
53
+ detail=f"File is {len(content) // (1024*1024)} MB — limit is 500 MB.",
54
+ )
55
+ if len(content) < 1024:
56
+ raise HTTPException(status_code=400, detail="File appears to be empty.")
57
+
58
+ # Persist to temp dir
59
+ os.makedirs(UPLOAD_DIR, exist_ok=True)
60
+ fd, tmp_path = tempfile.mkstemp(suffix=f".{ext}", dir=UPLOAD_DIR)
61
+ with os.fdopen(fd, "wb") as fh:
62
+ fh.write(content)
63
+
64
+ # Parse model keys
65
+ keys = [k.strip() for k in model_keys.split(",") if k.strip()]
66
+ if not keys:
67
+ keys = ["regex", "spacy", "deberta"]
68
+
69
+ # Create and enqueue job
70
+ job_id = _queue.create_job(tmp_path, keys)
71
+ await _queue.enqueue(job_id)
72
+
73
+ return JSONResponse(
74
+ content={
75
+ "job_id": job_id,
76
+ "status": "queued",
77
+ "file_size_mb": round(len(content) / (1024 * 1024), 2),
78
+ "model_keys": keys,
79
+ }
80
+ )
81
+
82
+
83
+ @router.get("/status/{job_id}")
84
+ async def video_status(job_id: str):
85
+ """
86
+ Return the current job status.
87
+
88
+ Response shape:
89
+ {
90
+ "status": "queued" | "extracting" | "scanning" | "done" | "error" | "cancelled",
91
+ "progress": 0-100,
92
+ "stage_detail": "Human-readable current stage",
93
+ "result": null | { per_model, ranked, union_total, elapsed, parsed_text },
94
+ "error": null | "error message string",
95
+ "created_at": unix timestamp
96
+ }
97
+ """
98
+ entry = _queue.get_job(job_id)
99
+ if entry is None:
100
+ raise HTTPException(
101
+ status_code=404,
102
+ detail=f"Job '{job_id}' not found. It may have expired.",
103
+ )
104
+ return JSONResponse(content=entry)
105
+
106
+
107
+ @router.delete("/cancel/{job_id}")
108
+ async def video_cancel(job_id: str):
109
+ """
110
+ Cancel a queued job.
111
+ Returns 409 if the job is already running or completed.
112
+ """
113
+ success = _queue.cancel_job(job_id)
114
+ if not success:
115
+ raise HTTPException(
116
+ status_code=409,
117
+ detail=(
118
+ "Cannot cancel: job is already running, completed, "
119
+ "or does not exist in queue."
120
+ ),
121
+ )
122
+ return JSONResponse(content={"cancelled": True, "job_id": job_id})