sense-backend / video_router.py
SHAFI
added video feature
413cf21
"""
Video Router — FastAPI router for async video PII processing.
Endpoints:
POST /api/video/upload — accept video, start async job, return job_id
GET /api/video/status/{job_id} — poll job status + progress + result
DELETE /api/video/cancel/{job_id} — cancel a queued job
"""
from __future__ import annotations
import os
import tempfile
from fastapi import APIRouter, File, Form, HTTPException, UploadFile
from fastapi.responses import JSONResponse
import video_job_queue as _queue
router = APIRouter(prefix="/api/video", tags=["video"])
ALLOWED_EXTENSIONS = {"mp4", "mkv", "avi", "mov", "webm"}
MAX_VIDEO_BYTES = 500 * 1024 * 1024 # 500 MB hard limit
UPLOAD_DIR = "/tmp/video_uploads"
@router.post("/upload")
async def video_upload(
file: UploadFile = File(...),
model_keys: str = Form("regex,spacy,deberta"),
):
"""
Accept a video file upload.
Saves bytes to /tmp/, creates an async job, returns {job_id} immediately.
The client should poll GET /api/video/status/{job_id} every ~3 seconds.
"""
# Validate extension
filename = file.filename or ""
ext = filename.rsplit(".", 1)[-1].lower() if "." in filename else ""
if ext not in ALLOWED_EXTENSIONS:
raise HTTPException(
status_code=400,
detail=(
f"Unsupported video format '{ext}'. "
f"Supported: {', '.join(sorted(ALLOWED_EXTENSIONS))}"
),
)
# Read and size-check
content = await file.read()
if len(content) > MAX_VIDEO_BYTES:
raise HTTPException(
status_code=413,
detail=f"File is {len(content) // (1024*1024)} MB — limit is 500 MB.",
)
if len(content) < 1024:
raise HTTPException(status_code=400, detail="File appears to be empty.")
# Persist to temp dir
os.makedirs(UPLOAD_DIR, exist_ok=True)
fd, tmp_path = tempfile.mkstemp(suffix=f".{ext}", dir=UPLOAD_DIR)
with os.fdopen(fd, "wb") as fh:
fh.write(content)
# Parse model keys
keys = [k.strip() for k in model_keys.split(",") if k.strip()]
if not keys:
keys = ["regex", "spacy", "deberta"]
# Create and enqueue job
job_id = _queue.create_job(tmp_path, keys)
await _queue.enqueue(job_id)
return JSONResponse(
content={
"job_id": job_id,
"status": "queued",
"file_size_mb": round(len(content) / (1024 * 1024), 2),
"model_keys": keys,
}
)
@router.get("/status/{job_id}")
async def video_status(job_id: str):
"""
Return the current job status.
Response shape:
{
"status": "queued" | "extracting" | "scanning" | "done" | "error" | "cancelled",
"progress": 0-100,
"stage_detail": "Human-readable current stage",
"result": null | { per_model, ranked, union_total, elapsed, parsed_text },
"error": null | "error message string",
"created_at": unix timestamp
}
"""
entry = _queue.get_job(job_id)
if entry is None:
raise HTTPException(
status_code=404,
detail=f"Job '{job_id}' not found. It may have expired.",
)
return JSONResponse(content=entry)
@router.delete("/cancel/{job_id}")
async def video_cancel(job_id: str):
"""
Cancel a queued job.
Returns 409 if the job is already running or completed.
"""
success = _queue.cancel_job(job_id)
if not success:
raise HTTPException(
status_code=409,
detail=(
"Cannot cancel: job is already running, completed, "
"or does not exist in queue."
),
)
return JSONResponse(content={"cancelled": True, "job_id": job_id})