File size: 3,786 Bytes
413cf21
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
"""
Video Router — FastAPI router for async video PII processing.

Endpoints:
  POST   /api/video/upload          — accept video, start async job, return job_id
  GET    /api/video/status/{job_id} — poll job status + progress + result
  DELETE /api/video/cancel/{job_id} — cancel a queued job
"""
from __future__ import annotations

import os
import tempfile

from fastapi import APIRouter, File, Form, HTTPException, UploadFile
from fastapi.responses import JSONResponse

import video_job_queue as _queue

router = APIRouter(prefix="/api/video", tags=["video"])

ALLOWED_EXTENSIONS = {"mp4", "mkv", "avi", "mov", "webm"}
MAX_VIDEO_BYTES    = 500 * 1024 * 1024   # 500 MB hard limit
UPLOAD_DIR         = "/tmp/video_uploads"


@router.post("/upload")
async def video_upload(
    file: UploadFile = File(...),
    model_keys: str = Form("regex,spacy,deberta"),
):
    """
    Accept a video file upload.
    Saves bytes to /tmp/, creates an async job, returns {job_id} immediately.
    The client should poll GET /api/video/status/{job_id} every ~3 seconds.
    """
    # Validate extension
    filename = file.filename or ""
    ext = filename.rsplit(".", 1)[-1].lower() if "." in filename else ""
    if ext not in ALLOWED_EXTENSIONS:
        raise HTTPException(
            status_code=400,
            detail=(
                f"Unsupported video format '{ext}'. "
                f"Supported: {', '.join(sorted(ALLOWED_EXTENSIONS))}"
            ),
        )

    # Read and size-check
    content = await file.read()
    if len(content) > MAX_VIDEO_BYTES:
        raise HTTPException(
            status_code=413,
            detail=f"File is {len(content) // (1024*1024)} MB — limit is 500 MB.",
        )
    if len(content) < 1024:
        raise HTTPException(status_code=400, detail="File appears to be empty.")

    # Persist to temp dir
    os.makedirs(UPLOAD_DIR, exist_ok=True)
    fd, tmp_path = tempfile.mkstemp(suffix=f".{ext}", dir=UPLOAD_DIR)
    with os.fdopen(fd, "wb") as fh:
        fh.write(content)

    # Parse model keys
    keys = [k.strip() for k in model_keys.split(",") if k.strip()]
    if not keys:
        keys = ["regex", "spacy", "deberta"]

    # Create and enqueue job
    job_id = _queue.create_job(tmp_path, keys)
    await _queue.enqueue(job_id)

    return JSONResponse(
        content={
            "job_id": job_id,
            "status": "queued",
            "file_size_mb": round(len(content) / (1024 * 1024), 2),
            "model_keys": keys,
        }
    )


@router.get("/status/{job_id}")
async def video_status(job_id: str):
    """
    Return the current job status.

    Response shape:
    {
      "status":       "queued" | "extracting" | "scanning" | "done" | "error" | "cancelled",
      "progress":     0-100,
      "stage_detail": "Human-readable current stage",
      "result":       null | { per_model, ranked, union_total, elapsed, parsed_text },
      "error":        null | "error message string",
      "created_at":   unix timestamp
    }
    """
    entry = _queue.get_job(job_id)
    if entry is None:
        raise HTTPException(
            status_code=404,
            detail=f"Job '{job_id}' not found. It may have expired.",
        )
    return JSONResponse(content=entry)


@router.delete("/cancel/{job_id}")
async def video_cancel(job_id: str):
    """
    Cancel a queued job.
    Returns 409 if the job is already running or completed.
    """
    success = _queue.cancel_job(job_id)
    if not success:
        raise HTTPException(
            status_code=409,
            detail=(
                "Cannot cancel: job is already running, completed, "
                "or does not exist in queue."
            ),
        )
    return JSONResponse(content={"cancelled": True, "job_id": job_id})