asamasach's picture
Add GroundingDINO and YOLO-World zero-shot models - Added GroundingDINO and YOLO-World for better zero-shot detection - Updated requirements.txt with ultralytics - Added visualization with distinct colors
80280f8
Raw
History Blame
42.6 kB
"""
Industrial Defect Detection with Gradio for HuggingFace Spaces.
Provides both UI and API endpoints through Gradio's native API system.
Includes:
- YOLO-based defect detection models (Data Matrix, Tire Cord, etc.)
- Zero-shot anomaly detection via AdaCLIP (no training data required)
"""
import gradio as gr
import onnxruntime as ort
import numpy as np
import cv2
from huggingface_hub import hf_hub_download
import os
import logging
from collections import defaultdict
from datetime import datetime, timedelta
import time
import tempfile
# Configure logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
# Rate limiting configuration
RATE_LIMIT_REQUESTS = 100 # Max requests per window
RATE_LIMIT_WINDOW = 60 # Window in seconds (1 minute)
request_tracker = defaultdict(list)
def check_rate_limit(identifier="global"):
"""Simple rate limiting - allows RATE_LIMIT_REQUESTS per RATE_LIMIT_WINDOW"""
current_time = time.time()
cutoff_time = current_time - RATE_LIMIT_WINDOW
# Remove old requests
request_tracker[identifier] = [
req_time for req_time in request_tracker[identifier]
if req_time > cutoff_time
]
# Check if over limit
if len(request_tracker[identifier]) >= RATE_LIMIT_REQUESTS:
logger.warning(f"Rate limit exceeded for {identifier}")
return False
# Add current request
request_tracker[identifier].append(current_time)
return True
def extract_bboxes_from_heatmap(heatmap_path: str, orig_w: int, orig_h: int, threshold: float = 0.5):
"""Extract bounding boxes from AdaCLIP heatmap image."""
try:
heatmap = cv2.imread(heatmap_path)
if heatmap is None:
return []
if len(heatmap.shape) == 3:
heatmap_gray = cv2.cvtColor(heatmap, cv2.COLOR_BGR2GRAY)
else:
heatmap_gray = heatmap
heatmap_resized = cv2.resize(heatmap_gray, (orig_w, orig_h))
heatmap_norm = heatmap_resized.astype(np.float32) / 255.0
binary_threshold = threshold * 0.5
_, binary_mask = cv2.threshold(heatmap_norm, binary_threshold, 1.0, cv2.THRESH_BINARY)
binary_mask = (binary_mask * 255).astype(np.uint8)
contours, _ = cv2.findContours(binary_mask, cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_SIMPLE)
bboxes = []
min_area = (orig_w * orig_h) * 0.001
for contour in contours:
area = cv2.contourArea(contour)
if area < min_area:
continue
x, y, w, h = cv2.boundingRect(contour)
roi = heatmap_norm[y:y+h, x:x+w]
confidence = float(np.mean(roi)) if roi.size > 0 else 0.5
bboxes.append({
"x1": float(x),
"y1": float(y),
"x2": float(x + w),
"y2": float(y + h),
"confidence": confidence
})
return bboxes
except Exception as e:
logger.error(f"Error extracting bboxes from heatmap: {e}")
return []
def run_clip_anomaly_inference(image_bytes: bytes, confidence: float = 0.25):
"""
Run zero-shot anomaly detection using CLIP similarity scoring.
This uses CLIP to compare the image against "normal" vs "defect" descriptions.
Returns detection if the image is more similar to defect descriptions than normal.
"""
try:
from transformers import CLIPProcessor, CLIPModel
from PIL import Image
import torch
import io
# Load image
image = Image.open(io.BytesIO(image_bytes))
orig_w, orig_h = image.size
# Initialize model and processor (cached after first load)
if not hasattr(run_clip_anomaly_inference, 'processor'):
logger.info("Loading CLIP model (first time only)...")
run_clip_anomaly_inference.processor = CLIPProcessor.from_pretrained("openai/clip-vit-base-patch32")
run_clip_anomaly_inference.model = CLIPModel.from_pretrained("openai/clip-vit-base-patch32")
logger.info("CLIP model loaded successfully")
processor = run_clip_anomaly_inference.processor
model = run_clip_anomaly_inference.model
# Simpler binary comparison: normal vs defect
text_descriptions = [
"a high quality product without any defects or anomalies",
"a defective product with visible defects, cracks, scratches, or damage"
]
# Process inputs
inputs = processor(
text=text_descriptions,
images=image,
return_tensors="pt",
padding=True
)
# Run inference
with torch.no_grad():
outputs = model(**inputs)
logits_per_image = outputs.logits_per_image
probs = logits_per_image.softmax(dim=1)
# Get probabilities
normal_prob = float(probs[0][0])
defect_prob = float(probs[0][1])
logger.info(f"CLIP probabilities - Normal: {normal_prob:.3f}, Defect: {defect_prob:.3f}")
detections = []
# If defect probability is higher than threshold, create detection
# This means the image looks more like a defect than normal
if defect_prob >= confidence:
detections.append({
"bbox": [0, 0, orig_w, orig_h],
"confidence": defect_prob,
"class_id": 0,
"class_name": "anomaly",
"x1": 0,
"y1": 0,
"x2": orig_w,
"y2": orig_h,
"anomaly_score": defect_prob,
"normal_score": normal_prob,
"model_type": "clip",
"description": f"CLIP anomaly detection (defect:{defect_prob:.2f} vs normal:{normal_prob:.2f})"
})
logger.info(f"CLIP result - Defect score: {defect_prob:.3f}, Detections: {len(detections)}")
return detections, defect_prob
except Exception as e:
logger.error(f"CLIP inference error: {e}")
import traceback
logger.error(traceback.format_exc())
return [], 0.0
def run_florence2_inference(image_bytes: bytes, confidence: float = 0.3):
"""
Run zero-shot object detection using Florence-2 (Microsoft).
Florence-2 is a multimodal vision-language model that can detect objects,
generate captions, and understand context - similar to Claude but open-source.
"""
try:
from transformers import AutoProcessor, AutoModelForCausalLM
from PIL import Image
import torch
import io
# Load image
image = Image.open(io.BytesIO(image_bytes))
orig_w, orig_h = image.size
logger.info(f"Florence-2: Processing image {orig_w}x{orig_h}")
# Initialize model and processor (cached after first load)
if not hasattr(run_florence2_inference, 'processor'):
logger.info("Loading Florence-2 model (first time only - may take a moment)...")
run_florence2_inference.processor = AutoProcessor.from_pretrained(
"microsoft/Florence-2-base", # Use base model - faster and more reliable
trust_remote_code=True
)
run_florence2_inference.model = AutoModelForCausalLM.from_pretrained(
"microsoft/Florence-2-base",
trust_remote_code=True
)
logger.info("Florence-2 model loaded successfully")
processor = run_florence2_inference.processor
model = run_florence2_inference.model
# Try dense region caption first - better for finding all objects
task_prompt = "<DENSE_REGION_CAPTION>"
inputs = processor(text=task_prompt, images=image, return_tensors="pt")
logger.info(f"Florence-2: Running inference with task {task_prompt}")
# Run inference
with torch.no_grad():
generated_ids = model.generate(
input_ids=inputs["input_ids"],
pixel_values=inputs["pixel_values"],
max_new_tokens=1024,
num_beams=3,
)
# Decode results
generated_text = processor.batch_decode(generated_ids, skip_special_tokens=False)[0]
logger.info(f"Florence-2 generated text: {generated_text[:200]}...")
parsed_answer = processor.post_process_generation(
generated_text,
task=task_prompt,
image_size=(orig_w, orig_h)
)
logger.info(f"Florence-2 parsed answer keys: {list(parsed_answer.keys())}")
logger.info(f"Florence-2 parsed answer: {str(parsed_answer)[:500]}...")
detections = []
# Try multiple possible output formats
if task_prompt in parsed_answer:
result = parsed_answer[task_prompt]
# Dense region caption format
if 'bboxes' in result and 'labels' in result:
bboxes = result['bboxes']
labels = result['labels']
logger.info(f"Florence-2 found {len(bboxes)} regions")
for bbox, label in zip(bboxes, labels):
x1, y1, x2, y2 = bbox
detections.append({
"bbox": [float(x1), float(y1), float(x2), float(y2)],
"confidence": 0.9,
"class_id": 0,
"class_name": str(label),
"x1": float(x1),
"y1": float(y1),
"x2": float(x2),
"y2": float(y2),
"model_type": "florence2"
})
logger.info(f"Florence-2 detected {len(detections)} objects: {[d.get('class_name', '?') for d in detections]}")
return detections
except Exception as e:
logger.error(f"Florence-2 inference error: {e}")
import traceback
logger.error(traceback.format_exc())
return []
def run_groundingdino_inference(image_bytes: bytes, text_queries: list = None, confidence: float = 0.3):
"""
Run zero-shot object detection using GroundingDINO (IDEA Research).
GroundingDINO is better than OWL-ViT for open-set object detection.
It can find objects based on text descriptions with better accuracy.
"""
try:
from transformers import AutoProcessor, AutoModelForZeroShotObjectDetection
from PIL import Image
import torch
import io
if text_queries is None:
text_queries = ["defect", "anomaly", "crack", "scratch", "damage", "error", "imperfection"]
# Load image
image = Image.open(io.BytesIO(image_bytes))
orig_w, orig_h = image.size
logger.info(f"GroundingDINO: Processing image {orig_w}x{orig_h}")
# Initialize model and processor (cached after first load)
if not hasattr(run_groundingdino_inference, 'processor'):
logger.info("Loading GroundingDINO model (first time only)...")
run_groundingdino_inference.processor = AutoProcessor.from_pretrained("IDEA-Research/grounding-dino-tiny")
run_groundingdino_inference.model = AutoModelForZeroShotObjectDetection.from_pretrained("IDEA-Research/grounding-dino-tiny")
logger.info("GroundingDINO model loaded successfully")
processor = run_groundingdino_inference.processor
model = run_groundingdino_inference.model
# Create text prompt (comma-separated)
text_prompt = ". ".join(text_queries) + "."
# Prepare inputs
inputs = processor(images=image, text=text_prompt, return_tensors="pt")
# Run inference
with torch.no_grad():
outputs = model(**inputs)
# Post-process results
results = processor.post_process_grounded_object_detection(
outputs,
inputs.input_ids,
box_threshold=confidence,
text_threshold=confidence,
target_sizes=[(orig_h, orig_w)]
)[0]
detections = []
if len(results["boxes"]) > 0:
boxes = results["boxes"].cpu().numpy()
scores = results["scores"].cpu().numpy()
labels = results["labels"]
logger.info(f"GroundingDINO found {len(boxes)} objects")
for box, score, label in zip(boxes, scores, labels):
x1, y1, x2, y2 = box
detections.append({
"bbox": [float(x1), float(y1), float(x2), float(y2)],
"confidence": float(score),
"class_id": 0,
"class_name": str(label),
"x1": float(x1),
"y1": float(y1),
"x2": float(x2),
"y2": float(y2),
"model_type": "groundingdino"
})
logger.info(f"GroundingDINO detected {len(detections)} objects: {[d['class_name'] for d in detections]}")
return detections
except Exception as e:
logger.error(f"GroundingDINO inference error: {e}")
import traceback
logger.error(traceback.format_exc())
return []
def run_yoloworld_inference(image_bytes: bytes, text_queries: list = None, confidence: float = 0.3):
"""
Run zero-shot object detection using YOLO-World.
YOLO-World combines YOLO speed with open-vocabulary detection.
Fast and effective for real-time anomaly detection.
"""
try:
from ultralytics import YOLOWorld
from PIL import Image
import io
import numpy as np
if text_queries is None:
text_queries = ["defect", "anomaly", "crack", "scratch", "damage"]
# Load image
image = Image.open(io.BytesIO(image_bytes))
orig_w, orig_h = image.size
logger.info(f"YOLO-World: Processing image {orig_w}x{orig_h}")
# Initialize model (cached after first load)
if not hasattr(run_yoloworld_inference, 'model'):
logger.info("Loading YOLO-World model (first time only)...")
run_yoloworld_inference.model = YOLOWorld("yolov8s-world.pt") # Small model
logger.info("YOLO-World model loaded successfully")
model = run_yoloworld_inference.model
# Set custom classes
model.set_classes(text_queries)
# Convert PIL to numpy array
img_array = np.array(image)
# Run inference
results = model.predict(img_array, conf=confidence, verbose=False)
detections = []
if len(results) > 0 and results[0].boxes is not None:
boxes = results[0].boxes
logger.info(f"YOLO-World found {len(boxes)} objects")
for box in boxes:
x1, y1, x2, y2 = box.xyxy[0].cpu().numpy()
conf = float(box.conf[0].cpu().numpy())
cls = int(box.cls[0].cpu().numpy())
class_name = text_queries[cls] if cls < len(text_queries) else "object"
detections.append({
"bbox": [float(x1), float(y1), float(x2), float(y2)],
"confidence": conf,
"class_id": cls,
"class_name": class_name,
"x1": float(x1),
"y1": float(y1),
"x2": float(x2),
"y2": float(y2),
"model_type": "yoloworld"
})
logger.info(f"YOLO-World detected {len(detections)} objects: {[d['class_name'] for d in detections]}")
return detections
except Exception as e:
logger.error(f"YOLO-World inference error: {e}")
import traceback
logger.error(traceback.format_exc())
return []
def run_owlvit_inference(image_bytes: bytes, text_queries: list = None, confidence: float = 0.1):
"""
Run zero-shot object detection using OWL-ViT (Open World Localization - Vision Transformer).
OWL-ViT is Google's zero-shot object detection model that can detect objects
based on text descriptions without any training.
Args:
image_bytes: Image as bytes
text_queries: List of text descriptions to detect (e.g., ["defect", "crack", "scratch"])
confidence: Confidence threshold for detections
Returns:
List of detections with bounding boxes
"""
try:
from transformers import Owlv2Processor, Owlv2ForObjectDetection
from PIL import Image
import torch
import io
if text_queries is None:
text_queries = ["a defect", "an anomaly", "a crack", "a scratch", "damage"]
# Load image
image = Image.open(io.BytesIO(image_bytes))
orig_w, orig_h = image.size
# Initialize model and processor (cached after first load)
if not hasattr(run_owlvit_inference, 'processor'):
logger.info("Loading OWL-ViT model (first time only)...")
run_owlvit_inference.processor = Owlv2Processor.from_pretrained("google/owlv2-base-patch16-ensemble")
run_owlvit_inference.model = Owlv2ForObjectDetection.from_pretrained("google/owlv2-base-patch16-ensemble")
logger.info("OWL-ViT model loaded successfully")
processor = run_owlvit_inference.processor
model = run_owlvit_inference.model
# Prepare inputs
inputs = processor(text=text_queries, images=image, return_tensors="pt")
# Run inference
with torch.no_grad():
outputs = model(**inputs)
# Process results
target_sizes = torch.Tensor([image.size[::-1]]) # (height, width)
results = processor.post_process_object_detection(
outputs=outputs,
threshold=confidence,
target_sizes=target_sizes
)[0]
detections = []
boxes = results["boxes"].cpu().numpy()
scores = results["scores"].cpu().numpy()
labels = results["labels"].cpu().numpy()
for box, score, label in zip(boxes, scores, labels):
x1, y1, x2, y2 = box
detections.append({
"bbox": [float(x1), float(y1), float(x2), float(y2)],
"confidence": float(score),
"class_id": int(label),
"class_name": text_queries[label] if label < len(text_queries) else "object",
"x1": float(x1),
"y1": float(y1),
"x2": float(x2),
"y2": float(y2),
"text_query": text_queries[label] if label < len(text_queries) else "object",
"model_type": "owlvit"
})
logger.info(f"OWL-ViT detected {len(detections)} objects")
return detections
except Exception as e:
logger.error(f"OWL-ViT inference error: {e}")
import traceback
logger.error(traceback.format_exc())
return []
# Available models
MODELS = {
"dental-implant": {"name": "Dental Implant", "repo": "smartfalcon-ai/Dental-Implant-Defect-Detection", "type": "yolo"},
"data-matrix": {"name": "Data Matrix", "repo": "smartfalcon-ai/Data-Matrix-Defect-Detection", "type": "yolo"},
"ball-pen": {"name": "Ball Pen", "repo": "smartfalcon-ai/Ball-Pen-Defect-Detection", "type": "yolo"},
"knit-up": {"name": "Knit Up", "repo": "smartfalcon-ai/Knit-Up-Defect-Detection", "type": "yolo"},
"knit-back": {"name": "Knit Back", "repo": "smartfalcon-ai/Knit-Back-Defect-Detection", "type": "yolo"},
"jean-back": {"name": "Jean Back", "repo": "smartfalcon-ai/Jean-Back-Defect-Detection", "type": "yolo"},
"jean-up": {"name": "Jean Up", "repo": "smartfalcon-ai/Jean-Up-Defect-Detection", "type": "yolo"},
"tire-cord": {"name": "Tire Cord", "repo": "smartfalcon-ai/Tire-Cord-Defect-Detection", "type": "yolo"},
# Zero-shot models (no training data required - run locally)
"zero-shot-florence2": {
"name": "Zero Shot (Florence-2)",
"type": "florence2",
"description": "Microsoft's multimodal vision-language model - detects and labels objects automatically"
},
"zero-shot-clip": {
"name": "Zero Shot (CLIP)",
"type": "clip",
"description": "Zero-shot anomaly detection using CLIP - fast and reliable"
},
"zero-shot-owlvit": {
"name": "Zero Shot (OWL-ViT)",
"type": "owlvit",
"description": "Zero-shot object detection using Google's OWL-ViT - detects objects based on text descriptions"
},
"zero-shot-groundingdino": {
"name": "Zero Shot (GroundingDINO)",
"type": "groundingdino",
"description": "IDEA Research's open-set object detection - better than OWL-ViT for text-guided detection"
},
"zero-shot-yoloworld": {
"name": "Zero Shot (YOLO-World)",
"type": "yoloworld",
"description": "Fast open-vocabulary detection using YOLO architecture - combines speed with zero-shot capability"
},
}
# AdaCLIP configuration
ADACLIP_CLASS_NAME = os.environ.get("ADACLIP_CLASS_NAME", "object")
# Example images for Gradio
EXAMPLES = [
# Dental Implant
["examples/dental-implant-1.jpg", "Dental Implant", 0.25],
["examples/dental-implant-2.jpg", "Dental Implant", 0.25],
["examples/dental-implant-3.jpg", "Dental Implant", 0.25],
# Data Matrix
["examples/data-matrix-1.jpg", "Data Matrix", 0.25],
["examples/data-matrix-2.jpg", "Data Matrix", 0.25],
["examples/data-matrix-3.jpg", "Data Matrix", 0.25],
# Ball Pen
["examples/ball-pen-1.jpg", "Ball Pen", 0.25],
["examples/ball-pen-2.jpg", "Ball Pen", 0.25],
["examples/ball-pen-3.jpg", "Ball Pen", 0.25],
# Knit Up
["examples/knit-up-1.jpg", "Knit Up", 0.25],
["examples/knit-up-2.jpg", "Knit Up", 0.25],
["examples/knit-up-3.jpg", "Knit Up", 0.25],
# Knit Back
["examples/knit-back-1.jpg", "Knit Back", 0.25],
["examples/knit-back-2.jpg", "Knit Back", 0.25],
["examples/knit-back-3.jpg", "Knit Back", 0.25],
# Jean Back
["examples/jean-back-1.jpg", "Jean Back", 0.25],
["examples/jean-back-2.jpg", "Jean Back", 0.25],
["examples/jean-back-3.jpg", "Jean Back", 0.25],
# Jean Up
["examples/jean-up-1.jpg", "Jean Up", 0.25],
["examples/jean-up-2.jpg", "Jean Up", 0.25],
["examples/jean-up-3.jpg", "Jean Up", 0.25],
# Tire Cord
["examples/tire-cord-1.jpg", "Tire Cord", 0.25],
["examples/tire-cord-2.jpg", "Tire Cord", 0.25],
["examples/tire-cord-3.jpg", "Tire Cord", 0.25],
]
# Model sessions cache
sessions = {}
# Default model
DEFAULT_MODEL = os.environ.get("DEFAULT_MODEL", "data-matrix")
# Inference parameters
IMG_SIZE = 640
IOU_THRESHOLD = 0.45
def get_session(model_key: str):
"""Get or create ONNX inference session for a YOLO model."""
if model_key not in sessions:
if model_key not in MODELS:
raise ValueError(f"Model '{model_key}' not found. Available: {list(MODELS.keys())}")
model_config = MODELS[model_key]
# Skip ONNX loading for non-YOLO models (like AdaCLIP)
if model_config.get("type") != "yolo":
return None
try:
hf_token = os.environ.get("HUGGINGFACE_TOKEN", None)
repo_id = model_config["repo"]
logger.info(f"Downloading model: {repo_id}")
model_path = hf_hub_download(
repo_id=repo_id,
filename="best.onnx",
token=hf_token
)
sessions[model_key] = ort.InferenceSession(
model_path,
providers=["CPUExecutionProvider"]
)
logger.info(f"Model '{model_key}' loaded successfully")
except Exception as e:
logger.error(f"Failed to load model '{model_key}': {e}")
raise
return sessions.get(model_key)
def preprocess(img):
"""Preprocess image for ONNX model."""
h, w = img.shape[:2]
img_resized = cv2.resize(img, (IMG_SIZE, IMG_SIZE))
img_resized = img_resized.astype(np.float32) / 255.0
img_resized = img_resized.transpose(2, 0, 1)
img_resized = np.expand_dims(img_resized, 0)
return img_resized, w, h
def xywh2xyxy(x):
"""Convert box format from xywh to xyxy."""
y = np.copy(x)
y[:, 0] = x[:, 0] - x[:, 2] / 2
y[:, 1] = x[:, 1] - x[:, 3] / 2
y[:, 2] = x[:, 0] + x[:, 2] / 2
y[:, 3] = x[:, 1] + x[:, 3] / 2
return y
def non_max_suppression(preds, conf_thres=0.25, iou_thres=0.45):
"""Apply NMS to predictions."""
preds = preds[0]
preds = preds[preds[:, 4] > conf_thres]
if preds.shape[0] == 0:
return []
boxes = xywh2xyxy(preds[:, :4])
scores = preds[:, 4]
class_scores = preds[:, 5:]
cls_ids = np.argmax(class_scores, axis=1)
cls_conf = class_scores.max(axis=1)
final_scores = scores * cls_conf
indices = cv2.dnn.NMSBoxes(
bboxes=boxes.tolist(),
scores=final_scores.tolist(),
score_threshold=conf_thres,
nms_threshold=iou_thres
)
if len(indices) == 0:
return []
indices = indices.flatten()
output = []
for idx in indices:
x1, y1, x2, y2 = boxes[idx]
output.append({
"bbox": [float(x1), float(y1), float(x2), float(y2)],
"confidence": float(final_scores[idx]),
"class_id": int(cls_ids[idx]),
"x1": float(x1),
"y1": float(y1),
"x2": float(x2),
"y2": float(y2)
})
return output
def gradio_inference(image, model_display_name, conf_threshold):
"""Inference function for Gradio UI - returns annotated image."""
# Rate limiting
if not check_rate_limit("ui"):
logger.warning("Rate limit exceeded for UI")
# Return image with watermark showing rate limit
if image is not None:
img_bgr = cv2.cvtColor(image, cv2.COLOR_RGB2BGR) if len(image.shape) == 3 else image
cv2.putText(img_bgr, "RATE LIMIT EXCEEDED", (50, 50),
cv2.FONT_HERSHEY_SIMPLEX, 1, (0, 0, 255), 2)
return cv2.cvtColor(img_bgr, cv2.COLOR_BGR2RGB) if len(img_bgr.shape) == 3 else img_bgr
return image
# Find model key from display name
model_key = None
for key, val in MODELS.items():
if val["name"] == model_display_name:
model_key = key
break
if model_key is None:
return image
if image is None:
return None
img_bgr = cv2.cvtColor(image, cv2.COLOR_RGB2BGR)
orig_h, orig_w = img_bgr.shape[:2]
model_config = MODELS[model_key]
model_type = model_config.get("type", "yolo")
# Handle Florence-2 (multimodal vision-language model)
if model_type == "florence2":
_, img_encoded = cv2.imencode('.jpg', img_bgr)
image_bytes = img_encoded.tobytes()
detections = run_florence2_inference(image_bytes, confidence=conf_threshold)
# Add detection count
status_text = f"Florence-2: {len(detections)} objects"
cv2.putText(img_bgr, status_text, (10, 30), cv2.FONT_HERSHEY_SIMPLEX, 0.8, (255, 255, 255), 2)
cv2.putText(img_bgr, status_text, (10, 30), cv2.FONT_HERSHEY_SIMPLEX, 0.8, (128, 0, 128), 1)
for i, det in enumerate(detections):
x1 = int(det["x1"])
y1 = int(det["y1"])
x2 = int(det["x2"])
y2 = int(det["y2"])
class_name = det.get("class_name", "object")
label = f"#{i+1} {class_name}"
cv2.rectangle(img_bgr, (x1, y1), (x2, y2), (128, 0, 128), 3) # Purple
cv2.putText(img_bgr, label, (x1, y1 - 10), cv2.FONT_HERSHEY_SIMPLEX, 0.7, (128, 0, 128), 2)
if not detections:
cv2.putText(img_bgr, "No objects found", (10, 60), cv2.FONT_HERSHEY_SIMPLEX, 0.6, (128, 0, 128), 2)
return cv2.cvtColor(img_bgr, cv2.COLOR_BGR2RGB)
# Handle CLIP (zero-shot anomaly detection)
if model_type == "clip":
_, img_encoded = cv2.imencode('.jpg', img_bgr)
image_bytes = img_encoded.tobytes()
detections, anomaly_score = run_clip_anomaly_inference(image_bytes, confidence=conf_threshold)
# Add text showing anomaly score even if no detection
status_text = f"Anomaly Score: {anomaly_score:.3f}"
cv2.putText(img_bgr, status_text, (10, 30), cv2.FONT_HERSHEY_SIMPLEX, 0.8, (255, 255, 255), 2)
cv2.putText(img_bgr, status_text, (10, 30), cv2.FONT_HERSHEY_SIMPLEX, 0.8, (0, 0, 255), 1)
for det in detections:
x1 = int(det["x1"])
y1 = int(det["y1"])
x2 = int(det["x2"])
y2 = int(det["y2"])
score = det["confidence"]
normal_score = det.get("normal_score", 0)
label = f"DEFECT:{score:.2f} (vs normal:{normal_score:.2f})"
cv2.rectangle(img_bgr, (x1, y1), (x2, y2), (0, 0, 255), 3) # Red for anomalies
cv2.putText(img_bgr, label, (x1, y1 - 10), cv2.FONT_HERSHEY_SIMPLEX, 0.7, (0, 0, 255), 2)
if not detections:
no_detect_text = f"No anomaly detected (threshold: {conf_threshold:.2f})"
cv2.putText(img_bgr, no_detect_text, (10, 60), cv2.FONT_HERSHEY_SIMPLEX, 0.6, (0, 255, 0), 2)
return cv2.cvtColor(img_bgr, cv2.COLOR_BGR2RGB)
# Handle OWL-ViT (zero-shot object detection)
if model_type == "owlvit":
_, img_encoded = cv2.imencode('.jpg', img_bgr)
image_bytes = img_encoded.tobytes()
detections = run_owlvit_inference(image_bytes, confidence=conf_threshold)
# Add detection count
status_text = f"OWL-ViT Detections: {len(detections)}"
cv2.putText(img_bgr, status_text, (10, 30), cv2.FONT_HERSHEY_SIMPLEX, 0.8, (255, 255, 255), 2)
cv2.putText(img_bgr, status_text, (10, 30), cv2.FONT_HERSHEY_SIMPLEX, 0.8, (255, 0, 0), 1)
for i, det in enumerate(detections):
x1 = int(det["x1"])
y1 = int(det["y1"])
x2 = int(det["x2"])
y2 = int(det["y2"])
score = det["confidence"]
class_name = det.get("class_name", "object")
label = f"#{i+1} {class_name}:{score:.2f}"
cv2.rectangle(img_bgr, (x1, y1), (x2, y2), (255, 0, 0), 3) # Blue for OWL-ViT
cv2.putText(img_bgr, label, (x1, y1 - 10), cv2.FONT_HERSHEY_SIMPLEX, 0.7, (255, 0, 0), 2)
if not detections:
no_detect_text = f"No objects detected (threshold: {conf_threshold:.2f})"
cv2.putText(img_bgr, no_detect_text, (10, 60), cv2.FONT_HERSHEY_SIMPLEX, 0.6, (255, 0, 0), 2)
return cv2.cvtColor(img_bgr, cv2.COLOR_BGR2RGB)
# Handle GroundingDINO (zero-shot object detection)
if model_type == "groundingdino":
_, img_encoded = cv2.imencode('.jpg', img_bgr)
image_bytes = img_encoded.tobytes()
detections = run_groundingdino_inference(image_bytes, confidence=conf_threshold)
# Add detection count
status_text = f"GroundingDINO: {len(detections)} objects"
cv2.putText(img_bgr, status_text, (10, 30), cv2.FONT_HERSHEY_SIMPLEX, 0.8, (255, 255, 255), 2)
cv2.putText(img_bgr, status_text, (10, 30), cv2.FONT_HERSHEY_SIMPLEX, 0.8, (0, 165, 255), 1) # Orange
for i, det in enumerate(detections):
x1 = int(det["x1"])
y1 = int(det["y1"])
x2 = int(det["x2"])
y2 = int(det["y2"])
score = det["confidence"]
class_name = det.get("class_name", "object")
label = f"#{i+1} {class_name}:{score:.2f}"
cv2.rectangle(img_bgr, (x1, y1), (x2, y2), (0, 165, 255), 3) # Orange
cv2.putText(img_bgr, label, (x1, y1 - 10), cv2.FONT_HERSHEY_SIMPLEX, 0.7, (0, 165, 255), 2)
if not detections:
no_detect_text = f"No objects detected (threshold: {conf_threshold:.2f})"
cv2.putText(img_bgr, no_detect_text, (10, 60), cv2.FONT_HERSHEY_SIMPLEX, 0.6, (0, 165, 255), 2)
return cv2.cvtColor(img_bgr, cv2.COLOR_BGR2RGB)
# Handle YOLO-World (zero-shot object detection)
if model_type == "yoloworld":
_, img_encoded = cv2.imencode('.jpg', img_bgr)
image_bytes = img_encoded.tobytes()
detections = run_yoloworld_inference(image_bytes, confidence=conf_threshold)
# Add detection count
status_text = f"YOLO-World: {len(detections)} objects"
cv2.putText(img_bgr, status_text, (10, 30), cv2.FONT_HERSHEY_SIMPLEX, 0.8, (255, 255, 255), 2)
cv2.putText(img_bgr, status_text, (10, 30), cv2.FONT_HERSHEY_SIMPLEX, 0.8, (255, 255, 0), 1) # Cyan
for i, det in enumerate(detections):
x1 = int(det["x1"])
y1 = int(det["y1"])
x2 = int(det["x2"])
y2 = int(det["y2"])
score = det["confidence"]
class_name = det.get("class_name", "object")
label = f"#{i+1} {class_name}:{score:.2f}"
cv2.rectangle(img_bgr, (x1, y1), (x2, y2), (255, 255, 0), 3) # Cyan
cv2.putText(img_bgr, label, (x1, y1 - 10), cv2.FONT_HERSHEY_SIMPLEX, 0.7, (255, 255, 0), 2)
if not detections:
no_detect_text = f"No objects detected (threshold: {conf_threshold:.2f})"
cv2.putText(img_bgr, no_detect_text, (10, 60), cv2.FONT_HERSHEY_SIMPLEX, 0.6, (255, 255, 0), 2)
return cv2.cvtColor(img_bgr, cv2.COLOR_BGR2RGB)
# Handle YOLO models (default)
session = get_session(model_key)
if session is None:
return image
blob, _, _ = preprocess(img_bgr)
preds = session.run(None, {"images": blob})[0]
detections = non_max_suppression(preds, conf_threshold, IOU_THRESHOLD)
for det in detections:
x1 = int(det["x1"] / IMG_SIZE * orig_w)
y1 = int(det["y1"] / IMG_SIZE * orig_h)
x2 = int(det["x2"] / IMG_SIZE * orig_w)
y2 = int(det["y2"] / IMG_SIZE * orig_h)
score = det["confidence"]
cls_id = det["class_id"]
label = f"{cls_id}:{score:.2f}"
cv2.rectangle(img_bgr, (x1, y1), (x2, y2), (0, 255, 0), 2)
cv2.putText(img_bgr, label, (x1, y1 - 5), cv2.FONT_HERSHEY_SIMPLEX, 0.6, (0, 255, 0), 2)
return cv2.cvtColor(img_bgr, cv2.COLOR_BGR2RGB)
def api_inference(image, model_display_name, conf_threshold):
"""
API inference function - returns JSON detections.
This function is accessible via Gradio's API at /api/detect
"""
# Rate limiting for API
if not check_rate_limit("api"):
logger.warning("Rate limit exceeded for API")
return {"error": "Rate limit exceeded. Max 100 requests per minute."}
# Find model key from display name
model_key = None
for key, val in MODELS.items():
if val["name"] == model_display_name:
model_key = key
break
if model_key is None:
return []
if image is None:
return []
img_bgr = cv2.cvtColor(image, cv2.COLOR_RGB2BGR)
orig_h, orig_w = img_bgr.shape[:2]
model_config = MODELS[model_key]
model_type = model_config.get("type", "yolo")
# Handle Florence-2 (multimodal vision-language model)
if model_type == "florence2":
_, img_encoded = cv2.imencode('.jpg', img_bgr)
image_bytes = img_encoded.tobytes()
detections = run_florence2_inference(image_bytes, confidence=conf_threshold)
return detections
# Handle CLIP (zero-shot anomaly detection)
if model_type == "clip":
_, img_encoded = cv2.imencode('.jpg', img_bgr)
image_bytes = img_encoded.tobytes()
detections, anomaly_score = run_clip_anomaly_inference(image_bytes, confidence=conf_threshold)
return detections
# Handle OWL-ViT (zero-shot object detection)
if model_type == "owlvit":
_, img_encoded = cv2.imencode('.jpg', img_bgr)
image_bytes = img_encoded.tobytes()
detections = run_owlvit_inference(image_bytes, confidence=conf_threshold)
return detections
# Handle GroundingDINO (zero-shot object detection)
if model_type == "groundingdino":
_, img_encoded = cv2.imencode('.jpg', img_bgr)
image_bytes = img_encoded.tobytes()
detections = run_groundingdino_inference(image_bytes, confidence=conf_threshold)
return detections
# Handle YOLO-World (zero-shot object detection)
if model_type == "yoloworld":
_, img_encoded = cv2.imencode('.jpg', img_bgr)
image_bytes = img_encoded.tobytes()
detections = run_yoloworld_inference(image_bytes, confidence=conf_threshold)
return detections
# Handle YOLO models (default)
session = get_session(model_key)
if session is None:
return []
blob, _, _ = preprocess(img_bgr)
preds = session.run(None, {"images": blob})[0]
detections = non_max_suppression(preds, conf_threshold, IOU_THRESHOLD)
# Scale bboxes back to original image size
for det in detections:
det["bbox"][0] = det["bbox"][0] / IMG_SIZE * orig_w
det["bbox"][1] = det["bbox"][1] / IMG_SIZE * orig_h
det["bbox"][2] = det["bbox"][2] / IMG_SIZE * orig_w
det["bbox"][3] = det["bbox"][3] / IMG_SIZE * orig_h
det["x1"] = det["bbox"][0]
det["y1"] = det["bbox"][1]
det["x2"] = det["bbox"][2]
det["y2"] = det["bbox"][3]
return detections
# Create Gradio interface with both UI and API
with gr.Blocks(title="Industrial Defect Detection") as demo:
gr.Markdown("# Industrial Defect Detection")
gr.Markdown("""
**Visual Testing Interface** for Industrial Defect Detection models.
**Available Models:**
- **YOLO Models**: Data Matrix, Tire Cord, Dental Implant, etc. (trained on specific defects)
- **Zero Shot (Anomaly)**: Detects anomalies on ANY product without training data!
- **For API Use:** This Space provides API endpoints accessible via `/api/predict`
- **For Visual Testing:** Use the interface below to test models visually
- **Rate Limiting:** Maximum 100 requests per minute to prevent abuse
Upload an image, select a model, and adjust the confidence threshold.
Note: Zero Shot may take longer (~30-60 seconds) as it calls an external model.
""")
with gr.Row():
with gr.Column():
input_image = gr.Image(type="numpy", label="Upload Image")
model_dropdown = gr.Dropdown(
choices=[v["name"] for v in MODELS.values()],
label="Select Model",
value="Data Matrix"
)
conf_slider = gr.Slider(
minimum=0.0,
maximum=1.0,
value=0.25,
step=0.01,
label="Confidence Threshold"
)
submit_btn = gr.Button("Detect Defects", variant="primary")
with gr.Column():
output_image = gr.Image(type="numpy", label="Detection Results")
submit_btn.click(
fn=gradio_inference,
inputs=[input_image, model_dropdown, conf_slider],
outputs=output_image,
api_name="predict" # Creates /api/predict endpoint (returns image)
)
# Hidden interface for JSON API (for MonitaQC compatibility)
with gr.Row(visible=False):
json_image = gr.Image(type="numpy")
json_model = gr.Dropdown(choices=[v["name"] for v in MODELS.values()])
json_conf = gr.Slider(minimum=0.0, maximum=1.0, value=0.25)
json_output = gr.JSON()
json_btn = gr.Button("JSON Detect")
json_btn.click(
fn=api_inference,
inputs=[json_image, json_model, json_conf],
outputs=json_output,
api_name="detect" # Creates /api/detect endpoint (returns JSON)
)
gr.Markdown("### Example Images")
gr.Examples(
examples=EXAMPLES,
inputs=[input_image, model_dropdown, conf_slider],
outputs=output_image,
fn=gradio_inference,
cache_examples=True,
examples_per_page=24
)
gr.Markdown("""
### API Access
This Space provides two API endpoints:
**1. Image API** (returns annotated image):
- **Endpoint**: `/api/predict`
- **Returns**: Annotated image with bounding boxes
**2. JSON API** (returns detection data - for MonitaQC):
- **Endpoint**: `/api/detect`
- **Returns**: JSON array of detections with bboxes and confidence
**Rate Limiting:**
- Maximum 100 requests per minute per endpoint
- Exceeding the limit returns an error response
**Python Example (Image):**
```python
from gradio_client import Client
client = Client("smartfalcon-ai/Industrial-Defect-Detection")
result = client.predict(
"path/to/image.jpg",
"Data Matrix",
0.25,
api_name="/predict"
)
```
**Python Example (JSON - for MonitaQC):**
```python
from gradio_client import Client
client = Client("smartfalcon-ai/Industrial-Defect-Detection")
detections = client.predict(
"path/to/image.jpg",
"Data Matrix",
0.25,
api_name="/detect"
)
# Returns: [{"bbox": [x1, y1, x2, y2], "confidence": 0.85, "class_id": 0, ...}]
```
**Available Models:**
- Data Matrix
- Dental Implant
- Ball Pen
- Knit Up
- Knit Back
- Jean Up
- Jean Back
- Tire Cord
- **Zero Shot (Anomaly)** - Works on any product without training!
""")
# Hidden interface for models API (for MonitaQC compatibility)
with gr.Row(visible=False):
models_btn = gr.Button("Get Models")
models_output = gr.JSON()
health_btn = gr.Button("Health Check")
health_output = gr.JSON()
def get_models():
"""Return list of available models."""
return {
"models": [model_info["name"] for model_info in MODELS.values()],
"count": len(MODELS)
}
def health_check():
"""Health check endpoint for monitoring."""
return {
"status": "healthy",
"service": "Gradio Inference (HuggingFace)",
"models_loaded": len(sessions),
"available_models": len(MODELS),
"timestamp": datetime.now().isoformat()
}
models_btn.click(
fn=get_models,
inputs=[],
outputs=models_output,
api_name="models" # Creates /api/models endpoint
)
health_btn.click(
fn=health_check,
inputs=[],
outputs=health_output,
api_name="health" # Creates /api/health endpoint
)
# Launch the app
if __name__ == "__main__":
demo.launch()