""" Video Pipeline — 4-channel text extraction for PII detection. Channels: 1. Metadata — ffprobe JSON → flat key: value text 2. Subtitles — embedded SRT/VTT tracks demuxed via ffmpeg 3. Visual — EasyOCR (Text) + Presidio Image Analyzer (Entities) on keyframes 4. Audio — ffmpeg WAV + faster-whisper (base) transcription Returns merged plain text with source attribution headers so the downstream NLP models receive clearly labelled, scannable content. All external calls are guarded with timeouts so this never hangs forever. """ from __future__ import annotations import glob import json import os import re import subprocess import tempfile from typing import Callable, Optional try: from PIL import Image except ImportError: pass # ── Lazy Whisper loader ────────────────────────────────────────────────────── WHISPER_MODEL_SIZE = "base" _whisper_model = None def _get_whisper(): global _whisper_model if _whisper_model is None: try: from faster_whisper import WhisperModel _whisper_model = WhisperModel( WHISPER_MODEL_SIZE, device="cpu", compute_type="int8", # quantised — runs on CPU without VRAM ) except ImportError: raise RuntimeError( "faster-whisper is not installed. " "Run: pip install faster-whisper" ) return _whisper_model # ── Lazy Visual Loaders (EasyOCR & Presidio Image) ─────────────────────────── _easyocr_reader = None _presidio_image_analyzer = None def _get_easyocr(): global _easyocr_reader if _easyocr_reader is None: try: import easyocr # Load English, run on CPU for wider compatibility _easyocr_reader = easyocr.Reader(['en'], gpu=False, verbose=False) except ImportError: raise RuntimeError("easyocr is not installed.") return _easyocr_reader def _get_presidio_image_analyzer(): global _presidio_image_analyzer if _presidio_image_analyzer is None: try: from presidio_image_redactor import ImageAnalyzerEngine _presidio_image_analyzer = ImageAnalyzerEngine() except ImportError: raise RuntimeError("presidio-image-redactor is not installed.") return _presidio_image_analyzer # ── ffmpeg availability ─────────────────────────────────────────────────────── def _check_ffmpeg() -> bool: """Return True if ffmpeg/ffprobe are available in PATH.""" try: subprocess.run( ["ffmpeg", "-version"], capture_output=True, check=True, timeout=5, ) return True except Exception: return False # ── Channel 1: Metadata ─────────────────────────────────────────────────────── def _extract_metadata(video_path: str) -> str: """Use ffprobe to extract all metadata tags as flat key: value text.""" try: result = subprocess.run( [ "ffprobe", "-v", "quiet", "-print_format", "json", "-show_format", "-show_streams", video_path, ], capture_output=True, text=True, timeout=30, check=True, ) data = json.loads(result.stdout) lines = ["[METADATA]"] fmt_tags = data.get("format", {}).get("tags", {}) for key, val in fmt_tags.items(): lines.append(f"{key}: {val}") for idx, stream in enumerate(data.get("streams", [])): stream_tags = stream.get("tags", {}) for key, val in stream_tags.items(): lines.append(f"stream{idx}_{key}: {val}") return "\n".join(lines) if len(lines) > 1 else "" except Exception as exc: return f"[METADATA]\n[Could not extract: {exc}]" # ── Channel 2: Subtitles ────────────────────────────────────────────────────── _TS_PATTERN = re.compile(r"(\d{2}:\d{2}:\d{2}),\d{3} --> ") _TS_CAPTURE = re.compile(r"(\d{2}:\d{2}:\d{2})") def _parse_srt(raw: str) -> list[str]: """Convert raw SRT content to clean timestamped lines.""" lines = raw.splitlines() result = [] i = 0 while i < len(lines): line = lines[i].strip() if _TS_PATTERN.match(line): ts_match = _TS_CAPTURE.match(line) label = ts_match.group(1) if ts_match else "" i += 1 parts: list[str] = [] while i < len(lines) and lines[i].strip(): parts.append(lines[i].strip()) i += 1 if parts: result.append(f"[{label}] {' '.join(parts)}") i += 1 return result def _extract_subtitles(video_path: str, tmp_dir: str) -> str: """Demux the first embedded subtitle track; parse and return clean text.""" srt_path = os.path.join(tmp_dir, "subs.srt") try: subprocess.run( ["ffmpeg", "-y", "-i", video_path, "-map", "0:s:0", srt_path], capture_output=True, timeout=60, check=True, ) except Exception: return "" # No subtitles is normal — not an error if not os.path.exists(srt_path) or os.path.getsize(srt_path) == 0: return "" with open(srt_path, encoding="utf-8", errors="replace") as fh: raw = fh.read() lines = _parse_srt(raw) if not lines: return "" return "[SUBTITLES]\n" + "\n".join(lines) # ── Channel 3: Audio Transcript ─────────────────────────────────────────────── def _extract_audio_transcript( video_path: str, tmp_dir: str, progress_cb: Optional[Callable[[int, str], None]] = None, ) -> str: """ Extract audio track with ffmpeg → transcribe with faster-whisper (base). progress_cb(percent: int, detail: str) is called throughout transcription. """ wav_path = os.path.join(tmp_dir, "audio.wav") # Step A: Extract audio as 16 kHz mono WAV (Whisper requirement) try: subprocess.run( [ "ffmpeg", "-y", "-i", video_path, "-vn", # no video "-acodec", "pcm_s16le", # 16-bit PCM "-ar", "16000", # 16 kHz sample rate "-ac", "1", # mono wav_path, ], capture_output=True, timeout=180, check=True, ) except subprocess.TimeoutExpired: return "[AUDIO TRANSCRIPT]\n[Audio extraction timed out after 3 minutes]" except Exception as exc: return f"[AUDIO TRANSCRIPT]\n[Audio extraction failed: {exc}]" if not os.path.exists(wav_path) or os.path.getsize(wav_path) < 1024: return "[AUDIO TRANSCRIPT]\n[No audio track found in this video]" # Step B: Transcribe try: model = _get_whisper() if progress_cb: progress_cb(0, "Starting Whisper transcription…") segments_iter, info = model.transcribe( wav_path, beam_size=5, language=None, # auto-detect vad_filter=True, # skip silent sections — faster ) duration = float(info.duration) if info.duration else 1.0 transcript_lines = ["[AUDIO TRANSCRIPT]"] for seg in segments_iter: start_s = int(seg.start) end_s = int(seg.end) ts = f"{start_s // 60:02d}:{start_s % 60:02d} → {end_s // 60:02d}:{end_s % 60:02d}" transcript_lines.append(f"[{ts}] {seg.text.strip()}") if progress_cb: pct = min(int((seg.end / duration) * 100), 99) dur_fmt = f"{int(duration // 60):02d}:{int(duration % 60):02d}" progress_cb(pct, f"Transcribing audio… {ts} / {dur_fmt}") return "\n".join(transcript_lines) except Exception as exc: return f"[AUDIO TRANSCRIPT]\n[Transcription failed: {exc}]" # ── Channel 4: Visual Stream (OCR & Presidio Image) ────────────────────────── def _extract_visual_stream( video_path: str, tmp_dir: str, progress_cb: Optional[Callable[[int, str], None]] = None, ) -> str: """ Extract 1 frame every 2 seconds. Run EasyOCR to get text. Run Presidio Image Redactor to find visual PII entities. """ frames_dir = os.path.join(tmp_dir, "frames") os.makedirs(frames_dir, exist_ok=True) try: subprocess.run( [ "ffmpeg", "-y", "-i", video_path, "-vf", "fps=1/2", os.path.join(frames_dir, "frame_%04d.jpg") ], capture_output=True, timeout=120, check=True ) except Exception as exc: return f"[VISUAL STREAM]\n[Frame extraction failed: {exc}]" frame_files = sorted(glob.glob(os.path.join(frames_dir, "*.jpg"))) if not frame_files: return "" try: reader = _get_easyocr() image_analyzer = _get_presidio_image_analyzer() except Exception as exc: return f"[VISUAL STREAM]\n[Failed to load visual models: {exc}]" total_frames = len(frame_files) visual_lines = ["[VISUAL STREAM OCR & PRESIDIO]"] seen_text = set() for idx, frame_path in enumerate(frame_files): if progress_cb: progress_cb(min(int((idx / total_frames) * 100), 99), f"Scanning visual frame {idx+1}/{total_frames}…") time_s = idx * 2 ts = f"{time_s // 60:02d}:{time_s % 60:02d}" try: # 1. EasyOCR results = reader.readtext(frame_path) for (bbox, text, prob) in results: if prob > 0.5: clean_text = text.strip() if len(clean_text) > 3 and clean_text not in seen_text: seen_text.add(clean_text) visual_lines.append(f"[{ts}] [TEXT] {clean_text}") # 2. Presidio Image Redactor if 'Image' in globals(): img = Image.open(frame_path) bboxes = image_analyzer.analyze(image=img) found_entities = set([b.entity_type for b in bboxes if b.score > 0.4]) for entity in found_entities: visual_lines.append(f"[{ts}] [VISUAL PII (Presidio)] Detected {entity}") except Exception: continue if len(visual_lines) == 1: return "" return "\n".join(visual_lines) # ── Public entry point ──────────────────────────────────────────────────────── def process_video( video_path: str, progress_cb: Optional[Callable[[int, str], None]] = None, ) -> str: """ Full 4-channel extraction pipeline. Returns merged text with source-attribution headers. progress_cb(percent: int, detail: str) — called throughout; percent maps 0 → 95 (final 5% is model scanning) """ if not _check_ffmpeg(): return ( "[ERROR] ffmpeg not found in PATH.\n" "On HuggingFace Spaces add 'ffmpeg' to packages.txt.\n" "Locally: https://ffmpeg.org/download.html" ) parts: list[str] = [] with tempfile.TemporaryDirectory() as tmp_dir: # ── Channel 1: Metadata (instant, ~1 s) ───────────────────────────── if progress_cb: progress_cb(3, "Extracting video metadata…") meta = _extract_metadata(video_path) if meta: parts.append(meta) # ── Channel 2: Subtitles (fast, ~2–5 s) ───────────────────────────── if progress_cb: progress_cb(8, "Demuxing embedded subtitles…") subs = _extract_subtitles(video_path, tmp_dir) if subs: parts.append(subs) # ── Channel 3: Visual Stream (EasyOCR & Presidio) ─────────────────── if progress_cb: progress_cb(15, "Initialising Visual Models (EasyOCR & Presidio)…") def _visual_progress(pct: int, detail: str) -> None: # Remap visual progress: 15 % -> 50 % mapped = 15 + int(pct * 0.35) if progress_cb: progress_cb(mapped, detail) visual_text = _extract_visual_stream(video_path, tmp_dir, _visual_progress) if visual_text: parts.append(visual_text) # ── Channel 4: Audio Transcript (slow — Whisper) ───────────────────── if progress_cb: progress_cb(50, "Initialising Whisper model…") def _audio_progress(pct: int, detail: str) -> None: # Remap audio progress: 50 % → 93 % mapped = 50 + int(pct * 0.43) if progress_cb: progress_cb(mapped, detail) transcript = _extract_audio_transcript(video_path, tmp_dir, _audio_progress) if transcript: parts.append(transcript) if progress_cb: progress_cb(95, "Extraction complete — handing off to PII models…") if not parts: return "[No extractable text found in this video file]" return "\n\n".join(parts)