| """redact_corpus.py — apply PII redaction over a JSONL corpus. |
| |
| Streams input → output line-by-line. Applies redact_pii() to the `text` |
| field only; preserves `id` + `metadata` verbatim. |
| |
| Usage: |
| python -m tau_rag.scripts.redact_corpus \ |
| tau_rag/runtime/parquet_cases.jsonl \ |
| tau_rag/runtime/parquet_cases_redacted.jsonl |
| """ |
| from __future__ import annotations |
|
|
| import json |
| import sys |
| import time |
| from pathlib import Path |
|
|
| from ..scrapers.pii_redactor import redact_pii |
|
|
|
|
| def main(src: str, dst: str) -> int: |
| src_p = Path(src) |
| dst_p = Path(dst) |
| if not src_p.exists(): |
| print(f"ERROR: {src_p} not found", file=sys.stderr) |
| return 2 |
|
|
| n_total = 0 |
| n_redacted = 0 |
| counts_total: dict[str, int] = {} |
| t0 = time.time() |
|
|
| with src_p.open("r", encoding="utf-8") as fi, \ |
| dst_p.open("w", encoding="utf-8") as fo: |
| for line in fi: |
| n_total += 1 |
| try: |
| d = json.loads(line) |
| except json.JSONDecodeError: |
| |
| fo.write(line) |
| continue |
|
|
| text = d.get("text", "") |
| if text: |
| new_text, counts = redact_pii(text) |
| if counts: |
| n_redacted += 1 |
| for k, v in counts.items(): |
| counts_total[k] = counts_total.get(k, 0) + v |
| d["text"] = new_text |
|
|
| fo.write(json.dumps(d, ensure_ascii=False) + "\n") |
|
|
| if n_total % 50_000 == 0: |
| rate = n_total / (time.time() - t0) |
| print(f" {n_total:>7,d} docs · {n_redacted:>6,d} redacted " |
| f"· {rate:.0f} docs/s", flush=True) |
|
|
| elapsed = time.time() - t0 |
| print() |
| print(f"DONE. {n_total:,} docs processed in {elapsed:.1f}s " |
| f"({n_total/elapsed:.0f} docs/s)") |
| print(f" {n_redacted:,} docs had ≥1 PII match " |
| f"({100*n_redacted/max(n_total,1):.2f}%)") |
| print(f"Replacements by kind:") |
| for k, v in sorted(counts_total.items(), key=lambda x: -x[1]): |
| print(f" {k:20s}: {v:>8,d}") |
| return 0 |
|
|
|
|
| if __name__ == "__main__": |
| if len(sys.argv) != 3: |
| print("usage: python -m tau_rag.scripts.redact_corpus <src.jsonl> <dst.jsonl>", |
| file=sys.stderr) |
| sys.exit(2) |
| sys.exit(main(sys.argv[1], sys.argv[2])) |
|
|