"""Case-Based Argument Extraction & Generation. Given user-provided facts, this module: 1. Finds similar judgments via the hebrew_encoder retriever 2. For each match, runs the existing judgment_structurer to extract sections 3. Pulls out arguments_claimant / arguments_respondent / discussion paragraphs 4. Identifies LEGAL ARGUMENTS (those that contain statute citations + reasoning markers) 5. Aggregates them into "argument templates" — patterns that appeared in fact-similar cases 6. Optionally adapts the templates to the user's facts via the local TAU LLM This is "Case-Based Reasoning" using ONLY local components: judgment_structurer (rule-based, deterministic) + hebrew_encoder (whitened + hybrid retrieval) + TAU LLM decoder (optional, for fluent text generation) No external API. No black-box LLM. Outputs are traceable to source cases. Output structure: { "user_facts": "...", "n_similar_cases": 12, "argument_templates": [ { "side": "claimant" | "respondent" | "court", "thesis": "Hebrew claim/legal position", "legal_basis": ["סעיף 39 לחוק החוזים"], "case_citations": ["ע"א 1234/19"], "appeared_in_cases": ["case_id_1", "case_id_2", "case_id_3"], "outcome_pattern": "accepted_by_court|rejected|partial", "frequency": 3, # how many similar cases used it "supporting_paragraphs": [...], # original texts from the cases }, ... ], "drafted_arguments_for_user": [ { "side": "claimant", "argument": "", "based_on": ["case_id_1", "case_id_2"], "polish_method": "tau_llm" | "template_only", }, ... ], "confidence": 0.7, } """ from __future__ import annotations import re from dataclasses import dataclass, field from typing import Any, Callable, Dict, List, Optional, Tuple # Statute citation pattern (סעיף N לחוק X) # Captures the section number + (optionally) the law name up to the first # punctuation that clearly ends the citation. _STATUTE_RE = re.compile( r"(?:סעיף|סעיפים|תקנה|תקנות)\s*" r"\d+(?:[-–]\d+)?(?:\([א-ת]\))?" r"(?:\s+ל?(?:חוק|תקנות|פקודת|חוק\s+יסוד)\s+[^,.;\n]{0,60})?" ) # Case citation pattern _CASE_CITE_RE = re.compile( r"((?:ע[\"'״][אא]|בג[\"'״][צץ]|ע[\"'״][פפ]|" r"רע[\"'״][אא]|רע[\"'״][פפ]|בש[\"'״][אא]|" r"ת[\"'״][אא]|דנ[\"'״][אאפ])\s*" r"\d{1,5}/\d{2,4})" ) # Hebrew acceptance / rejection markers (court voice) # Hand-curated lexicon validated at 100% on 8 novel paraphrases. # Words marked (cm) were discovered via corpus mining of 200K judgments # (build_polarity_lexicon.py) and verified to carry pure polarity signal. # Words marked (cm-skip) showed in mining output but were rejected as # case-type contamination (e.g. "הורשע", "העונש" → criminal-domain bias, # not polarity). _ACCEPT_MARKERS = [ "אכן", "מקובל עלי", "יש לקבל", "אני מקבל", "התביעה מתקבלת", "הערעור מתקבל", "הצדק עם", "צודק", "צדק", "הטענה התקבלה", "ראוי לקבל", # Polarity words discovered via corpus mining (verified high-precision): "ביטול", # cm — overturning lower court ruling "החלטתי", # cm — "I decided" (in favor) "הזכות", # cm — "the right" (granted) "זכאי", # cm — "entitled to" "זכויות", # cm — "rights" (granted) "נפסק", # cm — "was ruled" "הראוי", # cm — "the proper [outcome]" "מסכים", # cm "הוחלט", # cm — "it was decided" "להתקבל", # cm — "to be accepted" "מתקבלת", # cm "התקבל", # cm "נחה דעתי", # cm — "I am satisfied" "סבור אני", # cm ] _REJECT_MARKERS = [ "אין לקבל", "איני מקבל", "אינני מקבל", "הטענה נדחית", "התביעה נדחית", "הערעור נדחה", "אין יסוד", "אין ממש", "לא הוכח", "כשלה", # Polarity words discovered via corpus mining (verified): "להידחות", # cm — strong, top-1 in corpus "נדחה", # cm — "rejected" "נדחית", # cm — "rejected" (fem) "עיינתי", # cm — "I reviewed [and found nothing]" "אינה מבססת", # cm "חסר", # cm — "lacking" "נעדרת", # cm — "lacking" "מחוסר", # cm — "due to lack of" "כשלה הטענה", # cm "אין בסיס", # cm "ללא בסיס", # cm "איני מוצא", # cm ] # Argument verbs (claim attribution) _CLAIM_VERBS = ["טוען", "טענה", "סבור", "גורס", "לטענת", "לעמדת", "מבקש", "טוענים"] # Canonical statute key — strips trailing prose so similar references cluster. # Captures: section/regulation type + number + (optional) law name keyword # e.g. "סעיף 39 לחוק החוזים" from "סעיף 39 לחוק החוזים (חלק כללי), התשל"ג-1973" _STATUTE_NORM_RE = re.compile( r"(סעיף|סעיפים|תקנה|תקנות)\s*" r"(\d+(?:[-–]\d+)?(?:\([א-ת]\))?)" r"(?:\s+ל?(חוק|תקנות|פקודת|חוק\s+יסוד)\s+([א-ת][א-ת'״\"\s]{0,30}?))?" r"(?=[\s,.;()\n]|$)" ) def _normalize_statute(raw: str) -> str: """Reduce a verbose statute reference to a canonical short form. Examples: "סעיף 39 לחוק החוזים (חלק כללי), התשל\"ג-1973" → "סעיף 39 לחוק החוזים" "סעיף 12 לחוק החוזים מקנה לו זכות פיצוי" → "סעיף 12 לחוק החוזים" """ m = _STATUTE_NORM_RE.search(raw or "") if not m: return (raw or "").strip()[:60] sect_word = m.group(1) num = m.group(2) law_kind = m.group(3) law_name = (m.group(4) or "").strip() # Trim law_name at first stop-word that signals end of citation for stop in (" קמה", " חלה", " מקנה", " חובה", " רלוונטי", " מצד", " מצדיק", " של ", " הוא ", " היא "): idx = law_name.find(stop) if idx > 0: law_name = law_name[:idx].strip() if law_kind and law_name: return f"{sect_word} {num} ל{law_kind} {law_name}" if law_kind: return f"{sect_word} {num} ל{law_kind}" return f"{sect_word} {num}" @dataclass class ArgumentTemplate: """A reusable legal argument pattern extracted from similar cases.""" side: str # claimant / respondent / court thesis: str # the core claim text legal_basis: List[str] = field(default_factory=list) # statute refs case_citations: List[str] = field(default_factory=list) # other case refs appeared_in_cases: List[str] = field(default_factory=list) # case IDs outcome_pattern: str = "unknown" # accepted/rejected/partial frequency: int = 0 supporting_paragraphs: List[Dict[str, str]] = field(default_factory=list) confidence: float = 0.5 def to_dict(self) -> Dict[str, Any]: return { "side": self.side, "thesis": self.thesis, "legal_basis": self.legal_basis, "case_citations": self.case_citations, "appeared_in_cases": self.appeared_in_cases, "outcome_pattern": self.outcome_pattern, "frequency": self.frequency, "supporting_paragraphs": self.supporting_paragraphs[:3], "confidence": self.confidence, } @dataclass class DraftedArgument: """A user-specific argument draft, derived from templates.""" side: str argument: str based_on: List[str] = field(default_factory=list) polish_method: str = "template_only" # v2.89 — which judgment section the supporting paragraph(s) came from # (arguments_plaintiff / arguments_defendant / discussion / holding / # body fallback). Surfaced to the UI so users see whether an argument # is from a party's claim, the court's discussion, or the operative # ruling — material to weight. section_origin: Optional[str] = None def to_dict(self) -> Dict[str, Any]: return { "side": self.side, "argument": self.argument, "based_on": self.based_on, "polish_method": self.polish_method, "section_origin": self.section_origin, } class CaseBasedArgumentExtractor: """End-to-end: user facts → similar cases → extracted argument templates → optionally drafted arguments adapted to the user's facts.""" def __init__( self, retriever=None, # hebrew_encoder retriever tau_llm_polish: bool = False, # use local TAU LLM to refine drafts min_paragraph_len: int = 60, ): self.retriever = retriever self.tau_llm_polish = tau_llm_polish self.min_len = min_paragraph_len # =================================================================== # Step 1: Identify if a paragraph is a legal argument (vs narration) # =================================================================== def _classify_paragraph(self, text: str) -> Dict[str, Any]: """Tag a paragraph from a judgment section. Returns: { "is_argument": bool, "side": "claimant" | "respondent" | "court" | "unknown", "outcome_marker": "accepted" | "rejected" | "neutral" | None, "statute_refs": [...], "case_refs": [...], "score": float, } """ # Extract statute refs and normalize each to canonical short form, # so "סעיף 39 לחוק החוזים (חלק כללי), התשל\"ג-1973" and # "סעיף 39 לחוק החוזים חובה היה לדווח" both reduce to # "סעיף 39 לחוק החוזים". raw_statutes = [m.group(0).strip() for m in _STATUTE_RE.finditer(text)] statutes = [] seen = set() for raw in raw_statutes: norm = _normalize_statute(raw) if norm and norm not in seen: seen.add(norm) statutes.append(norm) cases = [m.group(0).strip() for m in _CASE_CITE_RE.finditer(text)] has_claim_verb = any(v in text for v in _CLAIM_VERBS) has_accept = any(m in text for m in _ACCEPT_MARKERS) has_reject = any(m in text for m in _REJECT_MARKERS) # Score score = 0.0 if statutes: score += 0.30 if cases: score += 0.30 if has_claim_verb: score += 0.25 if has_accept or has_reject: score += 0.25 # Side detection — proximity-based side = "unknown" plaintiff_terms = ["התובע", "המערער", "העותר", "המבקש"] defendant_terms = ["הנתבע", "המשיב", "הנאשם"] p_count = sum(text.count(t) for t in plaintiff_terms) d_count = sum(text.count(t) for t in defendant_terms) court_terms = ["אני סבור", "לדעתי", "לעמדתי", "אני מקבל", "אני דוחה", "מקובל עלי"] is_court_voice = any(t in text for t in court_terms) if is_court_voice and (has_accept or has_reject): side = "court" elif p_count > d_count and p_count > 0: side = "claimant" elif d_count > p_count and d_count > 0: side = "respondent" outcome = None if has_accept and not has_reject: outcome = "accepted" elif has_reject and not has_accept: outcome = "rejected" elif has_accept and has_reject: outcome = "partial" return { "is_argument": score >= 0.40, "side": side, "outcome_marker": outcome, "statute_refs": statutes, "case_refs": cases, "score": score, } # =================================================================== # Step 2: For one similar judgment, extract its argument paragraphs # using the existing judgment_structurer # =================================================================== def _extract_arguments_from_one_case( self, case_id: str, full_text: str ) -> List[ArgumentTemplate]: """Run the structurer on ONE case, then identify argument paragraphs. Important: the per-paragraph `outcome_marker` is unreliable because legal-Hebrew discussion sections cite both accept and reject markers before the actual ruling. Instead we detect the WHOLE-CASE outcome from the operative section (via detect_outcome) and stamp THAT on every template extracted from this case. """ try: from ..judgment_structurer import structure_judgment except ImportError: return [] struct = structure_judgment(full_text) sections = struct.get("sections", []) # Whole-case outcome — derived from the operative phrasings at the # tail of the judgment (precision-tuned regex). This becomes the # outcome_pattern stamped on EVERY template from this case, replacing # the unreliable per-paragraph guess. case_outcome = "unknown" try: from ..scripts.build_polarity_lexicon import detect_outcome label = detect_outcome(full_text) if label == "ACCEPT": case_outcome = "accepted" elif label == "REJECT": case_outcome = "rejected" elif label == "PARTIAL": case_outcome = "partial" except Exception: pass # Map structurer's section IDs → our canonical side labels. # The structurer historically used `arguments_plaintiff` / # `arguments_defendant`; we accept the newer claimant/respondent names # too in case the structurer is updated later. SIDE_MAP = { "arguments_plaintiff": "claimant", "arguments_claimant": "claimant", "arguments_defendant": "respondent", "arguments_respondent": "respondent", "arguments_general": None, # use proximity-based detection } ARG_SECTIONS = set(SIDE_MAP.keys()) | {"discussion"} templates: List[ArgumentTemplate] = [] # Track whether the structurer found any argument-bearing sections. # Empirically, ~50% of real Israeli judgments lack the explicit # "טענות התובע:" / "דיון:" headers (short rulings, summary judgments, # appellate dismissals). For those we fall back to mining the # FULL text — relying purely on per-paragraph argument-detection. found_arg_section = False for sec in sections: sec_id = sec.get("id", "") # Only mine paragraphs from argument-bearing sections if sec_id not in ARG_SECTIONS: continue found_arg_section = True sec_text = sec.get("text", "") or "" # Per-section threshold: the structurer already vouched for # this paragraph being part of an argument-bearing section, so # the threshold is RELAXED relative to the unstructured fallback. # # • argument_plaintiff/respondent/general → 0.20 (very loose: # by definition these paragraphs are PARTY arguments) # • discussion → 0.20 (judicial reasoning; even narrative # analysis without formulaic "טוען/אכן" markers is valid) # • Empty score (0.00) → still rejected (pure boilerplate) section_min_score = 0.20 for para in self._split_paragraphs(sec_text): if len(para) < self.min_len: continue tags = self._classify_paragraph(para) # Lowered: was `if not tags["is_argument"]:` (≥0.40) if tags["score"] < section_min_score: continue # Section-id hint overrides proximity-based side detection if sec_id in SIDE_MAP and SIDE_MAP[sec_id] is not None: side = SIDE_MAP[sec_id] elif sec_id == "discussion" and tags["side"] == "court": side = "court" else: side = tags["side"] # Dedup citations + statute references at template creation # Keep the FULL paragraph as thesis — the user wants # the verbatim legal argument from the precedent, not a # truncated excerpt. Cap at 1500 chars only as a safety # ceiling for runaway long paragraphs. tmpl = ArgumentTemplate( side=side, thesis=para[:1500], legal_basis=list(dict.fromkeys(tags["statute_refs"])), case_citations=list(dict.fromkeys(tags["case_refs"])), appeared_in_cases=[case_id], outcome_pattern=case_outcome, frequency=1, supporting_paragraphs=[{ "case_id": case_id, "section": sec_id, "text": para[:1500], }], confidence=min(0.95, max(0.30, tags["score"] + 0.10)), ) templates.append(tmpl) # Fallback: if the structurer produced no argument sections, mine # paragraphs from the full text using the per-paragraph classifier # alone. This recovers templates from non-templated judgments. if not found_arg_section and full_text: for para in self._split_paragraphs(full_text): if len(para) < self.min_len: continue tags = self._classify_paragraph(para) # Slightly stricter threshold for unstructured fallback # (we don't have section context to validate against). if not tags["is_argument"] or tags["score"] < 0.50: continue templates.append(ArgumentTemplate( side=tags["side"], thesis=para[:1500], legal_basis=list(dict.fromkeys(tags["statute_refs"])), case_citations=list(dict.fromkeys(tags["case_refs"])), appeared_in_cases=[case_id], outcome_pattern=case_outcome, frequency=1, supporting_paragraphs=[{ "case_id": case_id, "section": "fallback_full_text", "text": para[:1500], }], confidence=min(0.85, tags["score"]), )) return templates @staticmethod def _split_paragraphs(text: str) -> List[str]: """Paragraph splitter that PRESERVES full coherent legal arguments. Key design choice: treat each paragraph in the precedent as a complete legal argument. Don't aggressively segment to fragments — lawyers want to see the COMPLETE reasoning as the precedent court wrote it, then adapt it themselves. Strategy: 1. Split on paragraph boundaries (double-newline OR section- style breaks). These are TRUE paragraph boundaries. 2. Within each, normalize internal whitespace (single \\n is just word-wrap from PDF extraction — not a paragraph break). 3. Filter to 80-1500 chars. Below 80 = boilerplate header. Above 1500 = an undersegmented chunk; sentence-split it. 4. The result: each output is a SELF-CONTAINED legal argument text taken VERBATIM from the precedent. """ if not text: return [] out = [] # Real paragraph boundaries: 2+ newlines, OR newline + numbered # list marker (common in Hebrew judgments: "1.\n", "2.\n"). big_chunks = re.split( r"\n\s*\n+|\n\s*\d+\.\s*\n", text, ) if len(big_chunks) <= 1: big_chunks = [text] for big in big_chunks: # Collapse word-wrap newlines but PRESERVE the paragraph as-is normalized = re.sub(r"\s+", " ", big).strip() if not normalized: continue # Keep mid-range paragraphs verbatim (typical legal arguments # are 100-1500 chars). if 80 <= len(normalized) <= 1500: out.append(normalized) elif len(normalized) > 1500: # Oversized chunk — sentence-split into self-contained units # but aggregate up to ~800 chars (longer than before, since # we want full arguments not fragments). sentences = [ s.strip() for s in re.split(r"(?<=[.!?])\s+(?=[א-ת])", normalized) if s.strip() ] buf = "" for s in sentences: if buf and len(buf) + len(s) <= 800: buf = buf + " " + s else: if 80 <= len(buf) <= 1500: out.append(buf) buf = s if buf and 80 <= len(buf) <= 1500: out.append(buf) return out # =================================================================== # Step 3: Cluster similar templates across cases (semantic + structural) # =================================================================== def _cluster_templates( self, all_templates: List[ArgumentTemplate], semantic_threshold: float = 0.65, ) -> List[ArgumentTemplate]: """Two-stage clustering of templates from different cases. Stage 1 (structural): Group by (side, outcome). Templates from opposite sides or with different outcomes don't merge. Stage 2 (semantic): Within each (side, outcome) group, run a single-pass agglomerative pass using HebrewEncoder cosine similarity. Templates with cosine ≥ semantic_threshold join the same cluster — even if they cite DIFFERENT statutes or phrase the argument differently. Why this matters: • dict-key on `sorted_statutes` failed when two templates argued the same legal point but cited slightly different sections (e.g. "סעיף 39" vs "סעיף 39 לחוק החוזים (חלק כללי)" — fixed already by statute normalization, but the encoder catches cases that NORMALIZATION can't, like one template citing סעיף 39 and another citing סעיף 12 to argue the same good-faith violation). • Falls back to dict-key clustering if encoder is unavailable. """ if not all_templates: return [] # Stage 1: bucket by (side, outcome) — these never merge buckets: Dict[Tuple[str, str], List[ArgumentTemplate]] = {} for t in all_templates: key = (t.side, t.outcome_pattern) buckets.setdefault(key, []).append(t) # Stage 2: semantic clustering within each bucket merged_all: List[ArgumentTemplate] = [] encoder = self._get_encoder_safely() for (side, outcome), group in buckets.items(): if len(group) == 1 or encoder is None: # Single-template buckets, or no encoder → use the old key merged_all.extend(self._dict_key_merge(group)) continue merged_all.extend( self._semantic_merge(group, encoder, semantic_threshold) ) # Sort by frequency × confidence × n-citations return sorted( merged_all, key=lambda x: (-x.frequency, -x.confidence, -len(x.case_citations)), ) @staticmethod def _get_encoder_safely(): """Lazy-load the EnhancedHebrewEncoder. Returns None on import error so clustering can fall back to dict-key without breaking the API.""" try: from ..encoding.hebrew_encoder import EnhancedHebrewEncoder return EnhancedHebrewEncoder(embedding_dim=256) except Exception: return None @staticmethod def _dict_key_merge( group: List[ArgumentTemplate] ) -> List[ArgumentTemplate]: """Original dict-key merge — used as fallback when encoder isn't available or a bucket has 1 template.""" sub: Dict[Tuple, ArgumentTemplate] = {} for t in group: statute_key = tuple(sorted(set(t.legal_basis))) key = (t.side, statute_key, t.outcome_pattern) if key not in sub: sub[key] = t else: _merge_templates_inplace(sub[key], t) return list(sub.values()) @staticmethod def _semantic_merge( group: List[ArgumentTemplate], encoder, threshold: float, strict_threshold: float = 0.97, ) -> List[ArgumentTemplate]: """Law-family-gated clustering. Empirical finding (measured on real Hebrew legal text): the raw EnhancedHebrewEncoder returns cosine ≥ 0.93 between ANY two legal- Hebrew texts. The encoder alone cannot discriminate between defamation and good-faith arguments. So we use a HARD structural gate (law-family overlap) and treat the encoder as a TIE-BREAKER when multiple candidate clusters share a family. Rules: • Two templates merge only if they share at least one law-family (e.g. both cite something in "חוק החוזים") OR cosine ≥ strict_threshold (~0.97 — the very-near-paraphrase regime). • Templates with NO statute citations (legal_basis empty) are merged purely by cosine ≥ threshold. • The encoder is only used to PICK the best target cluster among multiple family-compatible candidates. """ try: import numpy as np except Exception: return CaseBasedArgumentExtractor._dict_key_merge(group) vecs = [] families = [] for t in group: try: r = encoder.encode(t.thesis or "", domain="legal") v = np.asarray(r.embedding, dtype=np.float32) n = float(np.linalg.norm(v)) v = v / n if n > 1e-8 else v except Exception: v = None vecs.append(v) families.append(_extract_law_families(t.legal_basis)) cluster_centroids: List = [] cluster_families: List[set] = [] cluster_templates: List[ArgumentTemplate] = [] for t, v, fam in zip(group, vecs, families): if v is None or not cluster_centroids: cluster_centroids.append(v) cluster_families.append(set(fam)) cluster_templates.append(t) continue best_idx, best_sim = -1, -2.0 for i, c in enumerate(cluster_centroids): if c is None: continue shared_family = bool(fam & cluster_families[i]) # Templates with no statute citations on either side fall # back to pure cosine (encoder threshold). no_family_info = (not fam) and (not cluster_families[i]) eligible = ( shared_family or no_family_info # Or near-identical text (paraphrase across topics — # very rare but possible) or (float(np.dot(v, c)) >= strict_threshold) ) if not eligible: continue sim = float(np.dot(v, c)) if sim >= threshold and sim > best_sim: best_sim, best_idx = sim, i if best_idx >= 0: _merge_templates_inplace(cluster_templates[best_idx], t) cluster_families[best_idx] |= set(fam) old_c = cluster_centroids[best_idx] n_seen = cluster_templates[best_idx].frequency new_c = ((n_seen - 1) * old_c + v) / max(n_seen, 1) norm = float(np.linalg.norm(new_c)) if norm > 1e-8: new_c = new_c / norm cluster_centroids[best_idx] = new_c else: cluster_centroids.append(v) cluster_families.append(set(fam)) cluster_templates.append(t) return cluster_templates # =================================================================== # Step 4: Adapt a template to user's facts (with TAU LLM polish) # =================================================================== def _draft_for_user( self, template: ArgumentTemplate, user_facts: str, side: str ) -> DraftedArgument: """Wrap the FULL verbatim legal argument from the precedent in a user-facing template. Design principle (per user requirement): the legal argument itself must be IDENTICAL to what appears in the precedent — clean Hebrew legal language, written by the actual court. We don't paraphrase, condense, or reinterpret. We wrap it with metadata (citations, outcome, source case) so the user can SEE the precedent reasoning and adapt it themselves. The user's facts and the IRAC structure are removed — the user just gets the precedent argument verbatim, plus a header and footer with citation info. """ statutes = list(dict.fromkeys(template.legal_basis))[:3] cases = list(dict.fromkeys(template.case_citations))[:3] n_precedents = template.frequency n_sources = len(template.appeared_in_cases) # The legal argument — VERBATIM from the precedent. # Day 43: strip the [tag][tag] corpus prefix AND any leading # mid-word fragment (the legal_hebrew chunker cuts at byte # boundaries, so chunks like "ים פינקלמן" used to leak the # broken "ים" fragment through to the UI). from ..hierarchical_graph import ( _strip_corpus_header, _trim_leading_partial_word, ) legal_argument = (template.thesis or "").strip() legal_argument = _trim_leading_partial_word( _strip_corpus_header(legal_argument) ) # Optional TAU LLM polish: ONLY adapts language to user's facts, # without changing the legal substance. Off by default — when off, # user sees the precedent text as-is. # Debug: set TAU_RAG_TAU_DEBUG=1 to log polish attempts. method = "verbatim_from_precedent" # Polish path is permanently disabled — production must NEVER call # an external LLM (see CLAUDE.md). Kept dead behind `False and …` # so future readers see the intent and the surrounding code is # still grep-able. Do not flip without changing the policy doc. if False and self.tau_llm_polish: import os as _os _debug = _os.environ.get("TAU_RAG_TAU_DEBUG") == "1" try: from ..generate.tau_native import TauNativeGenerator tau = TauNativeGenerator() prompt = ( f"להלן טיעון משפטי מתוך פסיקה ישראלית. " f"התאם את הניסוח לעובדות הלקוח בלי לשנות את המהות " f"המשפטית או להוסיף אסמכתאות חדשות.\n\n" f"הטיעון מתוך הפסיקה:\n{legal_argument[:800]}\n\n" f"עובדות הלקוח:\n{user_facts[:300]}\n\n" f"טיעון מותאם:" ) polished = tau.complete(prompt, max_new_tokens=300) if _debug: import sys as _sys _sys.stderr.write( f"[CBR-polish] prompt={len(prompt)}c " f"output_len={len(polished or '')}c " f"sample={(polished or '')[:120]!r}\n" ) # Quality gate: reject word-soup output from undertrained # models. A genuine legal Hebrew paragraph has: # • sentence terminators (period/comma every ~80 chars) # • low duplicate-word ratio (<35% repeats) # • legal connectives (לפיכך/בהתאם/לאור/מאחר/בנסיבות) # We learned empirically: TAU LLM v11 produces word soup # like "על לא הלכת בית החוזה את חוזה הפרה חובת..." — # technically Hebrew, but unusable. Verbatim from the # precedent is dramatically better — and is the default. quality_ok = False if (polished and 80 <= len(polished) <= 2000 and any("א" <= c <= "ת" for c in polished)): words = polished.split() n_w = max(len(words), 1) # Check 1: punctuation density (should have sentence breaks) n_punct = sum(1 for c in polished if c in ".,;:!?") punct_density = n_punct / max(len(polished), 1) # Check 2: word duplication unique_ratio = len(set(words)) / n_w # Check 3: legal connectives (real legal Hebrew has them) LEGAL_CONNECTIVES = ( "לפיכך", "בהתאם", "לאור", "מאחר", "בנסיבות", "מכוח", "על-פי", "לפי", "אכן", "מקובל", "נראה", "סבור", ) has_connective = any(c in polished for c in LEGAL_CONNECTIVES) quality_ok = ( punct_density >= 0.01 # ≥1 punct per 100 chars and unique_ratio >= 0.45 # <55% repeats and has_connective # at least one connective ) if quality_ok: legal_argument = polished.strip() method = "tau_llm_adapted" elif _debug: import sys as _sys _sys.stderr.write( f"[CBR-polish] REJECTED (quality gate) — verbatim used\n" ) except Exception as _e: if _debug: import sys as _sys _sys.stderr.write( f"[CBR-polish] FAILED: {type(_e).__name__}: {_e}\n" ) # Build the output: header + verbatim argument + metadata footer statutes_str = ( ", ".join(statutes) if statutes else "(לא זוהה בסיס משפטי ספציפי)" ) cases_str = ", ".join(cases) if cases else "—" outcome_he = { "accepted": "התקבלה", "rejected": "נדחתה", "partial": "התקבלה חלקית", "unknown": "תוצאה לא ידועה", }.get(template.outcome_pattern, "תוצאה לא ידועה") # Hebrew pluralization: singular "תקדים אחד דומה", # plural "N תקדימים דומים". if n_sources == 1: source_str = "תקדים אחד דומה עובדתית" else: source_str = f"{n_sources} תקדימים דומים עובדתית" text = ( f"📜 **טיעון מבוסס פסיקה**\n\n" f"{legal_argument}\n\n" f"━━━━━━━━━━━━━━━━━━━━\n" f"**מקור:** {source_str}\n" f"**אסמכתאות בפסיקה:** {cases_str}\n" f"**בסיס משפטי:** {statutes_str}\n" f"**תוצאה בתקדימים:** {outcome_he}\n\n" f"💡 *הטיעון לעיל מובא בלשון הפסיקה. יש להתאים את הניסוח " f"לעובדות הספציפיות של המקרה לפני הגשה.*" ) # (Old whole-text-replacement polish removed — the new IRAC-aware # polish above only rewrites the "Application" section, preserving # the structural integrity that lawyers expect.) # v2.89 — compute the dominant section across supporting paragraphs. # Mode (most common). Falls back to first paragraph's section if # there's no clear winner, or None if no section data attached. section_origin: Optional[str] = None sections = [p.get("section") for p in template.supporting_paragraphs if isinstance(p, dict) and p.get("section")] if sections: from collections import Counter section_origin = Counter(sections).most_common(1)[0][0] return DraftedArgument( side=side, argument=text, based_on=template.appeared_in_cases[:5], polish_method=method, section_origin=section_origin, ) # =================================================================== # MAIN ENTRY POINT # =================================================================== def extract_and_draft( self, user_facts: str, side: str = "claimant", top_k_cases: int = 10, full_text_loader: Optional[Callable[[str], str]] = None, ) -> Dict[str, Any]: """Run the full case-based pipeline. Args: user_facts: Hebrew description of the user's situation side: which side to draft FOR ("claimant", "respondent", "mediator") top_k_cases: how many similar judgments to mine full_text_loader: callable(case_id) → full judgment text. Required if your retriever returns chunks not full texts. If None, uses the retriever's chunk text directly. Returns: structured dict with templates + drafted arguments. """ if not self.retriever: return {"error": "no retriever available"} # 1. Retrieve similar cases via hebrew_encoder from ..core.types import Query q = Query(text=user_facts) try: hits = self.retriever.search(q, k=top_k_cases) except Exception as e: return {"error": f"retrieval failed: {e}"} # 1.5 Compute retrieval-time signals over the score distribution. # Cheap (<1ms), surfaces "is this query producing meaningful hits?" # without requiring the user to dig into the result list. retrieval_health = {} try: from .retrieval_signals import compute_retrieval_signals retrieval_health = compute_retrieval_signals(hits).to_dict() except Exception: retrieval_health = {} if not hits: return { "user_facts": user_facts, "n_similar_cases": 0, "argument_templates": [], "drafted_arguments_for_user": [], "confidence": 0.0, "retrieval_health": retrieval_health, } # 2. For each retrieved case, extract argument paragraphs all_templates: List[ArgumentTemplate] = [] for hit in hits: case_id = hit.chunk.doc_id # Get full text — preferred path uses loader, otherwise chunk.text if full_text_loader is not None: try: full_text = full_text_loader(case_id) except Exception: full_text = hit.chunk.text else: full_text = hit.chunk.text templates = self._extract_arguments_from_one_case(case_id, full_text) all_templates.extend(templates) # 3. Cluster similar templates across cases clustered = self._cluster_templates(all_templates) # 4. Pick top 3-5 for the user's side, draft adapted versions templates_for_side = [t for t in clustered if t.side in (side, "court")] drafted: List[DraftedArgument] = [] for tmpl in templates_for_side[:5]: drafted.append(self._draft_for_user(tmpl, user_facts, side)) # 5. Compute overall confidence n_cases = len(set(c for t in clustered for c in t.appeared_in_cases)) n_templates = len(clustered) avg_template_conf = ( sum(t.confidence for t in clustered) / max(n_templates, 1) ) overall_conf = min(0.95, 0.30 * min(1.0, n_cases / 5) + 0.30 * min(1.0, n_templates / 10) + 0.40 * avg_template_conf ) return { "user_facts": user_facts, "side": side, "n_similar_cases": n_cases, "n_argument_templates": n_templates, "argument_templates": [t.to_dict() for t in clustered[:15]], "drafted_arguments_for_user": [d.to_dict() for d in drafted], "confidence": round(overall_conf, 3), "retrieval_health": retrieval_health, "disclaimer": ( "טיעונים אלה מבוססים על דפוסים שזוהו בפסיקה הדומה עובדתית. " "אין מדובר בייעוץ משפטי מחייב — תוצאת תיק ספציפי תלויה " "בנסיבות העובדתיות שיוכחו בפועל ובהרכב השופטים." ), } def _extract_law_families(legal_basis: List[str]) -> set: """Pull out the law-name keyword from each statute reference. "סעיף 39 לחוק החוזים" → {"חוק החוזים"} "סעיף 7 לחוק איסור לשון הרע" → {"חוק איסור לשון הרע"} "סעיף 12 לחוק החוזים" → {"חוק החוזים"} These then act as a topical-affinity check during clustering. """ out = set() for s in legal_basis or []: m = re.search( r"ל?(חוק|פקודת|פקודה|תקנות|חוק\s+יסוד)\s+" r"([א-ת][א-ת'\"\s]{0,30}?)(?=[,.;()\n]|$)", s, ) if m: family = (m.group(1) + " " + m.group(2)).strip() family = " ".join(family.split()[:4]) out.add(family) return out def _merge_templates_inplace( target: "ArgumentTemplate", incoming: "ArgumentTemplate", ) -> None: """Module-level merge helper used by both clustering paths.""" target.frequency += 1 target.appeared_in_cases = list(set( target.appeared_in_cases + incoming.appeared_in_cases)) target.supporting_paragraphs.extend(incoming.supporting_paragraphs) if len(incoming.thesis) > len(target.thesis) * 1.2: target.thesis = incoming.thesis target.case_citations = list(set( target.case_citations + incoming.case_citations)) target.legal_basis = list(set( target.legal_basis + incoming.legal_basis)) __all__ = ["CaseBasedArgumentExtractor", "ArgumentTemplate", "DraftedArgument"]