Spaces:
Running
Running
| """ | |
| Video Pipeline β 4-channel text extraction for PII detection. | |
| Channels: | |
| 1. Metadata β ffprobe JSON β flat key: value text | |
| 2. Subtitles β embedded SRT/VTT tracks demuxed via ffmpeg | |
| 3. Visual β EasyOCR (Text) + Presidio Image Analyzer (Entities) on keyframes | |
| 4. Audio β ffmpeg WAV + faster-whisper (base) transcription | |
| Returns merged plain text with source attribution headers so the | |
| downstream NLP models receive clearly labelled, scannable content. | |
| All external calls are guarded with timeouts so this never hangs forever. | |
| """ | |
| from __future__ import annotations | |
| import glob | |
| import json | |
| import os | |
| import re | |
| import subprocess | |
| import tempfile | |
| from typing import Callable, Optional | |
| try: | |
| from PIL import Image | |
| except ImportError: | |
| pass | |
| # ββ Lazy Whisper loader ββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| WHISPER_MODEL_SIZE = "base" | |
| _whisper_model = None | |
| def _get_whisper(): | |
| global _whisper_model | |
| if _whisper_model is None: | |
| try: | |
| from faster_whisper import WhisperModel | |
| _whisper_model = WhisperModel( | |
| WHISPER_MODEL_SIZE, | |
| device="cpu", | |
| compute_type="int8", # quantised β runs on CPU without VRAM | |
| ) | |
| except ImportError: | |
| raise RuntimeError( | |
| "faster-whisper is not installed. " | |
| "Run: pip install faster-whisper" | |
| ) | |
| return _whisper_model | |
| # ββ Lazy Visual Loaders (EasyOCR & Presidio Image) βββββββββββββββββββββββββββ | |
| _easyocr_reader = None | |
| _presidio_image_analyzer = None | |
| def _get_easyocr(): | |
| global _easyocr_reader | |
| if _easyocr_reader is None: | |
| try: | |
| import easyocr | |
| # Load English, run on CPU for wider compatibility | |
| _easyocr_reader = easyocr.Reader(['en'], gpu=False, verbose=False) | |
| except ImportError: | |
| raise RuntimeError("easyocr is not installed.") | |
| return _easyocr_reader | |
| def _get_presidio_image_analyzer(): | |
| global _presidio_image_analyzer | |
| if _presidio_image_analyzer is None: | |
| try: | |
| from presidio_image_redactor import ImageAnalyzerEngine | |
| _presidio_image_analyzer = ImageAnalyzerEngine() | |
| except ImportError: | |
| raise RuntimeError("presidio-image-redactor is not installed.") | |
| return _presidio_image_analyzer | |
| # ββ ffmpeg availability βββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| def _check_ffmpeg() -> bool: | |
| """Return True if ffmpeg/ffprobe are available in PATH.""" | |
| try: | |
| subprocess.run( | |
| ["ffmpeg", "-version"], | |
| capture_output=True, check=True, timeout=5, | |
| ) | |
| return True | |
| except Exception: | |
| return False | |
| # ββ Channel 1: Metadata βββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| def _extract_metadata(video_path: str) -> str: | |
| """Use ffprobe to extract all metadata tags as flat key: value text.""" | |
| try: | |
| result = subprocess.run( | |
| [ | |
| "ffprobe", "-v", "quiet", | |
| "-print_format", "json", | |
| "-show_format", "-show_streams", | |
| video_path, | |
| ], | |
| capture_output=True, text=True, timeout=30, check=True, | |
| ) | |
| data = json.loads(result.stdout) | |
| lines = ["[METADATA]"] | |
| fmt_tags = data.get("format", {}).get("tags", {}) | |
| for key, val in fmt_tags.items(): | |
| lines.append(f"{key}: {val}") | |
| for idx, stream in enumerate(data.get("streams", [])): | |
| stream_tags = stream.get("tags", {}) | |
| for key, val in stream_tags.items(): | |
| lines.append(f"stream{idx}_{key}: {val}") | |
| return "\n".join(lines) if len(lines) > 1 else "" | |
| except Exception as exc: | |
| return f"[METADATA]\n[Could not extract: {exc}]" | |
| # ββ Channel 2: Subtitles ββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| _TS_PATTERN = re.compile(r"(\d{2}:\d{2}:\d{2}),\d{3} --> ") | |
| _TS_CAPTURE = re.compile(r"(\d{2}:\d{2}:\d{2})") | |
| def _parse_srt(raw: str) -> list[str]: | |
| """Convert raw SRT content to clean timestamped lines.""" | |
| lines = raw.splitlines() | |
| result = [] | |
| i = 0 | |
| while i < len(lines): | |
| line = lines[i].strip() | |
| if _TS_PATTERN.match(line): | |
| ts_match = _TS_CAPTURE.match(line) | |
| label = ts_match.group(1) if ts_match else "" | |
| i += 1 | |
| parts: list[str] = [] | |
| while i < len(lines) and lines[i].strip(): | |
| parts.append(lines[i].strip()) | |
| i += 1 | |
| if parts: | |
| result.append(f"[{label}] {' '.join(parts)}") | |
| i += 1 | |
| return result | |
| def _extract_subtitles(video_path: str, tmp_dir: str) -> str: | |
| """Demux the first embedded subtitle track; parse and return clean text.""" | |
| srt_path = os.path.join(tmp_dir, "subs.srt") | |
| try: | |
| subprocess.run( | |
| ["ffmpeg", "-y", "-i", video_path, "-map", "0:s:0", srt_path], | |
| capture_output=True, timeout=60, check=True, | |
| ) | |
| except Exception: | |
| return "" # No subtitles is normal β not an error | |
| if not os.path.exists(srt_path) or os.path.getsize(srt_path) == 0: | |
| return "" | |
| with open(srt_path, encoding="utf-8", errors="replace") as fh: | |
| raw = fh.read() | |
| lines = _parse_srt(raw) | |
| if not lines: | |
| return "" | |
| return "[SUBTITLES]\n" + "\n".join(lines) | |
| # ββ Channel 3: Audio Transcript βββββββββββββββββββββββββββββββββββββββββββββββ | |
| def _extract_audio_transcript( | |
| video_path: str, | |
| tmp_dir: str, | |
| progress_cb: Optional[Callable[[int, str], None]] = None, | |
| ) -> str: | |
| """ | |
| Extract audio track with ffmpeg β transcribe with faster-whisper (base). | |
| progress_cb(percent: int, detail: str) is called throughout transcription. | |
| """ | |
| wav_path = os.path.join(tmp_dir, "audio.wav") | |
| # Step A: Extract audio as 16 kHz mono WAV (Whisper requirement) | |
| try: | |
| subprocess.run( | |
| [ | |
| "ffmpeg", "-y", "-i", video_path, | |
| "-vn", # no video | |
| "-acodec", "pcm_s16le", # 16-bit PCM | |
| "-ar", "16000", # 16 kHz sample rate | |
| "-ac", "1", # mono | |
| wav_path, | |
| ], | |
| capture_output=True, timeout=180, check=True, | |
| ) | |
| except subprocess.TimeoutExpired: | |
| return "[AUDIO TRANSCRIPT]\n[Audio extraction timed out after 3 minutes]" | |
| except Exception as exc: | |
| return f"[AUDIO TRANSCRIPT]\n[Audio extraction failed: {exc}]" | |
| if not os.path.exists(wav_path) or os.path.getsize(wav_path) < 1024: | |
| return "[AUDIO TRANSCRIPT]\n[No audio track found in this video]" | |
| # Step B: Transcribe | |
| try: | |
| model = _get_whisper() | |
| if progress_cb: | |
| progress_cb(0, "Starting Whisper transcriptionβ¦") | |
| segments_iter, info = model.transcribe( | |
| wav_path, | |
| beam_size=5, | |
| language=None, # auto-detect | |
| vad_filter=True, # skip silent sections β faster | |
| ) | |
| duration = float(info.duration) if info.duration else 1.0 | |
| transcript_lines = ["[AUDIO TRANSCRIPT]"] | |
| for seg in segments_iter: | |
| start_s = int(seg.start) | |
| end_s = int(seg.end) | |
| ts = f"{start_s // 60:02d}:{start_s % 60:02d} β {end_s // 60:02d}:{end_s % 60:02d}" | |
| transcript_lines.append(f"[{ts}] {seg.text.strip()}") | |
| if progress_cb: | |
| pct = min(int((seg.end / duration) * 100), 99) | |
| dur_fmt = f"{int(duration // 60):02d}:{int(duration % 60):02d}" | |
| progress_cb(pct, f"Transcribing audio⦠{ts} / {dur_fmt}") | |
| return "\n".join(transcript_lines) | |
| except Exception as exc: | |
| return f"[AUDIO TRANSCRIPT]\n[Transcription failed: {exc}]" | |
| # ββ Channel 4: Visual Stream (OCR & Presidio Image) ββββββββββββββββββββββββββ | |
| def _extract_visual_stream( | |
| video_path: str, | |
| tmp_dir: str, | |
| progress_cb: Optional[Callable[[int, str], None]] = None, | |
| ) -> str: | |
| """ | |
| Extract 1 frame every 2 seconds. | |
| Run EasyOCR to get text. | |
| Run Presidio Image Redactor to find visual PII entities. | |
| """ | |
| frames_dir = os.path.join(tmp_dir, "frames") | |
| os.makedirs(frames_dir, exist_ok=True) | |
| try: | |
| subprocess.run( | |
| [ | |
| "ffmpeg", "-y", "-i", video_path, | |
| "-vf", "fps=1/2", | |
| os.path.join(frames_dir, "frame_%04d.jpg") | |
| ], | |
| capture_output=True, timeout=120, check=True | |
| ) | |
| except Exception as exc: | |
| return f"[VISUAL STREAM]\n[Frame extraction failed: {exc}]" | |
| frame_files = sorted(glob.glob(os.path.join(frames_dir, "*.jpg"))) | |
| if not frame_files: | |
| return "" | |
| try: | |
| reader = _get_easyocr() | |
| image_analyzer = _get_presidio_image_analyzer() | |
| except Exception as exc: | |
| return f"[VISUAL STREAM]\n[Failed to load visual models: {exc}]" | |
| total_frames = len(frame_files) | |
| visual_lines = ["[VISUAL STREAM OCR & PRESIDIO]"] | |
| seen_text = set() | |
| for idx, frame_path in enumerate(frame_files): | |
| if progress_cb: | |
| progress_cb(min(int((idx / total_frames) * 100), 99), f"Scanning visual frame {idx+1}/{total_frames}β¦") | |
| time_s = idx * 2 | |
| ts = f"{time_s // 60:02d}:{time_s % 60:02d}" | |
| try: | |
| # 1. EasyOCR | |
| results = reader.readtext(frame_path) | |
| for (bbox, text, prob) in results: | |
| if prob > 0.5: | |
| clean_text = text.strip() | |
| if len(clean_text) > 3 and clean_text not in seen_text: | |
| seen_text.add(clean_text) | |
| visual_lines.append(f"[{ts}] [TEXT] {clean_text}") | |
| # 2. Presidio Image Redactor | |
| if 'Image' in globals(): | |
| img = Image.open(frame_path) | |
| bboxes = image_analyzer.analyze(image=img) | |
| found_entities = set([b.entity_type for b in bboxes if b.score > 0.4]) | |
| for entity in found_entities: | |
| visual_lines.append(f"[{ts}] [VISUAL PII (Presidio)] Detected {entity}") | |
| except Exception: | |
| continue | |
| if len(visual_lines) == 1: | |
| return "" | |
| return "\n".join(visual_lines) | |
| # ββ Public entry point ββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| def process_video( | |
| video_path: str, | |
| progress_cb: Optional[Callable[[int, str], None]] = None, | |
| ) -> str: | |
| """ | |
| Full 4-channel extraction pipeline. | |
| Returns merged text with source-attribution headers. | |
| progress_cb(percent: int, detail: str) | |
| β called throughout; percent maps 0 β 95 (final 5% is model scanning) | |
| """ | |
| if not _check_ffmpeg(): | |
| return ( | |
| "[ERROR] ffmpeg not found in PATH.\n" | |
| "On HuggingFace Spaces add 'ffmpeg' to packages.txt.\n" | |
| "Locally: https://ffmpeg.org/download.html" | |
| ) | |
| parts: list[str] = [] | |
| with tempfile.TemporaryDirectory() as tmp_dir: | |
| # ββ Channel 1: Metadata (instant, ~1 s) βββββββββββββββββββββββββββββ | |
| if progress_cb: | |
| progress_cb(3, "Extracting video metadataβ¦") | |
| meta = _extract_metadata(video_path) | |
| if meta: | |
| parts.append(meta) | |
| # ββ Channel 2: Subtitles (fast, ~2β5 s) βββββββββββββββββββββββββββββ | |
| if progress_cb: | |
| progress_cb(8, "Demuxing embedded subtitlesβ¦") | |
| subs = _extract_subtitles(video_path, tmp_dir) | |
| if subs: | |
| parts.append(subs) | |
| # ββ Channel 3: Visual Stream (EasyOCR & Presidio) βββββββββββββββββββ | |
| if progress_cb: | |
| progress_cb(15, "Initialising Visual Models (EasyOCR & Presidio)β¦") | |
| def _visual_progress(pct: int, detail: str) -> None: | |
| # Remap visual progress: 15 % -> 50 % | |
| mapped = 15 + int(pct * 0.35) | |
| if progress_cb: | |
| progress_cb(mapped, detail) | |
| visual_text = _extract_visual_stream(video_path, tmp_dir, _visual_progress) | |
| if visual_text: | |
| parts.append(visual_text) | |
| # ββ Channel 4: Audio Transcript (slow β Whisper) βββββββββββββββββββββ | |
| if progress_cb: | |
| progress_cb(50, "Initialising Whisper modelβ¦") | |
| def _audio_progress(pct: int, detail: str) -> None: | |
| # Remap audio progress: 50 % β 93 % | |
| mapped = 50 + int(pct * 0.43) | |
| if progress_cb: | |
| progress_cb(mapped, detail) | |
| transcript = _extract_audio_transcript(video_path, tmp_dir, _audio_progress) | |
| if transcript: | |
| parts.append(transcript) | |
| if progress_cb: | |
| progress_cb(95, "Extraction complete β handing off to PII modelsβ¦") | |
| if not parts: | |
| return "[No extractable text found in this video file]" | |
| return "\n\n".join(parts) | |