NVIDIA-Nemotron-Parse-v1.2 / test_golden.py
nvidia-oliver-holworthy's picture
Normalize Nemotron Parse images in the processor
f200396 unverified
raw
history blame
22.8 kB
"""
Golden reference tests for NVIDIA-Nemotron-Parse-v1.2.
Captures reference outputs from the pinned dependency set, then verifies the
same outputs after dependency changes.
WORKFLOW
--------
Step 1 — capture (run once against pinned deps, e.g. transformers>=5.6.1):
python test_golden.py --capture [--model-path /path/to/model]
This writes golden_outputs.json next to this file.
Step 2 — verify (run against new deps):
pytest test_golden.py -v
All tests skip automatically if golden_outputs.json is missing.
TEST LAYERS
-----------
1. Image preprocessing — pixel value stats + first-N raw values (no GPU needed)
2. Encoder output — hidden state shape, mean, std, and a fixed-position slice
3. Decoder forward pass — top-k logit indices and values at a fixed decoder step
4. Generation — exact token ID sequence for 50 greedy-decoded tokens
"""
import json
import os
import sys
import pytest
import numpy as np
import torch
from pathlib import Path
# ---------------------------------------------------------------------------
# Paths / constants
# ---------------------------------------------------------------------------
MODEL_PATH = str(Path(__file__).parent)
GOLDEN_FILE = Path(__file__).parent / "golden_outputs.json"
TASK_PROMPT = "</s><s><predict_bbox><predict_classes><output_markdown><predict_no_text_in_pic>"
MAX_NEW_TOKENS_GOLDEN = 50 # short enough to be fast, long enough to be meaningful
TOP_K = 10 # number of top logit predictions to capture
# ---------------------------------------------------------------------------
# Deterministic test image (no external files required)
# ---------------------------------------------------------------------------
def make_test_image():
"""Return a fully deterministic PIL image that loosely resembles a document."""
from PIL import Image, ImageDraw
img = Image.new("RGB", (400, 600), color=(255, 255, 255))
draw = ImageDraw.Draw(img)
# Title bar
draw.rectangle([20, 20, 380, 80], fill=(210, 210, 210))
# Body text area with ruled lines
draw.rectangle([20, 100, 380, 480], fill=(245, 245, 245))
for y in range(120, 470, 18):
draw.line([(40, y), (360, y)], fill=(170, 170, 170), width=1)
# Table-like grid at the bottom
draw.rectangle([20, 500, 380, 580], fill=(200, 220, 200))
for x in range(80, 380, 80):
draw.line([(x, 500), (x, 580)], fill=(100, 140, 100), width=1)
for y in range(520, 580, 20):
draw.line([(20, y), (380, y)], fill=(100, 140, 100), width=1)
return img
# ---------------------------------------------------------------------------
# Golden file helpers
# ---------------------------------------------------------------------------
def load_golden():
if GOLDEN_FILE.exists():
with open(GOLDEN_FILE) as f:
return json.load(f)
return None
def save_golden(data: dict):
with open(GOLDEN_FILE, "w") as f:
json.dump(data, f, indent=2)
def _pixel_values_for_golden(processor, pixel_values: torch.Tensor) -> torch.Tensor:
"""Return the canonical raw pixel values used by the original golden file."""
pv = pixel_values.float()
image_processor = getattr(processor, "image_processor", None)
if not getattr(image_processor, "do_normalize", False):
return pv
mean = pv.new_tensor(image_processor.image_mean).view(1, -1, 1, 1)
std = pv.new_tensor(image_processor.image_std).view(1, -1, 1, 1)
return pv * std + mean
# ---------------------------------------------------------------------------
# Pytest fixtures (session-scoped so the model is loaded only once)
# ---------------------------------------------------------------------------
@pytest.fixture(scope="session")
def env():
"""Load model, processor, and tokenizer once for the whole test session."""
import torch
from transformers import AutoModel, AutoProcessor, AutoTokenizer
device = "cuda:0" if torch.cuda.is_available() else "cpu"
dtype = torch.bfloat16 if torch.cuda.is_available() else torch.float32
print(f"\nLoading model from {MODEL_PATH} on {device} ({dtype})…")
model = AutoModel.from_pretrained(
MODEL_PATH,
trust_remote_code=True,
torch_dtype=dtype,
).to(device).eval()
tokenizer = AutoTokenizer.from_pretrained(MODEL_PATH, trust_remote_code=True)
processor = AutoProcessor.from_pretrained(MODEL_PATH, trust_remote_code=True)
return dict(model=model, tokenizer=tokenizer, processor=processor,
device=device, dtype=dtype)
@pytest.fixture(scope="session")
def processed_inputs(env):
"""Preprocess the test image once for the whole session."""
import torch
image = make_test_image()
inputs = env["processor"](
images=[image],
text=TASK_PROMPT,
return_tensors="pt",
add_special_tokens=False,
).to(env["device"])
return inputs, image
@pytest.fixture(scope="session")
def golden():
"""Load golden data; tests that need it skip if the file is absent."""
data = load_golden()
if data is None:
pytest.skip("golden_outputs.json not found — run: python test_golden.py --capture")
return data
# ---------------------------------------------------------------------------
# Layer 1: Image preprocessing
# (Does not require a model or GPU — fast sanity check on the processor.)
# ---------------------------------------------------------------------------
class TestImageProcessing:
def test_pixel_values_shape(self, processed_inputs):
inputs, _ = processed_inputs
pv = inputs["pixel_values"]
# Model expects 2048×1664 images
assert list(pv.shape) == [1, 3, 2048, 1664], f"Unexpected shape: {pv.shape}"
def test_pixel_values_dtype(self, processed_inputs):
inputs, _ = processed_inputs
# Image preprocessing keeps float32; the model casts internally.
assert inputs["pixel_values"].dtype == torch.float32
def test_pixel_value_range(self, processed_inputs):
"""Values should be CLIP-normalized after image preprocessing."""
pv = processed_inputs[0]["pixel_values"].float()
assert pv.min() >= -2.0, f"Pixel values unexpectedly low: {pv.min()}"
assert pv.max() <= 2.5, f"Pixel values unexpectedly high: {pv.max()}"
def test_pixel_values_stats_match_golden(self, processed_inputs, env, golden):
pv = _pixel_values_for_golden(env["processor"], processed_inputs[0]["pixel_values"])
g = golden["image_processing"]
assert abs(pv.mean().item() - g["mean"]) < 1e-4, \
f"mean changed: {pv.mean().item():.6f} vs golden {g['mean']:.6f}"
assert abs(pv.std().item() - g["std"]) < 1e-4, \
f"std changed: {pv.std().item():.6f} vs golden {g['std']:.6f}"
def test_pixel_values_first_values_match_golden(self, processed_inputs, env, golden):
"""Exact match on the first 20 float values (catches transform-order bugs)."""
pv = _pixel_values_for_golden(env["processor"], processed_inputs[0]["pixel_values"])
actual = pv.flatten()[:20].tolist()
expected = golden["image_processing"]["first_20_values"]
for i, (a, e) in enumerate(zip(actual, expected)):
assert abs(a - e) < 1e-5, f"pixel[{i}] changed: {a} vs {e}"
# ---------------------------------------------------------------------------
# Layer 2: Encoder output
# ---------------------------------------------------------------------------
class TestEncoderOutput:
@pytest.fixture(scope="class")
def encoder_out(self, env, processed_inputs):
import torch
with torch.no_grad():
out = env["model"].encoder(processed_inputs[0]["pixel_values"])
return out
def test_encoder_output_shape(self, encoder_out):
# RadioWithNeck outputs (batch, 321, 1024): 320 patch tokens + 1 summary token
hs = encoder_out.last_hidden_state
assert hs.shape[0] == 1
assert hs.shape[2] == 1024, f"Unexpected hidden dim: {hs.shape[2]}"
def test_encoder_output_stats_match_golden(self, encoder_out, golden):
hs = encoder_out.last_hidden_state.float()
g = golden["encoder_output"]
assert abs(hs.mean().item() - g["mean"]) < 0.05, \
f"encoder mean changed: {hs.mean().item():.4f} vs {g['mean']:.4f}"
assert abs(hs.std().item() - g["std"]) < 0.05, \
f"encoder std changed: {hs.std().item():.4f} vs {g['std']:.4f}"
def test_encoder_output_slice_match_golden(self, encoder_out, golden):
"""Fixed-position slice: token 0, first 16 hidden dims."""
hs = encoder_out.last_hidden_state.float()
actual = hs[0, 0, :16].tolist()
expected = golden["encoder_output"]["token0_first16"]
for i, (a, e) in enumerate(zip(actual, expected)):
assert abs(a - e) < 0.1, \
f"encoder hidden[0,0,{i}] changed: {a:.4f} vs {e:.4f}"
# ---------------------------------------------------------------------------
# Layer 3: Decoder forward pass (logits)
# ---------------------------------------------------------------------------
class TestForwardPass:
@pytest.fixture(scope="class")
def forward_out(self, env, processed_inputs):
import torch
# Minimal decoder input: just the decoder_start_token (EOS = 2 for mBART)
dec_ids = torch.tensor([[2]], device=env["device"])
with torch.no_grad():
out = env["model"](
pixel_values=processed_inputs[0]["pixel_values"],
decoder_input_ids=dec_ids,
return_dict=True,
)
return out
def test_logits_shape(self, forward_out, env):
logits = forward_out.logits
assert logits.shape[0] == 1
assert logits.shape[1] == 1 # one decoder step
assert logits.shape[2] == 52352, f"Unexpected vocab size: {logits.shape[2]}"
def test_top_k_indices_match_golden(self, forward_out, golden):
"""The TOP_K predicted token IDs should be identical (order matters)."""
import torch
logits = forward_out.logits[0, -1, :].float()
top_k = torch.topk(logits, k=TOP_K)
actual = top_k.indices.tolist()
expected = golden["forward_pass"]["top_k_indices"]
assert actual == expected, \
f"Top-{TOP_K} predicted tokens changed.\n actual: {actual}\n expected: {expected}"
def test_top_k_values_match_golden(self, forward_out, golden):
"""Logit magnitudes may drift slightly due to bf16; use a loose tolerance."""
import torch
logits = forward_out.logits[0, -1, :].float()
top_k = torch.topk(logits, k=TOP_K)
for i, (a, e) in enumerate(zip(top_k.values.tolist(),
golden["forward_pass"]["top_k_values"])):
assert abs(a - e) < 1.0, \
f"top-{i+1} logit value changed: {a:.3f} vs {e:.3f}"
# ---------------------------------------------------------------------------
# Layer 4: Generation (greedy, deterministic)
# ---------------------------------------------------------------------------
class TestGeneration:
@pytest.fixture(scope="class")
def gen_out(self, env, processed_inputs):
import torch
with torch.no_grad():
out = env["model"].generate(
**processed_inputs[0],
max_new_tokens=MAX_NEW_TOKENS_GOLDEN,
do_sample=False,
num_beams=1,
)
return out
def test_generated_token_ids_match_golden(self, gen_out, golden):
"""Exact token-ID match — the most sensitive regression signal."""
actual = gen_out[0].cpu().tolist()
expected = golden["generation"]["token_ids"]
assert actual == expected, (
f"Generated token sequence differs from golden.\n"
f" first divergence at index "
f"{next((i for i,(a,e) in enumerate(zip(actual,expected)) if a!=e), '?')}\n"
f" actual: {actual}\n"
f" expected: {expected}"
)
def test_decoded_text_matches_golden(self, gen_out, env, golden):
text = env["tokenizer"].decode(gen_out[0], skip_special_tokens=False)
assert text == golden["generation"]["decoded_text"], \
f"Decoded text differs:\n actual: {text!r}\n expected: {golden['generation']['decoded_text']!r}"
# ---------------------------------------------------------------------------
# Layer 5: Processor (no model or GPU needed — pure preprocessing & text utils)
# ---------------------------------------------------------------------------
@pytest.fixture(scope="session")
def proc():
"""Load processor + tokenizer only (no model weights, no GPU required)."""
from transformers import AutoProcessor, AutoTokenizer
tokenizer = AutoTokenizer.from_pretrained(MODEL_PATH, trust_remote_code=True)
processor = AutoProcessor.from_pretrained(MODEL_PATH, trust_remote_code=True)
return dict(processor=processor, tokenizer=tokenizer)
class TestProcessor:
# ------------------------------------------------------------------
# post_process_generation
# ------------------------------------------------------------------
def test_post_process_generation_returns_string_for_string_input(self, proc, golden):
"""String input → string output."""
decoded = golden["generation"]["decoded_text"]
result = proc["processor"].post_process_generation(decoded)
assert isinstance(result, str)
def test_post_process_generation_removes_bos_eos(self, proc, golden):
"""<s> and </s> tokens must be stripped from the output."""
decoded = golden["generation"]["decoded_text"]
result = proc["processor"].post_process_generation(decoded)
assert "<s>" not in result
assert "</s>" not in result
def test_post_process_generation_matches_manual_clean(self, proc, golden):
"""Exact match against the expected cleaned string."""
decoded = golden["generation"]["decoded_text"]
expected = decoded.replace("<s>", "").replace("</s>", "").strip()
result = proc["processor"].post_process_generation(decoded)
assert result == expected
def test_post_process_generation_list_returns_list(self, proc, golden):
"""Multi-element list input → list output of the same length."""
decoded = golden["generation"]["decoded_text"]
result = proc["processor"].post_process_generation([decoded, decoded])
assert isinstance(result, list)
assert len(result) == 2
assert result[0] == result[1]
def test_post_process_generation_single_element_list_returns_string(self, proc, golden):
"""Single-element list input → scalar string (not a list)."""
decoded = golden["generation"]["decoded_text"]
result = proc["processor"].post_process_generation([decoded])
assert isinstance(result, str)
# ------------------------------------------------------------------
# decode / batch_decode via the processor
# ------------------------------------------------------------------
def test_decode_via_processor_matches_tokenizer(self, proc, golden):
"""processor.decode() must give the same result as tokenizer.decode()."""
token_ids = golden["generation"]["token_ids"]
via_proc = proc["processor"].decode(token_ids, skip_special_tokens=False)
via_tok = proc["tokenizer"].decode(token_ids, skip_special_tokens=False)
assert via_proc == via_tok
def test_batch_decode_via_processor(self, proc, golden):
"""processor.batch_decode() on repeated token lists matches golden decoded text."""
token_ids = golden["generation"]["token_ids"]
results = proc["processor"].batch_decode(
[token_ids, token_ids], skip_special_tokens=False
)
assert isinstance(results, list)
assert len(results) == 2
assert results[0] == results[1] == golden["generation"]["decoded_text"]
# ------------------------------------------------------------------
# Image processing edge cases
# ------------------------------------------------------------------
def test_large_image_resized_to_target(self, proc):
"""Image larger than 2048×1664 is downscaled to exactly [1, 3, 2048, 1664]."""
from PIL import Image
large = Image.new("RGB", (4000, 5000), color=(128, 64, 32))
out = proc["processor"](images=[large], return_tensors="pt")
assert list(out["pixel_values"].shape) == [1, 3, 2048, 1664]
def test_grayscale_image_converted_to_rgb(self, proc):
"""Grayscale (mode 'L') image is converted to RGB and produces 3 output channels."""
from PIL import Image
gray = Image.new("L", (400, 600), color=128)
out = proc["processor"](images=[gray], return_tensors="pt")
assert list(out["pixel_values"].shape) == [1, 3, 2048, 1664]
def test_multi_image_batch_first_dim(self, proc):
"""A batch of N images produces pixel_values with first dimension N."""
from PIL import Image
imgs = [
Image.new("RGB", (400, 600), color=(i * 30, i * 20, i * 10))
for i in range(3)
]
out = proc["processor"](images=imgs, return_tensors="pt")
assert list(out["pixel_values"].shape) == [3, 3, 2048, 1664]
def test_image_only_input_has_no_input_ids(self, proc):
"""Passing images without text returns pixel_values and no input_ids key."""
from PIL import Image
img = Image.new("RGB", (400, 600))
out = proc["processor"](images=[img], return_tensors="pt")
assert "pixel_values" in out
assert "input_ids" not in out
def test_text_only_input_has_no_pixel_values(self, proc):
"""Passing text without images returns input_ids and no pixel_values key."""
out = proc["processor"](text="hello world", return_tensors="pt")
assert "input_ids" in out
assert "pixel_values" not in out
# ---------------------------------------------------------------------------
# Capture helper (run as script: python test_golden.py --capture)
# ---------------------------------------------------------------------------
def capture(model_path: str = MODEL_PATH):
"""
Run a full inference pass and write golden_outputs.json.
Intended to be run once against the pinned dependency set.
"""
import torch
import transformers
from transformers import AutoModel, AutoProcessor, AutoTokenizer
device = "cuda:0" if torch.cuda.is_available() else "cpu"
dtype = torch.bfloat16 if torch.cuda.is_available() else torch.float32
print(f"Capturing golden outputs")
print(f" transformers : {transformers.__version__}")
print(f" torch : {torch.__version__}")
print(f" device : {device} dtype={dtype}")
print(f" model_path : {model_path}")
model = AutoModel.from_pretrained(
model_path, trust_remote_code=True, torch_dtype=dtype
).to(device).eval()
tokenizer = AutoTokenizer.from_pretrained(model_path, trust_remote_code=True)
processor = AutoProcessor.from_pretrained(model_path, trust_remote_code=True)
image = make_test_image()
inputs = processor(
images=[image],
text=TASK_PROMPT,
return_tensors="pt",
add_special_tokens=False,
).to(device)
# ---------- image processing ----------
pv = inputs["pixel_values"].float()
image_data = {
"shape": list(pv.shape),
"mean": pv.mean().item(),
"std": pv.std().item(),
"first_20_values": pv.flatten()[:20].tolist(),
}
print(f"\n[image] shape={image_data['shape']} mean={image_data['mean']:.4f} std={image_data['std']:.4f}")
# ---------- encoder output ----------
with torch.no_grad():
enc_out = model.encoder(inputs["pixel_values"])
hs = enc_out.last_hidden_state.float()
encoder_data = {
"shape": list(hs.shape),
"mean": hs.mean().item(),
"std": hs.std().item(),
"token0_first16": hs[0, 0, :16].tolist(),
}
print(f"[encoder] shape={encoder_data['shape']} mean={encoder_data['mean']:.4f} std={encoder_data['std']:.4f}")
# ---------- forward pass (logits) ----------
dec_ids = torch.tensor([[2]], device=device) # decoder_start_token_id
with torch.no_grad():
fwd_out = model(
pixel_values=inputs["pixel_values"],
decoder_input_ids=dec_ids,
return_dict=True,
)
logits = fwd_out.logits[0, -1, :].float()
top_k = torch.topk(logits, k=TOP_K)
forward_data = {
"logits_shape": list(fwd_out.logits.shape),
"top_k_indices": top_k.indices.tolist(),
"top_k_values": top_k.values.tolist(),
}
top_tokens = [tokenizer.decode([i]) for i in top_k.indices.tolist()]
print(f"[forward] top-{TOP_K} tokens: {top_tokens}")
# ---------- generation ----------
with torch.no_grad():
gen_out = model.generate(
**inputs,
max_new_tokens=MAX_NEW_TOKENS_GOLDEN,
do_sample=False,
num_beams=1,
)
token_ids = gen_out[0].cpu().tolist()
decoded_text = tokenizer.decode(gen_out[0], skip_special_tokens=False)
generation_data = {
"max_new_tokens": MAX_NEW_TOKENS_GOLDEN,
"token_ids": token_ids,
"decoded_text": decoded_text,
}
print(f"[generation] {len(token_ids)} tokens: {decoded_text!r}")
# ---------- save ----------
golden = {
"metadata": {
"transformers_version": transformers.__version__,
"torch_version": torch.__version__,
"device": str(device),
"dtype": str(dtype),
"model_path": model_path,
},
"image_processing": image_data,
"encoder_output": encoder_data,
"forward_pass": forward_data,
"generation": generation_data,
}
save_golden(golden)
print(f"\nGolden outputs written to {GOLDEN_FILE}")
return golden
if __name__ == "__main__":
import argparse
parser = argparse.ArgumentParser(description="Golden reference capture/verify for Nemotron-Parse")
parser.add_argument("--capture", action="store_true", help="Capture golden outputs")
parser.add_argument("--model-path", default=MODEL_PATH, help="Path to model directory")
args = parser.parse_args()
if args.capture:
capture(model_path=args.model_path)
else:
parser.print_help()
print("\nTo run tests: pytest test_golden.py -v")
print("To capture: python test_golden.py --capture")