"""Convert a large legacy .pkl corpus → standard JSONL. For huge pickle files (~5GB) that contain the LawDBHeb corpus (`production_index/chunks.pkl`, etc). Loads once, streams out to JSONL line-by-line, freeing memory progressively. Memory: peak usage ≈ size of pickle (it has to be loadable). For a 5GB pickle on a Mac, that means ~10GB RAM peak. If you don't have it, swap will help but slowly. Usage: python -m scripts.pkl_to_jsonl input.pkl output.jsonl python -m scripts.pkl_to_jsonl input.pkl output.jsonl --inspect """ from __future__ import annotations import argparse import gc import json import pickle import sys import time from pathlib import Path from typing import Any, Dict def _row_to_doc(row: Any, idx: int, source: str) -> Dict[str, Any]: """Best-effort normalize one row to {id, text, metadata}.""" if isinstance(row, str): return {"id": f"{source}-{idx}", "text": row, "metadata": {"source": source}} if isinstance(row, dict): text = (row.get("text") or row.get("chunk") or row.get("content") or row.get("body") or "") if not text: return None doc_id = str(row.get("id") or row.get("doc_id") or row.get("chunk_id") or f"{source}-{idx}") meta = {k: v for k, v in row.items() if k not in ("text", "chunk", "content", "body", "id", "doc_id", "chunk_id") and isinstance(v, (str, int, float, bool, type(None)))} meta["source"] = source return {"id": doc_id, "text": str(text), "metadata": meta} if isinstance(row, (tuple, list)) and len(row) >= 2: # (id, text) or (text, metadata) if isinstance(row[0], str) and isinstance(row[1], str): return {"id": row[0], "text": row[1], "metadata": {"source": source}} return None def main(argv=None) -> int: p = argparse.ArgumentParser( description="Convert a large pickle corpus → JSONL") p.add_argument("input", help="path to .pkl file") p.add_argument("output", nargs="?", help="path to output .jsonl (default: input.jsonl)") p.add_argument("--source-name", default=None, help="value for metadata.source (default: filename)") p.add_argument("--inspect", action="store_true", help="just print structure of first 3 rows, exit") p.add_argument("--limit", type=int, default=None, help="cap output at N rows") args = p.parse_args(argv) in_path = Path(args.input).expanduser().resolve() if not in_path.exists(): print(f"✗ Not found: {in_path}", file=sys.stderr) return 2 out_path = Path(args.output) if args.output else \ in_path.with_suffix(".jsonl") out_path = out_path.expanduser().resolve() source_name = args.source_name or in_path.name size_gb = in_path.stat().st_size / 1024 / 1024 / 1024 print(f"→ Loading {in_path}") print(f" size: {size_gb:.2f} GB " f"(this may take a while + need ~{size_gb*2:.1f} GB RAM)") t0 = time.time() try: with open(in_path, "rb") as f: data = pickle.load(f) except MemoryError: print(f"✗ MemoryError — pickle too big for RAM. " f"Close other apps or split the file.", file=sys.stderr) return 3 except Exception as e: print(f"✗ Pickle load failed: {type(e).__name__}: {e}", file=sys.stderr) return 3 print(f" loaded in {time.time() - t0:.1f}s " f"— type: {type(data).__name__}") # Inspect mode if args.inspect: if isinstance(data, list): print(f"\n📋 list of {len(data)} items") print("First 3 items:") for i, item in enumerate(data[:3]): print(f"\n--- item [{i}] ({type(item).__name__}) ---") if isinstance(item, dict): for k, v in item.items(): vs = str(v)[:120] print(f" {k}: {vs}") elif isinstance(item, str): print(f" text: {item[:200]}...") else: print(f" {repr(item)[:200]}") elif isinstance(data, dict): print(f"\n📋 dict with {len(data)} keys") for k in list(data.keys())[:10]: v = data[k] print(f" {k!r:30s} → {type(v).__name__} " f"({len(v) if hasattr(v, '__len__') else '?'})") else: print(f"\n📋 type: {type(data)}") print(f" preview: {repr(data)[:500]}") return 0 # Normalize input → iterable of rows if isinstance(data, list): rows = data elif isinstance(data, dict): # Common shapes: {"chunks": [...]}, {"docs": [...]} for k in ("chunks", "docs", "documents", "items", "data"): if k in data and isinstance(data[k], list): rows = data[k] print(f" using key '{k}' → {len(rows)} rows") break else: rows = list(data.items()) # (key, value) tuples else: print(f"✗ Unsupported root type: {type(data).__name__}", file=sys.stderr) return 4 print(f"→ Writing to {out_path}") n_written = 0 n_skipped = 0 t0 = time.time() last_print = t0 out_path.parent.mkdir(parents=True, exist_ok=True) with open(out_path, "w", encoding="utf-8") as out_f: for i, row in enumerate(rows): if args.limit and n_written >= args.limit: break doc = _row_to_doc(row, i, source_name) if doc is None: n_skipped += 1 else: out_f.write(json.dumps( doc, ensure_ascii=False) + "\n") n_written += 1 # Free memory progressively for huge lists if isinstance(rows, list) and i % 50000 == 0 and i > 0: # Don't actually delete — pyobject ref count handles it # But periodically run gc gc.collect() now = time.time() if now - last_print >= 1.0: rate = (i + 1) / (now - t0) eta = ((len(rows) - i - 1) / rate if rate > 0 else 0) pct = (i + 1) / len(rows) * 100 if rows else 0 line = (f" [{pct:5.1f}%] " f"{i+1:>9,}/{len(rows):,} " f"| written {n_written:>9,} " f"| skipped {n_skipped:>6,} " f"| {rate:.0f}/s " f"| ETA {eta:5.0f}s") print(f"\r{line:<100}", end="", flush=True) last_print = now print() out_size_mb = out_path.stat().st_size / 1024 / 1024 print() print(f"✓ Done in {time.time() - t0:.0f}s") print(f" Written: {n_written:,} docs") print(f" Skipped: {n_skipped:,} (no text field)") print(f" Output: {out_path} ({out_size_mb:.1f} MB)") print() print(f"📤 Now upload to tau-rag:") print(f" curl -X POST http://localhost:8000/v1/data/upload " f"\\\n -F \"file=@{out_path}\"") print(f" # Or merge into your main corpus:") print(f" cat {out_path} >> ~/tau_corpus.jsonl") return 0 if __name__ == "__main__": sys.exit(main())