""" Evaluator API — FastAPI router for the Model Lab. Endpoints: GET /api/evaluator/models — model catalogue GET /api/evaluator/file-types — category → file type options for UI POST /api/evaluator/parse — parse uploaded file → extracted text POST /api/evaluator/scan — run selected models on text, return per-model results POST /api/evaluator/batch — batch evaluation (stub) Drive Scan (Model Lab): POST /api/evaluator/drive/browse — list files inside a Drive folder (recursive) POST /api/evaluator/drive/scan — download + scan selected Drive files with chosen models POST /api/evaluator/drive/tag — write pii_detected to Drive file appProperties """ import time import asyncio import datetime from concurrent.futures import ThreadPoolExecutor from typing import List, Optional, Dict, Any from fastapi import APIRouter, File, UploadFile, Form, HTTPException from fastapi.responses import JSONResponse from pydantic import BaseModel from file_handlers.universal_parser import parse_file, get_all_categories from file_handlers.ocr_engine import OcrEngine from connectors.drive_handler import DriveHandler _drive_handler = DriveHandler() _ocr_engine = OcrEngine() router = APIRouter(prefix="/api/evaluator", tags=["evaluator"]) # Module-level reference to the shared RegexClassifier instance. # Populated by setup() called from api.py after the classifier is created. _classifier = None _executor = ThreadPoolExecutor(max_workers=4) def setup(classifier_instance): """Called from api.py to inject the shared classifier.""" global _classifier _classifier = classifier_instance # ───────────────────────────────────────────── # Model Catalogue # ───────────────────────────────────────────── MODEL_CATALOGUE = [ { "key": "ensemble", "label": "👑 Ensemble (God Algorithm)", "hf_id": "hybrid", "type": "God Algorithm", "params": "IoU", "f1_benchmark": 0.99, "lazy": False, "description": "Aggregates all models using sliding windows and weighted IoU deduplication.", }, { "key": "regex", "label": "🛠️ Regex Engine", "hf_id": "deterministic", "type": "Rule-based", "params": "—", "f1_benchmark": 1.0, "lazy": False, "description": "Deterministic regex patterns — emails, phones, SSNs, IPs.", }, { "key": "nltk", "label": "🧠 NLTK Chunker", "hf_id": "nltk", "type": "Statistical", "params": "—", "f1_benchmark": 0.0, "lazy": False, "description": "NLTK ne_chunk — names, locations.", }, { "key": "spacy", "label": "🤖 SpaCy LG", "hf_id": "en_core_web_lg", "type": "Statistical", "params": "685M", "f1_benchmark": 0.0, "lazy": False, "description": "Industrial-strength NER (en_core_web_lg).", }, { "key": "presidio", "label": "🛡️ MS Presidio", "hf_id": "microsoft/presidio-analyzer", "type": "Rule+ML", "params": "—", "f1_benchmark": 0.0, "lazy": False, "description": "Microsoft Presidio enterprise PII analyser.", }, { "key": "gliner", "label": "🦅 GLiNER Small", "hf_id": "urchade/gliner_small-v2.1", "type": "GLiNER", "params": "small", "f1_benchmark": 0.85, "lazy": False, "description": "Zero-shot GLiNER small-v2.1.", }, { "key": "deberta", "label": "🚀 DeBERTa PII", "hf_id": "lakshyakh93/deberta-large-finetuned-pii", "type": "NER", "params": "86M", "f1_benchmark": 0.92, "lazy": False, "description": "Kaggle-winning DeBERTa V3 fine-tuned for PII.", }, { "key": "pasteproof", "label": "📋 Pasteproof v2", "hf_id": "joneauxedgar/pasteproof-pii-detector-v2", "type": "NER", "params": "149M", "f1_benchmark": 0.97, "lazy": True, "description": "ModernBERT 149M — broad PII detection.", }, { "key": "piiranha", "label": "🐟 Piiranha v1", "hf_id": "iiiorg/piiranha-v1-detect-personal-information", "type": "NER", "params": "86M", "f1_benchmark": 0.931, "lazy": True, "description": "DeBERTa personal information specialist.", }, { "key": "nvidia_gliner", "label": "⚡ NVIDIA-GLiNER", "hf_id": "nvidia/gliner-PII-0.1", "type": "GLiNER", "params": "570M", "f1_benchmark": 0.87, "lazy": True, "description": "NVIDIA GLiNER enterprise zero-shot NER.", }, { "key": "mmbert", "label": "🌐 mmbert32k", "hf_id": "llm-semantic-router/mmbert32k-pii-detector-merged", "type": "NER", "params": "307M", "f1_benchmark": 0.969, "lazy": True, "description": "ModernBERT 32k-context document scanner.", }, ] @router.get("/models") async def get_evaluator_models(): """Return the full model catalogue.""" return JSONResponse(content={"models": MODEL_CATALOGUE}) # ───────────────────────────────────────────── # File Type Options for UI # ───────────────────────────────────────────── @router.get("/file-types") async def get_file_types(): """Return category → file type options for the two-step UI picker.""" return JSONResponse(content=get_all_categories()) # ───────────────────────────────────────────── # Parse Endpoint # ───────────────────────────────────────────── @router.post("/parse") async def evaluator_parse( file: UploadFile = File(...), file_type: str = Form("auto"), category: str = Form("unstructured"), doc_index: int = Form(0), format: str = Form("auto"), # legacy compat schema: Optional[str] = Form(None), # legacy compat ): """ Parse an uploaded file and return extracted text. The category + file_type tell us which parser to use — no guessing needed. """ try: content = await file.read() # Resolve file_type: prefer explicit param, else infer from filename resolved_type = file_type if resolved_type in ("auto", "", None): filename = file.filename or "" ext = filename.rsplit(".", 1)[-1].lower() if "." in filename else "txt" resolved_type = ext # Run blocking parser in thread pool to avoid blocking the event loop loop = asyncio.get_event_loop() text = await loop.run_in_executor( _executor, parse_file, content, resolved_type ) # Truncate to 100k chars — keeps models fast and avoids OOM truncated = text[:100_000] return JSONResponse(content={ "text": truncated, "gt_spans": [], # detection-only — no ground truth "has_gt": False, "format_detected": resolved_type, "doc_count": 1, "char_count": len(truncated), }) except Exception as e: raise HTTPException(status_code=500, detail=f"Parse error: {str(e)}") # ───────────────────────────────────────────── # Scan Endpoint # ───────────────────────────────────────────── class ScanRequest(BaseModel): text: str gt_spans: List[Any] = [] model_keys: List[str] = ["deberta"] conf_threshold: float = 0.5 entropy_threshold: float = 4.5 def _run_single_model(model_key: str, text: str) -> List[dict]: """Run one model synchronously. Called from thread pool.""" if _classifier is None: return [] return _classifier.analyze_text_hybrid(text, [model_key]) def _build_showdown_result( model_key: str, model_detections: List[dict], union_keys: set, all_model_detections: Dict[str, List[dict]], ) -> dict: """ Build the per-model result dict for detection-only mode. Definitions (no ground truth): TP (Consensus) = spans also found by ≥1 other model FP (Unique) = spans found ONLY by this model FN (Missed) = spans from union NOT found by this model """ # Key a detection by (start, end, text) for dedup def _key(m): return (m["start"], m["end"], m["text"]) this_keys = {_key(m): m for m in model_detections} # PII type breakdown type_counts: Dict[str, int] = {} for m in model_detections: label = m.get("label", "UNKNOWN") type_counts[label] = type_counts.get(label, 0) + 1 # Union minus this model = missed other_keys: set = set() for k, dets in all_model_detections.items(): if k != model_key: for m in dets: other_keys.add(_key(m)) missed_keys = other_keys - set(this_keys.keys()) unique_keys = set(this_keys.keys()) - other_keys consensus_keys = set(this_keys.keys()) & other_keys # Accuracy = found by this model ÷ union total union_total = len(union_keys) if union_keys else 1 found_count = len(this_keys) accuracy = round(found_count / union_total, 4) def _fmt(keys, source_dict): return [ {"text": source_dict[k]["text"], "label": source_dict[k]["label"]} for k in list(keys)[:30] # cap at 30 for response size ] # Build full predictions list (EvaluatorPrediction-compatible) predictions = [] for k, m in this_keys.items(): result_tag = "TP" if k in consensus_keys else "FP" predictions.append({ "text": m["text"], "label": m.get("label", ""), "canonical": m.get("label", ""), "start": m["start"], "end": m["end"], "score": m.get("score", 1.0), "source": m.get("source", model_key), "result": result_tag, }) # FN predictions (from union — not found by this model) all_union: Dict = {} for dets in all_model_detections.values(): for m in dets: k = _key(m) if k not in all_union: all_union[k] = m for k in missed_keys: m = all_union.get(k, {}) predictions.append({ "text": m.get("text", ""), "label": m.get("label", ""), "canonical": m.get("label", ""), "start": m.get("start", 0), "end": m.get("end", 0), "score": 0.0, "source": "missed", "result": "FN", }) # Metrics row (OVERALL only — no per-entity breakdown without GT) metrics = [{ "entity_type": "OVERALL", "tp": len(consensus_keys), "fp": len(unique_keys), "fn": len(missed_keys), "precision": round(len(consensus_keys) / max(len(this_keys), 1), 4), "recall": round(len(consensus_keys) / max(len(other_keys), 1), 4) if other_keys else 1.0, "f1": accuracy, # use union-accuracy as the primary score }] # Per entity-type metrics for label, cnt in type_counts.items(): metrics.append({ "entity_type": label, "tp": cnt, "fp": 0, "fn": 0, "precision": 1.0, "recall": 1.0, "f1": 1.0, }) return { "predictions": predictions, "comparison": { "TP": _fmt(consensus_keys, this_keys), "FP": _fmt(unique_keys, this_keys), "FN": [{"text": all_union[k]["text"], "label": all_union[k]["label"]} for k in list(missed_keys)[:30]], }, "metrics": metrics, "coverage": { "in_scope": list(type_counts.keys()), "out_of_scope": [], }, "failures": { "missed": [ { "entity_type": all_union[k].get("label", ""), "value": all_union[k].get("text", ""), "context": "", "reason": "Not detected by this model", } for k in list(missed_keys)[:20] ], "false_positives": [ { "entity_type": this_keys[k].get("label", ""), "value": this_keys[k].get("text", ""), "confidence": this_keys[k].get("score", 1.0), } for k in list(unique_keys)[:20] ], }, # Extra fields for the Format Scan dashboard "pii_count": found_count, "accuracy": accuracy, "type_counts": type_counts, "unique_count": len(unique_keys), "missed_count": len(missed_keys), "consensus_count": len(consensus_keys), } @router.post("/scan") async def evaluator_scan(request: ScanRequest): """ Run each selected model independently on the provided text. Returns per-model results + ranked summary. """ if _classifier is None: raise HTTPException(status_code=503, detail="Classifier not initialised.") if not request.text.strip(): raise HTTPException(status_code=400, detail="Empty text provided.") if not request.model_keys: raise HTTPException(status_code=400, detail="No models selected.") start_time = time.time() # Run models in thread pool (CPU-bound, releases GIL in PyTorch C++) loop = asyncio.get_event_loop() tasks = [ loop.run_in_executor(_executor, _run_single_model, key, request.text) for key in request.model_keys ] results_list = await asyncio.gather(*tasks) # Build per-model detection dict all_model_detections: Dict[str, List[dict]] = { key: dets for key, dets in zip(request.model_keys, results_list) } # Build union of all detections (unique by start+end+text) union_keys: set = set() for dets in all_model_detections.values(): for m in dets: union_keys.add((m["start"], m["end"], m["text"])) # Build showdown result per model per_model: Dict[str, dict] = {} for key in request.model_keys: per_model[key] = _build_showdown_result( key, all_model_detections[key], union_keys, all_model_detections, ) elapsed = round(time.time() - start_time, 3) # Ranked summary (sorted by pii_count desc) ranked = sorted( [{"model_key": k, "pii_count": v["pii_count"], "accuracy": v["accuracy"]} for k, v in per_model.items()], key=lambda x: x["pii_count"], reverse=True, ) for i, r in enumerate(ranked): r["rank"] = i + 1 return JSONResponse(content={ "per_model": per_model, "has_gt": False, "elapsed": elapsed, "union_total": len(union_keys), "ranked": ranked, }) # ───────────────────────────────────────────── # Batch Endpoint (stub — keeps frontend compat) # ───────────────────────────────────────────── @router.post("/batch") async def evaluator_batch( file: UploadFile = File(...), format: str = Form("nemotron"), n_docs: int = Form(50), model_keys: str = Form("deberta"), conf_threshold: float = Form(0.5), entropy_threshold: float = Form(4.5), ): """Batch evaluation — returns aggregate stats.""" raise HTTPException( status_code=501, detail="Batch evaluation requires labeled datasets. Use the Upload & Scan tab with a Nemotron/bigcode dataset.", ) # ───────────────────────────────────────────── # Drive Scan Routes — Model Lab # ───────────────────────────────────────────── # ── Shared helper ──────────────────────────────────────────────────────────── def _extract_folder_id(folder_id_or_url: str) -> str: """ Accept either a bare folder ID or a full Drive URL and return the folder ID. e.g. https://drive.google.com/drive/folders/1ABC123?usp=sharing → 1ABC123 """ s = folder_id_or_url.strip() if "drive.google.com" in s: # Extract the ID after /folders/ import re m = re.search(r"/folders/([a-zA-Z0-9_-]+)", s) if m: return m.group(1) return s # ── Request / Response models ───────────────────────────────────────────────── class DriveAuth(BaseModel): auth_type: str = "service_account" # "service_account" | "oauth2_token" credentials: Dict[str, Any] # SA JSON dict or {access_token, ...} class DriveBrowseRequest(DriveAuth): folder_id: str # bare ID or full Drive URL uid: str = "default_uid" # User ID class DriveFileRef(BaseModel): id: str name: str mimeType: str class DriveScanRequest(DriveAuth): file_refs: List[DriveFileRef] model_keys: List[str] = ["regex", "deberta"] uid: str = "default_uid" # User ID class DriveTagItem(BaseModel): file_id: str pii_detected: bool pii_count: int class DriveTagRequest(DriveAuth): files_to_tag: List[DriveTagItem] human_readable: bool = False # ── Route 1: Browse folder ──────────────────────────────────────────────────── @router.post("/drive/browse") async def evaluator_drive_browse(request: DriveBrowseRequest): """ Recursively list all files inside the given Drive folder. Returns a flat list so the frontend can build its own tree. """ folder_id = _extract_folder_id(request.folder_id) if not folder_id: raise HTTPException(status_code=400, detail="folder_id is required.") loop = asyncio.get_event_loop() try: items = await loop.run_in_executor( _executor, _drive_handler.browse_folder, folder_id, request.credentials, request.auth_type, request.uid, ) except Exception as e: raise HTTPException(status_code=400, detail=str(e)) return JSONResponse(content={"items": items, "folder_id": folder_id}) # ── Route 2: Scan selected files ───────────────────────────────────────────── def _extract_drive_file_text( file_ref: DriveFileRef, credentials: dict, auth_type: str, ) -> tuple[str, str, int]: """ Download and extract text from one Drive file. Returns (text, error_msg, char_count). """ # Download bytes into RAM only raw_bytes = _drive_handler.download_file_bytes( file_ref.id, file_ref.mimeType, credentials, auth_type ) if not raw_bytes: return "", "Download failed or empty file.", 0 # Resolve ext: use mapped ext from MIME, fall back to filename ext from connectors.drive_handler import _ext_for, _EXPORT_MIME, _VIDEO_EXTS, _IMAGE_EXT, _AUDIO_EXT ext = _ext_for(file_ref.mimeType) # Google Workspace files are exported — update ext to match export target export_mime = _EXPORT_MIME.get(file_ref.mimeType) if export_mime: ext = _ext_for(export_mime) if not ext: # Best-effort: try filename extension name_parts = file_ref.name.rsplit(".", 1) ext = name_parts[-1].lower() if len(name_parts) > 1 else "txt" # ── Route to the correct extractor based on media type ──────────────────── text: str = "" if ext == _IMAGE_EXT: # ── Image: run OCR via Tesseract ────────────────────────────────── text = _ocr_engine.extract_text(raw_bytes) if not text or not text.strip(): # EasyOCR fallback for better accuracy on challenging images try: import easyocr import io import tempfile import os with tempfile.NamedTemporaryFile(suffix=".png", delete=False) as tmp: tmp.write(raw_bytes) tmp_path = tmp.name try: reader = easyocr.Reader(["en"], gpu=False, verbose=False) results = reader.readtext(tmp_path) text = " ".join(t for (_, t, prob) in results if prob > 0.4) finally: os.unlink(tmp_path) except Exception as ocr_err: text = f"[Image OCR fallback failed: {ocr_err}]" del raw_bytes elif ext in _VIDEO_EXTS or ext == _AUDIO_EXT: # ── Video / Audio: write to temp file, run video_pipeline ───────── import tempfile import os from file_handlers.video_pipeline import process_video # Determine file suffix for temp file (ffmpeg uses extension to detect codec) suffix = f".{ext}" if ext in _VIDEO_EXTS else ".mp3" with tempfile.NamedTemporaryFile(suffix=suffix, delete=False) as tmp: tmp.write(raw_bytes) tmp_path = tmp.name del raw_bytes try: text = process_video(tmp_path) except Exception as vid_err: return "", f"Video/Audio extraction error: {str(vid_err)}", 0 finally: try: os.unlink(tmp_path) except OSError: pass else: # ── Standard document: universal_parser ────────────────────────── try: text = parse_file(raw_bytes, ext) del raw_bytes # release immediately except Exception as e: return "", f"Parse error: {str(e)}", 0 if not text or not text.strip(): return "", "No text could be extracted from this file.", 0 # Truncate (same guard as /evaluator/parse) text = text[:100_000] return text, "", len(text) def _scan_one_drive_file( file_ref: DriveFileRef, credentials: dict, auth_type: str, model_keys: List[str], ) -> dict: """ Download one Drive file into RAM, parse it, run models. Returns per-file result dict compatible with /api/evaluator/scan shape. Runs synchronously — called from thread pool. """ base = { "file_id": file_ref.id, "file_name": file_ref.name, "mime_type": file_ref.mimeType, "pii_detected": False, "pii_count": 0, "scan_data": None, "error": None, "char_count": 0, } text, err, char_count = _extract_drive_file_text(file_ref, credentials, auth_type) if err: base["error"] = err return base base["char_count"] = char_count # Run models if _classifier is None: base["error"] = "Classifier not initialised." return base try: all_model_detections: Dict[str, List[dict]] = {} for key in model_keys: all_model_detections[key] = _classifier.analyze_text_hybrid(text, [key]) union_keys: set = set() for dets in all_model_detections.values(): for m in dets: union_keys.add((m["start"], m["end"], m["text"])) per_model: Dict[str, dict] = {} for key in model_keys: per_model[key] = _build_showdown_result( key, all_model_detections[key], union_keys, all_model_detections, ) total_pii = len(union_keys) ranked = sorted( [{"model_key": k, "pii_count": v["pii_count"], "accuracy": v["accuracy"]} for k, v in per_model.items()], key=lambda x: x["pii_count"], reverse=True, ) for i, r in enumerate(ranked): r["rank"] = i + 1 base["pii_detected"] = total_pii > 0 base["pii_count"] = total_pii base["scan_data"] = { "per_model": per_model, "ranked": ranked, "union_total": total_pii, "has_gt": False, } except Exception as e: base["error"] = f"Scan error: {str(e)}" return base @router.post("/drive/scan") async def evaluator_drive_scan(request: DriveScanRequest): """ Download and scan each selected Drive file sequentially. Returns per-file results in the same shape as /evaluator/scan. """ if _classifier is None: raise HTTPException(status_code=503, detail="Classifier not initialised.") if not request.file_refs: raise HTTPException(status_code=400, detail="No files selected.") if not request.model_keys: raise HTTPException(status_code=400, detail="No models selected.") try: from db.supabase_client import detect_scan_type, create_scan_session, upsert_scan_state, finish_scan_session except ImportError: detect_scan_type = create_scan_session = upsert_scan_state = finish_scan_session = None scan_type = "external" session_id = None if detect_scan_type and create_scan_session: try: scan_type = detect_scan_type(request.uid, "google_drive") session_id = create_scan_session(request.uid, "google_drive", scan_type) except Exception as e: print(f"[WARN] Failed to create scan session: {e}") start_time = time.time() # Sequential per-file scan — RAM safe on 16GB HF Spaces loop = asyncio.get_event_loop() results = [] total_pii_found = 0 for file_ref in request.file_refs: result = await loop.run_in_executor( _executor, _scan_one_drive_file, file_ref, request.credentials, request.auth_type, request.model_keys, ) results.append(result) if upsert_scan_state and session_id is not None: scan_status = "pii_found" if result["pii_detected"] else "clean" if result.get("error"): scan_status = "unscanned" try: upsert_scan_state( uid=request.uid, connector_type="google_drive", file_id=file_ref.id, scan_status=scan_status, scan_type=scan_type, pii_count=result.get("pii_count", 0), session_id=session_id ) except Exception as e: print(f"[WARN] Failed to upsert scan state for {file_ref.id}: {e}") total_pii_found += result.get("pii_count", 0) elapsed = round(time.time() - start_time, 3) total_pii_files = sum(1 for r in results if r["pii_detected"]) if finish_scan_session and session_id is not None: try: finish_scan_session(session_id, len(results), total_pii_found) except Exception as e: print(f"[WARN] Failed to finish scan session: {e}") return JSONResponse(content={ "results": results, "total_files": len(results), "total_pii_files": total_pii_files, "elapsed": elapsed, }) # ── Route 3: Tag files ──────────────────────────────────────────────────────── @router.post("/drive/tag") async def evaluator_drive_tag(request: DriveTagRequest): """ Write pii_detected to each file's appProperties in Google Drive. Non-fatal: if a file cannot be tagged, error is returned per-file. """ if not request.files_to_tag: raise HTTPException(status_code=400, detail="No files to tag.") scan_ts = datetime.datetime.utcnow().isoformat() + "Z" loop = asyncio.get_event_loop() tagged = [] for item in request.files_to_tag: result = await loop.run_in_executor( _executor, _drive_handler.tag_file_pii, item.file_id, item.pii_detected, item.pii_count, scan_ts, request.credentials, request.auth_type, request.human_readable, ) tagged.append({ "file_id": item.file_id, "success": result["success"], "error": result["error"], }) return JSONResponse(content={"tagged": tagged, "scan_ts": scan_ts}) # ── Route 4: Fetch Content Chunks ───────────────────────────────────────────── class DriveContentRequest(DriveAuth): file_ref: DriveFileRef chunk_size: int = 10000 @router.post("/drive/content-chunks") async def evaluator_drive_content_chunks(request: DriveContentRequest): """ Fetch a file, extract its text, and split it into manageable chunks. Used by the Document Viewer Modal to render paginated text. """ loop = asyncio.get_event_loop() def _fetch_and_chunk(): text, err, char_count = _extract_drive_file_text(request.file_ref, request.credentials, request.auth_type) if err: return {"error": err, "chunks": []} chunks = [] if not text: return {"error": None, "chunks": [], "char_count": 0} # Presidio-inspired simple chunking strategy for display # We break text into chunks of `chunk_size` characters, trying to break on newlines current_idx = 0 while current_idx < len(text): end_idx = min(current_idx + request.chunk_size, len(text)) # If not at the end, try to find a newline to break gracefully if end_idx < len(text): last_newline = text.rfind('\n', current_idx, end_idx) if last_newline != -1 and last_newline > current_idx + request.chunk_size // 2: end_idx = last_newline + 1 chunks.append({ "text": text[current_idx:end_idx], "start_idx": current_idx, "end_idx": end_idx }) current_idx = end_idx return {"error": None, "chunks": chunks, "char_count": len(text)} result = await loop.run_in_executor(_executor, _fetch_and_chunk) if result.get("error"): raise HTTPException(status_code=400, detail=result["error"]) return JSONResponse(content=result)