""" Golden reference tests for NVIDIA-Nemotron-Parse-v1.2. Captures reference outputs from the pinned dependency set, then verifies the same outputs after dependency changes. WORKFLOW -------- Step 1 — capture (run once against pinned deps, e.g. transformers>=5.6.1): python test_golden.py --capture [--model-path /path/to/model] This writes golden_outputs.json next to this file. Step 2 — verify (run against new deps): pytest test_golden.py -v All tests skip automatically if golden_outputs.json is missing. TEST LAYERS ----------- 1. Image preprocessing — pixel value stats + first-N raw values (no GPU needed) 2. Encoder output — hidden state shape, mean, std, and a fixed-position slice 3. Decoder forward pass — top-k logit indices and values at a fixed decoder step 4. Generation — exact token ID sequence for 50 greedy-decoded tokens """ import json import os import sys import pytest import numpy as np import torch from pathlib import Path # --------------------------------------------------------------------------- # Paths / constants # --------------------------------------------------------------------------- MODEL_PATH = str(Path(__file__).parent) GOLDEN_FILE = Path(__file__).parent / "golden_outputs.json" TASK_PROMPT = "" MAX_NEW_TOKENS_GOLDEN = 50 # short enough to be fast, long enough to be meaningful TOP_K = 10 # number of top logit predictions to capture # --------------------------------------------------------------------------- # Deterministic test image (no external files required) # --------------------------------------------------------------------------- def make_test_image(): """Return a fully deterministic PIL image that loosely resembles a document.""" from PIL import Image, ImageDraw img = Image.new("RGB", (400, 600), color=(255, 255, 255)) draw = ImageDraw.Draw(img) # Title bar draw.rectangle([20, 20, 380, 80], fill=(210, 210, 210)) # Body text area with ruled lines draw.rectangle([20, 100, 380, 480], fill=(245, 245, 245)) for y in range(120, 470, 18): draw.line([(40, y), (360, y)], fill=(170, 170, 170), width=1) # Table-like grid at the bottom draw.rectangle([20, 500, 380, 580], fill=(200, 220, 200)) for x in range(80, 380, 80): draw.line([(x, 500), (x, 580)], fill=(100, 140, 100), width=1) for y in range(520, 580, 20): draw.line([(20, y), (380, y)], fill=(100, 140, 100), width=1) return img # --------------------------------------------------------------------------- # Golden file helpers # --------------------------------------------------------------------------- def load_golden(): if GOLDEN_FILE.exists(): with open(GOLDEN_FILE) as f: return json.load(f) return None def save_golden(data: dict): with open(GOLDEN_FILE, "w") as f: json.dump(data, f, indent=2) def _pixel_values_for_golden(processor, pixel_values: torch.Tensor) -> torch.Tensor: """Return the canonical raw pixel values used by the original golden file.""" pv = pixel_values.float() image_processor = getattr(processor, "image_processor", None) if not getattr(image_processor, "do_normalize", False): return pv mean = pv.new_tensor(image_processor.image_mean).view(1, -1, 1, 1) std = pv.new_tensor(image_processor.image_std).view(1, -1, 1, 1) return pv * std + mean # --------------------------------------------------------------------------- # Pytest fixtures (session-scoped so the model is loaded only once) # --------------------------------------------------------------------------- @pytest.fixture(scope="session") def env(): """Load model, processor, and tokenizer once for the whole test session.""" import torch from transformers import AutoModel, AutoProcessor, AutoTokenizer device = "cuda:0" if torch.cuda.is_available() else "cpu" dtype = torch.bfloat16 if torch.cuda.is_available() else torch.float32 print(f"\nLoading model from {MODEL_PATH} on {device} ({dtype})…") model = AutoModel.from_pretrained( MODEL_PATH, trust_remote_code=True, torch_dtype=dtype, ).to(device).eval() tokenizer = AutoTokenizer.from_pretrained(MODEL_PATH, trust_remote_code=True) processor = AutoProcessor.from_pretrained(MODEL_PATH, trust_remote_code=True) return dict(model=model, tokenizer=tokenizer, processor=processor, device=device, dtype=dtype) @pytest.fixture(scope="session") def processed_inputs(env): """Preprocess the test image once for the whole session.""" import torch image = make_test_image() inputs = env["processor"]( images=[image], text=TASK_PROMPT, return_tensors="pt", add_special_tokens=False, ).to(env["device"]) return inputs, image @pytest.fixture(scope="session") def golden(): """Load golden data; tests that need it skip if the file is absent.""" data = load_golden() if data is None: pytest.skip("golden_outputs.json not found — run: python test_golden.py --capture") return data # --------------------------------------------------------------------------- # Layer 1: Image preprocessing # (Does not require a model or GPU — fast sanity check on the processor.) # --------------------------------------------------------------------------- class TestImageProcessing: def test_pixel_values_shape(self, processed_inputs): inputs, _ = processed_inputs pv = inputs["pixel_values"] # Model expects 2048×1664 images assert list(pv.shape) == [1, 3, 2048, 1664], f"Unexpected shape: {pv.shape}" def test_pixel_values_dtype(self, processed_inputs): inputs, _ = processed_inputs # Image preprocessing keeps float32; the model casts internally. assert inputs["pixel_values"].dtype == torch.float32 def test_pixel_value_range(self, processed_inputs): """Values should be CLIP-normalized after image preprocessing.""" pv = processed_inputs[0]["pixel_values"].float() assert pv.min() >= -2.0, f"Pixel values unexpectedly low: {pv.min()}" assert pv.max() <= 2.5, f"Pixel values unexpectedly high: {pv.max()}" def test_pixel_values_stats_match_golden(self, processed_inputs, env, golden): pv = _pixel_values_for_golden(env["processor"], processed_inputs[0]["pixel_values"]) g = golden["image_processing"] assert abs(pv.mean().item() - g["mean"]) < 1e-4, \ f"mean changed: {pv.mean().item():.6f} vs golden {g['mean']:.6f}" assert abs(pv.std().item() - g["std"]) < 1e-4, \ f"std changed: {pv.std().item():.6f} vs golden {g['std']:.6f}" def test_pixel_values_first_values_match_golden(self, processed_inputs, env, golden): """Exact match on the first 20 float values (catches transform-order bugs).""" pv = _pixel_values_for_golden(env["processor"], processed_inputs[0]["pixel_values"]) actual = pv.flatten()[:20].tolist() expected = golden["image_processing"]["first_20_values"] for i, (a, e) in enumerate(zip(actual, expected)): assert abs(a - e) < 1e-5, f"pixel[{i}] changed: {a} vs {e}" # --------------------------------------------------------------------------- # Layer 2: Encoder output # --------------------------------------------------------------------------- class TestEncoderOutput: @pytest.fixture(scope="class") def encoder_out(self, env, processed_inputs): import torch with torch.no_grad(): out = env["model"].encoder(processed_inputs[0]["pixel_values"]) return out def test_encoder_output_shape(self, encoder_out): # RadioWithNeck outputs (batch, 321, 1024): 320 patch tokens + 1 summary token hs = encoder_out.last_hidden_state assert hs.shape[0] == 1 assert hs.shape[2] == 1024, f"Unexpected hidden dim: {hs.shape[2]}" def test_encoder_output_stats_match_golden(self, encoder_out, golden): hs = encoder_out.last_hidden_state.float() g = golden["encoder_output"] assert abs(hs.mean().item() - g["mean"]) < 0.05, \ f"encoder mean changed: {hs.mean().item():.4f} vs {g['mean']:.4f}" assert abs(hs.std().item() - g["std"]) < 0.05, \ f"encoder std changed: {hs.std().item():.4f} vs {g['std']:.4f}" def test_encoder_output_slice_match_golden(self, encoder_out, golden): """Fixed-position slice: token 0, first 16 hidden dims.""" hs = encoder_out.last_hidden_state.float() actual = hs[0, 0, :16].tolist() expected = golden["encoder_output"]["token0_first16"] for i, (a, e) in enumerate(zip(actual, expected)): assert abs(a - e) < 0.1, \ f"encoder hidden[0,0,{i}] changed: {a:.4f} vs {e:.4f}" # --------------------------------------------------------------------------- # Layer 3: Decoder forward pass (logits) # --------------------------------------------------------------------------- class TestForwardPass: @pytest.fixture(scope="class") def forward_out(self, env, processed_inputs): import torch # Minimal decoder input: just the decoder_start_token (EOS = 2 for mBART) dec_ids = torch.tensor([[2]], device=env["device"]) with torch.no_grad(): out = env["model"]( pixel_values=processed_inputs[0]["pixel_values"], decoder_input_ids=dec_ids, return_dict=True, ) return out def test_logits_shape(self, forward_out, env): logits = forward_out.logits assert logits.shape[0] == 1 assert logits.shape[1] == 1 # one decoder step assert logits.shape[2] == 52352, f"Unexpected vocab size: {logits.shape[2]}" def test_top_k_indices_match_golden(self, forward_out, golden): """The TOP_K predicted token IDs should be identical (order matters).""" import torch logits = forward_out.logits[0, -1, :].float() top_k = torch.topk(logits, k=TOP_K) actual = top_k.indices.tolist() expected = golden["forward_pass"]["top_k_indices"] assert actual == expected, \ f"Top-{TOP_K} predicted tokens changed.\n actual: {actual}\n expected: {expected}" def test_top_k_values_match_golden(self, forward_out, golden): """Logit magnitudes may drift slightly due to bf16; use a loose tolerance.""" import torch logits = forward_out.logits[0, -1, :].float() top_k = torch.topk(logits, k=TOP_K) for i, (a, e) in enumerate(zip(top_k.values.tolist(), golden["forward_pass"]["top_k_values"])): assert abs(a - e) < 1.0, \ f"top-{i+1} logit value changed: {a:.3f} vs {e:.3f}" # --------------------------------------------------------------------------- # Layer 4: Generation (greedy, deterministic) # --------------------------------------------------------------------------- class TestGeneration: @pytest.fixture(scope="class") def gen_out(self, env, processed_inputs): import torch with torch.no_grad(): out = env["model"].generate( **processed_inputs[0], max_new_tokens=MAX_NEW_TOKENS_GOLDEN, do_sample=False, num_beams=1, ) return out def test_generated_token_ids_match_golden(self, gen_out, golden): """Exact token-ID match — the most sensitive regression signal.""" actual = gen_out[0].cpu().tolist() expected = golden["generation"]["token_ids"] assert actual == expected, ( f"Generated token sequence differs from golden.\n" f" first divergence at index " f"{next((i for i,(a,e) in enumerate(zip(actual,expected)) if a!=e), '?')}\n" f" actual: {actual}\n" f" expected: {expected}" ) def test_decoded_text_matches_golden(self, gen_out, env, golden): text = env["tokenizer"].decode(gen_out[0], skip_special_tokens=False) assert text == golden["generation"]["decoded_text"], \ f"Decoded text differs:\n actual: {text!r}\n expected: {golden['generation']['decoded_text']!r}" # --------------------------------------------------------------------------- # Layer 5: Processor (no model or GPU needed — pure preprocessing & text utils) # --------------------------------------------------------------------------- @pytest.fixture(scope="session") def proc(): """Load processor + tokenizer only (no model weights, no GPU required).""" from transformers import AutoProcessor, AutoTokenizer tokenizer = AutoTokenizer.from_pretrained(MODEL_PATH, trust_remote_code=True) processor = AutoProcessor.from_pretrained(MODEL_PATH, trust_remote_code=True) return dict(processor=processor, tokenizer=tokenizer) class TestProcessor: # ------------------------------------------------------------------ # post_process_generation # ------------------------------------------------------------------ def test_post_process_generation_returns_string_for_string_input(self, proc, golden): """String input → string output.""" decoded = golden["generation"]["decoded_text"] result = proc["processor"].post_process_generation(decoded) assert isinstance(result, str) def test_post_process_generation_removes_bos_eos(self, proc, golden): """ and tokens must be stripped from the output.""" decoded = golden["generation"]["decoded_text"] result = proc["processor"].post_process_generation(decoded) assert "" not in result assert "" not in result def test_post_process_generation_matches_manual_clean(self, proc, golden): """Exact match against the expected cleaned string.""" decoded = golden["generation"]["decoded_text"] expected = decoded.replace("", "").replace("", "").strip() result = proc["processor"].post_process_generation(decoded) assert result == expected def test_post_process_generation_list_returns_list(self, proc, golden): """Multi-element list input → list output of the same length.""" decoded = golden["generation"]["decoded_text"] result = proc["processor"].post_process_generation([decoded, decoded]) assert isinstance(result, list) assert len(result) == 2 assert result[0] == result[1] def test_post_process_generation_single_element_list_returns_string(self, proc, golden): """Single-element list input → scalar string (not a list).""" decoded = golden["generation"]["decoded_text"] result = proc["processor"].post_process_generation([decoded]) assert isinstance(result, str) # ------------------------------------------------------------------ # decode / batch_decode via the processor # ------------------------------------------------------------------ def test_decode_via_processor_matches_tokenizer(self, proc, golden): """processor.decode() must give the same result as tokenizer.decode().""" token_ids = golden["generation"]["token_ids"] via_proc = proc["processor"].decode(token_ids, skip_special_tokens=False) via_tok = proc["tokenizer"].decode(token_ids, skip_special_tokens=False) assert via_proc == via_tok def test_batch_decode_via_processor(self, proc, golden): """processor.batch_decode() on repeated token lists matches golden decoded text.""" token_ids = golden["generation"]["token_ids"] results = proc["processor"].batch_decode( [token_ids, token_ids], skip_special_tokens=False ) assert isinstance(results, list) assert len(results) == 2 assert results[0] == results[1] == golden["generation"]["decoded_text"] # ------------------------------------------------------------------ # Image processing edge cases # ------------------------------------------------------------------ def test_large_image_resized_to_target(self, proc): """Image larger than 2048×1664 is downscaled to exactly [1, 3, 2048, 1664].""" from PIL import Image large = Image.new("RGB", (4000, 5000), color=(128, 64, 32)) out = proc["processor"](images=[large], return_tensors="pt") assert list(out["pixel_values"].shape) == [1, 3, 2048, 1664] def test_grayscale_image_converted_to_rgb(self, proc): """Grayscale (mode 'L') image is converted to RGB and produces 3 output channels.""" from PIL import Image gray = Image.new("L", (400, 600), color=128) out = proc["processor"](images=[gray], return_tensors="pt") assert list(out["pixel_values"].shape) == [1, 3, 2048, 1664] def test_multi_image_batch_first_dim(self, proc): """A batch of N images produces pixel_values with first dimension N.""" from PIL import Image imgs = [ Image.new("RGB", (400, 600), color=(i * 30, i * 20, i * 10)) for i in range(3) ] out = proc["processor"](images=imgs, return_tensors="pt") assert list(out["pixel_values"].shape) == [3, 3, 2048, 1664] def test_image_only_input_has_no_input_ids(self, proc): """Passing images without text returns pixel_values and no input_ids key.""" from PIL import Image img = Image.new("RGB", (400, 600)) out = proc["processor"](images=[img], return_tensors="pt") assert "pixel_values" in out assert "input_ids" not in out def test_text_only_input_has_no_pixel_values(self, proc): """Passing text without images returns input_ids and no pixel_values key.""" out = proc["processor"](text="hello world", return_tensors="pt") assert "input_ids" in out assert "pixel_values" not in out # --------------------------------------------------------------------------- # Capture helper (run as script: python test_golden.py --capture) # --------------------------------------------------------------------------- def capture(model_path: str = MODEL_PATH): """ Run a full inference pass and write golden_outputs.json. Intended to be run once against the pinned dependency set. """ import torch import transformers from transformers import AutoModel, AutoProcessor, AutoTokenizer device = "cuda:0" if torch.cuda.is_available() else "cpu" dtype = torch.bfloat16 if torch.cuda.is_available() else torch.float32 print(f"Capturing golden outputs") print(f" transformers : {transformers.__version__}") print(f" torch : {torch.__version__}") print(f" device : {device} dtype={dtype}") print(f" model_path : {model_path}") model = AutoModel.from_pretrained( model_path, trust_remote_code=True, torch_dtype=dtype ).to(device).eval() tokenizer = AutoTokenizer.from_pretrained(model_path, trust_remote_code=True) processor = AutoProcessor.from_pretrained(model_path, trust_remote_code=True) image = make_test_image() inputs = processor( images=[image], text=TASK_PROMPT, return_tensors="pt", add_special_tokens=False, ).to(device) # ---------- image processing ---------- pv = inputs["pixel_values"].float() image_data = { "shape": list(pv.shape), "mean": pv.mean().item(), "std": pv.std().item(), "first_20_values": pv.flatten()[:20].tolist(), } print(f"\n[image] shape={image_data['shape']} mean={image_data['mean']:.4f} std={image_data['std']:.4f}") # ---------- encoder output ---------- with torch.no_grad(): enc_out = model.encoder(inputs["pixel_values"]) hs = enc_out.last_hidden_state.float() encoder_data = { "shape": list(hs.shape), "mean": hs.mean().item(), "std": hs.std().item(), "token0_first16": hs[0, 0, :16].tolist(), } print(f"[encoder] shape={encoder_data['shape']} mean={encoder_data['mean']:.4f} std={encoder_data['std']:.4f}") # ---------- forward pass (logits) ---------- dec_ids = torch.tensor([[2]], device=device) # decoder_start_token_id with torch.no_grad(): fwd_out = model( pixel_values=inputs["pixel_values"], decoder_input_ids=dec_ids, return_dict=True, ) logits = fwd_out.logits[0, -1, :].float() top_k = torch.topk(logits, k=TOP_K) forward_data = { "logits_shape": list(fwd_out.logits.shape), "top_k_indices": top_k.indices.tolist(), "top_k_values": top_k.values.tolist(), } top_tokens = [tokenizer.decode([i]) for i in top_k.indices.tolist()] print(f"[forward] top-{TOP_K} tokens: {top_tokens}") # ---------- generation ---------- with torch.no_grad(): gen_out = model.generate( **inputs, max_new_tokens=MAX_NEW_TOKENS_GOLDEN, do_sample=False, num_beams=1, ) token_ids = gen_out[0].cpu().tolist() decoded_text = tokenizer.decode(gen_out[0], skip_special_tokens=False) generation_data = { "max_new_tokens": MAX_NEW_TOKENS_GOLDEN, "token_ids": token_ids, "decoded_text": decoded_text, } print(f"[generation] {len(token_ids)} tokens: {decoded_text!r}") # ---------- save ---------- golden = { "metadata": { "transformers_version": transformers.__version__, "torch_version": torch.__version__, "device": str(device), "dtype": str(dtype), "model_path": model_path, }, "image_processing": image_data, "encoder_output": encoder_data, "forward_pass": forward_data, "generation": generation_data, } save_golden(golden) print(f"\nGolden outputs written to {GOLDEN_FILE}") return golden if __name__ == "__main__": import argparse parser = argparse.ArgumentParser(description="Golden reference capture/verify for Nemotron-Parse") parser.add_argument("--capture", action="store_true", help="Capture golden outputs") parser.add_argument("--model-path", default=MODEL_PATH, help="Path to model directory") args = parser.parse_args() if args.capture: capture(model_path=args.model_path) else: parser.print_help() print("\nTo run tests: pytest test_golden.py -v") print("To capture: python test_golden.py --capture")