"""Load a JSONL corpus directly into the live tau-rag pipeline. Bypasses HTTP — imports the pipeline in-process and calls add_documents() in batches. Works for very large corpora (~1GB+) that would crash the /v1/data/upload endpoint. Usage: cd tau_rag PYTHONPATH=.. python3 -m scripts.load_jsonl_to_pipeline \\ ~/tau_corpus.jsonl --batch-size 1000 After loading, the pipeline holds the docs in memory. Run the server with the SAME process or save a snapshot: PYTHONPATH=.. python3 -m scripts.load_jsonl_to_pipeline \\ ~/tau_corpus.jsonl --snapshot-to ~/tau_index.pkl """ from __future__ import annotations import argparse import json import os import sys import time from pathlib import Path from typing import List def main(argv=None) -> int: p = argparse.ArgumentParser( description="Load JSONL → live pipeline (no HTTP)") p.add_argument("jsonl", help="path to JSONL corpus") p.add_argument("--batch-size", type=int, default=1000, help="docs per add_documents() call (default 1000)") p.add_argument("--max-docs", type=int, default=None, help="cap total docs loaded (for testing)") p.add_argument("--snapshot-to", default=None, help="pickle the index after load") p.add_argument("--preset", default=os.environ.get( "TAU_RAG_PRESET", "no_llm"), help="preset for pipeline init " "(no_llm/hebrew_legal/hebrew_dense)") args = p.parse_args(argv) src = Path(args.jsonl).expanduser().resolve() if not src.exists(): print(f"✗ JSONL not found: {src}", file=sys.stderr) return 2 # Import pipeline + Document try: from tau_rag.pipeline import get_pipeline from tau_rag.core.types import Document except ImportError as e: print(f"✗ Cannot import tau_rag: {e}", file=sys.stderr) print(" Run from the project root with PYTHONPATH=..", file=sys.stderr) return 3 print(f"→ Initializing pipeline (preset={args.preset})...") os.environ["TAU_RAG_PRESET"] = args.preset pipe = get_pipeline() print(f"✓ Pipeline ready ({type(pipe).__name__})") if not hasattr(pipe, "add_documents"): print(f"✗ Pipeline has no add_documents() — preset " f"'{args.preset}' may not support ingestion.", file=sys.stderr) return 4 # Stream JSONL print(f"→ Streaming {src} (batch={args.batch_size})...") t0 = time.time() last_print = t0 n_loaded = 0 n_skipped = 0 n_batches = 0 batch: List = [] def flush_batch(): nonlocal n_batches, n_loaded if not batch: return try: pipe.add_documents(batch) n_loaded += len(batch) n_batches += 1 except Exception as e: print(f"\n ⚠ batch {n_batches+1} failed: " f"{type(e).__name__}: {e}", file=sys.stderr) batch.clear() with open(src, encoding="utf-8") as f: for line_n, line in enumerate(f, 1): line = line.strip() if not line: continue try: d = json.loads(line) except Exception: n_skipped += 1 continue text = d.get("text", "") if not text: n_skipped += 1 continue doc = Document( id=str(d.get("id", f"line-{line_n}")), text=text, metadata=d.get("metadata", {}) or {}, ) batch.append(doc) if len(batch) >= args.batch_size: flush_batch() # Progress print now = time.time() if now - last_print >= 1.0: elapsed = now - t0 rate = n_loaded / elapsed if elapsed > 0 else 0 print( f"\r loaded {n_loaded:>9,} docs | " f"{n_batches:>4} batches | " f"{rate:6.0f} docs/s | " f"skipped {n_skipped:>4}", end="", flush=True) last_print = now if args.max_docs and n_loaded >= args.max_docs: break flush_batch() print() elapsed = time.time() - t0 print(f"\n✓ Done in {elapsed:.0f}s") print(f" Total docs loaded: {n_loaded:,}") print(f" Batches: {n_batches}") print(f" Lines skipped: {n_skipped}") print(f" Docs/sec average: {n_loaded/elapsed:.0f}") if args.snapshot_to: snap = Path(args.snapshot_to).expanduser().resolve() snap.parent.mkdir(parents=True, exist_ok=True) try: from tau_rag.persist import save_pipeline print(f"→ Saving pipeline index to {snap}...") stats = save_pipeline(pipe, str(snap)) print(f"✓ Saved: {stats}") except Exception as e: # Fallback: full pickle of pipeline object print(f"⚠ save_pipeline failed ({e}), trying raw pickle...") try: import pickle with open(str(snap) + ".pkl", "wb") as f: pickle.dump(pipe, f) size = os.path.getsize(str(snap) + ".pkl") print(f"✓ Pickle: {size/1024/1024:.1f} MB → {snap}.pkl") except Exception as e2: print(f"⚠ Pickle also failed: {e2}", file=sys.stderr) print() print("ℹ The pipeline holds your corpus IN MEMORY for this Python " "process. To make it queryable from the running server:") print(" • RECOMMENDED: use scripts/serve_with_corpus.py — loads " "JSONL and starts uvicorn in the SAME process so the data is " "actually accessible to /v1/query.") return 0 if __name__ == "__main__": sys.exit(main())