import argparse import base64 import os from concurrent.futures import ThreadPoolExecutor, as_completed from dataclasses import dataclass from pathlib import Path from typing import Any, Dict, Iterable, List, Optional, Tuple from openai import OpenAI def _guess_mime(path: str) -> str: ext = Path(path).suffix.lower().lstrip(".") if ext in ("jpg", "jpeg"): return "image/jpeg" if ext in ("webp",): return "image/webp" # default return "image/png" def _b64_image_data_url(path: str) -> str: with open(path, "rb") as f: img_b64 = base64.b64encode(f.read()).decode("utf-8") mime = _guess_mime(path) return f"data:{mime};base64,{img_b64}" def _iter_images(paths: List[str], image_dir: Optional[str]) -> List[str]: out: List[str] = [] for p in paths: out.append(p) if image_dir: for ext in ("*.png", "*.jpg", "*.jpeg", "*.webp"): out.extend([str(x) for x in sorted(Path(image_dir).glob(ext))]) # De-dupe, keep order seen = set() deduped: List[str] = [] for p in out: if p in seen: continue seen.add(p) deduped.append(p) return deduped @dataclass(frozen=True) class _ReqSpec: image_path: str request_idx: int def _make_client(base_url: str) -> OpenAI: # openai>=1.x requires an API key; vLLM ignores it by default. api_key = os.environ.get("OPENAI_API_KEY", "EMPTY") return OpenAI(base_url=base_url, api_key=api_key) def _run_one( req: _ReqSpec, *, base_url: str, model: str, prompt_text: str, max_tokens: int, temperature: float, extra_body: Dict[str, Any], ) -> Tuple[_ReqSpec, str]: client = _make_client(base_url) img_url = _b64_image_data_url(req.image_path) resp = client.chat.completions.create( model=model, messages=[ { "role": "user", "content": [ {"type": "text", "text": prompt_text}, {"type": "image_url", "image_url": {"url": img_url}}, ], } ], max_tokens=max_tokens, temperature=temperature, extra_body=extra_body, ) text = resp.choices[0].message.content or "" return req, text def _maybe_annotate(image_path: str, generated_text: str, out_image_path: str) -> None: # Optional visualization (similar to example_with_table_processor.py). from PIL import Image, ImageDraw # local import so batching can run without pillow from postprocessing import extract_classes_bboxes, postprocess_text, transform_bbox_to_original image = Image.open(image_path).convert("RGB") classes, bboxes, texts = extract_classes_bboxes(generated_text) bboxes = [transform_bbox_to_original(bbox, image.width, image.height) for bbox in bboxes] table_format = "HTML" # latex | HTML | markdown text_format = "markdown" # markdown | plain blank_text_in_figures = False _ = [ postprocess_text( text, cls=cls, table_format=table_format, text_format=text_format, blank_text_in_figures=blank_text_in_figures, ) for text, cls in zip(texts, classes) ] draw = ImageDraw.Draw(image) for bbox in bboxes: draw.rectangle( (bbox[0], bbox[1], max(bbox[0], bbox[2]), max(bbox[1], bbox[3])), outline="red", width=2, ) image.save(out_image_path) def main() -> None: ap = argparse.ArgumentParser(description="vLLM OpenAI-compatible example (batch + .txt outputs).") ap.add_argument("--base-url", default="http://localhost:8000/v1") ap.add_argument("--model", default="nvidia/NVIDIA-Nemotron-Parse-v1.2") ap.add_argument("--image", action="append", default=[], help="Image path (repeatable).") ap.add_argument("--image-dir", default=None, help="Directory of images to run (png/jpg/jpeg/webp).") ap.add_argument("--out-dir", default="vllm_outputs", help="Where to write .txt outputs.") ap.add_argument("--concurrency", type=int, default=4, help="How many concurrent requests to send.") ap.add_argument("--max-tokens", type=int, default=8994) ap.add_argument("--temperature", type=float, default=0.0) ap.add_argument( "--annotate", action=argparse.BooleanOptionalAction, default=True, help="Write annotated images with boxes to --out-dir (default: enabled). Use --no-annotate to disable.", ) args = ap.parse_args() image_paths = _iter_images(args.image, args.image_dir) if not image_paths: raise SystemExit("No images provided. Use --image PATH (repeatable) or --image-dir DIR.") out_dir = Path(args.out_dir) out_dir.mkdir(parents=True, exist_ok=True) prompt_text = "" #prompt_text = "" extra_body = { "repetition_penalty": 1.1, "top_k": 1, "skip_special_tokens": False, } reqs: List[_ReqSpec] = [] for idx, img in enumerate(image_paths): reqs.append(_ReqSpec(image_path=img, request_idx=idx)) # Concurrency is the simplest way to make sure vLLM batches requests internally. summary_lines: List[str] = [] with ThreadPoolExecutor(max_workers=max(1, args.concurrency)) as ex: futs = [ ex.submit( _run_one, r, base_url=args.base_url, model=args.model, prompt_text=prompt_text, max_tokens=args.max_tokens, temperature=args.temperature, extra_body=extra_body, ) for r in reqs ] for fut in as_completed(futs): req, text = fut.result() base = Path(req.image_path).name stem = f"{req.request_idx:04d}_{base}" out_txt = out_dir / f"{stem}.txt" out_txt.write_text(text, encoding="utf-8") summary_lines.append(f"{req.image_path}\t{out_txt}") if args.annotate: out_img = out_dir / f"{stem}.annotated.jpg" _maybe_annotate(req.image_path, text, str(out_img)) (out_dir / "summary.txt").write_text("\n".join(sorted(summary_lines)) + "\n", encoding="utf-8") if __name__ == "__main__": main()