"""redact_corpus.py — apply PII redaction over a JSONL corpus. Streams input → output line-by-line. Applies redact_pii() to the `text` field only; preserves `id` + `metadata` verbatim. Usage: python -m tau_rag.scripts.redact_corpus \ tau_rag/runtime/parquet_cases.jsonl \ tau_rag/runtime/parquet_cases_redacted.jsonl """ from __future__ import annotations import json import sys import time from pathlib import Path from ..scrapers.pii_redactor import redact_pii def main(src: str, dst: str) -> int: src_p = Path(src) dst_p = Path(dst) if not src_p.exists(): print(f"ERROR: {src_p} not found", file=sys.stderr) return 2 n_total = 0 n_redacted = 0 counts_total: dict[str, int] = {} t0 = time.time() with src_p.open("r", encoding="utf-8") as fi, \ dst_p.open("w", encoding="utf-8") as fo: for line in fi: n_total += 1 try: d = json.loads(line) except json.JSONDecodeError: # Pass through unparseable lines unchanged fo.write(line) continue text = d.get("text", "") if text: new_text, counts = redact_pii(text) if counts: n_redacted += 1 for k, v in counts.items(): counts_total[k] = counts_total.get(k, 0) + v d["text"] = new_text fo.write(json.dumps(d, ensure_ascii=False) + "\n") if n_total % 50_000 == 0: rate = n_total / (time.time() - t0) print(f" {n_total:>7,d} docs · {n_redacted:>6,d} redacted " f"· {rate:.0f} docs/s", flush=True) elapsed = time.time() - t0 print() print(f"DONE. {n_total:,} docs processed in {elapsed:.1f}s " f"({n_total/elapsed:.0f} docs/s)") print(f" {n_redacted:,} docs had ≥1 PII match " f"({100*n_redacted/max(n_total,1):.2f}%)") print(f"Replacements by kind:") for k, v in sorted(counts_total.items(), key=lambda x: -x[1]): print(f" {k:20s}: {v:>8,d}") return 0 if __name__ == "__main__": if len(sys.argv) != 3: print("usage: python -m tau_rag.scripts.redact_corpus ", file=sys.stderr) sys.exit(2) sys.exit(main(sys.argv[1], sys.argv[2]))