legal-eye / tau_rag /scripts /extract_paragraphs_for_labeling.py
Legal-i's picture
Initial deploy: legal-eye Hebrew legal RAG (17K corpus, verbatim-from-precedent)
3be54c6 verified
Raw
History Blame Contribute Delete
4.56 kB
#!/usr/bin/env python3
"""Extract candidate paragraphs from the Hebrew legal corpus for labeling.
Pulls discussion-section paragraphs from `parquet_cases.jsonl`, filters for
ones that look likely to contain legal arguments (Hebrew "טוען", "טענה", etc.),
and writes them as a single JSONL file ready for the HTML labeling tool.
Output format (per line):
{"id": "<case_id>::<paragraph_idx>",
"case_id": "...",
"domain": "...",
"text": "<paragraph>",
"label": null} # filled in by the labeling tool
Usage:
python3 -m tau_rag.scripts.extract_paragraphs_for_labeling \\
--n 1000 --out data/paragraphs_to_label.jsonl
"""
from __future__ import annotations
import argparse
import json
import random
from pathlib import Path
from typing import List, Tuple
# Hebrew markers that suggest a paragraph might contain a legal argument.
# Used to bias sampling — the labeling pool is more useful if it has high
# density of "real" arguments rather than mostly procedural boilerplate.
ARGUMENT_INDICATORS = [
"טוען", "טענה", "סבור", "גורס", "לטענת", "לעמדת",
"נטען", "נקבע", "אכן", "מקובל", "דחה", "דחתה",
"אין לקבל", "יש לקבל", "התביעה",
]
MIN_LEN = 80 # paragraphs shorter than this are usually noise
MAX_LEN = 800 # too long = entire sections
def split_paragraphs(text: str) -> List[str]:
if not text:
return []
paras = []
for p in text.split("\n\n"):
p = p.strip()
if MIN_LEN <= len(p) <= MAX_LEN:
paras.append(p)
return paras
def has_argument_marker(text: str) -> bool:
return any(m in text for m in ARGUMENT_INDICATORS)
def main():
ap = argparse.ArgumentParser(description=__doc__,
formatter_class=argparse.RawDescriptionHelpFormatter)
ap.add_argument("--corpus",
default="tau_rag/runtime/parquet_cases.jsonl",
help="JSONL corpus to sample from")
ap.add_argument("--out", default="data/paragraphs_to_label.jsonl",
help="output JSONL path")
ap.add_argument("--n", type=int, default=1000,
help="how many paragraphs to extract")
ap.add_argument("--high-signal-only", action="store_true",
help="keep only paragraphs containing argument markers")
ap.add_argument("--seed", type=int, default=42)
args = ap.parse_args()
corpus_path = Path(args.corpus)
out_path = Path(args.out)
out_path.parent.mkdir(parents=True, exist_ok=True)
if not corpus_path.exists():
raise SystemExit(f"corpus not found: {corpus_path}")
rng = random.Random(args.seed)
# Pass 1: gather candidate paragraphs from a stratified sample of cases
print(f"reading {corpus_path.name}...", flush=True)
candidates = []
n_cases_seen = 0
with corpus_path.open("r", encoding="utf-8") as f:
for line in f:
line = line.strip()
if not line:
continue
try:
rec = json.loads(line)
except Exception:
continue
n_cases_seen += 1
text = rec.get("text", "") or ""
md = rec.get("metadata", {}) or {}
paragraphs = split_paragraphs(text)
for i, para in enumerate(paragraphs):
if args.high_signal_only and not has_argument_marker(para):
continue
candidates.append({
"id": f"{rec.get('id','')}::{i}",
"case_id": rec.get("id", ""),
"domain": md.get("domain"),
"text": para,
"label": None, # to be filled by labeling tool
})
# Cap memory — sample as we go
if len(candidates) > args.n * 30:
rng.shuffle(candidates)
candidates = candidates[:args.n * 10]
print(f" scanned {n_cases_seen:,} cases, "
f"got {len(candidates):,} candidate paragraphs")
rng.shuffle(candidates)
selected = candidates[:args.n]
print(f" selecting {len(selected):,} for labeling")
with out_path.open("w", encoding="utf-8") as f:
for rec in selected:
f.write(json.dumps(rec, ensure_ascii=False) + "\n")
print(f"\n✓ wrote {out_path}")
print(f"\nNext: open the labeling tool")
print(f" python3 -m tau_rag.scripts.labeling_server --pool {out_path}")
if __name__ == "__main__":
main()