""" UnifIDE AI Detector — v2 (4-class UniXcoder) Deploys to HuggingFace Spaces (Docker SDK) or any server with enough RAM. Model: microsoft/unixcoder-base fine-tuned on 4 classes: human | ai_pure | ai_assisted | ai_adversarial AI% = sum of (ai_pure + ai_assisted + ai_adversarial) probabilities × 100 Response format is backward-compatible with the old binary detector. """ import os import json import pickle import numpy as np import torch from torch import nn from fastapi import FastAPI, HTTPException from fastapi.middleware.cors import CORSMiddleware from pydantic import BaseModel from transformers import AutoTokenizer, AutoModel from huggingface_hub import hf_hub_download # ── Config ──────────────────────────────────────────────────────────────────── MODEL_REPO = os.getenv("AI_MODEL_REPO", "ss123qwd/unifide-ai-detector-v2") HF_TOKEN = os.getenv("HF_TOKEN", None) OPERATE_THRESHOLD = float(os.getenv("SUBMIT_THRESHOLD", "50.0")) # 50% = more likely AI than human MIN_CODE_LEN = 150 DEVICE = torch.device("cuda" if torch.cuda.is_available() else "cpu") print(f"[AI-DETECTOR] Device: {DEVICE}") print(f"[AI-DETECTOR] Loading model from: {MODEL_REPO}") # ── Model architecture (must match training notebook exactly) ───────────────── class UniXcoderVariantClassifier(nn.Module): def __init__(self, model_name, num_classes, dropout=0.1): super().__init__() self.encoder = AutoModel.from_pretrained(model_name) self.dropout = nn.Dropout(dropout) self.classifier = nn.Linear( self.encoder.config.hidden_size, num_classes ) def forward(self, input_ids, attention_mask): out = self.encoder(input_ids=input_ids, attention_mask=attention_mask) cls_emb = out.last_hidden_state[:, 0, :] return self.classifier(self.dropout(cls_emb)) # ── Load supporting files from HuggingFace repo ─────────────────────────────── def download(filename): return hf_hub_download( repo_id=MODEL_REPO, filename=filename, token=HF_TOKEN, ) print("[AI-DETECTOR] Downloading variant encoder...") enc_path = download("variant_enc.pkl") with open(enc_path, "rb") as f: variant_enc = pickle.load(f) VARIANT_CLASSES = list(variant_enc.classes_) NUM_CLASSES = len(VARIANT_CLASSES) AI_INDICES = [i for i, c in enumerate(VARIANT_CLASSES) if c != "human"] print(f"[AI-DETECTOR] Classes: {VARIANT_CLASSES}") print(f"[AI-DETECTOR] AI indices: {AI_INDICES}") # Load threshold (but use env var override if set) try: threshold_path = download("threshold.json") with open(threshold_path) as f: saved = json.load(f) TUNED_THRESHOLD = float(saved.get("submit_threshold", 75.3)) print(f"[AI-DETECTOR] Tuned threshold from file: {TUNED_THRESHOLD}%") print(f"[AI-DETECTOR] Operating threshold (used): {OPERATE_THRESHOLD}%") except Exception as e: print(f"[AI-DETECTOR] WARN: could not load threshold.json: {e}") TUNED_THRESHOLD = 75.3 # ── Load tokenizer ──────────────────────────────────────────────────────────── print("[AI-DETECTOR] Loading tokenizer...") tokenizer = AutoTokenizer.from_pretrained(MODEL_REPO, token=HF_TOKEN) print("[AI-DETECTOR] Tokenizer loaded.") # ── Load model weights ──────────────────────────────────────────────────────── print("[AI-DETECTOR] Loading model weights (this takes ~30s)...") model = UniXcoderVariantClassifier( "microsoft/unixcoder-base", NUM_CLASSES, dropout=0.1 ).to(DEVICE) weights_path = download("best_model.pt") model.load_state_dict( torch.load(weights_path, map_location=DEVICE) ) model.eval() print("[AI-DETECTOR] Model ready.") # ── Inference ───────────────────────────────────────────────────────────────── def compute_ai_pct(proba: np.ndarray) -> float: return float(np.float64(sum(float(proba[i]) for i in AI_INDICES)) * 100) @torch.no_grad() def predict(code: str, language: str) -> dict: """ Run 4-class inference and return a result dict. Confidence gate: HIGH confidence + AI% > threshold → flag_ai MEDIUM/LOW confidence + AI% > threshold → pending_review AI% <= threshold → pass """ if len(code.strip()) < MIN_CODE_LEN: return { "ai_probability": 0.0, "human_probability": 1.0, "ai_percent": 0.0, "label": "Too Short", "action": "pending_review", "confidence": None, "variant": None, "breakdown": None, "warning": f"Code too short (< {MIN_CODE_LEN} chars). Manual review recommended.", } enc = tokenizer( code, max_length=512, truncation=True, padding=True, return_tensors="pt", ) logits = model( enc["input_ids"].to(DEVICE), enc["attention_mask"].to(DEVICE), ) proba = torch.softmax(logits, dim=-1).cpu().float().numpy()[0] ai_pct = compute_ai_pct(proba) variant = variant_enc.inverse_transform([proba.argmax()])[0] # Confidence band max_prob = float(proba.max()) if max_prob >= 0.70: confidence = "high" elif max_prob >= 0.45: confidence = "medium" else: confidence = "low" # Confidence gate above_threshold = ai_pct > OPERATE_THRESHOLD if not above_threshold: action = "pass" elif confidence == "high": action = "flag_ai" else: action = "pending_review" human_prob = float(proba[VARIANT_CLASSES.index("human")]) ai_prob = 1.0 - human_prob return { # ── Backward-compatible fields (same as old binary detector) ────── "ai_probability": round(ai_prob, 4), "human_probability": round(human_prob, 4), "ai_percent": round(ai_pct, 2), "label": "AI-Generated" if action == "flag_ai" else "Human-Written", # ── New fields for richer UI ────────────────────────────────────── "action": action, # pass | flag_ai | pending_review "confidence": confidence, # high | medium | low "variant": variant, # ai_pure | ai_assisted | ai_adversarial | human "breakdown": { c: round(float(p) * 100, 2) for c, p in zip(VARIANT_CLASSES, proba) }, "threshold_used": OPERATE_THRESHOLD, "tuned_threshold": TUNED_THRESHOLD, } # ── FastAPI app ──────────────────────────────────────────────────────────────── app = FastAPI(title="UnifIDE AI Detector v2") CORS_ORIGINS = os.getenv("CORS_ORIGINS", "http://localhost:5173").split(",") app.add_middleware( CORSMiddleware, allow_origins=[o.strip() for o in CORS_ORIGINS], allow_credentials=True, allow_methods=["*"], allow_headers=["*"], ) class DetectRequest(BaseModel): code: str language: str @app.get("/health") def health(): return { "ok": True, "model": MODEL_REPO, "classes": VARIANT_CLASSES, "threshold": OPERATE_THRESHOLD, } @app.post("/detect-code") def detect_code(payload: DetectRequest): code = payload.code.strip() if not code: raise HTTPException(status_code=400, detail="Code is empty.") try: result = predict(code, payload.language) except Exception as e: print("[AI-DETECTOR] Prediction error:", e) raise HTTPException(status_code=500, detail="Model prediction failed.") return result