| """ |
| Industrial Defect Detection with Gradio for HuggingFace Spaces. |
| Provides both UI and API endpoints through Gradio's native API system. |
| |
| Includes: |
| - YOLO-based defect detection models (Data Matrix, Tire Cord, etc.) |
| - Zero-shot anomaly detection via AdaCLIP (no training data required) |
| """ |
|
|
| import gradio as gr |
| import onnxruntime as ort |
| import numpy as np |
| import cv2 |
| from huggingface_hub import hf_hub_download |
| import os |
| import logging |
| from collections import defaultdict |
| from datetime import datetime, timedelta |
| import time |
| import tempfile |
|
|
| |
| logging.basicConfig(level=logging.INFO) |
| logger = logging.getLogger(__name__) |
|
|
| |
| RATE_LIMIT_REQUESTS = 100 |
| RATE_LIMIT_WINDOW = 60 |
| request_tracker = defaultdict(list) |
|
|
| def check_rate_limit(identifier="global"): |
| """Simple rate limiting - allows RATE_LIMIT_REQUESTS per RATE_LIMIT_WINDOW""" |
| current_time = time.time() |
| cutoff_time = current_time - RATE_LIMIT_WINDOW |
|
|
| |
| request_tracker[identifier] = [ |
| req_time for req_time in request_tracker[identifier] |
| if req_time > cutoff_time |
| ] |
|
|
| |
| if len(request_tracker[identifier]) >= RATE_LIMIT_REQUESTS: |
| logger.warning(f"Rate limit exceeded for {identifier}") |
| return False |
|
|
| |
| request_tracker[identifier].append(current_time) |
| return True |
|
|
|
|
| def extract_bboxes_from_heatmap(heatmap_path: str, orig_w: int, orig_h: int, threshold: float = 0.5): |
| """Extract bounding boxes from AdaCLIP heatmap image.""" |
| try: |
| heatmap = cv2.imread(heatmap_path) |
| if heatmap is None: |
| return [] |
|
|
| if len(heatmap.shape) == 3: |
| heatmap_gray = cv2.cvtColor(heatmap, cv2.COLOR_BGR2GRAY) |
| else: |
| heatmap_gray = heatmap |
|
|
| heatmap_resized = cv2.resize(heatmap_gray, (orig_w, orig_h)) |
| heatmap_norm = heatmap_resized.astype(np.float32) / 255.0 |
|
|
| binary_threshold = threshold * 0.5 |
| _, binary_mask = cv2.threshold(heatmap_norm, binary_threshold, 1.0, cv2.THRESH_BINARY) |
| binary_mask = (binary_mask * 255).astype(np.uint8) |
|
|
| contours, _ = cv2.findContours(binary_mask, cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_SIMPLE) |
|
|
| bboxes = [] |
| min_area = (orig_w * orig_h) * 0.001 |
|
|
| for contour in contours: |
| area = cv2.contourArea(contour) |
| if area < min_area: |
| continue |
|
|
| x, y, w, h = cv2.boundingRect(contour) |
| roi = heatmap_norm[y:y+h, x:x+w] |
| confidence = float(np.mean(roi)) if roi.size > 0 else 0.5 |
|
|
| bboxes.append({ |
| "x1": float(x), |
| "y1": float(y), |
| "x2": float(x + w), |
| "y2": float(y + h), |
| "confidence": confidence |
| }) |
|
|
| return bboxes |
| except Exception as e: |
| logger.error(f"Error extracting bboxes from heatmap: {e}") |
| return [] |
|
|
|
|
| def run_clip_anomaly_inference(image_bytes: bytes, confidence: float = 0.25): |
| """ |
| Run zero-shot anomaly detection using CLIP similarity scoring. |
| |
| This uses CLIP to compare the image against "normal" vs "defect" descriptions. |
| Returns detection if the image is more similar to defect descriptions than normal. |
| """ |
| try: |
| from transformers import CLIPProcessor, CLIPModel |
| from PIL import Image |
| import torch |
| import io |
|
|
| |
| image = Image.open(io.BytesIO(image_bytes)) |
| orig_w, orig_h = image.size |
|
|
| |
| if not hasattr(run_clip_anomaly_inference, 'processor'): |
| logger.info("Loading CLIP model (first time only)...") |
| run_clip_anomaly_inference.processor = CLIPProcessor.from_pretrained("openai/clip-vit-base-patch32") |
| run_clip_anomaly_inference.model = CLIPModel.from_pretrained("openai/clip-vit-base-patch32") |
| logger.info("CLIP model loaded successfully") |
|
|
| processor = run_clip_anomaly_inference.processor |
| model = run_clip_anomaly_inference.model |
|
|
| |
| text_descriptions = [ |
| "a high quality product without any defects or anomalies", |
| "a defective product with visible defects, cracks, scratches, or damage" |
| ] |
|
|
| |
| inputs = processor( |
| text=text_descriptions, |
| images=image, |
| return_tensors="pt", |
| padding=True |
| ) |
|
|
| |
| with torch.no_grad(): |
| outputs = model(**inputs) |
| logits_per_image = outputs.logits_per_image |
| probs = logits_per_image.softmax(dim=1) |
|
|
| |
| normal_prob = float(probs[0][0]) |
| defect_prob = float(probs[0][1]) |
|
|
| logger.info(f"CLIP probabilities - Normal: {normal_prob:.3f}, Defect: {defect_prob:.3f}") |
|
|
| detections = [] |
|
|
| |
| |
| if defect_prob >= confidence: |
| detections.append({ |
| "bbox": [0, 0, orig_w, orig_h], |
| "confidence": defect_prob, |
| "class_id": 0, |
| "class_name": "anomaly", |
| "x1": 0, |
| "y1": 0, |
| "x2": orig_w, |
| "y2": orig_h, |
| "anomaly_score": defect_prob, |
| "normal_score": normal_prob, |
| "model_type": "clip", |
| "description": f"CLIP anomaly detection (defect:{defect_prob:.2f} vs normal:{normal_prob:.2f})" |
| }) |
|
|
| logger.info(f"CLIP result - Defect score: {defect_prob:.3f}, Detections: {len(detections)}") |
| return detections, defect_prob |
|
|
| except Exception as e: |
| logger.error(f"CLIP inference error: {e}") |
| import traceback |
| logger.error(traceback.format_exc()) |
| return [], 0.0 |
|
|
|
|
| def run_florence2_inference(image_bytes: bytes, confidence: float = 0.3): |
| """ |
| Run zero-shot object detection using Florence-2 (Microsoft). |
| |
| Florence-2 is a multimodal vision-language model that can detect objects, |
| generate captions, and understand context - similar to Claude but open-source. |
| """ |
| try: |
| from transformers import AutoProcessor, AutoModelForCausalLM |
| from PIL import Image |
| import torch |
| import io |
|
|
| |
| image = Image.open(io.BytesIO(image_bytes)) |
| orig_w, orig_h = image.size |
| logger.info(f"Florence-2: Processing image {orig_w}x{orig_h}") |
|
|
| |
| if not hasattr(run_florence2_inference, 'processor'): |
| logger.info("Loading Florence-2 model (first time only - may take a moment)...") |
| run_florence2_inference.processor = AutoProcessor.from_pretrained( |
| "microsoft/Florence-2-base", |
| trust_remote_code=True |
| ) |
| run_florence2_inference.model = AutoModelForCausalLM.from_pretrained( |
| "microsoft/Florence-2-base", |
| trust_remote_code=True |
| ) |
| logger.info("Florence-2 model loaded successfully") |
|
|
| processor = run_florence2_inference.processor |
| model = run_florence2_inference.model |
|
|
| |
| task_prompt = "<DENSE_REGION_CAPTION>" |
| inputs = processor(text=task_prompt, images=image, return_tensors="pt") |
|
|
| logger.info(f"Florence-2: Running inference with task {task_prompt}") |
|
|
| |
| with torch.no_grad(): |
| generated_ids = model.generate( |
| input_ids=inputs["input_ids"], |
| pixel_values=inputs["pixel_values"], |
| max_new_tokens=1024, |
| num_beams=3, |
| ) |
|
|
| |
| generated_text = processor.batch_decode(generated_ids, skip_special_tokens=False)[0] |
| logger.info(f"Florence-2 generated text: {generated_text[:200]}...") |
|
|
| parsed_answer = processor.post_process_generation( |
| generated_text, |
| task=task_prompt, |
| image_size=(orig_w, orig_h) |
| ) |
|
|
| logger.info(f"Florence-2 parsed answer keys: {list(parsed_answer.keys())}") |
| logger.info(f"Florence-2 parsed answer: {str(parsed_answer)[:500]}...") |
|
|
| detections = [] |
|
|
| |
| if task_prompt in parsed_answer: |
| result = parsed_answer[task_prompt] |
|
|
| |
| if 'bboxes' in result and 'labels' in result: |
| bboxes = result['bboxes'] |
| labels = result['labels'] |
| logger.info(f"Florence-2 found {len(bboxes)} regions") |
|
|
| for bbox, label in zip(bboxes, labels): |
| x1, y1, x2, y2 = bbox |
|
|
| detections.append({ |
| "bbox": [float(x1), float(y1), float(x2), float(y2)], |
| "confidence": 0.9, |
| "class_id": 0, |
| "class_name": str(label), |
| "x1": float(x1), |
| "y1": float(y1), |
| "x2": float(x2), |
| "y2": float(y2), |
| "model_type": "florence2" |
| }) |
|
|
| logger.info(f"Florence-2 detected {len(detections)} objects: {[d.get('class_name', '?') for d in detections]}") |
| return detections |
|
|
| except Exception as e: |
| logger.error(f"Florence-2 inference error: {e}") |
| import traceback |
| logger.error(traceback.format_exc()) |
| return [] |
|
|
|
|
| def run_groundingdino_inference(image_bytes: bytes, text_queries: list = None, confidence: float = 0.3): |
| """ |
| Run zero-shot object detection using GroundingDINO (IDEA Research). |
| |
| GroundingDINO is better than OWL-ViT for open-set object detection. |
| It can find objects based on text descriptions with better accuracy. |
| """ |
| try: |
| from transformers import AutoProcessor, AutoModelForZeroShotObjectDetection |
| from PIL import Image |
| import torch |
| import io |
|
|
| if text_queries is None: |
| text_queries = ["defect", "anomaly", "crack", "scratch", "damage", "error", "imperfection"] |
|
|
| |
| image = Image.open(io.BytesIO(image_bytes)) |
| orig_w, orig_h = image.size |
| logger.info(f"GroundingDINO: Processing image {orig_w}x{orig_h}") |
|
|
| |
| if not hasattr(run_groundingdino_inference, 'processor'): |
| logger.info("Loading GroundingDINO model (first time only)...") |
| run_groundingdino_inference.processor = AutoProcessor.from_pretrained("IDEA-Research/grounding-dino-tiny") |
| run_groundingdino_inference.model = AutoModelForZeroShotObjectDetection.from_pretrained("IDEA-Research/grounding-dino-tiny") |
| logger.info("GroundingDINO model loaded successfully") |
|
|
| processor = run_groundingdino_inference.processor |
| model = run_groundingdino_inference.model |
|
|
| |
| text_prompt = ". ".join(text_queries) + "." |
|
|
| |
| inputs = processor(images=image, text=text_prompt, return_tensors="pt") |
|
|
| |
| with torch.no_grad(): |
| outputs = model(**inputs) |
|
|
| |
| results = processor.post_process_grounded_object_detection( |
| outputs, |
| inputs.input_ids, |
| box_threshold=confidence, |
| text_threshold=confidence, |
| target_sizes=[(orig_h, orig_w)] |
| )[0] |
|
|
| detections = [] |
|
|
| if len(results["boxes"]) > 0: |
| boxes = results["boxes"].cpu().numpy() |
| scores = results["scores"].cpu().numpy() |
| labels = results["labels"] |
|
|
| logger.info(f"GroundingDINO found {len(boxes)} objects") |
|
|
| for box, score, label in zip(boxes, scores, labels): |
| x1, y1, x2, y2 = box |
|
|
| detections.append({ |
| "bbox": [float(x1), float(y1), float(x2), float(y2)], |
| "confidence": float(score), |
| "class_id": 0, |
| "class_name": str(label), |
| "x1": float(x1), |
| "y1": float(y1), |
| "x2": float(x2), |
| "y2": float(y2), |
| "model_type": "groundingdino" |
| }) |
|
|
| logger.info(f"GroundingDINO detected {len(detections)} objects: {[d['class_name'] for d in detections]}") |
| return detections |
|
|
| except Exception as e: |
| logger.error(f"GroundingDINO inference error: {e}") |
| import traceback |
| logger.error(traceback.format_exc()) |
| return [] |
|
|
|
|
| def run_yoloworld_inference(image_bytes: bytes, text_queries: list = None, confidence: float = 0.3): |
| """ |
| Run zero-shot object detection using YOLO-World. |
| |
| YOLO-World combines YOLO speed with open-vocabulary detection. |
| Fast and effective for real-time anomaly detection. |
| """ |
| try: |
| from ultralytics import YOLOWorld |
| from PIL import Image |
| import io |
| import numpy as np |
|
|
| if text_queries is None: |
| text_queries = ["defect", "anomaly", "crack", "scratch", "damage"] |
|
|
| |
| image = Image.open(io.BytesIO(image_bytes)) |
| orig_w, orig_h = image.size |
| logger.info(f"YOLO-World: Processing image {orig_w}x{orig_h}") |
|
|
| |
| if not hasattr(run_yoloworld_inference, 'model'): |
| logger.info("Loading YOLO-World model (first time only)...") |
| run_yoloworld_inference.model = YOLOWorld("yolov8s-world.pt") |
| logger.info("YOLO-World model loaded successfully") |
|
|
| model = run_yoloworld_inference.model |
|
|
| |
| model.set_classes(text_queries) |
|
|
| |
| img_array = np.array(image) |
|
|
| |
| results = model.predict(img_array, conf=confidence, verbose=False) |
|
|
| detections = [] |
|
|
| if len(results) > 0 and results[0].boxes is not None: |
| boxes = results[0].boxes |
| logger.info(f"YOLO-World found {len(boxes)} objects") |
|
|
| for box in boxes: |
| x1, y1, x2, y2 = box.xyxy[0].cpu().numpy() |
| conf = float(box.conf[0].cpu().numpy()) |
| cls = int(box.cls[0].cpu().numpy()) |
| class_name = text_queries[cls] if cls < len(text_queries) else "object" |
|
|
| detections.append({ |
| "bbox": [float(x1), float(y1), float(x2), float(y2)], |
| "confidence": conf, |
| "class_id": cls, |
| "class_name": class_name, |
| "x1": float(x1), |
| "y1": float(y1), |
| "x2": float(x2), |
| "y2": float(y2), |
| "model_type": "yoloworld" |
| }) |
|
|
| logger.info(f"YOLO-World detected {len(detections)} objects: {[d['class_name'] for d in detections]}") |
| return detections |
|
|
| except Exception as e: |
| logger.error(f"YOLO-World inference error: {e}") |
| import traceback |
| logger.error(traceback.format_exc()) |
| return [] |
|
|
|
|
| def run_owlvit_inference(image_bytes: bytes, text_queries: list = None, confidence: float = 0.1): |
| """ |
| Run zero-shot object detection using OWL-ViT (Open World Localization - Vision Transformer). |
| |
| OWL-ViT is Google's zero-shot object detection model that can detect objects |
| based on text descriptions without any training. |
| |
| Args: |
| image_bytes: Image as bytes |
| text_queries: List of text descriptions to detect (e.g., ["defect", "crack", "scratch"]) |
| confidence: Confidence threshold for detections |
| |
| Returns: |
| List of detections with bounding boxes |
| """ |
| try: |
| from transformers import Owlv2Processor, Owlv2ForObjectDetection |
| from PIL import Image |
| import torch |
| import io |
|
|
| if text_queries is None: |
| text_queries = ["a defect", "an anomaly", "a crack", "a scratch", "damage"] |
|
|
| |
| image = Image.open(io.BytesIO(image_bytes)) |
| orig_w, orig_h = image.size |
|
|
| |
| if not hasattr(run_owlvit_inference, 'processor'): |
| logger.info("Loading OWL-ViT model (first time only)...") |
| run_owlvit_inference.processor = Owlv2Processor.from_pretrained("google/owlv2-base-patch16-ensemble") |
| run_owlvit_inference.model = Owlv2ForObjectDetection.from_pretrained("google/owlv2-base-patch16-ensemble") |
| logger.info("OWL-ViT model loaded successfully") |
|
|
| processor = run_owlvit_inference.processor |
| model = run_owlvit_inference.model |
|
|
| |
| inputs = processor(text=text_queries, images=image, return_tensors="pt") |
|
|
| |
| with torch.no_grad(): |
| outputs = model(**inputs) |
|
|
| |
| target_sizes = torch.Tensor([image.size[::-1]]) |
| results = processor.post_process_object_detection( |
| outputs=outputs, |
| threshold=confidence, |
| target_sizes=target_sizes |
| )[0] |
|
|
| detections = [] |
| boxes = results["boxes"].cpu().numpy() |
| scores = results["scores"].cpu().numpy() |
| labels = results["labels"].cpu().numpy() |
|
|
| for box, score, label in zip(boxes, scores, labels): |
| x1, y1, x2, y2 = box |
| detections.append({ |
| "bbox": [float(x1), float(y1), float(x2), float(y2)], |
| "confidence": float(score), |
| "class_id": int(label), |
| "class_name": text_queries[label] if label < len(text_queries) else "object", |
| "x1": float(x1), |
| "y1": float(y1), |
| "x2": float(x2), |
| "y2": float(y2), |
| "text_query": text_queries[label] if label < len(text_queries) else "object", |
| "model_type": "owlvit" |
| }) |
|
|
| logger.info(f"OWL-ViT detected {len(detections)} objects") |
| return detections |
|
|
| except Exception as e: |
| logger.error(f"OWL-ViT inference error: {e}") |
| import traceback |
| logger.error(traceback.format_exc()) |
| return [] |
|
|
|
|
| |
| MODELS = { |
| "dental-implant": {"name": "Dental Implant", "repo": "smartfalcon-ai/Dental-Implant-Defect-Detection", "type": "yolo"}, |
| "data-matrix": {"name": "Data Matrix", "repo": "smartfalcon-ai/Data-Matrix-Defect-Detection", "type": "yolo"}, |
| "ball-pen": {"name": "Ball Pen", "repo": "smartfalcon-ai/Ball-Pen-Defect-Detection", "type": "yolo"}, |
| "knit-up": {"name": "Knit Up", "repo": "smartfalcon-ai/Knit-Up-Defect-Detection", "type": "yolo"}, |
| "knit-back": {"name": "Knit Back", "repo": "smartfalcon-ai/Knit-Back-Defect-Detection", "type": "yolo"}, |
| "jean-back": {"name": "Jean Back", "repo": "smartfalcon-ai/Jean-Back-Defect-Detection", "type": "yolo"}, |
| "jean-up": {"name": "Jean Up", "repo": "smartfalcon-ai/Jean-Up-Defect-Detection", "type": "yolo"}, |
| "tire-cord": {"name": "Tire Cord", "repo": "smartfalcon-ai/Tire-Cord-Defect-Detection", "type": "yolo"}, |
| |
| "zero-shot-florence2": { |
| "name": "Zero Shot (Florence-2)", |
| "type": "florence2", |
| "description": "Microsoft's multimodal vision-language model - detects and labels objects automatically" |
| }, |
| "zero-shot-clip": { |
| "name": "Zero Shot (CLIP)", |
| "type": "clip", |
| "description": "Zero-shot anomaly detection using CLIP - fast and reliable" |
| }, |
| "zero-shot-owlvit": { |
| "name": "Zero Shot (OWL-ViT)", |
| "type": "owlvit", |
| "description": "Zero-shot object detection using Google's OWL-ViT - detects objects based on text descriptions" |
| }, |
| "zero-shot-groundingdino": { |
| "name": "Zero Shot (GroundingDINO)", |
| "type": "groundingdino", |
| "description": "IDEA Research's open-set object detection - better than OWL-ViT for text-guided detection" |
| }, |
| "zero-shot-yoloworld": { |
| "name": "Zero Shot (YOLO-World)", |
| "type": "yoloworld", |
| "description": "Fast open-vocabulary detection using YOLO architecture - combines speed with zero-shot capability" |
| }, |
| } |
|
|
| |
| ADACLIP_CLASS_NAME = os.environ.get("ADACLIP_CLASS_NAME", "object") |
|
|
| |
| EXAMPLES = [ |
| |
| ["examples/dental-implant-1.jpg", "Dental Implant", 0.25], |
| ["examples/dental-implant-2.jpg", "Dental Implant", 0.25], |
| ["examples/dental-implant-3.jpg", "Dental Implant", 0.25], |
| |
| ["examples/data-matrix-1.jpg", "Data Matrix", 0.25], |
| ["examples/data-matrix-2.jpg", "Data Matrix", 0.25], |
| ["examples/data-matrix-3.jpg", "Data Matrix", 0.25], |
| |
| ["examples/ball-pen-1.jpg", "Ball Pen", 0.25], |
| ["examples/ball-pen-2.jpg", "Ball Pen", 0.25], |
| ["examples/ball-pen-3.jpg", "Ball Pen", 0.25], |
| |
| ["examples/knit-up-1.jpg", "Knit Up", 0.25], |
| ["examples/knit-up-2.jpg", "Knit Up", 0.25], |
| ["examples/knit-up-3.jpg", "Knit Up", 0.25], |
| |
| ["examples/knit-back-1.jpg", "Knit Back", 0.25], |
| ["examples/knit-back-2.jpg", "Knit Back", 0.25], |
| ["examples/knit-back-3.jpg", "Knit Back", 0.25], |
| |
| ["examples/jean-back-1.jpg", "Jean Back", 0.25], |
| ["examples/jean-back-2.jpg", "Jean Back", 0.25], |
| ["examples/jean-back-3.jpg", "Jean Back", 0.25], |
| |
| ["examples/jean-up-1.jpg", "Jean Up", 0.25], |
| ["examples/jean-up-2.jpg", "Jean Up", 0.25], |
| ["examples/jean-up-3.jpg", "Jean Up", 0.25], |
| |
| ["examples/tire-cord-1.jpg", "Tire Cord", 0.25], |
| ["examples/tire-cord-2.jpg", "Tire Cord", 0.25], |
| ["examples/tire-cord-3.jpg", "Tire Cord", 0.25], |
| ] |
|
|
| |
| sessions = {} |
|
|
| |
| DEFAULT_MODEL = os.environ.get("DEFAULT_MODEL", "data-matrix") |
|
|
| |
| IMG_SIZE = 640 |
| IOU_THRESHOLD = 0.45 |
|
|
|
|
| def get_session(model_key: str): |
| """Get or create ONNX inference session for a YOLO model.""" |
| if model_key not in sessions: |
| if model_key not in MODELS: |
| raise ValueError(f"Model '{model_key}' not found. Available: {list(MODELS.keys())}") |
|
|
| model_config = MODELS[model_key] |
|
|
| |
| if model_config.get("type") != "yolo": |
| return None |
|
|
| try: |
| hf_token = os.environ.get("HUGGINGFACE_TOKEN", None) |
| repo_id = model_config["repo"] |
| logger.info(f"Downloading model: {repo_id}") |
| model_path = hf_hub_download( |
| repo_id=repo_id, |
| filename="best.onnx", |
| token=hf_token |
| ) |
| sessions[model_key] = ort.InferenceSession( |
| model_path, |
| providers=["CPUExecutionProvider"] |
| ) |
| logger.info(f"Model '{model_key}' loaded successfully") |
| except Exception as e: |
| logger.error(f"Failed to load model '{model_key}': {e}") |
| raise |
|
|
| return sessions.get(model_key) |
|
|
|
|
| def preprocess(img): |
| """Preprocess image for ONNX model.""" |
| h, w = img.shape[:2] |
| img_resized = cv2.resize(img, (IMG_SIZE, IMG_SIZE)) |
| img_resized = img_resized.astype(np.float32) / 255.0 |
| img_resized = img_resized.transpose(2, 0, 1) |
| img_resized = np.expand_dims(img_resized, 0) |
| return img_resized, w, h |
|
|
|
|
| def xywh2xyxy(x): |
| """Convert box format from xywh to xyxy.""" |
| y = np.copy(x) |
| y[:, 0] = x[:, 0] - x[:, 2] / 2 |
| y[:, 1] = x[:, 1] - x[:, 3] / 2 |
| y[:, 2] = x[:, 0] + x[:, 2] / 2 |
| y[:, 3] = x[:, 1] + x[:, 3] / 2 |
| return y |
|
|
|
|
| def non_max_suppression(preds, conf_thres=0.25, iou_thres=0.45): |
| """Apply NMS to predictions.""" |
| preds = preds[0] |
| preds = preds[preds[:, 4] > conf_thres] |
| if preds.shape[0] == 0: |
| return [] |
|
|
| boxes = xywh2xyxy(preds[:, :4]) |
| scores = preds[:, 4] |
| class_scores = preds[:, 5:] |
| cls_ids = np.argmax(class_scores, axis=1) |
| cls_conf = class_scores.max(axis=1) |
| final_scores = scores * cls_conf |
|
|
| indices = cv2.dnn.NMSBoxes( |
| bboxes=boxes.tolist(), |
| scores=final_scores.tolist(), |
| score_threshold=conf_thres, |
| nms_threshold=iou_thres |
| ) |
|
|
| if len(indices) == 0: |
| return [] |
|
|
| indices = indices.flatten() |
| output = [] |
| for idx in indices: |
| x1, y1, x2, y2 = boxes[idx] |
| output.append({ |
| "bbox": [float(x1), float(y1), float(x2), float(y2)], |
| "confidence": float(final_scores[idx]), |
| "class_id": int(cls_ids[idx]), |
| "x1": float(x1), |
| "y1": float(y1), |
| "x2": float(x2), |
| "y2": float(y2) |
| }) |
| return output |
|
|
|
|
| def gradio_inference(image, model_display_name, conf_threshold): |
| """Inference function for Gradio UI - returns annotated image.""" |
| |
| if not check_rate_limit("ui"): |
| logger.warning("Rate limit exceeded for UI") |
| |
| if image is not None: |
| img_bgr = cv2.cvtColor(image, cv2.COLOR_RGB2BGR) if len(image.shape) == 3 else image |
| cv2.putText(img_bgr, "RATE LIMIT EXCEEDED", (50, 50), |
| cv2.FONT_HERSHEY_SIMPLEX, 1, (0, 0, 255), 2) |
| return cv2.cvtColor(img_bgr, cv2.COLOR_BGR2RGB) if len(img_bgr.shape) == 3 else img_bgr |
| return image |
|
|
| |
| model_key = None |
| for key, val in MODELS.items(): |
| if val["name"] == model_display_name: |
| model_key = key |
| break |
|
|
| if model_key is None: |
| return image |
| if image is None: |
| return None |
|
|
| img_bgr = cv2.cvtColor(image, cv2.COLOR_RGB2BGR) |
| orig_h, orig_w = img_bgr.shape[:2] |
|
|
| model_config = MODELS[model_key] |
| model_type = model_config.get("type", "yolo") |
|
|
| |
| if model_type == "florence2": |
| _, img_encoded = cv2.imencode('.jpg', img_bgr) |
| image_bytes = img_encoded.tobytes() |
|
|
| detections = run_florence2_inference(image_bytes, confidence=conf_threshold) |
|
|
| |
| status_text = f"Florence-2: {len(detections)} objects" |
| cv2.putText(img_bgr, status_text, (10, 30), cv2.FONT_HERSHEY_SIMPLEX, 0.8, (255, 255, 255), 2) |
| cv2.putText(img_bgr, status_text, (10, 30), cv2.FONT_HERSHEY_SIMPLEX, 0.8, (128, 0, 128), 1) |
|
|
| for i, det in enumerate(detections): |
| x1 = int(det["x1"]) |
| y1 = int(det["y1"]) |
| x2 = int(det["x2"]) |
| y2 = int(det["y2"]) |
| class_name = det.get("class_name", "object") |
|
|
| label = f"#{i+1} {class_name}" |
| cv2.rectangle(img_bgr, (x1, y1), (x2, y2), (128, 0, 128), 3) |
| cv2.putText(img_bgr, label, (x1, y1 - 10), cv2.FONT_HERSHEY_SIMPLEX, 0.7, (128, 0, 128), 2) |
|
|
| if not detections: |
| cv2.putText(img_bgr, "No objects found", (10, 60), cv2.FONT_HERSHEY_SIMPLEX, 0.6, (128, 0, 128), 2) |
|
|
| return cv2.cvtColor(img_bgr, cv2.COLOR_BGR2RGB) |
|
|
| |
| if model_type == "clip": |
| _, img_encoded = cv2.imencode('.jpg', img_bgr) |
| image_bytes = img_encoded.tobytes() |
|
|
| detections, anomaly_score = run_clip_anomaly_inference(image_bytes, confidence=conf_threshold) |
|
|
| |
| status_text = f"Anomaly Score: {anomaly_score:.3f}" |
| cv2.putText(img_bgr, status_text, (10, 30), cv2.FONT_HERSHEY_SIMPLEX, 0.8, (255, 255, 255), 2) |
| cv2.putText(img_bgr, status_text, (10, 30), cv2.FONT_HERSHEY_SIMPLEX, 0.8, (0, 0, 255), 1) |
|
|
| for det in detections: |
| x1 = int(det["x1"]) |
| y1 = int(det["y1"]) |
| x2 = int(det["x2"]) |
| y2 = int(det["y2"]) |
| score = det["confidence"] |
| normal_score = det.get("normal_score", 0) |
|
|
| label = f"DEFECT:{score:.2f} (vs normal:{normal_score:.2f})" |
| cv2.rectangle(img_bgr, (x1, y1), (x2, y2), (0, 0, 255), 3) |
| cv2.putText(img_bgr, label, (x1, y1 - 10), cv2.FONT_HERSHEY_SIMPLEX, 0.7, (0, 0, 255), 2) |
|
|
| if not detections: |
| no_detect_text = f"No anomaly detected (threshold: {conf_threshold:.2f})" |
| cv2.putText(img_bgr, no_detect_text, (10, 60), cv2.FONT_HERSHEY_SIMPLEX, 0.6, (0, 255, 0), 2) |
|
|
| return cv2.cvtColor(img_bgr, cv2.COLOR_BGR2RGB) |
|
|
| |
| if model_type == "owlvit": |
| _, img_encoded = cv2.imencode('.jpg', img_bgr) |
| image_bytes = img_encoded.tobytes() |
|
|
| detections = run_owlvit_inference(image_bytes, confidence=conf_threshold) |
|
|
| |
| status_text = f"OWL-ViT Detections: {len(detections)}" |
| cv2.putText(img_bgr, status_text, (10, 30), cv2.FONT_HERSHEY_SIMPLEX, 0.8, (255, 255, 255), 2) |
| cv2.putText(img_bgr, status_text, (10, 30), cv2.FONT_HERSHEY_SIMPLEX, 0.8, (255, 0, 0), 1) |
|
|
| for i, det in enumerate(detections): |
| x1 = int(det["x1"]) |
| y1 = int(det["y1"]) |
| x2 = int(det["x2"]) |
| y2 = int(det["y2"]) |
| score = det["confidence"] |
| class_name = det.get("class_name", "object") |
|
|
| label = f"#{i+1} {class_name}:{score:.2f}" |
| cv2.rectangle(img_bgr, (x1, y1), (x2, y2), (255, 0, 0), 3) |
| cv2.putText(img_bgr, label, (x1, y1 - 10), cv2.FONT_HERSHEY_SIMPLEX, 0.7, (255, 0, 0), 2) |
|
|
| if not detections: |
| no_detect_text = f"No objects detected (threshold: {conf_threshold:.2f})" |
| cv2.putText(img_bgr, no_detect_text, (10, 60), cv2.FONT_HERSHEY_SIMPLEX, 0.6, (255, 0, 0), 2) |
|
|
| return cv2.cvtColor(img_bgr, cv2.COLOR_BGR2RGB) |
|
|
| |
| if model_type == "groundingdino": |
| _, img_encoded = cv2.imencode('.jpg', img_bgr) |
| image_bytes = img_encoded.tobytes() |
|
|
| detections = run_groundingdino_inference(image_bytes, confidence=conf_threshold) |
|
|
| |
| status_text = f"GroundingDINO: {len(detections)} objects" |
| cv2.putText(img_bgr, status_text, (10, 30), cv2.FONT_HERSHEY_SIMPLEX, 0.8, (255, 255, 255), 2) |
| cv2.putText(img_bgr, status_text, (10, 30), cv2.FONT_HERSHEY_SIMPLEX, 0.8, (0, 165, 255), 1) |
|
|
| for i, det in enumerate(detections): |
| x1 = int(det["x1"]) |
| y1 = int(det["y1"]) |
| x2 = int(det["x2"]) |
| y2 = int(det["y2"]) |
| score = det["confidence"] |
| class_name = det.get("class_name", "object") |
|
|
| label = f"#{i+1} {class_name}:{score:.2f}" |
| cv2.rectangle(img_bgr, (x1, y1), (x2, y2), (0, 165, 255), 3) |
| cv2.putText(img_bgr, label, (x1, y1 - 10), cv2.FONT_HERSHEY_SIMPLEX, 0.7, (0, 165, 255), 2) |
|
|
| if not detections: |
| no_detect_text = f"No objects detected (threshold: {conf_threshold:.2f})" |
| cv2.putText(img_bgr, no_detect_text, (10, 60), cv2.FONT_HERSHEY_SIMPLEX, 0.6, (0, 165, 255), 2) |
|
|
| return cv2.cvtColor(img_bgr, cv2.COLOR_BGR2RGB) |
|
|
| |
| if model_type == "yoloworld": |
| _, img_encoded = cv2.imencode('.jpg', img_bgr) |
| image_bytes = img_encoded.tobytes() |
|
|
| detections = run_yoloworld_inference(image_bytes, confidence=conf_threshold) |
|
|
| |
| status_text = f"YOLO-World: {len(detections)} objects" |
| cv2.putText(img_bgr, status_text, (10, 30), cv2.FONT_HERSHEY_SIMPLEX, 0.8, (255, 255, 255), 2) |
| cv2.putText(img_bgr, status_text, (10, 30), cv2.FONT_HERSHEY_SIMPLEX, 0.8, (255, 255, 0), 1) |
|
|
| for i, det in enumerate(detections): |
| x1 = int(det["x1"]) |
| y1 = int(det["y1"]) |
| x2 = int(det["x2"]) |
| y2 = int(det["y2"]) |
| score = det["confidence"] |
| class_name = det.get("class_name", "object") |
|
|
| label = f"#{i+1} {class_name}:{score:.2f}" |
| cv2.rectangle(img_bgr, (x1, y1), (x2, y2), (255, 255, 0), 3) |
| cv2.putText(img_bgr, label, (x1, y1 - 10), cv2.FONT_HERSHEY_SIMPLEX, 0.7, (255, 255, 0), 2) |
|
|
| if not detections: |
| no_detect_text = f"No objects detected (threshold: {conf_threshold:.2f})" |
| cv2.putText(img_bgr, no_detect_text, (10, 60), cv2.FONT_HERSHEY_SIMPLEX, 0.6, (255, 255, 0), 2) |
|
|
| return cv2.cvtColor(img_bgr, cv2.COLOR_BGR2RGB) |
|
|
| |
| session = get_session(model_key) |
| if session is None: |
| return image |
|
|
| blob, _, _ = preprocess(img_bgr) |
| preds = session.run(None, {"images": blob})[0] |
| detections = non_max_suppression(preds, conf_threshold, IOU_THRESHOLD) |
|
|
| for det in detections: |
| x1 = int(det["x1"] / IMG_SIZE * orig_w) |
| y1 = int(det["y1"] / IMG_SIZE * orig_h) |
| x2 = int(det["x2"] / IMG_SIZE * orig_w) |
| y2 = int(det["y2"] / IMG_SIZE * orig_h) |
| score = det["confidence"] |
| cls_id = det["class_id"] |
|
|
| label = f"{cls_id}:{score:.2f}" |
| cv2.rectangle(img_bgr, (x1, y1), (x2, y2), (0, 255, 0), 2) |
| cv2.putText(img_bgr, label, (x1, y1 - 5), cv2.FONT_HERSHEY_SIMPLEX, 0.6, (0, 255, 0), 2) |
|
|
| return cv2.cvtColor(img_bgr, cv2.COLOR_BGR2RGB) |
|
|
|
|
| def api_inference(image, model_display_name, conf_threshold): |
| """ |
| API inference function - returns JSON detections. |
| This function is accessible via Gradio's API at /api/detect |
| """ |
| |
| if not check_rate_limit("api"): |
| logger.warning("Rate limit exceeded for API") |
| return {"error": "Rate limit exceeded. Max 100 requests per minute."} |
|
|
| |
| model_key = None |
| for key, val in MODELS.items(): |
| if val["name"] == model_display_name: |
| model_key = key |
| break |
|
|
| if model_key is None: |
| return [] |
| if image is None: |
| return [] |
|
|
| img_bgr = cv2.cvtColor(image, cv2.COLOR_RGB2BGR) |
| orig_h, orig_w = img_bgr.shape[:2] |
|
|
| model_config = MODELS[model_key] |
| model_type = model_config.get("type", "yolo") |
|
|
| |
| if model_type == "florence2": |
| _, img_encoded = cv2.imencode('.jpg', img_bgr) |
| image_bytes = img_encoded.tobytes() |
|
|
| detections = run_florence2_inference(image_bytes, confidence=conf_threshold) |
| return detections |
|
|
| |
| if model_type == "clip": |
| _, img_encoded = cv2.imencode('.jpg', img_bgr) |
| image_bytes = img_encoded.tobytes() |
|
|
| detections, anomaly_score = run_clip_anomaly_inference(image_bytes, confidence=conf_threshold) |
| return detections |
|
|
| |
| if model_type == "owlvit": |
| _, img_encoded = cv2.imencode('.jpg', img_bgr) |
| image_bytes = img_encoded.tobytes() |
|
|
| detections = run_owlvit_inference(image_bytes, confidence=conf_threshold) |
| return detections |
|
|
| |
| if model_type == "groundingdino": |
| _, img_encoded = cv2.imencode('.jpg', img_bgr) |
| image_bytes = img_encoded.tobytes() |
|
|
| detections = run_groundingdino_inference(image_bytes, confidence=conf_threshold) |
| return detections |
|
|
| |
| if model_type == "yoloworld": |
| _, img_encoded = cv2.imencode('.jpg', img_bgr) |
| image_bytes = img_encoded.tobytes() |
|
|
| detections = run_yoloworld_inference(image_bytes, confidence=conf_threshold) |
| return detections |
|
|
| |
| session = get_session(model_key) |
| if session is None: |
| return [] |
|
|
| blob, _, _ = preprocess(img_bgr) |
| preds = session.run(None, {"images": blob})[0] |
| detections = non_max_suppression(preds, conf_threshold, IOU_THRESHOLD) |
|
|
| |
| for det in detections: |
| det["bbox"][0] = det["bbox"][0] / IMG_SIZE * orig_w |
| det["bbox"][1] = det["bbox"][1] / IMG_SIZE * orig_h |
| det["bbox"][2] = det["bbox"][2] / IMG_SIZE * orig_w |
| det["bbox"][3] = det["bbox"][3] / IMG_SIZE * orig_h |
| det["x1"] = det["bbox"][0] |
| det["y1"] = det["bbox"][1] |
| det["x2"] = det["bbox"][2] |
| det["y2"] = det["bbox"][3] |
|
|
| return detections |
|
|
|
|
| |
| with gr.Blocks(title="Industrial Defect Detection") as demo: |
| gr.Markdown("# Industrial Defect Detection") |
| gr.Markdown(""" |
| **Visual Testing Interface** for Industrial Defect Detection models. |
| |
| **Available Models:** |
| - **YOLO Models**: Data Matrix, Tire Cord, Dental Implant, etc. (trained on specific defects) |
| - **Zero Shot (Anomaly)**: Detects anomalies on ANY product without training data! |
| |
| - **For API Use:** This Space provides API endpoints accessible via `/api/predict` |
| - **For Visual Testing:** Use the interface below to test models visually |
| - **Rate Limiting:** Maximum 100 requests per minute to prevent abuse |
| |
| Upload an image, select a model, and adjust the confidence threshold. |
| Note: Zero Shot may take longer (~30-60 seconds) as it calls an external model. |
| """) |
|
|
| with gr.Row(): |
| with gr.Column(): |
| input_image = gr.Image(type="numpy", label="Upload Image") |
| model_dropdown = gr.Dropdown( |
| choices=[v["name"] for v in MODELS.values()], |
| label="Select Model", |
| value="Data Matrix" |
| ) |
| conf_slider = gr.Slider( |
| minimum=0.0, |
| maximum=1.0, |
| value=0.25, |
| step=0.01, |
| label="Confidence Threshold" |
| ) |
| submit_btn = gr.Button("Detect Defects", variant="primary") |
|
|
| with gr.Column(): |
| output_image = gr.Image(type="numpy", label="Detection Results") |
|
|
| submit_btn.click( |
| fn=gradio_inference, |
| inputs=[input_image, model_dropdown, conf_slider], |
| outputs=output_image, |
| api_name="predict" |
| ) |
|
|
| |
| with gr.Row(visible=False): |
| json_image = gr.Image(type="numpy") |
| json_model = gr.Dropdown(choices=[v["name"] for v in MODELS.values()]) |
| json_conf = gr.Slider(minimum=0.0, maximum=1.0, value=0.25) |
| json_output = gr.JSON() |
| json_btn = gr.Button("JSON Detect") |
|
|
| json_btn.click( |
| fn=api_inference, |
| inputs=[json_image, json_model, json_conf], |
| outputs=json_output, |
| api_name="detect" |
| ) |
|
|
| gr.Markdown("### Example Images") |
| gr.Examples( |
| examples=EXAMPLES, |
| inputs=[input_image, model_dropdown, conf_slider], |
| outputs=output_image, |
| fn=gradio_inference, |
| cache_examples=True, |
| examples_per_page=24 |
| ) |
|
|
| gr.Markdown(""" |
| ### API Access |
| |
| This Space provides two API endpoints: |
| |
| **1. Image API** (returns annotated image): |
| - **Endpoint**: `/api/predict` |
| - **Returns**: Annotated image with bounding boxes |
| |
| **2. JSON API** (returns detection data - for MonitaQC): |
| - **Endpoint**: `/api/detect` |
| - **Returns**: JSON array of detections with bboxes and confidence |
| |
| **Rate Limiting:** |
| - Maximum 100 requests per minute per endpoint |
| - Exceeding the limit returns an error response |
| |
| **Python Example (Image):** |
| ```python |
| from gradio_client import Client |
| |
| client = Client("smartfalcon-ai/Industrial-Defect-Detection") |
| result = client.predict( |
| "path/to/image.jpg", |
| "Data Matrix", |
| 0.25, |
| api_name="/predict" |
| ) |
| ``` |
| |
| **Python Example (JSON - for MonitaQC):** |
| ```python |
| from gradio_client import Client |
| |
| client = Client("smartfalcon-ai/Industrial-Defect-Detection") |
| detections = client.predict( |
| "path/to/image.jpg", |
| "Data Matrix", |
| 0.25, |
| api_name="/detect" |
| ) |
| # Returns: [{"bbox": [x1, y1, x2, y2], "confidence": 0.85, "class_id": 0, ...}] |
| ``` |
| |
| **Available Models:** |
| - Data Matrix |
| - Dental Implant |
| - Ball Pen |
| - Knit Up |
| - Knit Back |
| - Jean Up |
| - Jean Back |
| - Tire Cord |
| - **Zero Shot (Anomaly)** - Works on any product without training! |
| """) |
|
|
| |
| with gr.Row(visible=False): |
| models_btn = gr.Button("Get Models") |
| models_output = gr.JSON() |
| health_btn = gr.Button("Health Check") |
| health_output = gr.JSON() |
|
|
| def get_models(): |
| """Return list of available models.""" |
| return { |
| "models": [model_info["name"] for model_info in MODELS.values()], |
| "count": len(MODELS) |
| } |
|
|
| def health_check(): |
| """Health check endpoint for monitoring.""" |
| return { |
| "status": "healthy", |
| "service": "Gradio Inference (HuggingFace)", |
| "models_loaded": len(sessions), |
| "available_models": len(MODELS), |
| "timestamp": datetime.now().isoformat() |
| } |
|
|
| models_btn.click( |
| fn=get_models, |
| inputs=[], |
| outputs=models_output, |
| api_name="models" |
| ) |
|
|
| health_btn.click( |
| fn=health_check, |
| inputs=[], |
| outputs=health_output, |
| api_name="health" |
| ) |
|
|
| |
| if __name__ == "__main__": |
| demo.launch() |
|
|