Spaces:
Running
Running
| """ | |
| Video Router — FastAPI router for async video PII processing. | |
| Endpoints: | |
| POST /api/video/upload — accept video, start async job, return job_id | |
| GET /api/video/status/{job_id} — poll job status + progress + result | |
| DELETE /api/video/cancel/{job_id} — cancel a queued job | |
| """ | |
| from __future__ import annotations | |
| import os | |
| import tempfile | |
| from fastapi import APIRouter, File, Form, HTTPException, UploadFile | |
| from fastapi.responses import JSONResponse | |
| import video_job_queue as _queue | |
| router = APIRouter(prefix="/api/video", tags=["video"]) | |
| ALLOWED_EXTENSIONS = {"mp4", "mkv", "avi", "mov", "webm"} | |
| MAX_VIDEO_BYTES = 500 * 1024 * 1024 # 500 MB hard limit | |
| UPLOAD_DIR = "/tmp/video_uploads" | |
| async def video_upload( | |
| file: UploadFile = File(...), | |
| model_keys: str = Form("regex,spacy,deberta"), | |
| ): | |
| """ | |
| Accept a video file upload. | |
| Saves bytes to /tmp/, creates an async job, returns {job_id} immediately. | |
| The client should poll GET /api/video/status/{job_id} every ~3 seconds. | |
| """ | |
| # Validate extension | |
| filename = file.filename or "" | |
| ext = filename.rsplit(".", 1)[-1].lower() if "." in filename else "" | |
| if ext not in ALLOWED_EXTENSIONS: | |
| raise HTTPException( | |
| status_code=400, | |
| detail=( | |
| f"Unsupported video format '{ext}'. " | |
| f"Supported: {', '.join(sorted(ALLOWED_EXTENSIONS))}" | |
| ), | |
| ) | |
| # Read and size-check | |
| content = await file.read() | |
| if len(content) > MAX_VIDEO_BYTES: | |
| raise HTTPException( | |
| status_code=413, | |
| detail=f"File is {len(content) // (1024*1024)} MB — limit is 500 MB.", | |
| ) | |
| if len(content) < 1024: | |
| raise HTTPException(status_code=400, detail="File appears to be empty.") | |
| # Persist to temp dir | |
| os.makedirs(UPLOAD_DIR, exist_ok=True) | |
| fd, tmp_path = tempfile.mkstemp(suffix=f".{ext}", dir=UPLOAD_DIR) | |
| with os.fdopen(fd, "wb") as fh: | |
| fh.write(content) | |
| # Parse model keys | |
| keys = [k.strip() for k in model_keys.split(",") if k.strip()] | |
| if not keys: | |
| keys = ["regex", "spacy", "deberta"] | |
| # Create and enqueue job | |
| job_id = _queue.create_job(tmp_path, keys) | |
| await _queue.enqueue(job_id) | |
| return JSONResponse( | |
| content={ | |
| "job_id": job_id, | |
| "status": "queued", | |
| "file_size_mb": round(len(content) / (1024 * 1024), 2), | |
| "model_keys": keys, | |
| } | |
| ) | |
| async def video_status(job_id: str): | |
| """ | |
| Return the current job status. | |
| Response shape: | |
| { | |
| "status": "queued" | "extracting" | "scanning" | "done" | "error" | "cancelled", | |
| "progress": 0-100, | |
| "stage_detail": "Human-readable current stage", | |
| "result": null | { per_model, ranked, union_total, elapsed, parsed_text }, | |
| "error": null | "error message string", | |
| "created_at": unix timestamp | |
| } | |
| """ | |
| entry = _queue.get_job(job_id) | |
| if entry is None: | |
| raise HTTPException( | |
| status_code=404, | |
| detail=f"Job '{job_id}' not found. It may have expired.", | |
| ) | |
| return JSONResponse(content=entry) | |
| async def video_cancel(job_id: str): | |
| """ | |
| Cancel a queued job. | |
| Returns 409 if the job is already running or completed. | |
| """ | |
| success = _queue.cancel_job(job_id) | |
| if not success: | |
| raise HTTPException( | |
| status_code=409, | |
| detail=( | |
| "Cannot cancel: job is already running, completed, " | |
| "or does not exist in queue." | |
| ), | |
| ) | |
| return JSONResponse(content={"cancelled": True, "job_id": job_id}) | |