#!/usr/bin/env python3 """Sequential ingest — scan every parquet row, append all that aren't already in the JSONL. No random sampling, no collisions: efficient for going from 20% → 100% of the source. Usage: python3 -m tau_rag.scripts.ingest_parquet_sequential [--max N] [--source PATH] Difference from `ingest_parquet_cases.py`: • That script samples N random indices, then iterates batches looking for them. With heavy overlap (already loaded ~20%), most samples hit existing IDs and get skipped → wasted parquet I/O. • This script iterates batches once, writes anything new it sees, stops at --max OR when the parquet is exhausted. """ from __future__ import annotations import argparse import json import sys import time from pathlib import Path import pyarrow.parquet as pq # Reuse the helpers from the original script from tau_rag.scripts.ingest_parquet_cases import ( build_doc_from_clean, classify_domain, ) DEFAULT_OUT = Path(__file__).resolve().parent.parent / "runtime" / "parquet_cases.jsonl" def _find_source() -> Path: """Auto-detect the parquet file across common locations. First match wins. macOS iCloud Drive lives under ~/Library/Mobile Documents/. """ candidates = [ # Local sandbox (Cowork mode) Path("/sessions/kind-affectionate-mccarthy/mnt/com~apple~CloudDocs/" "LawDBHeb/cases_clean.parquet"), # macOS iCloud Drive — real path Path.home() / "Library/Mobile Documents/com~apple~CloudDocs/" "LawDBHeb/cases_clean.parquet", # Other common locations Path.home() / "iCloud Drive/LawDBHeb/cases_clean.parquet", Path.home() / "Documents/LawDBHeb/cases_clean.parquet", Path.home() / "LawDBHeb/cases_clean.parquet", Path("./LawDBHeb/cases_clean.parquet"), Path("./cases_clean.parquet"), ] for p in candidates: if p.exists(): return p # No match — return the first candidate so the error message is informative return candidates[0] DEFAULT_SOURCE = _find_source() def main(): ap = argparse.ArgumentParser(description=__doc__.split("\n", 1)[0]) ap.add_argument("--max", type=int, default=200_000, help="stop after writing this many NEW docs (default 200k)") ap.add_argument("--source", default=str(DEFAULT_SOURCE), help="parquet file to scan") ap.add_argument("--out", default=str(DEFAULT_OUT), help="JSONL to append to") ap.add_argument("--min-text-chars", type=int, default=300) ap.add_argument("--progress-every", type=int, default=5000) args = ap.parse_args() src = Path(args.source) out = Path(args.out) if not src.exists(): print(f"FATAL: source not found: {src}", file=sys.stderr) print(f"\nSearched these locations automatically:", file=sys.stderr) for p in [ "/sessions/kind-affectionate-mccarthy/mnt/com~apple~CloudDocs/LawDBHeb/cases_clean.parquet", str(Path.home() / "Library/Mobile Documents/com~apple~CloudDocs/LawDBHeb/cases_clean.parquet"), str(Path.home() / "iCloud Drive/LawDBHeb/cases_clean.parquet"), str(Path.home() / "Documents/LawDBHeb/cases_clean.parquet"), str(Path.home() / "LawDBHeb/cases_clean.parquet"), "./LawDBHeb/cases_clean.parquet", ]: print(f" - {p}", file=sys.stderr) print(f"\nIf the file is elsewhere, pass it explicitly:", file=sys.stderr) print(f" python3 -m tau_rag.scripts.ingest_parquet_sequential " f"--max 600000 --source /path/to/cases_clean.parquet", file=sys.stderr) sys.exit(1) # Load existing IDs (so we skip duplicates fast) existing: set = set() if out.exists(): with out.open("r", encoding="utf-8") as f: for line in f: try: rec = json.loads(line) existing.add(rec.get("id")) except Exception: pass print(f"existing docs: {len(existing):,}", flush=True) pf = pq.ParquetFile(str(src)) total = pf.metadata.num_rows print(f"source rows: {total:,}", flush=True) print(f"target adds: {args.max:,}", flush=True) n_written = 0 n_skipped_short = 0 n_skipped_dup = 0 n_skipped_bad = 0 n_classified = 0 n_seen = 0 t_start = time.time() with out.open("a", encoding="utf-8") as fout: for batch in pf.iter_batches(batch_size=4000): df = batch.to_pandas() for i in range(len(df)): n_seen += 1 row = df.iloc[i].to_dict() doc = build_doc_from_clean(row) if not doc: n_skipped_bad += 1 continue if len(doc["text"]) < args.min_text_chars: n_skipped_short += 1 continue if doc["id"] in existing: n_skipped_dup += 1 continue # Classify domain cls = classify_domain(doc["text"]) if cls: doc["metadata"].update(cls) n_classified += 1 fout.write(json.dumps(doc, ensure_ascii=False) + "\n") existing.add(doc["id"]) n_written += 1 if n_written % args.progress_every == 0: elapsed = time.time() - t_start rate = n_written / max(elapsed, 0.1) print(f" +{n_written:,} written, {n_seen:,} seen, " f"{rate:.0f} docs/s, dup={n_skipped_dup:,}", flush=True) if n_written >= args.max: break if n_written >= args.max: break elapsed = time.time() - t_start print(f"\nDONE in {elapsed:.0f}s:") print(f" written: {n_written:,}") print(f" classified: {n_classified:,}") print(f" skipped (dup): {n_skipped_dup:,}") print(f" skipped (short):{n_skipped_short:,}") print(f" skipped (bad): {n_skipped_bad:,}") print(f" rate: {n_written/max(elapsed,0.1):.0f} docs/s") print(f" total in JSONL: {len(existing):,}") if __name__ == "__main__": main()