""" Video Router — FastAPI router for async video PII processing. Endpoints: POST /api/video/upload — accept video, start async job, return job_id GET /api/video/status/{job_id} — poll job status + progress + result DELETE /api/video/cancel/{job_id} — cancel a queued job """ from __future__ import annotations import os import tempfile from fastapi import APIRouter, File, Form, HTTPException, UploadFile from fastapi.responses import JSONResponse import video_job_queue as _queue router = APIRouter(prefix="/api/video", tags=["video"]) ALLOWED_EXTENSIONS = {"mp4", "mkv", "avi", "mov", "webm"} MAX_VIDEO_BYTES = 500 * 1024 * 1024 # 500 MB hard limit UPLOAD_DIR = "/tmp/video_uploads" @router.post("/upload") async def video_upload( file: UploadFile = File(...), model_keys: str = Form("regex,spacy,deberta"), ): """ Accept a video file upload. Saves bytes to /tmp/, creates an async job, returns {job_id} immediately. The client should poll GET /api/video/status/{job_id} every ~3 seconds. """ # Validate extension filename = file.filename or "" ext = filename.rsplit(".", 1)[-1].lower() if "." in filename else "" if ext not in ALLOWED_EXTENSIONS: raise HTTPException( status_code=400, detail=( f"Unsupported video format '{ext}'. " f"Supported: {', '.join(sorted(ALLOWED_EXTENSIONS))}" ), ) # Read and size-check content = await file.read() if len(content) > MAX_VIDEO_BYTES: raise HTTPException( status_code=413, detail=f"File is {len(content) // (1024*1024)} MB — limit is 500 MB.", ) if len(content) < 1024: raise HTTPException(status_code=400, detail="File appears to be empty.") # Persist to temp dir os.makedirs(UPLOAD_DIR, exist_ok=True) fd, tmp_path = tempfile.mkstemp(suffix=f".{ext}", dir=UPLOAD_DIR) with os.fdopen(fd, "wb") as fh: fh.write(content) # Parse model keys keys = [k.strip() for k in model_keys.split(",") if k.strip()] if not keys: keys = ["regex", "spacy", "deberta"] # Create and enqueue job job_id = _queue.create_job(tmp_path, keys) await _queue.enqueue(job_id) return JSONResponse( content={ "job_id": job_id, "status": "queued", "file_size_mb": round(len(content) / (1024 * 1024), 2), "model_keys": keys, } ) @router.get("/status/{job_id}") async def video_status(job_id: str): """ Return the current job status. Response shape: { "status": "queued" | "extracting" | "scanning" | "done" | "error" | "cancelled", "progress": 0-100, "stage_detail": "Human-readable current stage", "result": null | { per_model, ranked, union_total, elapsed, parsed_text }, "error": null | "error message string", "created_at": unix timestamp } """ entry = _queue.get_job(job_id) if entry is None: raise HTTPException( status_code=404, detail=f"Job '{job_id}' not found. It may have expired.", ) return JSONResponse(content=entry) @router.delete("/cancel/{job_id}") async def video_cancel(job_id: str): """ Cancel a queued job. Returns 409 if the job is already running or completed. """ success = _queue.cancel_job(job_id) if not success: raise HTTPException( status_code=409, detail=( "Cannot cancel: job is already running, completed, " "or does not exist in queue." ), ) return JSONResponse(content={"cancelled": True, "job_id": job_id})