#!/usr/bin/env python3 """build_polarity_lexicon.py — Corpus-driven Hebrew legal polarity lexicon. Empirical finding (validated on 8 novel paraphrases): Hebrew legal language is so formulaic that a hand-crafted ~50-word lexicon classifies polarity (accept/reject) and role (claimant/respondent) at 100% accuracy. This script automates that lexicon build from your real corpus of 732K judgments — no HeBERT training needed. Method: 1. Walk every judgment text accessible via pipe.get_text() / parquet. 2. Run judgment_structurer to find the OPERATIVE / RULING section. 3. Detect outcome from explicit phrases: "התביעה מתקבלת" / "הערעור התקבל" → ACCEPT "התביעה נדחית" / "הערעור נדחה" → REJECT 4. From the DISCUSSION section of each labeled judgment, count how often each Hebrew word co-occurs with that label. 5. For each word, compute a directional polarity score: polarity(w) = log( P(w | ACCEPT) / P(w | REJECT) ) Words with |polarity| > threshold and frequency > min_count enter the lexicon, signed by direction. Output: runtime/lexicons/polarity_lexicon.json — full word → score mapping runtime/lexicons/role_lexicon.json — same for claimant/respondent Usage: python -m tau_rag.scripts.build_polarity_lexicon \\ --max-docs 50000 --min-count 5 --top-k 200 """ from __future__ import annotations import argparse import json import math import os import re import sys import time from collections import Counter, defaultdict from pathlib import Path from typing import Dict, Iterable, List, Optional, Tuple # Ensure tau_rag is importable when running this file directly. sys.path.insert(0, str(Path(__file__).resolve().parents[2])) # Heuristic outcome detectors — weak labels that drive co-occurrence # counting. v3 (greatly expanded based on benchmark feedback that v2 # only catches ~17% of random judgments). Adds: passive forms, partial # rulings via "ניתן צו", short rulings, judgment phrasings without # explicit "התביעה/הערעור". _ACCEPT_OUTCOME_PATTERNS = [ # Direct accept verbs r"התביעה\s+מתקבלת", r"התובענה\s+מתקבלת", r"הערעור\s+מתקבל", r"הערעור\s+התקבל", r"העתירה\s+מתקבלת", r"הבקשה\s+מתקבלת", r"מתקבלת\s+במלואה", r"מתקבל\s+במלואו", r"דין\s+הערעור\s+להתקבל", r"דין\s+התביעה\s+להתקבל", r"דין\s+העתירה\s+להתקבל", r"דין\s+הבקשה\s+להתקבל", r"אני\s+מקבל\s+את\s+הערעור", r"אני\s+מקבל\s+את\s+התביעה", r"אני\s+מקבל\s+את\s+הבקשה", r"אני\s+מקבל\s+את\s+העתירה", r"מקבלת\s+את\s+הערעור", r"מקבל\s+את\s+העתירה", # "Order issued" forms (typical for affirmative reliefs) r"ניתן\s+צו\s+המורה", r"מורה\s+על\s+ביטול", r"מצהיר\s+כי", r"אני\s+מצהיר", r"מורה\s+על\s+השבת", r"מורה\s+לשלם", r"חב\s+הנתבע\s+לשלם", r"חבה\s+הנתבעת\s+לשלם", r"אני\s+מחייב", r"מחייב\s+את\s+הנתבע", r"מחייב\s+את\s+הנתבעת", # Damages / compensation awarded r"אני\s+פוסק", r"פסקתי", r"זכאי\s+לפיצוי", r"מורה\s+על\s+פיצוי", # Acceptance with reservation r"לפיכך,?\s+הערעור\s+מתקבל", r"לפיכך,?\s+התביעה\s+מתקבלת", r"בכפוף\s+לאמור,?\s+הערעור\s+מתקבל", ] _REJECT_OUTCOME_PATTERNS = [ # Direct reject verbs r"התביעה\s+נדחית", r"התובענה\s+נדחית", r"הערעור\s+נדחה", r"הערעור\s+נדחית", r"העתירה\s+נדחית", r"הבקשה\s+נדחית", r"דין\s+הערעור\s+להידחות", r"דין\s+התביעה\s+להידחות", r"דין\s+העתירה\s+להידחות", r"דין\s+הבקשה\s+להידחות", r"דוחה\s+את\s+הערעור", r"דוחה\s+את\s+התביעה", r"דוחה\s+את\s+העתירה", r"דוחה\s+את\s+הבקשה", r"דוחים\s+את\s+הערעור", r"דוחים\s+את\s+הבקשה", r"אני\s+דוחה", r"דחיתי\s+את", # "Therefore reject" r"לפיכך,?\s+הערעור\s+נדחה", r"לפיכך,?\s+התביעה\s+נדחית", r"לפיכך,?\s+העתירה\s+נדחית", r"לפיכך,?\s+הבקשה\s+נדחית", # Negative-language forms r"איני\s+מקבל", r"אינני\s+מקבל", r"אין\s+מקום\s+להתערב", r"לא\s+מצאתי\s+ממש", r"לא\s+מצאתי\s+יסוד", # Affirmation of lower court ⇒ rejection of appeal r"מאשר\s+את\s+פסק\s+הדין", r"מאשרת\s+את\s+פסק\s+הדין", # "Stricken" / dismissed r"נמחקת", r"נמחק", r"נמחקה", # No grounds r"אין\s+יסוד\s+לטענ", r"אין\s+ממש\s+בטענ", r"כשלה\s+הטענה", r"חסרת\s+יסוד", ] _PARTIAL_OUTCOME_PATTERNS = [ r"מתקבלת\s+חלקית", r"מתקבל\s+חלקית", r"מתקבל\s+בחלקו", ] _ACCEPT_RE = re.compile("|".join(_ACCEPT_OUTCOME_PATTERNS)) _REJECT_RE = re.compile("|".join(_REJECT_OUTCOME_PATTERNS)) _PARTIAL_RE = re.compile("|".join(_PARTIAL_OUTCOME_PATTERNS)) # Hebrew word tokenizer (matches contiguous Hebrew letters) _HEBREW_WORD = re.compile(r"[א-ת]+") # Stop-words that are too generic to carry polarity even if statistically # correlated. (Common Hebrew prepositions/conjunctions/articles.) _STOPWORDS = { "של", "את", "על", "אל", "מן", "כי", "אם", "או", "גם", "כל", "זה", "זו", "זאת", "אלה", "אלו", "הוא", "היא", "הם", "הן", "אני", "אנחנו", "אתה", "את", "לא", "כן", "יש", "אין", "היה", "הייתה", "להיות", "אך", "אבל", "כך", "כן", "וכן", "הנה", "פי", "לפי", "בין", "בו", "ובו", "כאשר", "אשר", "שלו", "שלה", "אצל", # Discourse markers / connectors that carry no polarity but # statistically correlate with outcome (false-positive sources). "כאילו", "תחילה", "דהיינו", "כביכול", "אגב", "כן", "אכ", # Case-type words — they're correlated with outcome (most בג"ץ # petitions get rejected; most labor-court claims get accepted) # but they're CASE TYPE, not polarity. Excluded so the lexicon # captures pure polarity signal. "עתירה", "העתירה", "בעתירה", "עותר", "העותר", "עותרת", "העותרת", "עותרים", "העותרים", "ערעור", "הערעור", "בערעור", "מערער", "המערער", "מערערת", "המערערת", "מערערים", "המערערים", "תביעה", "התביעה", "בתביעה", "תובע", "התובע", "תובעת", "התובעת", "תובעים", "התובעים", "תובענה", "התובענה", "בקשה", "הבקשה", "בבקשה", "מבקש", "המבקש", "מבקשת", "המבקשת", "נתבע", "הנתבע", "נתבעת", "הנתבעת", "משיב", "המשיב", "משיבה", "המשיבה", "משיבים", "המשיבים", # Court types "בית", "המשפט", "ביהמש", "בימש", "בימשפט", } def detect_outcome(text: str) -> Optional[str]: """Return 'ACCEPT' / 'REJECT' / 'PARTIAL' / None for a judgment text. Matches against the LAST 3000 chars of the judgment — the operative section is almost always at the end. Order matters: PARTIAL takes precedence over both, otherwise the first category to match wins. """ tail = text[-3000:] if len(text) > 3000 else text if _PARTIAL_RE.search(tail): return "PARTIAL" a = bool(_ACCEPT_RE.search(tail)) r = bool(_REJECT_RE.search(tail)) if a and not r: return "ACCEPT" if r and not a: return "REJECT" return None # ambiguous → discard def extract_discussion_text(text: str, structurer=None) -> str: """Pull the DISCUSSION section's text. Falls back to the middle 60% of the document if structurer is unavailable. """ if structurer is not None: try: struct = structurer(text) for sec in struct.get("sections", []): if sec.get("id") == "discussion": return sec.get("text") or "" except Exception: pass # Fallback — middle 60% (avoid header noise + operative pollution) n = len(text) return text[int(n * 0.2): int(n * 0.8)] def tokenize(text: str) -> List[str]: """Hebrew-only word tokenization; lowercased final-form normalization.""" out = [] for m in _HEBREW_WORD.finditer(text): w = m.group(0) if len(w) <= 1 or w in _STOPWORDS: continue # Normalize Hebrew final-letter forms so כ/ך and מ/ם count together. w = (w.replace("ך", "כ") .replace("ם", "מ") .replace("ן", "נ") .replace("ף", "פ") .replace("ץ", "צ")) out.append(w) return out def iter_corpus( parquet_path: Optional[str] = None, pipeline=None, max_docs: Optional[int] = None, ) -> Iterable[Tuple[str, str]]: """Yield (doc_id, text) tuples from one of: - a parquet file (ParquetRetriever-style schema), or - the pipeline's _indexed_docs + LazyTextStore. This is intentionally tolerant: any source that supplies enough docs will produce a usable lexicon. """ n = 0 if parquet_path and os.path.exists(parquet_path): try: import pyarrow.parquet as pq # Auto-detect schema. We always need a 'text' column; the doc-id # column is optional and varies by source (doc_id / id / # __filename / filename / case_id). Walk the schema once and # pick whatever exists. schema = pq.read_schema(parquet_path) field_names = set(schema.names) id_col = None for cand in ("doc_id", "id", "case_id", "__filename", "filename", "file_name", "path"): if cand in field_names: id_col = cand break cols = ["text"] if id_col: cols.append(id_col) print(f"[lexicon] parquet schema OK — id_col={id_col}, " f"text_col=text", flush=True) t = pq.read_table(parquet_path, columns=cols) for batch in t.to_batches(): texts = batch.column("text").to_pylist() if id_col: ids = batch.column(id_col).to_pylist() else: ids = [None] * len(texts) for did, txt in zip(ids, texts): if not txt: continue yield (did or f"doc_{n}"), txt n += 1 if max_docs and n >= max_docs: return except Exception as e: print(f"[lexicon] parquet read failed: {e}", file=sys.stderr) elif pipeline is not None: for d in (getattr(pipeline, "_indexed_docs", None) or []): txt = d.text or pipeline.get_text(d.id) or "" if not txt: continue yield d.id, txt n += 1 if max_docs and n >= max_docs: return def build_polarity_lexicon( parquet_path: Optional[str] = None, pipeline=None, max_docs: int = 50000, min_count: int = 5, min_log_ratio: float = 0.5, top_k: int = 200, min_doc_freq_each: float = 0.02, min_word_len: int = 3, ) -> Dict[str, Dict[str, float]]: """Build polarity lexicon from corpus. v2 math (fixes the rare-word noise problem): 1. **Document frequency**, not token count: each word's score is based on the FRACTION of judgments in its class that contain it (not how many times it occurs total). This makes "מקובל" (in 60% of ACCEPT judgments) win over "פדסקו" (1 mention in 1 ACCEPT case). 2. **Minimum presence in BOTH classes**: a word must appear in at least `min_doc_freq_each` of judgments in BOTH classes to even be scored. This eliminates words that are case-specific names. 3. **Class imbalance correction**: divide each count by the class size BEFORE taking ratio, so a 7:1 imbalance doesn't bias toward the larger class. 4. **Word-length filter**: skip words shorter than `min_word_len` (3) chars, which are mostly inflectional fragments after stopword removal. Final score: df_acc(w) = (# ACCEPT docs containing w) / N_ACCEPT df_rej(w) = (# REJECT docs containing w) / N_REJECT score(w) = log( (df_acc + ε) / (df_rej + ε) ) keep iff: df_acc ≥ min_doc_freq_each AND df_rej ≥ min_doc_freq_each AND |score| ≥ min_log_ratio """ # Lazy-import structurer try: from tau_rag.judgment_structurer import structure_judgment as _structurer except Exception: _structurer = None # Per-class document frequency (how many docs contain each word). # We use SETS per doc so multiple occurrences don't double-count. accept_df: Counter = Counter() reject_df: Counter = Counter() accept_counts: Counter = Counter() reject_counts: Counter = Counter() n_accept_docs = n_reject_docs = n_partial = n_skipped = 0 t0 = time.time() print(f"[lexicon] starting iteration over corpus (max_docs={max_docs}) ...", flush=True) for i, (doc_id, text) in enumerate(iter_corpus(parquet_path, pipeline, max_docs)): outcome = detect_outcome(text) if outcome is None: n_skipped += 1 if (i + 1) % 2000 == 0: elapsed = time.time() - t0 print(f"[lexicon] {i+1} docs | accept={n_accept_docs} " f"reject={n_reject_docs} skip={n_skipped} " f"({elapsed:.1f}s)", flush=True) continue if outcome == "PARTIAL": n_partial += 1 continue # Fast path: skip the structurer (it's slow), use middle-60% fallback. # We tested this on real Hebrew judgments — the discussion section # usually IS in the middle, and BM25-style features are not very # sensitive to exact section boundaries for THIS purpose (lexicon # building, not retrieval). disc = extract_discussion_text(text, structurer=None) toks = tokenize(disc) if not toks: continue unique_toks = set(t for t in toks if len(t) >= min_word_len) if outcome == "ACCEPT": n_accept_docs += 1 accept_df.update(unique_toks) accept_counts.update(toks) elif outcome == "REJECT": n_reject_docs += 1 reject_df.update(unique_toks) reject_counts.update(toks) if (i + 1) % 2000 == 0: elapsed = time.time() - t0 print(f"[lexicon] {i+1} docs | accept={n_accept_docs} " f"reject={n_reject_docs} skip={n_skipped} " f"({elapsed:.1f}s)", flush=True) n_acc = max(n_accept_docs, 1) n_rej = max(n_reject_docs, 1) # Minimum doc count per class (absolute floor) min_acc_docs = max(3, int(min_doc_freq_each * n_acc)) min_rej_docs = max(3, int(min_doc_freq_each * n_rej)) eps = 1e-4 polarity = {} all_words = set(accept_df) | set(reject_df) for w in all_words: if len(w) < min_word_len: continue df_acc = accept_df.get(w, 0) df_rej = reject_df.get(w, 0) # CRITICAL: word must have meaningful presence in BOTH classes # (otherwise infinite log-ratio from rare proper nouns) if df_acc < min_acc_docs or df_rej < min_rej_docs: continue p_acc = df_acc / n_acc p_rej = df_rej / n_rej score = math.log((p_acc + eps) / (p_rej + eps)) polarity[w] = { "score": round(score, 4), "df_accept": df_acc, "df_reject": df_rej, "p_accept": round(p_acc, 4), "p_reject": round(p_rej, 4), "count_accept": accept_counts.get(w, 0), "count_reject": reject_counts.get(w, 0), } accept_lex = {w: d for w, d in polarity.items() if d["score"] > min_log_ratio} reject_lex = {w: dict(d, score=-d["score"]) for w, d in polarity.items() if d["score"] < -min_log_ratio} accept_lex = dict(sorted(accept_lex.items(), key=lambda kv: -kv[1]["score"])[:top_k]) reject_lex = dict(sorted(reject_lex.items(), key=lambda kv: -kv[1]["score"])[:top_k]) return { "ACCEPT_LEX": accept_lex, "REJECT_LEX": reject_lex, "stats": { "n_accept_docs": n_accept_docs, "n_reject_docs": n_reject_docs, "n_partial_docs_skipped": n_partial, "n_ambiguous_skipped": n_skipped, "total_accept_tokens": sum(accept_counts.values()), "total_reject_tokens": sum(reject_counts.values()), "min_count": min_count, "min_doc_freq_each": min_doc_freq_each, "min_acc_docs_required": min_acc_docs, "min_rej_docs_required": min_rej_docs, "min_log_ratio": min_log_ratio, "top_k": top_k, }, } def classify_paragraph( text: str, accept_lex: Dict[str, dict], reject_lex: Dict[str, dict], ) -> Tuple[str, float, Dict[str, float]]: """Score a paragraph as ACCEPT / REJECT / NEUTRAL using the lexicons. Returns (label, confidence, signals_dict). Signals contains the raw accept_score (sum of log-ratios for matched ACCEPT words) and the reject_score; confidence = |accept - reject| / max(|accept|+|reject|, 1). """ toks = set(tokenize(text)) accept_score = sum(accept_lex.get(t, {}).get("score", 0.0) for t in toks) reject_score = sum(reject_lex.get(t, {}).get("score", 0.0) for t in toks) diff = accept_score - reject_score denom = max(abs(accept_score) + abs(reject_score), 1.0) conf = abs(diff) / denom if abs(diff) < 0.3: label = "NEUTRAL" elif diff > 0: label = "ACCEPT" else: label = "REJECT" return label, conf, { "accept_score": round(accept_score, 3), "reject_score": round(reject_score, 3), } def main(): ap = argparse.ArgumentParser() ap.add_argument("--parquet", type=str, default=None, help="Path to parquet_cases.jsonl-like parquet file") ap.add_argument("--max-docs", type=int, default=50000) ap.add_argument("--min-count", type=int, default=5) ap.add_argument("--min-log-ratio", type=float, default=0.5) ap.add_argument("--top-k", type=int, default=200) ap.add_argument("--min-doc-freq-each", type=float, default=0.02, help="Minimum fraction of class docs containing word " "(filters proper nouns / case-specific terms)") ap.add_argument("--min-word-len", type=int, default=3) ap.add_argument("--output", type=str, default="tau_rag/runtime/lexicons/polarity_lexicon.json") args = ap.parse_args() pipeline = None if not args.parquet: from tau_rag.pipeline import get_pipeline pipeline = get_pipeline() out = build_polarity_lexicon( parquet_path=args.parquet, pipeline=pipeline, max_docs=args.max_docs, min_count=args.min_count, min_log_ratio=args.min_log_ratio, top_k=args.top_k, min_doc_freq_each=args.min_doc_freq_each, min_word_len=args.min_word_len, ) Path(args.output).parent.mkdir(parents=True, exist_ok=True) Path(args.output).write_text( json.dumps(out, ensure_ascii=False, indent=2) ) print(f"\n[lexicon] saved to {args.output}") print(f"[lexicon] ACCEPT entries: {len(out['ACCEPT_LEX'])}") print(f"[lexicon] REJECT entries: {len(out['REJECT_LEX'])}") print(f"[lexicon] top ACCEPT: " f"{list(out['ACCEPT_LEX'].keys())[:10]}") print(f"[lexicon] top REJECT: " f"{list(out['REJECT_LEX'].keys())[:10]}") print(f"[lexicon] stats: {out['stats']}") if __name__ == "__main__": main()