| |
| """Extract candidate paragraphs from the Hebrew legal corpus for labeling. |
| |
| Pulls discussion-section paragraphs from `parquet_cases.jsonl`, filters for |
| ones that look likely to contain legal arguments (Hebrew "טוען", "טענה", etc.), |
| and writes them as a single JSONL file ready for the HTML labeling tool. |
| |
| Output format (per line): |
| {"id": "<case_id>::<paragraph_idx>", |
| "case_id": "...", |
| "domain": "...", |
| "text": "<paragraph>", |
| "label": null} # filled in by the labeling tool |
| |
| Usage: |
| python3 -m tau_rag.scripts.extract_paragraphs_for_labeling \\ |
| --n 1000 --out data/paragraphs_to_label.jsonl |
| """ |
| from __future__ import annotations |
|
|
| import argparse |
| import json |
| import random |
| from pathlib import Path |
| from typing import List, Tuple |
|
|
|
|
| |
| |
| |
| ARGUMENT_INDICATORS = [ |
| "טוען", "טענה", "סבור", "גורס", "לטענת", "לעמדת", |
| "נטען", "נקבע", "אכן", "מקובל", "דחה", "דחתה", |
| "אין לקבל", "יש לקבל", "התביעה", |
| ] |
| MIN_LEN = 80 |
| MAX_LEN = 800 |
|
|
|
|
| def split_paragraphs(text: str) -> List[str]: |
| if not text: |
| return [] |
| paras = [] |
| for p in text.split("\n\n"): |
| p = p.strip() |
| if MIN_LEN <= len(p) <= MAX_LEN: |
| paras.append(p) |
| return paras |
|
|
|
|
| def has_argument_marker(text: str) -> bool: |
| return any(m in text for m in ARGUMENT_INDICATORS) |
|
|
|
|
| def main(): |
| ap = argparse.ArgumentParser(description=__doc__, |
| formatter_class=argparse.RawDescriptionHelpFormatter) |
| ap.add_argument("--corpus", |
| default="tau_rag/runtime/parquet_cases.jsonl", |
| help="JSONL corpus to sample from") |
| ap.add_argument("--out", default="data/paragraphs_to_label.jsonl", |
| help="output JSONL path") |
| ap.add_argument("--n", type=int, default=1000, |
| help="how many paragraphs to extract") |
| ap.add_argument("--high-signal-only", action="store_true", |
| help="keep only paragraphs containing argument markers") |
| ap.add_argument("--seed", type=int, default=42) |
| args = ap.parse_args() |
|
|
| corpus_path = Path(args.corpus) |
| out_path = Path(args.out) |
| out_path.parent.mkdir(parents=True, exist_ok=True) |
|
|
| if not corpus_path.exists(): |
| raise SystemExit(f"corpus not found: {corpus_path}") |
|
|
| rng = random.Random(args.seed) |
|
|
| |
| print(f"reading {corpus_path.name}...", flush=True) |
| candidates = [] |
| n_cases_seen = 0 |
| with corpus_path.open("r", encoding="utf-8") as f: |
| for line in f: |
| line = line.strip() |
| if not line: |
| continue |
| try: |
| rec = json.loads(line) |
| except Exception: |
| continue |
| n_cases_seen += 1 |
| text = rec.get("text", "") or "" |
| md = rec.get("metadata", {}) or {} |
| paragraphs = split_paragraphs(text) |
| for i, para in enumerate(paragraphs): |
| if args.high_signal_only and not has_argument_marker(para): |
| continue |
| candidates.append({ |
| "id": f"{rec.get('id','')}::{i}", |
| "case_id": rec.get("id", ""), |
| "domain": md.get("domain"), |
| "text": para, |
| "label": None, |
| }) |
| |
| if len(candidates) > args.n * 30: |
| rng.shuffle(candidates) |
| candidates = candidates[:args.n * 10] |
| print(f" scanned {n_cases_seen:,} cases, " |
| f"got {len(candidates):,} candidate paragraphs") |
|
|
| rng.shuffle(candidates) |
| selected = candidates[:args.n] |
| print(f" selecting {len(selected):,} for labeling") |
|
|
| with out_path.open("w", encoding="utf-8") as f: |
| for rec in selected: |
| f.write(json.dumps(rec, ensure_ascii=False) + "\n") |
| print(f"\n✓ wrote {out_path}") |
| print(f"\nNext: open the labeling tool") |
| print(f" python3 -m tau_rag.scripts.labeling_server --pool {out_path}") |
|
|
|
|
| if __name__ == "__main__": |
| main() |
|
|