#!/usr/bin/env python3 """Run the Embedl-quantized SAM-3D-Body backbone with TensorRT. Builds (or reuses) an INT8+FP16 TensorRT engine from `embedl_sam3dbody_int8.onnx`, runs it on a person crop, reports the feature-map statistics + GPU-compute latency, and saves a PCA visualization of the 16x16 patch features. python3 -m venv venv --system-site-packages # use system TensorRT source venv/bin/activate pip install pillow numpy python infer_trt.py --image sample_input.png --save-pca features_pca.png Requires an NVIDIA GPU + TensorRT 10.x and CUDA. The first run builds the engine (~1-2 min) and caches it next to the ONNX. """ from __future__ import annotations import argparse import statistics import time from pathlib import Path import numpy as np import tensorrt as trt from PIL import Image try: import pycuda.autoinit # noqa: F401 import pycuda.driver as cuda except ImportError as e: # pragma: no cover raise SystemExit("pip install pycuda (needed for the TensorRT host/device copies)") from e HERE = Path(__file__).resolve().parent ONNX = HERE / "embedl_sam3dbody_int8.onnx" ENGINE = HERE / "embedl_sam3dbody_int8.engine" INPUT_SIZE = 512 IMAGENET_MEAN = np.array([0.485, 0.456, 0.406], dtype=np.float32) IMAGENET_STD = np.array([0.229, 0.224, 0.225], dtype=np.float32) LOG = trt.Logger(trt.Logger.WARNING) def preprocess(path: Path) -> np.ndarray: img = Image.open(path).convert("RGB").resize((INPUT_SIZE, INPUT_SIZE), Image.BILINEAR) arr = np.asarray(img, dtype=np.float32) / 255.0 arr = (arr - IMAGENET_MEAN) / IMAGENET_STD return np.ascontiguousarray(arr.transpose(2, 0, 1)[None]) # (1, 3, H, W) def build_engine(onnx_path: Path, engine_path: Path) -> trt.ICudaEngine: if engine_path.exists(): print(f"loading cached engine {engine_path.name}") return trt.Runtime(LOG).deserialize_cuda_engine(engine_path.read_bytes()) print(f"building INT8+FP16 engine from {onnx_path.name} (~1-2 min) ...") trt.init_libnvinfer_plugins(LOG, "") builder = trt.Builder(LOG) network = builder.create_network(1 << int(trt.NetworkDefinitionCreationFlag.EXPLICIT_BATCH)) parser = trt.OnnxParser(network, LOG) if not parser.parse_from_file(str(onnx_path)): for i in range(parser.num_errors): print(" parse error:", parser.get_error(i)) raise RuntimeError("ONNX parse failed") cfg = builder.create_builder_config() cfg.set_memory_pool_limit(trt.MemoryPoolType.WORKSPACE, 4 * 1024**3) cfg.builder_optimization_level = 3 cfg.set_flag(trt.BuilderFlag.FP16) cfg.set_flag(trt.BuilderFlag.INT8) plan = builder.build_serialized_network(network, cfg) if plan is None: raise RuntimeError("engine build failed") engine_path.write_bytes(bytes(plan)) print(f" saved {engine_path.name} ({engine_path.stat().st_size / 1e6:.0f} MB)") return trt.Runtime(LOG).deserialize_cuda_engine(bytes(plan)) class TrtRunner: def __init__(self, engine: trt.ICudaEngine): self.engine = engine self.ctx = engine.create_execution_context() self.host, self.dev, self.shapes = {}, {}, {} self.in_name = self.out_name = None for i in range(engine.num_io_tensors): nm = engine.get_tensor_name(i) shape = tuple(engine.get_tensor_shape(nm)) dt = trt.nptype(engine.get_tensor_dtype(nm)) host = cuda.pagelocked_empty(int(np.prod(shape)), dt) dev = cuda.mem_alloc(host.nbytes) self.host[nm], self.dev[nm], self.shapes[nm] = host, dev, shape self.ctx.set_tensor_address(nm, int(dev)) if engine.get_tensor_mode(nm) == trt.TensorIOMode.INPUT: self.in_name = nm else: self.out_name = nm self.stream = cuda.Stream() def __call__(self, x: np.ndarray) -> np.ndarray: np.copyto(self.host[self.in_name], x.ravel().astype(self.host[self.in_name].dtype)) cuda.memcpy_htod_async(self.dev[self.in_name], self.host[self.in_name], self.stream) self.ctx.execute_async_v3(self.stream.handle) cuda.memcpy_dtoh_async(self.host[self.out_name], self.dev[self.out_name], self.stream) self.stream.synchronize() return self.host[self.out_name].reshape(self.shapes[self.out_name]).astype(np.float32).copy() def benchmark(self, n: int = 300, warmup: float = 2.0) -> tuple[float, float, float]: until = time.perf_counter() + warmup while time.perf_counter() < until: self.ctx.execute_async_v3(self.stream.handle) self.stream.synchronize() times = [] for _ in range(n): t0 = time.perf_counter() self.ctx.execute_async_v3(self.stream.handle) self.stream.synchronize() times.append((time.perf_counter() - t0) * 1000) return statistics.mean(times), sorted(times)[int(0.95 * len(times))], n / (sum(times) / 1000) def feature_pca_rgb(feats: np.ndarray, out_hw: int = 256) -> Image.Image: c, h, w = feats.shape x = feats.reshape(c, h * w).T x = x - x.mean(0, keepdims=True) _, _, vt = np.linalg.svd(x, full_matrices=False) comps = x @ vt[:3].T lo, hi = np.percentile(comps, 2, axis=0), np.percentile(comps, 98, axis=0) comps = np.clip((comps - lo) / (hi - lo + 1e-8), 0, 1) rgb = (comps.reshape(h, w, 3) * 255).astype(np.uint8) return Image.fromarray(rgb).resize((out_hw, out_hw), Image.NEAREST) def main() -> None: ap = argparse.ArgumentParser(description=__doc__) ap.add_argument("--image", default=str(HERE / "sample_input.png")) ap.add_argument("--onnx", default=str(ONNX)) ap.add_argument("--engine", default=str(ENGINE)) ap.add_argument("--save-pca", default=None) ap.add_argument("--no-benchmark", action="store_true") args = ap.parse_args() engine = build_engine(Path(args.onnx), Path(args.engine)) runner = TrtRunner(engine) x = preprocess(Path(args.image)) print(f"input: {x.shape} {x.dtype}") feats = runner(x) print(f"features: {feats.shape} mean={feats.mean():.4f} std={feats.std():.4f}") if not args.no_benchmark: mean_ms, p95_ms, qps = runner.benchmark() print(f"latency: mean={mean_ms:.2f} ms p95={p95_ms:.2f} ms throughput={qps:.1f} qps") if args.save_pca: feature_pca_rgb(feats[0]).save(args.save_pca) print(f"wrote PCA feature visualization -> {args.save_pca}") if __name__ == "__main__": main()