"""End-to-end evaluation harness for the Prompt Squirrel RAG pipeline. Measures per-stage and overall metrics using ground-truth tagged samples from the e621 evaluation dataset. Metrics computed: - Stage 2 (Retrieval): Recall@k — what fraction of ground-truth tags appear among the retrieved candidates - Stage 3 (Selection): Precision, Recall, F1 — how well the final selected tags match the ground truth Usage: # Full end-to-end (Stage 1 + 2 + 3), 20 random samples: python scripts/eval_pipeline.py --n 20 # Reproducible run with specific seed: python scripts/eval_pipeline.py --n 50 --seed 123 # Parallel processing with 4 workers (default): python scripts/eval_pipeline.py --n 50 --workers 4 # Sequential mode (disable parallelism): python scripts/eval_pipeline.py --n 20 --workers 1 # Skip Stage 1 LLM rewrite (cheaper, tests Stage 2+3 only): python scripts/eval_pipeline.py --n 20 --skip-rewrite # First N samples in file order (no shuffle): python scripts/eval_pipeline.py --n 20 --no-shuffle Results are always saved as JSONL to data/eval_results/ (auto-named by timestamp) or to a custom path with -o. Requires: - OPENROUTER_API_KEY env var (for Stage 1 rewrite and Stage 3 selection) - fluffyrock_3m.csv and other retrieval assets in the project root - data/eval_samples/e621_sfw_sample_1000_seed123_buffer10000.jsonl """ from __future__ import annotations import argparse import json import os import random import sys import threading import time from concurrent.futures import ThreadPoolExecutor, as_completed from dataclasses import dataclass, field from datetime import datetime from pathlib import Path from typing import Any, Dict, List, Optional, Set, Tuple _REPO_ROOT = Path(__file__).resolve().parents[1] if str(_REPO_ROOT) not in sys.path: sys.path.insert(0, str(_REPO_ROOT)) os.chdir(_REPO_ROOT) EVAL_DATA_PATH = _REPO_ROOT / "data" / "eval_samples" / "e621_sfw_sample_1000_seed123_buffer10000.jsonl" # Character tag types that go through the alias filter pipeline _CHARACTER_TYPES = {"character"} # Copyright tags are filtered out entirely _COPYRIGHT_TYPES = {"copyright"} def _classify_tags(tags: Set[str], get_type_fn) -> Tuple[Set[str], Set[str]]: """Split tags into (character_tags, general_tags). Copyright tags are excluded from both sets since they're filtered before any selection happens. """ character = set() general = set() for tag in tags: ttype = get_type_fn(tag) if ttype in _CHARACTER_TYPES: character.add(tag) elif ttype not in _COPYRIGHT_TYPES: general.add(tag) return character, general def _flatten_ground_truth_tags(tags_categorized_str: str) -> Set[str]: """Parse the categorized ground-truth JSON string into a flat set of tags.""" if not tags_categorized_str: return set() try: cats = json.loads(tags_categorized_str) except json.JSONDecodeError: return set() tags = set() for tag_list in cats.values(): if isinstance(tag_list, list): for t in tag_list: tags.add(t.strip()) return tags @dataclass class SampleResult: sample_id: Any caption: str ground_truth_tags: Set[str] # Stage 1 rewrite_phrases: List[str] = field(default_factory=list) # Stage 2 retrieved_tags: Set[str] = field(default_factory=set) retrieval_recall: float = 0.0 # Stage 3 — overall selected_tags: Set[str] = field(default_factory=set) selection_precision: float = 0.0 selection_recall: float = 0.0 selection_f1: float = 0.0 # Stage 3 — character tags only gt_character_tags: Set[str] = field(default_factory=set) selected_character_tags: Set[str] = field(default_factory=set) retrieved_character_tags: Set[str] = field(default_factory=set) char_retrieval_recall: float = 0.0 char_precision: float = 0.0 char_recall: float = 0.0 char_f1: float = 0.0 # Stage 3 — general tags only (non-character, non-copyright) gt_general_tags: Set[str] = field(default_factory=set) selected_general_tags: Set[str] = field(default_factory=set) general_precision: float = 0.0 general_recall: float = 0.0 general_f1: float = 0.0 # Timing stage1_time: float = 0.0 stage2_time: float = 0.0 stage3_time: float = 0.0 # Errors error: Optional[str] = None def _compute_metrics(predicted: Set[str], ground_truth: Set[str]) -> Tuple[float, float, float]: """Compute precision, recall, F1.""" if not predicted and not ground_truth: return 1.0, 1.0, 1.0 if not predicted: return 0.0, 0.0, 0.0 if not ground_truth: return 0.0, 0.0, 0.0 tp = len(predicted & ground_truth) precision = tp / len(predicted) recall = tp / len(ground_truth) f1 = 2 * precision * recall / (precision + recall) if (precision + recall) > 0 else 0.0 return precision, recall, f1 def _process_one_sample( sample: Dict[str, Any], index: int, total: int, skip_rewrite: bool, allow_nsfw: bool, mode: str, chunk_size: int, per_phrase_k: int, temperature: float, max_tokens: int, verbose: bool, print_lock: threading.Lock, ) -> SampleResult: """Process a single eval sample through the full pipeline. Thread-safe.""" from psq_rag.llm.rewrite import llm_rewrite_prompt from psq_rag.retrieval.psq_retrieval import psq_candidates_from_rewrite_phrases from psq_rag.llm.select import llm_select_indices from psq_rag.retrieval.state import get_tag_type_name def log(msg: str) -> None: if verbose: with print_lock: print(f" [{index+1}] {msg}") sid = sample["id"] caption = sample["caption"] gt_tags = sample["gt_tags"] result = SampleResult( sample_id=sid, caption=caption[:120] + ("..." if len(caption) > 120 else ""), ground_truth_tags=gt_tags, ) with print_lock: print(f"[{index+1}/{total}] id={sid} gt_tags={len(gt_tags)}") try: # --- Stage 1: LLM Rewrite --- if skip_rewrite: phrases = [p.strip() for p in caption.split(",") if p.strip()] if len(phrases) <= 1: phrases = [p.strip() for p in caption.replace(".", ",").split(",") if p.strip()] result.rewrite_phrases = phrases result.stage1_time = 0.0 else: t0 = time.time() rewritten = llm_rewrite_prompt(caption, log) result.stage1_time = time.time() - t0 if rewritten: result.rewrite_phrases = [p.strip() for p in rewritten.split(",") if p.strip()] else: result.rewrite_phrases = [p.strip() for p in caption.split(",") if p.strip()] if len(result.rewrite_phrases) <= 1: result.rewrite_phrases = [p.strip() for p in caption.replace(".", ",").split(",") if p.strip()] log(f"Phrases ({len(result.rewrite_phrases)}): {result.rewrite_phrases[:5]}") # --- Stage 2: Retrieval --- t0 = time.time() retrieval_result = psq_candidates_from_rewrite_phrases( rewrite_phrases=result.rewrite_phrases, allow_nsfw_tags=allow_nsfw, global_k=300, verbose=False, ) result.stage2_time = time.time() - t0 if isinstance(retrieval_result, tuple): candidates, _ = retrieval_result else: candidates = retrieval_result result.retrieved_tags = {c.tag for c in candidates} if gt_tags: result.retrieval_recall = len(result.retrieved_tags & gt_tags) / len(gt_tags) log(f"Retrieved {len(candidates)} candidates, recall={result.retrieval_recall:.3f}") # --- Stage 3: LLM Selection --- t0 = time.time() picked_indices = llm_select_indices( query_text=caption, candidates=candidates, max_pick=0, log=log, mode=mode, chunk_size=chunk_size, per_phrase_k=per_phrase_k, temperature=temperature, max_tokens=max_tokens, ) result.stage3_time = time.time() - t0 result.selected_tags = {candidates[idx].tag for idx in picked_indices} if picked_indices else set() # Overall selection metrics p, r, f1 = _compute_metrics(result.selected_tags, gt_tags) result.selection_precision = p result.selection_recall = r result.selection_f1 = f1 # Split ground-truth and selected tags by type gt_char, gt_gen = _classify_tags(gt_tags, get_tag_type_name) sel_char, sel_gen = _classify_tags(result.selected_tags, get_tag_type_name) ret_char, _ = _classify_tags(result.retrieved_tags, get_tag_type_name) result.gt_character_tags = gt_char result.selected_character_tags = sel_char result.retrieved_character_tags = ret_char result.gt_general_tags = gt_gen result.selected_general_tags = sel_gen # Character-specific metrics if gt_char: result.char_retrieval_recall = len(ret_char & gt_char) / len(gt_char) cp, cr, cf1 = _compute_metrics(sel_char, gt_char) result.char_precision = cp result.char_recall = cr result.char_f1 = cf1 # General-tag metrics gp, gr, gf1 = _compute_metrics(sel_gen, gt_gen) result.general_precision = gp result.general_recall = gr result.general_f1 = gf1 # Per-sample output line char_info = "" if gt_char: char_info = f" char[gt={len(gt_char)} sel={len(sel_char)} P={cp:.2f} R={cr:.2f}]" with print_lock: print( f" [{index+1}] retrieval_recall={result.retrieval_recall:.3f} " f"sel_P={p:.3f} sel_R={r:.3f} sel_F1={f1:.3f} " f"selected={len(result.selected_tags)}{char_info} " f"t1={result.stage1_time:.1f}s t2={result.stage2_time:.1f}s t3={result.stage3_time:.1f}s" ) except Exception as e: result.error = str(e) with print_lock: print(f" [{index+1}] ERROR: {e}") return result def _prewarm_retrieval_assets() -> None: """Force-load all lazy retrieval assets so threads don't race on init.""" from psq_rag.retrieval.state import ( get_tfidf_components, get_tag2aliases, get_tag_type_name, ) print("Pre-warming retrieval assets (TF-IDF, FastText, HNSW, aliases)...") t0 = time.time() get_tfidf_components() # loads joblib, HNSW indexes, FastText model get_tag2aliases() # loads CSV alias dict get_tag_type_name("_warmup_") # ensures tag type dict is built print(f" Assets loaded in {time.time() - t0:.1f}s") def run_eval( n_samples: int = 20, caption_field: str = "caption_cogvlm", skip_rewrite: bool = False, allow_nsfw: bool = False, mode: str = "chunked_map_union", chunk_size: int = 60, per_phrase_k: int = 2, temperature: float = 0.0, max_tokens: int = 512, verbose: bool = False, shuffle: bool = True, seed: int = 42, workers: int = 1, ) -> List[SampleResult]: # Load eval samples if not EVAL_DATA_PATH.is_file(): print(f"ERROR: Eval data not found: {EVAL_DATA_PATH}") sys.exit(1) all_samples = [] with EVAL_DATA_PATH.open("r", encoding="utf-8") as f: for line in f: row = json.loads(line) caption = row.get(caption_field, "") if not caption or not caption.strip(): continue gt_tags = _flatten_ground_truth_tags(row.get("tags_ground_truth_categorized", "")) if not gt_tags: continue all_samples.append({ "id": row.get("id", row.get("row_id", len(all_samples))), "caption": caption.strip(), "gt_tags": gt_tags, }) if shuffle: rng = random.Random(seed) rng.shuffle(all_samples) samples = all_samples[:n_samples] print(f"Loaded {len(samples)}/{len(all_samples)} samples (caption_field={caption_field})") print(f"shuffle={shuffle}, seed={seed}, skip_rewrite={skip_rewrite}, allow_nsfw={allow_nsfw}, mode={mode}") print(f"workers={workers}") print() # Pre-warm shared retrieval assets before spawning threads _prewarm_retrieval_assets() print() print_lock = threading.Lock() total = len(samples) if workers <= 1: # Sequential mode (original behavior) results: List[SampleResult] = [] for i, sample in enumerate(samples): result = _process_one_sample( sample, i, total, skip_rewrite, allow_nsfw, mode, chunk_size, per_phrase_k, temperature, max_tokens, verbose, print_lock, ) results.append(result) else: # Parallel mode print(f"Processing {total} samples with {workers} parallel workers...") print() # Submit all samples; use index to preserve original ordering results_by_index: Dict[int, SampleResult] = {} with ThreadPoolExecutor(max_workers=workers) as executor: futures = { executor.submit( _process_one_sample, sample, i, total, skip_rewrite, allow_nsfw, mode, chunk_size, per_phrase_k, temperature, max_tokens, verbose, print_lock, ): i for i, sample in enumerate(samples) } for future in as_completed(futures): idx = futures[future] try: results_by_index[idx] = future.result() except Exception as e: # Should not happen since _process_one_sample catches exceptions, # but guard against unexpected errors with print_lock: print(f" [{idx+1}] WORKER ERROR: {e}") result = SampleResult( sample_id=samples[idx]["id"], caption=samples[idx]["caption"][:120], ground_truth_tags=samples[idx]["gt_tags"], error=f"Worker error: {e}", ) results_by_index[idx] = result # Reassemble in original order results = [results_by_index[i] for i in range(total)] return results def _safe_avg(values: List[float]) -> float: return sum(values) / len(values) if values else 0.0 def print_summary(results: List[SampleResult]) -> None: """Print aggregate metrics across all samples.""" valid = [r for r in results if r.error is None] errored = [r for r in results if r.error is not None] if not valid: print("\nNo valid results to summarize.") return n = len(valid) avg_retrieval_recall = sum(r.retrieval_recall for r in valid) / n avg_sel_precision = sum(r.selection_precision for r in valid) / n avg_sel_recall = sum(r.selection_recall for r in valid) / n avg_sel_f1 = sum(r.selection_f1 for r in valid) / n avg_retrieved = sum(len(r.retrieved_tags) for r in valid) / n avg_selected = sum(len(r.selected_tags) for r in valid) / n avg_gt = sum(len(r.ground_truth_tags) for r in valid) / n avg_t1 = sum(r.stage1_time for r in valid) / n avg_t2 = sum(r.stage2_time for r in valid) / n avg_t3 = sum(r.stage3_time for r in valid) / n print() print("=" * 70) print(f"EVALUATION SUMMARY ({n} samples, {len(errored)} errors)") print("=" * 70) print() print("Stage 2 - Retrieval:") print(f" Avg recall@300: {avg_retrieval_recall:.4f}") print(f" Avg candidates: {avg_retrieved:.1f}") print() print("Stage 3 - Selection (ALL tags):") print(f" Avg precision: {avg_sel_precision:.4f}") print(f" Avg recall: {avg_sel_recall:.4f}") print(f" Avg F1: {avg_sel_f1:.4f}") print(f" Avg selected tags: {avg_selected:.1f}") print(f" Avg ground-truth tags:{avg_gt:.1f}") # --- Character tag breakdown --- # Only include samples that actually have character tags in ground truth samples_with_chars = [r for r in valid if r.gt_character_tags] # Samples where the system selected character tags (true or false positive) samples_selecting_chars = [r for r in valid if r.selected_character_tags] print() print("-" * 70) print(f"CHARACTER TAGS ({len(samples_with_chars)}/{n} samples have character ground-truth)") print("-" * 70) if samples_with_chars: avg_char_retrieval_recall = _safe_avg([r.char_retrieval_recall for r in samples_with_chars]) avg_char_p = _safe_avg([r.char_precision for r in samples_with_chars]) avg_char_r = _safe_avg([r.char_recall for r in samples_with_chars]) avg_char_f1 = _safe_avg([r.char_f1 for r in samples_with_chars]) avg_gt_char = _safe_avg([len(r.gt_character_tags) for r in samples_with_chars]) avg_sel_char = _safe_avg([len(r.selected_character_tags) for r in samples_with_chars]) print(f" Retrieval recall: {avg_char_retrieval_recall:.4f}") print(f" Selection precision: {avg_char_p:.4f}") print(f" Selection recall: {avg_char_r:.4f}") print(f" Selection F1: {avg_char_f1:.4f}") print(f" Avg gt char tags: {avg_gt_char:.1f}") print(f" Avg selected chars: {avg_sel_char:.1f}") # Show character-specific failures char_misses = [] char_false_pos = [] for r in samples_with_chars: missed = r.gt_character_tags - r.selected_character_tags for m in missed: char_misses.append((r.sample_id, m)) extra = r.selected_character_tags - r.gt_character_tags for e in extra: char_false_pos.append((r.sample_id, e)) if char_misses: print(f"\n Missed characters ({len(char_misses)} total):") for sid, tag in char_misses[:10]: print(f" id={sid}: missed {tag}") if char_false_pos: print(f"\n False positive characters ({len(char_false_pos)} total):") for sid, tag in char_false_pos[:10]: print(f" id={sid}: wrongly selected {tag}") else: print(" (no samples had character tags in ground truth)") # False positive characters in samples WITHOUT character ground-truth no_char_gt_but_selected = [r for r in valid if not r.gt_character_tags and r.selected_character_tags] if no_char_gt_but_selected: print(f"\n Spurious character selections ({len(no_char_gt_but_selected)} samples):") print(" (These samples had NO character in ground truth but system selected one)") for r in no_char_gt_but_selected[:5]: print(f" id={r.sample_id}: selected {sorted(r.selected_character_tags)}") # --- General tag breakdown --- print() print("-" * 70) print("GENERAL TAGS (non-character, non-copyright)") print("-" * 70) avg_gen_p = _safe_avg([r.general_precision for r in valid]) avg_gen_r = _safe_avg([r.general_recall for r in valid]) avg_gen_f1 = _safe_avg([r.general_f1 for r in valid]) avg_gt_gen = _safe_avg([len(r.gt_general_tags) for r in valid]) avg_sel_gen = _safe_avg([len(r.selected_general_tags) for r in valid]) print(f" Selection precision: {avg_gen_p:.4f}") print(f" Selection recall: {avg_gen_r:.4f}") print(f" Selection F1: {avg_gen_f1:.4f}") print(f" Avg gt general tags: {avg_gt_gen:.1f}") print(f" Avg selected general: {avg_sel_gen:.1f}") print() print("-" * 70) print("Timing (avg per sample):") print(f" Stage 1 (rewrite): {avg_t1:.2f}s") print(f" Stage 2 (retrieval): {avg_t2:.2f}s") print(f" Stage 3 (selection): {avg_t3:.2f}s") print(f" Total: {avg_t1 + avg_t2 + avg_t3:.2f}s") print() # Show worst and best F1 samples by_f1 = sorted(valid, key=lambda r: r.selection_f1) print("Lowest F1 samples (overall):") for r in by_f1[:3]: print(f" id={r.sample_id} F1={r.selection_f1:.3f} P={r.selection_precision:.3f} R={r.selection_recall:.3f}") missed = r.ground_truth_tags - r.selected_tags extra = r.selected_tags - r.ground_truth_tags if missed: print(f" missed: {sorted(missed)[:10]}") if extra: print(f" extra: {sorted(extra)[:10]}") print() print("Highest F1 samples (overall):") for r in by_f1[-3:]: print(f" id={r.sample_id} F1={r.selection_f1:.3f} P={r.selection_precision:.3f} R={r.selection_recall:.3f}") if errored: print() print(f"Errors ({len(errored)}):") for r in errored[:5]: print(f" id={r.sample_id}: {r.error}") print("=" * 70) def main(argv=None) -> int: ap = argparse.ArgumentParser(description="End-to-end pipeline evaluation") ap.add_argument("--n", type=int, default=20, help="Number of samples to evaluate") ap.add_argument("--caption-field", default="caption_cogvlm", choices=["caption_cogvlm", "caption_llm_0", "caption_llm_1", "caption_llm_2", "caption_llm_3", "caption_llm_4", "caption_llm_5", "caption_llm_6", "caption_llm_7"], help="Which caption field to use as input") ap.add_argument("--skip-rewrite", action="store_true", help="Skip Stage 1 LLM rewrite; split caption directly into phrases") ap.add_argument("--allow-nsfw", action="store_true", help="Allow NSFW tags") ap.add_argument("--mode", default="chunked_map_union", choices=["single_shot", "chunked_map_union"]) ap.add_argument("--chunk-size", type=int, default=60) ap.add_argument("--per-phrase-k", type=int, default=2) ap.add_argument("--temperature", type=float, default=0.0) ap.add_argument("--max-tokens", type=int, default=512) ap.add_argument("--verbose", "-v", action="store_true", help="Show per-call Stage 3 logs") ap.add_argument("--output", "-o", type=str, default=None, help="Save detailed results as JSONL (default: auto-generated in data/eval_results/)") ap.add_argument("--shuffle", action="store_true", default=True, help="Randomly shuffle samples before selecting (default: True)") ap.add_argument("--no-shuffle", dest="shuffle", action="store_false", help="Use samples in file order (first N)") ap.add_argument("--seed", type=int, default=42, help="Random seed for shuffle (default: 42)") ap.add_argument("--workers", "-w", type=int, default=4, help="Number of parallel workers (default: 4, use 1 for sequential)") args = ap.parse_args(list(argv) if argv is not None else None) results = run_eval( n_samples=args.n, caption_field=args.caption_field, skip_rewrite=args.skip_rewrite, allow_nsfw=args.allow_nsfw, mode=args.mode, chunk_size=args.chunk_size, per_phrase_k=args.per_phrase_k, temperature=args.temperature, max_tokens=args.max_tokens, verbose=args.verbose, shuffle=args.shuffle, seed=args.seed, workers=args.workers, ) print_summary(results) # Always save detailed results if args.output: out_path = Path(args.output) else: results_dir = _REPO_ROOT / "data" / "eval_results" results_dir.mkdir(parents=True, exist_ok=True) timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") out_path = results_dir / f"eval_{args.caption_field}_n{args.n}_seed{args.seed}_{timestamp}.jsonl" out_path.parent.mkdir(parents=True, exist_ok=True) # Write run metadata as first line meta = { "_meta": True, "timestamp": datetime.now().isoformat(), "n_samples": len(results), "caption_field": args.caption_field, "skip_rewrite": args.skip_rewrite, "allow_nsfw": args.allow_nsfw, "mode": args.mode, "chunk_size": args.chunk_size, "per_phrase_k": args.per_phrase_k, "temperature": args.temperature, "shuffle": args.shuffle, "seed": args.seed, "workers": args.workers, "n_errors": sum(1 for r in results if r.error), } with out_path.open("w", encoding="utf-8") as f: f.write(json.dumps(meta, ensure_ascii=False) + "\n") for r in results: row = { "sample_id": r.sample_id, "caption": r.caption, "ground_truth_tags": sorted(r.ground_truth_tags), "rewrite_phrases": r.rewrite_phrases, "retrieved_tags": sorted(r.retrieved_tags), "selected_tags": sorted(r.selected_tags), "retrieval_recall": round(r.retrieval_recall, 4), "selection_precision": round(r.selection_precision, 4), "selection_recall": round(r.selection_recall, 4), "selection_f1": round(r.selection_f1, 4), # Character tag breakdown "gt_character_tags": sorted(r.gt_character_tags), "selected_character_tags": sorted(r.selected_character_tags), "retrieved_character_tags": sorted(r.retrieved_character_tags), "char_retrieval_recall": round(r.char_retrieval_recall, 4), "char_precision": round(r.char_precision, 4), "char_recall": round(r.char_recall, 4), "char_f1": round(r.char_f1, 4), # General tag breakdown "gt_general_tags": sorted(r.gt_general_tags), "selected_general_tags": sorted(r.selected_general_tags), "general_precision": round(r.general_precision, 4), "general_recall": round(r.general_recall, 4), "general_f1": round(r.general_f1, 4), # Timing "stage1_time": round(r.stage1_time, 3), "stage2_time": round(r.stage2_time, 3), "stage3_time": round(r.stage3_time, 3), "error": r.error, } f.write(json.dumps(row, ensure_ascii=False) + "\n") print(f"\nDetailed results saved to: {out_path}") return 0 if __name__ == "__main__": sys.exit(main())