from fastapi import FastAPI, File, UploadFile, HTTPException, Form from fastapi.middleware.cors import CORSMiddleware from fastapi.responses import JSONResponse, StreamingResponse from pydantic import BaseModel, Field, EmailStr from typing import Optional, List, Dict, Any import pandas as pd import io import json import time import asyncio from backend import RegexClassifier from email_service import send_welcome_email import evaluator_api import video_router import video_job_queue # Initialize FastAPI app app = FastAPI( title="Segmento Sense API", description="AI-powered PII Detection and Data Classification Platform", version="1.0.0" ) # CORS Configuration # Keep all known origins: production domains + local dev ports app.add_middleware( CORSMiddleware, allow_origins=[ # ── Production ────────────────────────────────────── "https://segmento.in", "https://www.segmento.in", "https://segmento-sense.vercel.app", # ── Local development ──────────────────────────────── "http://localhost:3000", "http://localhost:3001", "http://localhost:3002", ], allow_credentials=True, allow_methods=["*"], allow_headers=["*"], ) # Initialize the classifier classifier = RegexClassifier() # Wire evaluator API — must happen after classifier is ready evaluator_api.setup(classifier) app.include_router(evaluator_api.router) # Wire video API — async job queue (startup worker launched below) video_job_queue.setup(classifier) app.include_router(video_router.router) @app.on_event("startup") async def _start_video_worker(): """Launch the background asyncio video processing worker.""" await video_job_queue.startup() # Maximum file size (1GB) MAX_FILE_SIZE = 1024 * 1024 * 1024 # 1GB in bytes # ==================== PYDANTIC MODELS ==================== class TextAnalysisRequest(BaseModel): text: str = Field(..., description="Text to analyze for PII") class PatternAddRequest(BaseModel): name: str = Field(..., description="Pattern name") regex: str = Field(..., description="Regex pattern") class DatabaseConnectionRequest(BaseModel): host: str port: str database: str user: str password: str table: str = Field(None, description="Table name (or collection for MongoDB)") class S3ConnectionRequest(BaseModel): access_key: str secret_key: str region: str bucket: str = Field(None, description="Bucket name") file_key: str = Field(None, description="File key/path") class AzureConnectionRequest(BaseModel): connection_string: str container: str = Field(None, description="Container name") blob: str = Field(None, description="Blob name") class GCSConnectionRequest(BaseModel): credentials: Dict[str, Any] bucket: str = Field(None, description="Bucket name") file_name: str = Field(None, description="File name") class GoogleDriveRequest(BaseModel): credentials: Dict[str, Any] file_id: str = Field(None, description="Drive file ID") mime_type: str = Field(None, description="File MIME type") class SlackRequest(BaseModel): token: str channel_id: str class ConfluenceRequest(BaseModel): url: str username: str token: str page_id: str class PDFPageRequest(BaseModel): page_number: int = 0 class WelcomeEmailRequest(BaseModel): name: str = Field(..., description="User's name") email: EmailStr = Field(..., description="User's email address") # ==================== HELPER FUNCTIONS ==================== def validate_file_size(file: UploadFile): """Validate uploaded file size""" file.file.seek(0, 2) # Seek to end size = file.file.tell() # Get position (file size) file.file.seek(0) # Reset to beginning if size > MAX_FILE_SIZE: raise HTTPException( status_code=413, detail=f"File size ({size} bytes) exceeds maximum allowed size (1GB)" ) return size def format_pii_response(df: pd.DataFrame, source_df: pd.DataFrame = None, text: str = None, selected_models: list = None) -> Dict: """Format PII analysis response""" count_df = classifier.get_pii_counts_dataframe(df, selected_models) if source_df is not None else classifier.get_pii_counts(text, selected_models) response = { "pii_counts": count_df.fillna("").to_dict(orient="records") if not count_df.empty else [], "total_pii_found": int(count_df["Count"].sum()) if not count_df.empty else 0 } # Add schema if source dataframe provided if source_df is not None and not source_df.empty: schema_df = classifier.get_data_schema(source_df) response["schema"] = schema_df.fillna("").to_dict(orient="records") # Add inspector results if text provided if text: inspector_df = classifier.run_full_inspection(text, selected_models) if not inspector_df.empty: response["inspector"] = inspector_df.fillna("").to_dict(orient="records") return response # ==================== MODEL CATALOGUE ENDPOINT ==================== @app.get("/api/models") async def get_available_models(): """ Returns the full list of available PII detection models, separated into always-on and lazy-loaded categories. """ return JSONResponse(content={ "always_on": [ {"key": "regex", "label": "🛠️ Regex", "description": "Fast rule-based pattern matching (emails, phones, SSNs)"}, {"key": "nltk", "label": "🧠 NLTK", "description": "Statistical NLP chunker for names and locations"}, {"key": "spacy", "label": "🤖 SpaCy", "description": "Industrial-strength NER (en_core_web_lg)"}, {"key": "presidio", "label": "🛡️ Presidio", "description": "Microsoft Presidio — enterprise PII analyser"}, {"key": "gliner", "label": "🦅 GLiNER", "description": "Zero-shot entity extraction (urchade/gliner_small-v2.1)"}, {"key": "deberta", "label": "🚀 DeBERTa", "description": "Kaggle-winning DeBERTa V3 fine-tuned for PII"}, ], "lazy_loaded": [ {"key": "pasteproof", "label": "📋 Pasteproof", "description": "joneauxedgar/pasteproof-pii-detector-v2 — broad PII detection"}, {"key": "piiranha", "label": "🐟 Piiranha", "description": "iiiorg/piiranha-v1 — personal information specialist"}, {"key": "nvidia_gliner", "label": "⚡ NVIDIA-GLiNER", "description": "nvidia/gliner-PII — enterprise-grade zero-shot NER"}, {"key": "mmbert", "label": "🌐 mmbert32k", "description": "llm-semantic-router/mmbert32k — 32k-context document scanner"}, ] }) # ==================== FILE UPLOAD ENDPOINTS ==================== @app.post("/api/upload/csv") async def upload_csv(file: UploadFile = File(...), mask: bool = Form(False), selected_models: str = Form("")): """Upload and analyze CSV file""" try: validate_file_size(file) content = await file.read() df = pd.read_csv(io.BytesIO(content)) models = [m.strip() for m in selected_models.split(",") if m.strip()] or None text_sample = df.head(10).to_string() response = format_pii_response(df, df, text_sample, models) if mask: masked_df = classifier.mask_dataframe(df.head(50), models) response["data"] = masked_df.fillna("").to_dict(orient="records") else: highlighted_df = classifier.scan_dataframe_with_html(df.head(50), models) response["data"] = highlighted_df.fillna("").to_dict(orient="records") return JSONResponse(content=response) except Exception as e: raise HTTPException(status_code=500, detail=str(e)) @app.post("/api/upload/json") async def upload_json(file: UploadFile = File(...), mask: bool = Form(False), selected_models: str = Form("")): """Upload and analyze JSON file""" try: validate_file_size(file) df = classifier.get_json_data(file.file) models = [m.strip() for m in selected_models.split(",") if m.strip()] or None text_sample = df.head(10).to_string() response = format_pii_response(df, df, text_sample, models) if mask: masked_df = classifier.mask_dataframe(df.head(50), models) response["data"] = masked_df.fillna("").to_dict(orient="records") else: highlighted_df = classifier.scan_dataframe_with_html(df.head(50), models) response["data"] = highlighted_df.fillna("").to_dict(orient="records") return JSONResponse(content=response) except Exception as e: raise HTTPException(status_code=500, detail=str(e)) @app.post("/api/upload/parquet") async def upload_parquet(file: UploadFile = File(...), mask: bool = Form(False), selected_models: str = Form("")): """Upload and analyze Parquet file""" try: validate_file_size(file) content = await file.read() df = classifier.get_parquet_data(content) models = [m.strip() for m in selected_models.split(",") if m.strip()] or None text_sample = df.head(10).to_string() response = format_pii_response(df, df, text_sample, models) if mask: masked_df = classifier.mask_dataframe(df.head(50), models) response["data"] = masked_df.fillna("").to_dict(orient="records") else: highlighted_df = classifier.scan_dataframe_with_html(df.head(50), models) response["data"] = highlighted_df.fillna("").to_dict(orient="records") return JSONResponse(content=response) except Exception as e: raise HTTPException(status_code=500, detail=str(e)) @app.post("/api/upload/avro") async def upload_avro(file: UploadFile = File(...), mask: bool = Form(False), selected_models: str = Form("")): """Upload and analyze Apache Avro file""" try: validate_file_size(file) content = await file.read() df = classifier.get_avro_data(content) models = [m.strip() for m in selected_models.split(",") if m.strip()] or None text_sample = df.head(10).to_string() response = format_pii_response(df, df, text_sample, models) if mask: masked_df = classifier.mask_dataframe(df.head(50), models) response["data"] = masked_df.fillna("").to_dict(orient="records") else: highlighted_df = classifier.scan_dataframe_with_html(df.head(50), models) response["data"] = highlighted_df.fillna("").to_dict(orient="records") return JSONResponse(content=response) except Exception as e: raise HTTPException(status_code=500, detail=str(e)) @app.post("/api/upload/pdf") async def upload_pdf(file: UploadFile = File(...), page_number: int = Form(0)): """Upload and analyze PDF file (with pagination)""" try: validate_file_size(file) content = await file.read() # Get total pages and extract text from specific page total_pages = classifier.get_pdf_total_pages(content) text = classifier.get_pdf_page_text(content, page_number) # Format PII response response = format_pii_response(None, None, text) response["total_pages"] = total_pages response["current_page"] = page_number # Get labeled PDF image img = classifier.get_labeled_pdf_image(content, page_number) if img: import base64 from PIL import Image # Check if img is already bytes or a PIL Image if isinstance(img, bytes): # Already bytes, just encode img_str = base64.b64encode(img).decode() elif isinstance(img, Image.Image): # PIL Image, need to convert to bytes buffered = io.BytesIO() img.save(buffered, format="PNG") img_str = base64.b64encode(buffered.getvalue()).decode() else: # Unknown type, skip image img_str = None if img_str: response["image"] = f"data:image/png;base64,{img_str}" return JSONResponse(content=response) except Exception as e: raise HTTPException(status_code=500, detail=str(e)) @app.post("/api/upload/image") async def upload_image(file: UploadFile = File(...), mask: bool = Form(False)): """Upload and analyze image with OCR""" try: validate_file_size(file) content = await file.read() # Extract text via OCR text = classifier.get_ocr_text_from_image(content) if not text: raise HTTPException(status_code=400, detail="No text could be extracted from the image") df = pd.DataFrame({"Content": [text]}) response = format_pii_response(df, df, text) if mask: masked_df = classifier.mask_dataframe(df) response["data"] = masked_df.fillna("").to_dict(orient="records") else: highlighted_df = classifier.scan_dataframe_with_html(df) response["data"] = highlighted_df.fillna("").to_dict(orient="records") # Return original image as base64 import base64 img_str = base64.b64encode(content).decode() response["original_image"] = f"data:image/png;base64,{img_str}" return JSONResponse(content=response) except Exception as e: raise HTTPException(status_code=500, detail=str(e)) # ==================== ANALYSIS ENDPOINTS ==================== @app.post("/api/analyze/text") async def analyze_text(request: TextAnalysisRequest): """Analyze plain text for PII""" try: matches = classifier.analyze_text_hybrid(request.text) count_df = classifier.get_pii_counts(request.text) return JSONResponse(content={ "matches": matches, "pii_counts": count_df.fillna("").to_dict(orient="records") if not count_df.empty else [], "total_pii_found": len(matches) }) except Exception as e: raise HTTPException(status_code=500, detail=str(e)) @app.post("/api/inspect") async def inspect_text(request: TextAnalysisRequest): """Run full model inspection on text""" try: inspector_df = classifier.run_full_inspection(request.text) if inspector_df.empty: return JSONResponse(content={ "inspector": [], "message": "No PII detected by any model" }) return JSONResponse(content={ "inspector": inspector_df.fillna("").to_dict(orient="records") }) except Exception as e: raise HTTPException(status_code=500, detail=str(e)) @app.post("/api/mask") async def mask_text(request: TextAnalysisRequest): """Mask PII in text""" try: df = pd.DataFrame({"Content": [request.text]}) masked_df = classifier.mask_dataframe(df) return JSONResponse(content={ "original": request.text, "masked": masked_df.iloc[0]["Content"] }) except Exception as e: raise HTTPException(status_code=500, detail=str(e)) # ==================== PATTERN MANAGEMENT ==================== @app.get("/api/patterns") async def get_patterns(): """Get all regex patterns""" try: patterns = classifier.list_patterns() return JSONResponse(content={ "patterns": [{"name": k, "regex": v} for k, v in patterns.items()] }) except Exception as e: raise HTTPException(status_code=500, detail=str(e)) @app.post("/api/patterns") async def add_pattern(request: PatternAddRequest): """Add a new regex pattern""" try: classifier.add_pattern(request.name, request.regex) return JSONResponse(content={ "message": f"Pattern '{request.name}' added successfully", "pattern": {"name": request.name, "regex": request.regex} }) except Exception as e: raise HTTPException(status_code=500, detail=str(e)) @app.delete("/api/patterns/{pattern_name}") async def delete_pattern(pattern_name: str): """Remove a regex pattern""" try: classifier.remove_pattern(pattern_name) return JSONResponse(content={ "message": f"Pattern '{pattern_name}' removed successfully" }) except Exception as e: raise HTTPException(status_code=500, detail=str(e)) # ==================== DATABASE CONNECTORS ==================== @app.post("/api/connect/postgresql") async def connect_postgresql(request: DatabaseConnectionRequest): """Connect to PostgreSQL and scan table""" try: df = classifier.get_postgres_data( request.host, request.port, request.database, request.user, request.password, request.table ) text_sample = df.head(10).to_string() response = format_pii_response(df, df, text_sample) masked_df = classifier.mask_dataframe(df.head(50)) response["data"] = masked_df.fillna("").to_dict(orient="records") return JSONResponse(content=response) except Exception as e: raise HTTPException(status_code=500, detail=f"PostgreSQL connection failed: {str(e)}") @app.post("/api/connect/mysql") async def connect_mysql(request: DatabaseConnectionRequest): """Connect to MySQL and scan table""" try: df = classifier.get_mysql_data( request.host, request.port, request.database, request.user, request.password, request.table ) text_sample = df.head(10).to_string() response = format_pii_response(df, df, text_sample) masked_df = classifier.mask_dataframe(df.head(50)) response["data"] = masked_df.fillna("").to_dict(orient="records") return JSONResponse(content=response) except Exception as e: raise HTTPException(status_code=500, detail=f"MySQL connection failed: {str(e)}") @app.post("/api/connect/mongodb") async def connect_mongodb(request: DatabaseConnectionRequest): """Connect to MongoDB and scan collection""" try: df = classifier.get_mongodb_data( request.host, request.port, request.database, request.user, request.password, request.table ) text_sample = df.head(10).to_string() response = format_pii_response(df, df, text_sample) masked_df = classifier.mask_dataframe(df.head(50)) response["data"] = masked_df.fillna("").to_dict(orient="records") return JSONResponse(content=response) except Exception as e: raise HTTPException(status_code=500, detail=f"MongoDB connection failed: {str(e)}") # ==================== CLOUD STORAGE - AWS S3 ==================== @app.post("/api/cloud/s3/list-buckets") async def list_s3_buckets(request: S3ConnectionRequest): """List S3 buckets""" try: buckets = classifier.get_s3_buckets(request.access_key, request.secret_key, request.region) return JSONResponse(content={"buckets": buckets}) except Exception as e: raise HTTPException(status_code=500, detail=f"S3 connection failed: {str(e)}") @app.post("/api/cloud/s3/list-files") async def list_s3_files(request: S3ConnectionRequest): """List files in S3 bucket""" try: if not request.bucket: raise HTTPException(status_code=400, detail="Bucket name is required") files = classifier.get_s3_files( request.access_key, request.secret_key, request.region, request.bucket ) return JSONResponse(content={"files": files}) except Exception as e: raise HTTPException(status_code=500, detail=f"Failed to list S3 files: {str(e)}") @app.post("/api/cloud/s3/scan") async def scan_s3_file(request: S3ConnectionRequest): """Download and scan S3 file""" try: if not request.bucket or not request.file_key: raise HTTPException(status_code=400, detail="Bucket and file_key are required") content = classifier.download_s3_file( request.access_key, request.secret_key, request.region, request.bucket, request.file_key ) # Assume CSV for now df = pd.read_csv(io.BytesIO(content)) text_sample = df.head(10).to_string() response = format_pii_response(df, df, text_sample) masked_df = classifier.mask_dataframe(df.head(50)) response["data"] = masked_df.fillna("").to_dict(orient="records") return JSONResponse(content=response) except Exception as e: raise HTTPException(status_code=500, detail=f"S3 scan failed: {str(e)}") # ==================== CLOUD STORAGE - AZURE ==================== @app.post("/api/cloud/azure/list-containers") async def list_azure_containers(request: AzureConnectionRequest): """List Azure containers""" try: containers = classifier.get_azure_containers(request.connection_string) return JSONResponse(content={"containers": containers}) except Exception as e: raise HTTPException(status_code=500, detail=f"Azure connection failed: {str(e)}") @app.post("/api/cloud/azure/list-blobs") async def list_azure_blobs(request: AzureConnectionRequest): """List blobs in Azure container""" try: if not request.container: raise HTTPException(status_code=400, detail="Container name is required") blobs = classifier.get_azure_blobs(request.connection_string, request.container) return JSONResponse(content={"blobs": blobs}) except Exception as e: raise HTTPException(status_code=500, detail=f"Failed to list blobs: {str(e)}") @app.post("/api/cloud/azure/scan") async def scan_azure_blob(request: AzureConnectionRequest): """Download and scan Azure blob""" try: if not request.container or not request.blob: raise HTTPException(status_code=400, detail="Container and blob are required") content = classifier.download_azure_blob( request.connection_string, request.container, request.blob ) df = pd.read_csv(io.BytesIO(content)) text_sample = df.head(10).to_string() response = format_pii_response(df, df, text_sample) masked_df = classifier.mask_dataframe(df.head(50)) response["data"] = masked_df.fillna("").to_dict(orient="records") return JSONResponse(content=response) except Exception as e: raise HTTPException(status_code=500, detail=f"Azure scan failed: {str(e)}") # ==================== CLOUD STORAGE - GCS ==================== @app.post("/api/cloud/gcs/list-buckets") async def list_gcs_buckets(request: GCSConnectionRequest): """List GCS buckets""" try: buckets = classifier.get_gcs_buckets(request.credentials) return JSONResponse(content={"buckets": buckets}) except Exception as e: raise HTTPException(status_code=500, detail=f"GCS connection failed: {str(e)}") @app.post("/api/cloud/gcs/list-files") async def list_gcs_files(request: GCSConnectionRequest): """List files in GCS bucket""" try: if not request.bucket: raise HTTPException(status_code=400, detail="Bucket name is required") files = classifier.get_gcs_files(request.credentials, request.bucket) return JSONResponse(content={"files": files}) except Exception as e: raise HTTPException(status_code=500, detail=f"Failed to list GCS files: {str(e)}") @app.post("/api/cloud/gcs/scan") async def scan_gcs_file(request: GCSConnectionRequest): """Download and scan GCS file""" try: if not request.bucket or not request.file_name: raise HTTPException(status_code=400, detail="Bucket and file_name are required") content = classifier.download_gcs_file( request.credentials, request.bucket, request.file_name ) df = pd.read_csv(io.BytesIO(content)) text_sample = df.head(10).to_string() response = format_pii_response(df, df, text_sample) masked_df = classifier.mask_dataframe(df.head(50)) response["data"] = masked_df.fillna("").to_dict(orient="records") return JSONResponse(content=response) except Exception as e: raise HTTPException(status_code=500, detail=f"GCS scan failed: {str(e)}") # ==================== CLOUD STORAGE - GOOGLE DRIVE ==================== @app.post("/api/cloud/drive/list-files") async def list_drive_files(request: GoogleDriveRequest): """List Google Drive files""" try: files = classifier.get_google_drive_files(request.credentials) return JSONResponse(content={"files": files}) except Exception as e: raise HTTPException(status_code=500, detail=f"Google Drive connection failed: {str(e)}") @app.post("/api/cloud/drive/scan") async def scan_drive_file(request: GoogleDriveRequest): """Download and scan Google Drive file""" try: if not request.file_id or not request.mime_type: raise HTTPException(status_code=400, detail="file_id and mime_type are required") content = classifier.download_drive_file( request.file_id, request.mime_type, request.credentials ) if isinstance(content, bytes): try: text = content.decode('utf-8') df = pd.DataFrame({"Content": [text]}) response = format_pii_response(df, df, text) highlighted_df = classifier.scan_dataframe_with_html(df) response["data"] = highlighted_df.fillna("").to_dict(orient="records") return JSONResponse(content=response) except: raise HTTPException(status_code=400, detail="Binary file cannot be processed") except Exception as e: raise HTTPException(status_code=500, detail=f"Google Drive scan failed: {str(e)}") # ==================== ENTERPRISE CONNECTORS ==================== @app.post("/api/enterprise/gmail") async def scan_gmail(file: UploadFile = File(...), num_emails: int = Form(10)): """Scan Gmail messages""" try: df = classifier.get_gmail_data(file.file, num_emails) if df.empty: raise HTTPException(status_code=400, detail="No emails fetched") text_sample = df.iloc[0]['Content'] response = format_pii_response(df, df, text_sample) masked_df = classifier.mask_dataframe(df) response["data"] = masked_df.fillna("").to_dict(orient="records") return JSONResponse(content=response) except Exception as e: raise HTTPException(status_code=500, detail=f"Gmail scan failed: {str(e)}") @app.post("/api/enterprise/slack") async def scan_slack(request: SlackRequest): """Scan Slack messages""" try: df = classifier.get_slack_messages(request.token, request.channel_id) if df.empty: raise HTTPException(status_code=400, detail="No messages found or authentication failed") text_sample = df.iloc[0]['Content'] response = format_pii_response(df, df, text_sample) masked_df = classifier.mask_dataframe(df) response["data"] = masked_df.fillna("").to_dict(orient="records") return JSONResponse(content=response) except Exception as e: raise HTTPException(status_code=500, detail=f"Slack scan failed: {str(e)}") @app.post("/api/enterprise/confluence") async def scan_confluence(request: ConfluenceRequest): """Scan Confluence page""" try: df = classifier.get_confluence_page( request.url, request.username, request.token, request.page_id ) if df.empty: raise HTTPException(status_code=400, detail="Failed to fetch page") text_sample = df.iloc[0]['Content'] response = format_pii_response(df, df, text_sample) highlighted_df = classifier.scan_dataframe_with_html(df) response["data"] = highlighted_df.fillna("").to_dict(orient="records") return JSONResponse(content=response) except Exception as e: raise HTTPException(status_code=500, detail=f"Confluence scan failed: {str(e)}") # ==================== EMAIL FUNCTIONALITY ==================== @app.post("/api/send-welcome") async def send_welcome(request: WelcomeEmailRequest): """ Send a welcome email to a new user. This endpoint is called by the frontend after a user submits the contact form. """ try: # Send the welcome email success = send_welcome_email(request.name, request.email) if success: return JSONResponse(content={ "success": True, "message": f"Welcome email sent to {request.email}" }) else: raise HTTPException( status_code=500, detail="Failed to send welcome email. SMTP configuration may be missing." ) except Exception as e: raise HTTPException( status_code=500, detail=f"Email sending failed: {str(e)}" ) # ==================== EVALUATOR ENDPOINTS ==================== # Full model catalogue served to the MODEL LAB frontend EVALUATOR_MODEL_CATALOGUE = [ {"key": "regex", "label": "Regex Engine", "hf_id": "deterministic", "type": "Rule-based", "params": "—", "f1_benchmark": 1.0, "lazy": False, "description": "Deterministic regex patterns for EMAIL, PHONE, SSN, CC, IP, URL, MAC."}, {"key": "nltk", "label": "NLTK Chunker", "hf_id": "nltk", "type": "Statistical", "params": "—", "f1_benchmark": 0.0, "lazy": False, "description": "NLTK ne_chunk: PERSON → FIRST_NAME, GPE → LOCATION."}, {"key": "spacy", "label": "SpaCy LG", "hf_id": "en_core_web_lg", "type": "Statistical", "params": "685M", "f1_benchmark": 0.0, "lazy": False, "description": "SpaCy en_core_web_lg NER model."}, {"key": "presidio", "label": "MS Presidio", "hf_id": "microsoft/presidio-analyzer", "type": "Rule+ML", "params": "—", "f1_benchmark": 0.0, "lazy": False, "description": "Microsoft Presidio enterprise PII analyzer."}, {"key": "gliner", "label": "GLiNER Small", "hf_id": "urchade/gliner_small-v2.1", "type": "GLiNER", "params": "small", "f1_benchmark": 0.850, "lazy": False, "description": "Zero-shot GLiNER small model."}, {"key": "deberta", "label": "DeBERTa PII", "hf_id": "lakshyakh93/deberta-large-finetuned-pii", "type": "NER", "params": "86M", "f1_benchmark": 0.920, "lazy": False, "description": "Kaggle-winning DeBERTa V3 fine-tuned for PII."}, {"key": "pasteproof", "label": "Pasteproof v2", "hf_id": "joneauxedgar/pasteproof-pii-detector-v2", "type": "NER", "params": "149M", "f1_benchmark": 0.970, "lazy": True, "description": "ModernBERT 149M fine-tuned PII detector."}, {"key": "piiranha", "label": "Piiranha v1", "hf_id": "iiiorg/piiranha-v1-detect-personal-information", "type": "NER", "params": "86M", "f1_benchmark": 0.931, "lazy": True, "description": "DeBERTa-based PII detector."}, {"key": "nvidia_gliner","label": "NVIDIA GLiNER", "hf_id": "nvidia/gliner-PII-0.1", "type": "GLiNER", "params": "570M", "f1_benchmark": 0.870, "lazy": True, "description": "NVIDIA GLiNER with 37-label PII vocabulary."}, {"key": "mmbert", "label": "mmbert32k", "hf_id": "llm-semantic-router/mmbert32k-pii-detector-merged", "type": "NER", "params": "307M", "f1_benchmark": 0.969, "lazy": True, "description": "ModernBERT 32k-context PII detector."}, {"key": "nerguard", "label": "NerGuard-0.3B", "hf_id": "exdsgift/NerGuard-0.3B", "type": "NER", "params": "300M", "f1_benchmark": 0.996, "lazy": True, "description": "mDeBERTa 300M — highest F1 in the registry."}, {"key": "gliner_large", "label": "GLiNER PII Large", "hf_id": "knowledgator/gliner-pii-large-v1.0", "type": "GLiNER", "params": "large","f1_benchmark": 0.833, "lazy": True, "description": "GLiNER large architecture fine-tuned for PII."}, ] class EvaluatorScanRequest(BaseModel): text: str = Field(..., description="Document text to scan") gt_spans: List[Dict[str, Any]] = Field(default=[], description="Ground truth spans from /api/evaluator/parse") model_keys: List[str] = Field(default=["regex", "spacy", "deberta"], description="Model keys to run") conf_threshold: float = Field(default=0.5, ge=0.0, le=1.0) entropy_threshold: float = Field(default=4.5, ge=3.0, le=6.0) @app.get("/api/evaluator/models") async def evaluator_models(): """Return the full model catalogue for the MODEL LAB page.""" return JSONResponse(content={"models": EVALUATOR_MODEL_CATALOGUE}) @app.post("/api/evaluator/parse") async def evaluator_parse( file: UploadFile = File(...), format: str = Form("auto"), doc_index: int = Form(0), schema: str = Form(""), ): """ Parse a labeled (or unlabeled) dataset file. Returns the document text, ground-truth spans, format detected, and doc count. format: 'auto' | 'bigcode' | 'nemotron' | 'csv_spans' | 'json_spans' | 'unlabeled' schema: JSON string e.g. '{"text_col":"text","spans_col":"spans"}' """ try: content = await file.read() schema_dict = json.loads(schema) if schema.strip() else None text, gt_spans, has_gt, doc_count, fmt_detected = detect_and_parse( content, file.filename or "", doc_index, schema_dict, format ) return JSONResponse(content={ "text": text, "gt_spans": gt_spans, "has_gt": has_gt, "format_detected": fmt_detected, "doc_count": doc_count, "char_count": len(text), }) except Exception as e: raise HTTPException(status_code=500, detail=f"Parse error: {str(e)}") @app.post("/api/evaluator/scan") async def evaluator_scan(request: EvaluatorScanRequest): """ Run selected models on text in parallel, compare against GT spans. Returns partial results — if a model times out or errors, other models still return results. Each model entry includes timed_out + error fields. """ try: t0 = time.time() has_gt = bool(request.gt_spans) # Parallel scan — new shape: {key: {detections, error, timed_out}} raw_results = await asyncio.get_event_loop().run_in_executor( None, lambda: classifier.scan_with_models(request.text, request.model_keys), ) per_model: Dict[str, Any] = {} for model_key, payload in raw_results.items(): predictions = payload["detections"] timed_out = payload["timed_out"] model_error = payload["error"] # Normalise canonical label for p in predictions: p["canonical"] = norm_model_out(p.get("label", "")) if has_gt and not timed_out: comparison = compare_spans(predictions, request.gt_spans, model_key) metrics = compute_metrics(comparison) coverage = get_label_coverage(request.gt_spans, model_key) failures = analyse_failures(comparison, request.text) else: comparison = {"TP": [], "FP": [], "FN": []} metrics = [] coverage = {"in_scope": [], "out_of_scope": []} failures = {"missed": [], "false_positives": []} per_model[model_key] = { "predictions": predictions, "comparison": comparison, "metrics": metrics, "coverage": coverage, "failures": failures, "timed_out": timed_out, "error": model_error, } elapsed = round(time.time() - t0, 2) any_timeout = any(v["timed_out"] for v in per_model.values()) return JSONResponse(content={ "per_model": per_model, "has_gt": has_gt, "elapsed": elapsed, "any_timeout": any_timeout, }) except Exception as e: raise HTTPException(status_code=500, detail=f"Evaluator scan error: {str(e)}") @app.post("/api/evaluator/scan/stream") async def evaluator_scan_stream(request: EvaluatorScanRequest): """ SSE streaming version of /api/evaluator/scan. Emits one Server-Sent Event per model as results arrive, so the frontend can show live per-model progress rather than waiting for all models. Event format (JSON): data: {"model_key": ..., "done": false, "status": "running"} data: {"model_key": ..., "done": true, "payload": {...}, "elapsed": ...} data: {"done": true, "all_complete": true} """ import concurrent.futures has_gt = bool(request.gt_spans) loop = asyncio.get_event_loop() async def event_generator(): t0 = time.time() always_on = {"regex", "nltk", "spacy", "presidio", "gliner", "deberta"} def _run_one_key(key: str): payload = classifier.scan_with_models(request.text, [key]) return key, payload.get(key, {"detections": [], "error": "No result", "timed_out": False}) with concurrent.futures.ThreadPoolExecutor(max_workers=4) as pool: future_map = {pool.submit(_run_one_key, key): key for key in request.model_keys} # Yield start event for each model for key in request.model_keys: yield f'data: {{"model_key": "{key}", "done": false, "status": "running"}}\n\n' for f in concurrent.futures.as_completed(future_map, timeout=110): try: key, raw = await loop.run_in_executor(None, f.result) predictions = raw["detections"] for p in predictions: p["canonical"] = norm_model_out(p.get("label", "")) if has_gt and not raw["timed_out"]: comparison = compare_spans(predictions, request.gt_spans, key) metrics = compute_metrics(comparison) coverage = get_label_coverage(request.gt_spans, key) failures = analyse_failures(comparison, request.text) else: comparison = {"TP": [], "FP": [], "FN": []} metrics = []; coverage = {"in_scope": [], "out_of_scope": []}; failures = {"missed": [], "false_positives": []} result_payload = { "predictions": predictions, "comparison": comparison, "metrics": metrics, "coverage": coverage, "failures": failures, "timed_out": raw["timed_out"], "error": raw["error"], } event = json.dumps({"model_key": key, "done": True, "payload": result_payload, "elapsed": round(time.time() - t0, 2)}) yield f'data: {event}\n\n' except Exception as e: err_event = json.dumps({"model_key": future_map[f], "done": True, "error": str(e), "timed_out": True}) yield f'data: {err_event}\n\n' yield f'data: {{"done": true, "all_complete": true, "total_elapsed": {round(time.time() - t0, 2)}}}\n\n' return StreamingResponse( event_generator(), media_type="text/event-stream", headers={"Cache-Control": "no-cache", "X-Accel-Buffering": "no"}, ) @app.post("/api/evaluator/batch") async def evaluator_batch( file: UploadFile = File(...), format: str = Form("nemotron"), n_docs: int = Form(50), model_keys: str = Form("deberta"), conf_threshold: float = Form(0.5), entropy_threshold: float = Form(4.5), ): """ Batch evaluation over N documents from a labeled file. Returns aggregate metrics + per-doc breakdown for each selected model. model_keys: comma-separated string e.g. 'deberta,piiranha' """ try: content = await file.read() keys = [k.strip() for k in model_keys.split(",") if k.strip()] n_docs = max(1, min(n_docs, 1000)) agg: Dict[str, Dict[str, int]] = {k: {"TP": 0, "FP": 0, "FN": 0} for k in keys} per_doc: List[Dict[str, Any]] = [] for i in range(n_docs): try: text, gt_spans, has_gt, doc_count, _ = detect_and_parse( content, file.filename or "", i, None, format ) if i >= doc_count: break if not has_gt: continue model_results = classifier.scan_with_models(text, keys) doc_entry: Dict[str, Any] = {"doc_index": i} for key, predictions in model_results.items(): for p in predictions: p["canonical"] = norm_model_out(p.get("label", "")) comp = compare_spans(predictions, gt_spans, key) tp, fp, fn = len(comp["TP"]), len(comp["FP"]), len(comp["FN"]) agg[key]["TP"] += tp agg[key]["FP"] += fp agg[key]["FN"] += fn prec = tp / (tp + fp) if (tp + fp) else 0.0 rec = tp / (tp + fn) if (tp + fn) else 0.0 f1 = 2 * prec * rec / (prec + rec) if (prec + rec) else 0.0 doc_entry[key] = { "f1": round(f1, 4), "precision": round(prec, 4), "recall": round(rec, 4), "tp": tp, "fp": fp, "fn": fn, } per_doc.append(doc_entry) except Exception as doc_err: per_doc.append({"doc_index": i, "error": str(doc_err)}) # Aggregate metrics per model aggregate: Dict[str, Any] = {} for key, c in agg.items(): tp, fp, fn = c["TP"], c["FP"], c["FN"] p = tp / (tp + fp) if (tp + fp) else 0.0 r = tp / (tp + fn) if (tp + fn) else 0.0 f = 2 * p * r / (p + r) if (p + r) else 0.0 aggregate[key] = { "f1": round(f, 4), "precision": round(p, 4), "recall": round(r, 4), "tp": tp, "fp": fp, "fn": fn, } return JSONResponse(content={ "aggregate": aggregate, "per_doc": per_doc, "n_docs_evaluated": len(per_doc), }) except Exception as e: raise HTTPException(status_code=500, detail=f"Batch eval error: {str(e)}") # ==================== HEALTH CHECK ==================== @app.get("/") async def root(): """API health check""" return { "message": "Segmento Sense API", "status": "operational", "version": "1.0.0" } @app.get("/health") async def health_check(): """Detailed health check""" return { "status": "healthy", "classifiers": { "regex": True, "nltk": True, "spacy": True, "presidio": True, "gliner": True, "deberta": True } } # ==================== CONNECTOR CATALOG ENDPOINTS ==================== @app.get("/api/connector/file-catalog") async def get_file_catalog_endpoint(connector_type: str, uid: str = "default_uid"): try: from db.supabase_client import get_file_catalog, get_scan_sessions files = get_file_catalog(uid, connector_type) sessions = get_scan_sessions(uid, connector_type) completed_sessions = [s for s in sessions if s.get("status") == "completed"] last_session = completed_sessions[0] if completed_sessions else None return {"files": files, "last_session": last_session, "sessions": sessions} except Exception as e: raise HTTPException(status_code=500, detail=str(e)) @app.get("/api/connector/scan-sessions") async def get_scan_sessions_endpoint(connector_type: str, uid: str = "default_uid"): try: from db.supabase_client import get_scan_sessions sessions = get_scan_sessions(uid, connector_type) return {"sessions": sessions} except Exception as e: raise HTTPException(status_code=500, detail=str(e)) if __name__ == "__main__": import uvicorn uvicorn.run(app, host="0.0.0.0", port=7860) # HuggingFace Spaces default port