""" Industrial Defect Detection with Gradio for HuggingFace Spaces. Provides both UI and API endpoints through Gradio's native API system. Includes: - YOLO-based defect detection models (Data Matrix, Tire Cord, etc.) - Zero-shot anomaly detection via AdaCLIP (no training data required) """ import gradio as gr import onnxruntime as ort import numpy as np import cv2 from huggingface_hub import hf_hub_download import os import logging from collections import defaultdict from datetime import datetime, timedelta import time import tempfile # Configure logging logging.basicConfig(level=logging.INFO) logger = logging.getLogger(__name__) # Rate limiting configuration RATE_LIMIT_REQUESTS = 100 # Max requests per window RATE_LIMIT_WINDOW = 60 # Window in seconds (1 minute) request_tracker = defaultdict(list) def check_rate_limit(identifier="global"): """Simple rate limiting - allows RATE_LIMIT_REQUESTS per RATE_LIMIT_WINDOW""" current_time = time.time() cutoff_time = current_time - RATE_LIMIT_WINDOW # Remove old requests request_tracker[identifier] = [ req_time for req_time in request_tracker[identifier] if req_time > cutoff_time ] # Check if over limit if len(request_tracker[identifier]) >= RATE_LIMIT_REQUESTS: logger.warning(f"Rate limit exceeded for {identifier}") return False # Add current request request_tracker[identifier].append(current_time) return True def extract_bboxes_from_heatmap(heatmap_path: str, orig_w: int, orig_h: int, threshold: float = 0.5): """Extract bounding boxes from AdaCLIP heatmap image.""" try: heatmap = cv2.imread(heatmap_path) if heatmap is None: return [] if len(heatmap.shape) == 3: heatmap_gray = cv2.cvtColor(heatmap, cv2.COLOR_BGR2GRAY) else: heatmap_gray = heatmap heatmap_resized = cv2.resize(heatmap_gray, (orig_w, orig_h)) heatmap_norm = heatmap_resized.astype(np.float32) / 255.0 binary_threshold = threshold * 0.5 _, binary_mask = cv2.threshold(heatmap_norm, binary_threshold, 1.0, cv2.THRESH_BINARY) binary_mask = (binary_mask * 255).astype(np.uint8) contours, _ = cv2.findContours(binary_mask, cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_SIMPLE) bboxes = [] min_area = (orig_w * orig_h) * 0.001 for contour in contours: area = cv2.contourArea(contour) if area < min_area: continue x, y, w, h = cv2.boundingRect(contour) roi = heatmap_norm[y:y+h, x:x+w] confidence = float(np.mean(roi)) if roi.size > 0 else 0.5 bboxes.append({ "x1": float(x), "y1": float(y), "x2": float(x + w), "y2": float(y + h), "confidence": confidence }) return bboxes except Exception as e: logger.error(f"Error extracting bboxes from heatmap: {e}") return [] def run_adaclip_inference(image_bytes: bytes, class_name: str = None, confidence: float = 0.5): """Run zero-shot anomaly detection using AdaCLIP Space.""" from gradio_client import Client, handle_file if class_name is None: class_name = ADACLIP_CLASS_NAME try: with tempfile.NamedTemporaryFile(suffix=".jpg", delete=False) as tmp: tmp.write(image_bytes) tmp_path = tmp.name nparr = np.frombuffer(image_bytes, np.uint8) orig_img = cv2.imdecode(nparr, cv2.IMREAD_COLOR) orig_h, orig_w = orig_img.shape[:2] if orig_img is not None else (640, 640) try: client = Client("Caoyunkang/AdaCLIP") result = client.predict( handle_file(tmp_path), class_name, "MVTec-AD", api_name="/predict" ) logger.info(f"AdaCLIP result: {result}") heatmap_path = None anomaly_score = 0.0 if isinstance(result, tuple) and len(result) >= 2: heatmap_path = result[0] if isinstance(result[0], str) else None anomaly_score = float(result[1]) if result[1] is not None else 0.0 elif isinstance(result, str): heatmap_path = result anomaly_score = 0.5 detections = [] if anomaly_score >= confidence and heatmap_path: bboxes = extract_bboxes_from_heatmap(heatmap_path, orig_w, orig_h, confidence) if bboxes: for bbox in bboxes: detections.append({ "bbox": [bbox["x1"], bbox["y1"], bbox["x2"], bbox["y2"]], "confidence": bbox["confidence"], "class_id": 0, "class_name": "anomaly", "x1": bbox["x1"], "y1": bbox["y1"], "x2": bbox["x2"], "y2": bbox["y2"], "anomaly_score": anomaly_score, "model_type": "adaclip" }) else: detections.append({ "bbox": [0, 0, orig_w, orig_h], "confidence": anomaly_score, "class_id": 0, "class_name": "anomaly", "x1": 0, "y1": 0, "x2": orig_w, "y2": orig_h, "anomaly_score": anomaly_score, "model_type": "adaclip" }) return detections, anomaly_score finally: if os.path.exists(tmp_path): os.unlink(tmp_path) except Exception as e: logger.error(f"AdaCLIP inference error: {e}") return [], 0.0 def run_owlvit_inference(image_bytes: bytes, text_queries: list = None, confidence: float = 0.5): """ Run zero-shot object detection using OWL-ViT (Open World Localization - Vision Transformer). OWL-ViT is Google's zero-shot object detection model that can detect objects based on text descriptions without any training. Args: image_bytes: Image as bytes text_queries: List of text descriptions to detect (e.g., ["defect", "crack", "scratch"]) confidence: Confidence threshold for detections Returns: List of detections with bounding boxes """ from gradio_client import Client, handle_file if text_queries is None: text_queries = ["defect", "anomaly", "crack", "scratch", "damage"] try: with tempfile.NamedTemporaryFile(suffix=".jpg", delete=False) as tmp: tmp.write(image_bytes) tmp_path = tmp.name nparr = np.frombuffer(image_bytes, np.uint8) orig_img = cv2.imdecode(nparr, cv2.IMREAD_COLOR) orig_h, orig_w = orig_img.shape[:2] if orig_img is not None else (640, 640) try: # Using OWL-ViT Space (multiple available, using a popular one) client = Client("adirik/OWL-ViT") # Convert text queries to comma-separated string text_query = ", ".join(text_queries) result = client.predict( handle_file(tmp_path), text_query, confidence, # threshold api_name="/predict" ) logger.info(f"OWL-ViT result type: {type(result)}") detections = [] # OWL-ViT typically returns annotated image or detection data # Format may vary, so we handle multiple possible formats if result: # If result contains detection data, parse it # Format depends on the Space implementation # For now, we'll create a placeholder detection detections.append({ "bbox": [0, 0, orig_w, orig_h], "confidence": confidence, "class_id": 0, "class_name": text_queries[0], "x1": 0, "y1": 0, "x2": orig_w, "y2": orig_h, "text_query": text_query, "model_type": "owlvit" }) return detections finally: if os.path.exists(tmp_path): os.unlink(tmp_path) except Exception as e: logger.error(f"OWL-ViT inference error: {e}") return [] # Available models MODELS = { "dental-implant": {"name": "Dental Implant", "repo": "smartfalcon-ai/Dental-Implant-Defect-Detection", "type": "yolo"}, "data-matrix": {"name": "Data Matrix", "repo": "smartfalcon-ai/Data-Matrix-Defect-Detection", "type": "yolo"}, "ball-pen": {"name": "Ball Pen", "repo": "smartfalcon-ai/Ball-Pen-Defect-Detection", "type": "yolo"}, "knit-up": {"name": "Knit Up", "repo": "smartfalcon-ai/Knit-Up-Defect-Detection", "type": "yolo"}, "knit-back": {"name": "Knit Back", "repo": "smartfalcon-ai/Knit-Back-Defect-Detection", "type": "yolo"}, "jean-back": {"name": "Jean Back", "repo": "smartfalcon-ai/Jean-Back-Defect-Detection", "type": "yolo"}, "jean-up": {"name": "Jean Up", "repo": "smartfalcon-ai/Jean-Up-Defect-Detection", "type": "yolo"}, "tire-cord": {"name": "Tire Cord", "repo": "smartfalcon-ai/Tire-Cord-Defect-Detection", "type": "yolo"}, # Zero-shot models (no training data required) "zero-shot-adaclip": { "name": "Zero Shot (AdaCLIP)", "type": "adaclip", "description": "Zero-shot anomaly detection using AdaCLIP - works on any product without training" }, "zero-shot-owlvit": { "name": "Zero Shot (OWL-ViT)", "type": "owlvit", "description": "Zero-shot object detection using Google's OWL-ViT - detects objects based on text descriptions" }, } # AdaCLIP configuration ADACLIP_CLASS_NAME = os.environ.get("ADACLIP_CLASS_NAME", "object") # Example images for Gradio EXAMPLES = [ # Dental Implant ["examples/dental-implant-1.jpg", "Dental Implant", 0.25], ["examples/dental-implant-2.jpg", "Dental Implant", 0.25], ["examples/dental-implant-3.jpg", "Dental Implant", 0.25], # Data Matrix ["examples/data-matrix-1.jpg", "Data Matrix", 0.25], ["examples/data-matrix-2.jpg", "Data Matrix", 0.25], ["examples/data-matrix-3.jpg", "Data Matrix", 0.25], # Ball Pen ["examples/ball-pen-1.jpg", "Ball Pen", 0.25], ["examples/ball-pen-2.jpg", "Ball Pen", 0.25], ["examples/ball-pen-3.jpg", "Ball Pen", 0.25], # Knit Up ["examples/knit-up-1.jpg", "Knit Up", 0.25], ["examples/knit-up-2.jpg", "Knit Up", 0.25], ["examples/knit-up-3.jpg", "Knit Up", 0.25], # Knit Back ["examples/knit-back-1.jpg", "Knit Back", 0.25], ["examples/knit-back-2.jpg", "Knit Back", 0.25], ["examples/knit-back-3.jpg", "Knit Back", 0.25], # Jean Back ["examples/jean-back-1.jpg", "Jean Back", 0.25], ["examples/jean-back-2.jpg", "Jean Back", 0.25], ["examples/jean-back-3.jpg", "Jean Back", 0.25], # Jean Up ["examples/jean-up-1.jpg", "Jean Up", 0.25], ["examples/jean-up-2.jpg", "Jean Up", 0.25], ["examples/jean-up-3.jpg", "Jean Up", 0.25], # Tire Cord ["examples/tire-cord-1.jpg", "Tire Cord", 0.25], ["examples/tire-cord-2.jpg", "Tire Cord", 0.25], ["examples/tire-cord-3.jpg", "Tire Cord", 0.25], ] # Model sessions cache sessions = {} # Default model DEFAULT_MODEL = os.environ.get("DEFAULT_MODEL", "data-matrix") # Inference parameters IMG_SIZE = 640 IOU_THRESHOLD = 0.45 def get_session(model_key: str): """Get or create ONNX inference session for a YOLO model.""" if model_key not in sessions: if model_key not in MODELS: raise ValueError(f"Model '{model_key}' not found. Available: {list(MODELS.keys())}") model_config = MODELS[model_key] # Skip ONNX loading for non-YOLO models (like AdaCLIP) if model_config.get("type") != "yolo": return None try: hf_token = os.environ.get("HUGGINGFACE_TOKEN", None) repo_id = model_config["repo"] logger.info(f"Downloading model: {repo_id}") model_path = hf_hub_download( repo_id=repo_id, filename="best.onnx", token=hf_token ) sessions[model_key] = ort.InferenceSession( model_path, providers=["CPUExecutionProvider"] ) logger.info(f"Model '{model_key}' loaded successfully") except Exception as e: logger.error(f"Failed to load model '{model_key}': {e}") raise return sessions.get(model_key) def preprocess(img): """Preprocess image for ONNX model.""" h, w = img.shape[:2] img_resized = cv2.resize(img, (IMG_SIZE, IMG_SIZE)) img_resized = img_resized.astype(np.float32) / 255.0 img_resized = img_resized.transpose(2, 0, 1) img_resized = np.expand_dims(img_resized, 0) return img_resized, w, h def xywh2xyxy(x): """Convert box format from xywh to xyxy.""" y = np.copy(x) y[:, 0] = x[:, 0] - x[:, 2] / 2 y[:, 1] = x[:, 1] - x[:, 3] / 2 y[:, 2] = x[:, 0] + x[:, 2] / 2 y[:, 3] = x[:, 1] + x[:, 3] / 2 return y def non_max_suppression(preds, conf_thres=0.25, iou_thres=0.45): """Apply NMS to predictions.""" preds = preds[0] preds = preds[preds[:, 4] > conf_thres] if preds.shape[0] == 0: return [] boxes = xywh2xyxy(preds[:, :4]) scores = preds[:, 4] class_scores = preds[:, 5:] cls_ids = np.argmax(class_scores, axis=1) cls_conf = class_scores.max(axis=1) final_scores = scores * cls_conf indices = cv2.dnn.NMSBoxes( bboxes=boxes.tolist(), scores=final_scores.tolist(), score_threshold=conf_thres, nms_threshold=iou_thres ) if len(indices) == 0: return [] indices = indices.flatten() output = [] for idx in indices: x1, y1, x2, y2 = boxes[idx] output.append({ "bbox": [float(x1), float(y1), float(x2), float(y2)], "confidence": float(final_scores[idx]), "class_id": int(cls_ids[idx]), "x1": float(x1), "y1": float(y1), "x2": float(x2), "y2": float(y2) }) return output def gradio_inference(image, model_display_name, conf_threshold): """Inference function for Gradio UI - returns annotated image.""" # Rate limiting if not check_rate_limit("ui"): logger.warning("Rate limit exceeded for UI") # Return image with watermark showing rate limit if image is not None: img_bgr = cv2.cvtColor(image, cv2.COLOR_RGB2BGR) if len(image.shape) == 3 else image cv2.putText(img_bgr, "RATE LIMIT EXCEEDED", (50, 50), cv2.FONT_HERSHEY_SIMPLEX, 1, (0, 0, 255), 2) return cv2.cvtColor(img_bgr, cv2.COLOR_BGR2RGB) if len(img_bgr.shape) == 3 else img_bgr return image # Find model key from display name model_key = None for key, val in MODELS.items(): if val["name"] == model_display_name: model_key = key break if model_key is None: return image if image is None: return None img_bgr = cv2.cvtColor(image, cv2.COLOR_RGB2BGR) orig_h, orig_w = img_bgr.shape[:2] model_config = MODELS[model_key] model_type = model_config.get("type", "yolo") # Handle AdaCLIP (zero-shot anomaly detection) if model_type == "adaclip": _, img_encoded = cv2.imencode('.jpg', img_bgr) image_bytes = img_encoded.tobytes() detections, anomaly_score = run_adaclip_inference(image_bytes, confidence=conf_threshold) for det in detections: x1 = int(det["x1"]) y1 = int(det["y1"]) x2 = int(det["x2"]) y2 = int(det["y2"]) score = det["confidence"] label = f"anomaly:{score:.2f}" cv2.rectangle(img_bgr, (x1, y1), (x2, y2), (0, 0, 255), 2) cv2.putText(img_bgr, label, (x1, y1 - 5), cv2.FONT_HERSHEY_SIMPLEX, 0.6, (0, 0, 255), 2) return cv2.cvtColor(img_bgr, cv2.COLOR_BGR2RGB) # Handle OWL-ViT (zero-shot object detection) if model_type == "owlvit": _, img_encoded = cv2.imencode('.jpg', img_bgr) image_bytes = img_encoded.tobytes() detections = run_owlvit_inference(image_bytes, confidence=conf_threshold) for det in detections: x1 = int(det["x1"]) y1 = int(det["y1"]) x2 = int(det["x2"]) y2 = int(det["y2"]) score = det["confidence"] class_name = det.get("class_name", "object") label = f"{class_name}:{score:.2f}" cv2.rectangle(img_bgr, (x1, y1), (x2, y2), (255, 0, 0), 2) # Blue for OWL-ViT cv2.putText(img_bgr, label, (x1, y1 - 5), cv2.FONT_HERSHEY_SIMPLEX, 0.6, (255, 0, 0), 2) return cv2.cvtColor(img_bgr, cv2.COLOR_BGR2RGB) # Handle YOLO models (default) session = get_session(model_key) if session is None: return image blob, _, _ = preprocess(img_bgr) preds = session.run(None, {"images": blob})[0] detections = non_max_suppression(preds, conf_threshold, IOU_THRESHOLD) for det in detections: x1 = int(det["x1"] / IMG_SIZE * orig_w) y1 = int(det["y1"] / IMG_SIZE * orig_h) x2 = int(det["x2"] / IMG_SIZE * orig_w) y2 = int(det["y2"] / IMG_SIZE * orig_h) score = det["confidence"] cls_id = det["class_id"] label = f"{cls_id}:{score:.2f}" cv2.rectangle(img_bgr, (x1, y1), (x2, y2), (0, 255, 0), 2) cv2.putText(img_bgr, label, (x1, y1 - 5), cv2.FONT_HERSHEY_SIMPLEX, 0.6, (0, 255, 0), 2) return cv2.cvtColor(img_bgr, cv2.COLOR_BGR2RGB) def api_inference(image, model_display_name, conf_threshold): """ API inference function - returns JSON detections. This function is accessible via Gradio's API at /api/detect """ # Rate limiting for API if not check_rate_limit("api"): logger.warning("Rate limit exceeded for API") return {"error": "Rate limit exceeded. Max 100 requests per minute."} # Find model key from display name model_key = None for key, val in MODELS.items(): if val["name"] == model_display_name: model_key = key break if model_key is None: return [] if image is None: return [] img_bgr = cv2.cvtColor(image, cv2.COLOR_RGB2BGR) orig_h, orig_w = img_bgr.shape[:2] model_config = MODELS[model_key] model_type = model_config.get("type", "yolo") # Handle AdaCLIP (zero-shot anomaly detection) if model_type == "adaclip": _, img_encoded = cv2.imencode('.jpg', img_bgr) image_bytes = img_encoded.tobytes() detections, anomaly_score = run_adaclip_inference(image_bytes, confidence=conf_threshold) return detections # Handle OWL-ViT (zero-shot object detection) if model_type == "owlvit": _, img_encoded = cv2.imencode('.jpg', img_bgr) image_bytes = img_encoded.tobytes() detections = run_owlvit_inference(image_bytes, confidence=conf_threshold) return detections # Handle YOLO models (default) session = get_session(model_key) if session is None: return [] blob, _, _ = preprocess(img_bgr) preds = session.run(None, {"images": blob})[0] detections = non_max_suppression(preds, conf_threshold, IOU_THRESHOLD) # Scale bboxes back to original image size for det in detections: det["bbox"][0] = det["bbox"][0] / IMG_SIZE * orig_w det["bbox"][1] = det["bbox"][1] / IMG_SIZE * orig_h det["bbox"][2] = det["bbox"][2] / IMG_SIZE * orig_w det["bbox"][3] = det["bbox"][3] / IMG_SIZE * orig_h det["x1"] = det["bbox"][0] det["y1"] = det["bbox"][1] det["x2"] = det["bbox"][2] det["y2"] = det["bbox"][3] return detections # Create Gradio interface with both UI and API with gr.Blocks(title="Industrial Defect Detection") as demo: gr.Markdown("# Industrial Defect Detection") gr.Markdown(""" **Visual Testing Interface** for Industrial Defect Detection models. **Available Models:** - **YOLO Models**: Data Matrix, Tire Cord, Dental Implant, etc. (trained on specific defects) - **Zero Shot (Anomaly)**: Detects anomalies on ANY product without training data! - **For API Use:** This Space provides API endpoints accessible via `/api/predict` - **For Visual Testing:** Use the interface below to test models visually - **Rate Limiting:** Maximum 100 requests per minute to prevent abuse Upload an image, select a model, and adjust the confidence threshold. Note: Zero Shot may take longer (~30-60 seconds) as it calls an external model. """) with gr.Row(): with gr.Column(): input_image = gr.Image(type="numpy", label="Upload Image") model_dropdown = gr.Dropdown( choices=[v["name"] for v in MODELS.values()], label="Select Model", value="Data Matrix" ) conf_slider = gr.Slider( minimum=0.0, maximum=1.0, value=0.25, step=0.01, label="Confidence Threshold" ) submit_btn = gr.Button("Detect Defects", variant="primary") with gr.Column(): output_image = gr.Image(type="numpy", label="Detection Results") submit_btn.click( fn=gradio_inference, inputs=[input_image, model_dropdown, conf_slider], outputs=output_image, api_name="predict" # Creates /api/predict endpoint (returns image) ) # Hidden interface for JSON API (for MonitaQC compatibility) with gr.Row(visible=False): json_image = gr.Image(type="numpy") json_model = gr.Dropdown(choices=[v["name"] for v in MODELS.values()]) json_conf = gr.Slider(minimum=0.0, maximum=1.0, value=0.25) json_output = gr.JSON() json_btn = gr.Button("JSON Detect") json_btn.click( fn=api_inference, inputs=[json_image, json_model, json_conf], outputs=json_output, api_name="detect" # Creates /api/detect endpoint (returns JSON) ) gr.Markdown("### Example Images") gr.Examples( examples=EXAMPLES, inputs=[input_image, model_dropdown, conf_slider], outputs=output_image, fn=gradio_inference, cache_examples=True, examples_per_page=24 ) gr.Markdown(""" ### API Access This Space provides two API endpoints: **1. Image API** (returns annotated image): - **Endpoint**: `/api/predict` - **Returns**: Annotated image with bounding boxes **2. JSON API** (returns detection data - for MonitaQC): - **Endpoint**: `/api/detect` - **Returns**: JSON array of detections with bboxes and confidence **Rate Limiting:** - Maximum 100 requests per minute per endpoint - Exceeding the limit returns an error response **Python Example (Image):** ```python from gradio_client import Client client = Client("smartfalcon-ai/Industrial-Defect-Detection") result = client.predict( "path/to/image.jpg", "Data Matrix", 0.25, api_name="/predict" ) ``` **Python Example (JSON - for MonitaQC):** ```python from gradio_client import Client client = Client("smartfalcon-ai/Industrial-Defect-Detection") detections = client.predict( "path/to/image.jpg", "Data Matrix", 0.25, api_name="/detect" ) # Returns: [{"bbox": [x1, y1, x2, y2], "confidence": 0.85, "class_id": 0, ...}] ``` **Available Models:** - Data Matrix - Dental Implant - Ball Pen - Knit Up - Knit Back - Jean Up - Jean Back - Tire Cord - **Zero Shot (Anomaly)** - Works on any product without training! """) # Hidden interface for models API (for MonitaQC compatibility) with gr.Row(visible=False): models_btn = gr.Button("Get Models") models_output = gr.JSON() health_btn = gr.Button("Health Check") health_output = gr.JSON() def get_models(): """Return list of available models.""" return { "models": [model_info["name"] for model_info in MODELS.values()], "count": len(MODELS) } def health_check(): """Health check endpoint for monitoring.""" return { "status": "healthy", "service": "Gradio Inference (HuggingFace)", "models_loaded": len(sessions), "available_models": len(MODELS), "timestamp": datetime.now().isoformat() } models_btn.click( fn=get_models, inputs=[], outputs=models_output, api_name="models" # Creates /api/models endpoint ) health_btn.click( fn=health_check, inputs=[], outputs=health_output, api_name="health" # Creates /api/health endpoint ) # Launch the app if __name__ == "__main__": demo.launch()