""" Industrial Defect Detection with Gradio for HuggingFace Spaces. Provides both UI and API endpoints through Gradio's native API system. Includes: - YOLO-based defect detection models (Data Matrix, Tire Cord, etc.) - Zero-shot anomaly detection via AdaCLIP (no training data required) """ import gradio as gr import onnxruntime as ort import numpy as np import cv2 from huggingface_hub import hf_hub_download import os import logging from collections import defaultdict from datetime import datetime, timedelta import time import tempfile # Configure logging logging.basicConfig(level=logging.INFO) logger = logging.getLogger(__name__) # Rate limiting configuration RATE_LIMIT_REQUESTS = 100 # Max requests per window RATE_LIMIT_WINDOW = 60 # Window in seconds (1 minute) request_tracker = defaultdict(list) def check_rate_limit(identifier="global"): """Simple rate limiting - allows RATE_LIMIT_REQUESTS per RATE_LIMIT_WINDOW""" current_time = time.time() cutoff_time = current_time - RATE_LIMIT_WINDOW # Remove old requests request_tracker[identifier] = [ req_time for req_time in request_tracker[identifier] if req_time > cutoff_time ] # Check if over limit if len(request_tracker[identifier]) >= RATE_LIMIT_REQUESTS: logger.warning(f"Rate limit exceeded for {identifier}") return False # Add current request request_tracker[identifier].append(current_time) return True def extract_bboxes_from_heatmap(heatmap_path: str, orig_w: int, orig_h: int, threshold: float = 0.5): """Extract bounding boxes from AdaCLIP heatmap image.""" try: heatmap = cv2.imread(heatmap_path) if heatmap is None: return [] if len(heatmap.shape) == 3: heatmap_gray = cv2.cvtColor(heatmap, cv2.COLOR_BGR2GRAY) else: heatmap_gray = heatmap heatmap_resized = cv2.resize(heatmap_gray, (orig_w, orig_h)) heatmap_norm = heatmap_resized.astype(np.float32) / 255.0 binary_threshold = threshold * 0.5 _, binary_mask = cv2.threshold(heatmap_norm, binary_threshold, 1.0, cv2.THRESH_BINARY) binary_mask = (binary_mask * 255).astype(np.uint8) contours, _ = cv2.findContours(binary_mask, cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_SIMPLE) bboxes = [] min_area = (orig_w * orig_h) * 0.001 for contour in contours: area = cv2.contourArea(contour) if area < min_area: continue x, y, w, h = cv2.boundingRect(contour) roi = heatmap_norm[y:y+h, x:x+w] confidence = float(np.mean(roi)) if roi.size > 0 else 0.5 bboxes.append({ "x1": float(x), "y1": float(y), "x2": float(x + w), "y2": float(y + h), "confidence": confidence }) return bboxes except Exception as e: logger.error(f"Error extracting bboxes from heatmap: {e}") return [] def run_clip_anomaly_inference(image_bytes: bytes, confidence: float = 0.5): """ Run zero-shot anomaly detection using CLIP similarity scoring. This uses CLIP to compare image patches against "normal" vs "defect" descriptions. Simple but effective for general anomaly detection. """ try: from transformers import CLIPProcessor, CLIPModel from PIL import Image import torch import io # Load image image = Image.open(io.BytesIO(image_bytes)) orig_w, orig_h = image.size # Initialize model and processor (cached after first load) if not hasattr(run_clip_anomaly_inference, 'processor'): logger.info("Loading CLIP model (first time only)...") run_clip_anomaly_inference.processor = CLIPProcessor.from_pretrained("openai/clip-vit-base-patch32") run_clip_anomaly_inference.model = CLIPModel.from_pretrained("openai/clip-vit-base-patch32") logger.info("CLIP model loaded successfully") processor = run_clip_anomaly_inference.processor model = run_clip_anomaly_inference.model # Text descriptions for anomaly detection text_descriptions = [ "a photo of a normal product without defects", "a photo of a defective product with anomalies", "a photo with cracks or scratches", "a photo with damage or imperfections" ] # Process inputs inputs = processor( text=text_descriptions, images=image, return_tensors="pt", padding=True ) # Run inference with torch.no_grad(): outputs = model(**inputs) logits_per_image = outputs.logits_per_image probs = logits_per_image.softmax(dim=1) # Get anomaly probability (sum of defect-related classes) anomaly_prob = float(probs[0][1:].sum()) # Skip "normal" class detections = [] # If anomaly detected, create detection box if anomaly_prob >= confidence: # Create a detection for the whole image # In a real scenario, you'd segment the anomalous region detections.append({ "bbox": [0, 0, orig_w, orig_h], "confidence": anomaly_prob, "class_id": 0, "class_name": "anomaly", "x1": 0, "y1": 0, "x2": orig_w, "y2": orig_h, "anomaly_score": anomaly_prob, "model_type": "clip", "description": "CLIP-based anomaly detection" }) logger.info(f"CLIP anomaly score: {anomaly_prob:.3f}, detections: {len(detections)}") return detections, anomaly_prob except Exception as e: logger.error(f"CLIP inference error: {e}") import traceback logger.error(traceback.format_exc()) return [], 0.0 def run_owlvit_inference(image_bytes: bytes, text_queries: list = None, confidence: float = 0.1): """ Run zero-shot object detection using OWL-ViT (Open World Localization - Vision Transformer). OWL-ViT is Google's zero-shot object detection model that can detect objects based on text descriptions without any training. Args: image_bytes: Image as bytes text_queries: List of text descriptions to detect (e.g., ["defect", "crack", "scratch"]) confidence: Confidence threshold for detections Returns: List of detections with bounding boxes """ try: from transformers import Owlv2Processor, Owlv2ForObjectDetection from PIL import Image import torch import io if text_queries is None: text_queries = ["a defect", "an anomaly", "a crack", "a scratch", "damage"] # Load image image = Image.open(io.BytesIO(image_bytes)) orig_w, orig_h = image.size # Initialize model and processor (cached after first load) if not hasattr(run_owlvit_inference, 'processor'): logger.info("Loading OWL-ViT model (first time only)...") run_owlvit_inference.processor = Owlv2Processor.from_pretrained("google/owlv2-base-patch16-ensemble") run_owlvit_inference.model = Owlv2ForObjectDetection.from_pretrained("google/owlv2-base-patch16-ensemble") logger.info("OWL-ViT model loaded successfully") processor = run_owlvit_inference.processor model = run_owlvit_inference.model # Prepare inputs inputs = processor(text=text_queries, images=image, return_tensors="pt") # Run inference with torch.no_grad(): outputs = model(**inputs) # Process results target_sizes = torch.Tensor([image.size[::-1]]) # (height, width) results = processor.post_process_object_detection( outputs=outputs, threshold=confidence, target_sizes=target_sizes )[0] detections = [] boxes = results["boxes"].cpu().numpy() scores = results["scores"].cpu().numpy() labels = results["labels"].cpu().numpy() for box, score, label in zip(boxes, scores, labels): x1, y1, x2, y2 = box detections.append({ "bbox": [float(x1), float(y1), float(x2), float(y2)], "confidence": float(score), "class_id": int(label), "class_name": text_queries[label] if label < len(text_queries) else "object", "x1": float(x1), "y1": float(y1), "x2": float(x2), "y2": float(y2), "text_query": text_queries[label] if label < len(text_queries) else "object", "model_type": "owlvit" }) logger.info(f"OWL-ViT detected {len(detections)} objects") return detections except Exception as e: logger.error(f"OWL-ViT inference error: {e}") import traceback logger.error(traceback.format_exc()) return [] # Available models MODELS = { "dental-implant": {"name": "Dental Implant", "repo": "smartfalcon-ai/Dental-Implant-Defect-Detection", "type": "yolo"}, "data-matrix": {"name": "Data Matrix", "repo": "smartfalcon-ai/Data-Matrix-Defect-Detection", "type": "yolo"}, "ball-pen": {"name": "Ball Pen", "repo": "smartfalcon-ai/Ball-Pen-Defect-Detection", "type": "yolo"}, "knit-up": {"name": "Knit Up", "repo": "smartfalcon-ai/Knit-Up-Defect-Detection", "type": "yolo"}, "knit-back": {"name": "Knit Back", "repo": "smartfalcon-ai/Knit-Back-Defect-Detection", "type": "yolo"}, "jean-back": {"name": "Jean Back", "repo": "smartfalcon-ai/Jean-Back-Defect-Detection", "type": "yolo"}, "jean-up": {"name": "Jean Up", "repo": "smartfalcon-ai/Jean-Up-Defect-Detection", "type": "yolo"}, "tire-cord": {"name": "Tire Cord", "repo": "smartfalcon-ai/Tire-Cord-Defect-Detection", "type": "yolo"}, # Zero-shot models (no training data required - run locally) "zero-shot-clip": { "name": "Zero Shot (CLIP)", "type": "clip", "description": "Zero-shot anomaly detection using CLIP - fast and reliable" }, "zero-shot-owlvit": { "name": "Zero Shot (OWL-ViT)", "type": "owlvit", "description": "Zero-shot object detection using Google's OWL-ViT - detects objects based on text descriptions" }, } # AdaCLIP configuration ADACLIP_CLASS_NAME = os.environ.get("ADACLIP_CLASS_NAME", "object") # Example images for Gradio EXAMPLES = [ # Dental Implant ["examples/dental-implant-1.jpg", "Dental Implant", 0.25], ["examples/dental-implant-2.jpg", "Dental Implant", 0.25], ["examples/dental-implant-3.jpg", "Dental Implant", 0.25], # Data Matrix ["examples/data-matrix-1.jpg", "Data Matrix", 0.25], ["examples/data-matrix-2.jpg", "Data Matrix", 0.25], ["examples/data-matrix-3.jpg", "Data Matrix", 0.25], # Ball Pen ["examples/ball-pen-1.jpg", "Ball Pen", 0.25], ["examples/ball-pen-2.jpg", "Ball Pen", 0.25], ["examples/ball-pen-3.jpg", "Ball Pen", 0.25], # Knit Up ["examples/knit-up-1.jpg", "Knit Up", 0.25], ["examples/knit-up-2.jpg", "Knit Up", 0.25], ["examples/knit-up-3.jpg", "Knit Up", 0.25], # Knit Back ["examples/knit-back-1.jpg", "Knit Back", 0.25], ["examples/knit-back-2.jpg", "Knit Back", 0.25], ["examples/knit-back-3.jpg", "Knit Back", 0.25], # Jean Back ["examples/jean-back-1.jpg", "Jean Back", 0.25], ["examples/jean-back-2.jpg", "Jean Back", 0.25], ["examples/jean-back-3.jpg", "Jean Back", 0.25], # Jean Up ["examples/jean-up-1.jpg", "Jean Up", 0.25], ["examples/jean-up-2.jpg", "Jean Up", 0.25], ["examples/jean-up-3.jpg", "Jean Up", 0.25], # Tire Cord ["examples/tire-cord-1.jpg", "Tire Cord", 0.25], ["examples/tire-cord-2.jpg", "Tire Cord", 0.25], ["examples/tire-cord-3.jpg", "Tire Cord", 0.25], ] # Model sessions cache sessions = {} # Default model DEFAULT_MODEL = os.environ.get("DEFAULT_MODEL", "data-matrix") # Inference parameters IMG_SIZE = 640 IOU_THRESHOLD = 0.45 def get_session(model_key: str): """Get or create ONNX inference session for a YOLO model.""" if model_key not in sessions: if model_key not in MODELS: raise ValueError(f"Model '{model_key}' not found. Available: {list(MODELS.keys())}") model_config = MODELS[model_key] # Skip ONNX loading for non-YOLO models (like AdaCLIP) if model_config.get("type") != "yolo": return None try: hf_token = os.environ.get("HUGGINGFACE_TOKEN", None) repo_id = model_config["repo"] logger.info(f"Downloading model: {repo_id}") model_path = hf_hub_download( repo_id=repo_id, filename="best.onnx", token=hf_token ) sessions[model_key] = ort.InferenceSession( model_path, providers=["CPUExecutionProvider"] ) logger.info(f"Model '{model_key}' loaded successfully") except Exception as e: logger.error(f"Failed to load model '{model_key}': {e}") raise return sessions.get(model_key) def preprocess(img): """Preprocess image for ONNX model.""" h, w = img.shape[:2] img_resized = cv2.resize(img, (IMG_SIZE, IMG_SIZE)) img_resized = img_resized.astype(np.float32) / 255.0 img_resized = img_resized.transpose(2, 0, 1) img_resized = np.expand_dims(img_resized, 0) return img_resized, w, h def xywh2xyxy(x): """Convert box format from xywh to xyxy.""" y = np.copy(x) y[:, 0] = x[:, 0] - x[:, 2] / 2 y[:, 1] = x[:, 1] - x[:, 3] / 2 y[:, 2] = x[:, 0] + x[:, 2] / 2 y[:, 3] = x[:, 1] + x[:, 3] / 2 return y def non_max_suppression(preds, conf_thres=0.25, iou_thres=0.45): """Apply NMS to predictions.""" preds = preds[0] preds = preds[preds[:, 4] > conf_thres] if preds.shape[0] == 0: return [] boxes = xywh2xyxy(preds[:, :4]) scores = preds[:, 4] class_scores = preds[:, 5:] cls_ids = np.argmax(class_scores, axis=1) cls_conf = class_scores.max(axis=1) final_scores = scores * cls_conf indices = cv2.dnn.NMSBoxes( bboxes=boxes.tolist(), scores=final_scores.tolist(), score_threshold=conf_thres, nms_threshold=iou_thres ) if len(indices) == 0: return [] indices = indices.flatten() output = [] for idx in indices: x1, y1, x2, y2 = boxes[idx] output.append({ "bbox": [float(x1), float(y1), float(x2), float(y2)], "confidence": float(final_scores[idx]), "class_id": int(cls_ids[idx]), "x1": float(x1), "y1": float(y1), "x2": float(x2), "y2": float(y2) }) return output def gradio_inference(image, model_display_name, conf_threshold): """Inference function for Gradio UI - returns annotated image.""" # Rate limiting if not check_rate_limit("ui"): logger.warning("Rate limit exceeded for UI") # Return image with watermark showing rate limit if image is not None: img_bgr = cv2.cvtColor(image, cv2.COLOR_RGB2BGR) if len(image.shape) == 3 else image cv2.putText(img_bgr, "RATE LIMIT EXCEEDED", (50, 50), cv2.FONT_HERSHEY_SIMPLEX, 1, (0, 0, 255), 2) return cv2.cvtColor(img_bgr, cv2.COLOR_BGR2RGB) if len(img_bgr.shape) == 3 else img_bgr return image # Find model key from display name model_key = None for key, val in MODELS.items(): if val["name"] == model_display_name: model_key = key break if model_key is None: return image if image is None: return None img_bgr = cv2.cvtColor(image, cv2.COLOR_RGB2BGR) orig_h, orig_w = img_bgr.shape[:2] model_config = MODELS[model_key] model_type = model_config.get("type", "yolo") # Handle CLIP (zero-shot anomaly detection) if model_type == "clip": _, img_encoded = cv2.imencode('.jpg', img_bgr) image_bytes = img_encoded.tobytes() detections, anomaly_score = run_clip_anomaly_inference(image_bytes, confidence=conf_threshold) for det in detections: x1 = int(det["x1"]) y1 = int(det["y1"]) x2 = int(det["x2"]) y2 = int(det["y2"]) score = det["confidence"] label = f"anomaly:{score:.2f}" cv2.rectangle(img_bgr, (x1, y1), (x2, y2), (0, 0, 255), 2) # Red for anomalies cv2.putText(img_bgr, label, (x1, y1 - 5), cv2.FONT_HERSHEY_SIMPLEX, 0.6, (0, 0, 255), 2) return cv2.cvtColor(img_bgr, cv2.COLOR_BGR2RGB) # Handle OWL-ViT (zero-shot object detection) if model_type == "owlvit": _, img_encoded = cv2.imencode('.jpg', img_bgr) image_bytes = img_encoded.tobytes() detections = run_owlvit_inference(image_bytes, confidence=conf_threshold) for det in detections: x1 = int(det["x1"]) y1 = int(det["y1"]) x2 = int(det["x2"]) y2 = int(det["y2"]) score = det["confidence"] class_name = det.get("class_name", "object") label = f"{class_name}:{score:.2f}" cv2.rectangle(img_bgr, (x1, y1), (x2, y2), (255, 0, 0), 2) # Blue for OWL-ViT cv2.putText(img_bgr, label, (x1, y1 - 5), cv2.FONT_HERSHEY_SIMPLEX, 0.6, (255, 0, 0), 2) return cv2.cvtColor(img_bgr, cv2.COLOR_BGR2RGB) # Handle YOLO models (default) session = get_session(model_key) if session is None: return image blob, _, _ = preprocess(img_bgr) preds = session.run(None, {"images": blob})[0] detections = non_max_suppression(preds, conf_threshold, IOU_THRESHOLD) for det in detections: x1 = int(det["x1"] / IMG_SIZE * orig_w) y1 = int(det["y1"] / IMG_SIZE * orig_h) x2 = int(det["x2"] / IMG_SIZE * orig_w) y2 = int(det["y2"] / IMG_SIZE * orig_h) score = det["confidence"] cls_id = det["class_id"] label = f"{cls_id}:{score:.2f}" cv2.rectangle(img_bgr, (x1, y1), (x2, y2), (0, 255, 0), 2) cv2.putText(img_bgr, label, (x1, y1 - 5), cv2.FONT_HERSHEY_SIMPLEX, 0.6, (0, 255, 0), 2) return cv2.cvtColor(img_bgr, cv2.COLOR_BGR2RGB) def api_inference(image, model_display_name, conf_threshold): """ API inference function - returns JSON detections. This function is accessible via Gradio's API at /api/detect """ # Rate limiting for API if not check_rate_limit("api"): logger.warning("Rate limit exceeded for API") return {"error": "Rate limit exceeded. Max 100 requests per minute."} # Find model key from display name model_key = None for key, val in MODELS.items(): if val["name"] == model_display_name: model_key = key break if model_key is None: return [] if image is None: return [] img_bgr = cv2.cvtColor(image, cv2.COLOR_RGB2BGR) orig_h, orig_w = img_bgr.shape[:2] model_config = MODELS[model_key] model_type = model_config.get("type", "yolo") # Handle CLIP (zero-shot anomaly detection) if model_type == "clip": _, img_encoded = cv2.imencode('.jpg', img_bgr) image_bytes = img_encoded.tobytes() detections, anomaly_score = run_clip_anomaly_inference(image_bytes, confidence=conf_threshold) return detections # Handle OWL-ViT (zero-shot object detection) if model_type == "owlvit": _, img_encoded = cv2.imencode('.jpg', img_bgr) image_bytes = img_encoded.tobytes() detections = run_owlvit_inference(image_bytes, confidence=conf_threshold) return detections # Handle YOLO models (default) session = get_session(model_key) if session is None: return [] blob, _, _ = preprocess(img_bgr) preds = session.run(None, {"images": blob})[0] detections = non_max_suppression(preds, conf_threshold, IOU_THRESHOLD) # Scale bboxes back to original image size for det in detections: det["bbox"][0] = det["bbox"][0] / IMG_SIZE * orig_w det["bbox"][1] = det["bbox"][1] / IMG_SIZE * orig_h det["bbox"][2] = det["bbox"][2] / IMG_SIZE * orig_w det["bbox"][3] = det["bbox"][3] / IMG_SIZE * orig_h det["x1"] = det["bbox"][0] det["y1"] = det["bbox"][1] det["x2"] = det["bbox"][2] det["y2"] = det["bbox"][3] return detections # Create Gradio interface with both UI and API with gr.Blocks(title="Industrial Defect Detection") as demo: gr.Markdown("# Industrial Defect Detection") gr.Markdown(""" **Visual Testing Interface** for Industrial Defect Detection models. **Available Models:** - **YOLO Models**: Data Matrix, Tire Cord, Dental Implant, etc. (trained on specific defects) - **Zero Shot (Anomaly)**: Detects anomalies on ANY product without training data! - **For API Use:** This Space provides API endpoints accessible via `/api/predict` - **For Visual Testing:** Use the interface below to test models visually - **Rate Limiting:** Maximum 100 requests per minute to prevent abuse Upload an image, select a model, and adjust the confidence threshold. Note: Zero Shot may take longer (~30-60 seconds) as it calls an external model. """) with gr.Row(): with gr.Column(): input_image = gr.Image(type="numpy", label="Upload Image") model_dropdown = gr.Dropdown( choices=[v["name"] for v in MODELS.values()], label="Select Model", value="Data Matrix" ) conf_slider = gr.Slider( minimum=0.0, maximum=1.0, value=0.25, step=0.01, label="Confidence Threshold" ) submit_btn = gr.Button("Detect Defects", variant="primary") with gr.Column(): output_image = gr.Image(type="numpy", label="Detection Results") submit_btn.click( fn=gradio_inference, inputs=[input_image, model_dropdown, conf_slider], outputs=output_image, api_name="predict" # Creates /api/predict endpoint (returns image) ) # Hidden interface for JSON API (for MonitaQC compatibility) with gr.Row(visible=False): json_image = gr.Image(type="numpy") json_model = gr.Dropdown(choices=[v["name"] for v in MODELS.values()]) json_conf = gr.Slider(minimum=0.0, maximum=1.0, value=0.25) json_output = gr.JSON() json_btn = gr.Button("JSON Detect") json_btn.click( fn=api_inference, inputs=[json_image, json_model, json_conf], outputs=json_output, api_name="detect" # Creates /api/detect endpoint (returns JSON) ) gr.Markdown("### Example Images") gr.Examples( examples=EXAMPLES, inputs=[input_image, model_dropdown, conf_slider], outputs=output_image, fn=gradio_inference, cache_examples=True, examples_per_page=24 ) gr.Markdown(""" ### API Access This Space provides two API endpoints: **1. Image API** (returns annotated image): - **Endpoint**: `/api/predict` - **Returns**: Annotated image with bounding boxes **2. JSON API** (returns detection data - for MonitaQC): - **Endpoint**: `/api/detect` - **Returns**: JSON array of detections with bboxes and confidence **Rate Limiting:** - Maximum 100 requests per minute per endpoint - Exceeding the limit returns an error response **Python Example (Image):** ```python from gradio_client import Client client = Client("smartfalcon-ai/Industrial-Defect-Detection") result = client.predict( "path/to/image.jpg", "Data Matrix", 0.25, api_name="/predict" ) ``` **Python Example (JSON - for MonitaQC):** ```python from gradio_client import Client client = Client("smartfalcon-ai/Industrial-Defect-Detection") detections = client.predict( "path/to/image.jpg", "Data Matrix", 0.25, api_name="/detect" ) # Returns: [{"bbox": [x1, y1, x2, y2], "confidence": 0.85, "class_id": 0, ...}] ``` **Available Models:** - Data Matrix - Dental Implant - Ball Pen - Knit Up - Knit Back - Jean Up - Jean Back - Tire Cord - **Zero Shot (Anomaly)** - Works on any product without training! """) # Hidden interface for models API (for MonitaQC compatibility) with gr.Row(visible=False): models_btn = gr.Button("Get Models") models_output = gr.JSON() health_btn = gr.Button("Health Check") health_output = gr.JSON() def get_models(): """Return list of available models.""" return { "models": [model_info["name"] for model_info in MODELS.values()], "count": len(MODELS) } def health_check(): """Health check endpoint for monitoring.""" return { "status": "healthy", "service": "Gradio Inference (HuggingFace)", "models_loaded": len(sessions), "available_models": len(MODELS), "timestamp": datetime.now().isoformat() } models_btn.click( fn=get_models, inputs=[], outputs=models_output, api_name="models" # Creates /api/models endpoint ) health_btn.click( fn=health_check, inputs=[], outputs=health_output, api_name="health" # Creates /api/health endpoint ) # Launch the app if __name__ == "__main__": demo.launch()