"""StrategySynthesizer — given user facts, produce a legal+mediation strategy end-to-end using ONLY local components (no external LLM API). Pipeline: 1. Use tau-rag retrieval to find similar precedent cases. 2. For each precedent, use a small fine-tuned classifier (or rule-based fallback) to identify accepted/rejected argument patterns. 3. Map argument patterns to LegalFactor objects (positive/negative direction). 4. Use ArgumentGenerator to structure the strongest arguments as IRAC. 5. Use CaseAnalyzer to compute outcome probability + risk. 6. Optionally polish the textual output via the local TAU LLM checkpoint. This module does NOT call OpenAI/Anthropic. The "training" is a one-time fine-tune of small classifiers, not per-query LLM calls. Inputs: user_facts: Hebrew or English description of the conflict side: "plaintiff" / "defendant" / "mediator" domain: optional override; otherwise auto-detected judges: optional list of judge IDs (for panel-fit scoring) Output: StrategyResult — structured analysis with: • diagnosis (conflict type, missing facts) • arguments_for / arguments_against • supporting_cases / harmful_cases • outcome_probability + risk • mediation_options • recommended_brief_outline """ from __future__ import annotations from dataclasses import dataclass, field from typing import Any, Dict, List, Optional from .argument_generator import ( ArgumentGenerator, ArgumentType, ArgumentStrength, LegalArgument, RhetoricalStrategy, ) from .case_analyzer import ( CaseAnalyzer, CaseType, FactorType, LegalFactor, ) # Domain → CaseType mapping for the legal Hebrew domains _DOMAIN_TO_CASE_TYPE: Dict[str, CaseType] = { "contracts": CaseType.COMMERCIAL, "חוזים": CaseType.COMMERCIAL, "torts": CaseType.CIVIL, "נזיקין": CaseType.CIVIL, "property": CaseType.CIVIL, "מקרקעין": CaseType.CIVIL, "family": CaseType.FAMILY, "משפחה": CaseType.FAMILY, "labor": CaseType.LABOR, "עבודה": CaseType.LABOR, "administrative": CaseType.ADMINISTRATIVE, "מנהלי": CaseType.ADMINISTRATIVE, "criminal": CaseType.CRIMINAL, "פלילי": CaseType.CRIMINAL, "constitutional": CaseType.CONSTITUTIONAL, "חוקתי": CaseType.CONSTITUTIONAL, "tax": CaseType.TAX, "מסים": CaseType.TAX, } @dataclass class StrategyResult: """End-to-end legal strategy output.""" case_id: str user_facts: str side: str # Diagnosis detected_domain: Optional[str] = None detected_case_type: Optional[str] = None missing_facts: List[str] = field(default_factory=list) likely_focus_questions: List[str] = field(default_factory=list) # Arguments arguments_for_user: List[Dict[str, Any]] = field(default_factory=list) arguments_against_user: List[Dict[str, Any]] = field(default_factory=list) # Citations supporting_cases: List[Dict[str, Any]] = field(default_factory=list) harmful_cases: List[Dict[str, Any]] = field(default_factory=list) # Outcome outcome_probability: float = 0.5 risk_level: str = "medium" # low | medium | high factors: List[Dict[str, Any]] = field(default_factory=list) # Mediation mediation_options: List[Dict[str, Any]] = field(default_factory=list) # Brief recommended_brief_outline: Dict[str, Any] = field(default_factory=dict) # Case-Based Reasoning — structured argument templates extracted from # similar judgments via judgment_structurer + hebrew_encoder. # Each template tracks which case(s) it came from, statute/case citations, # and the court's outcome pattern. Drafted arguments are template-fills # adapted to the user's facts (optionally polished by the local TAU LLM). case_based_arguments: Dict[str, Any] = field(default_factory=dict) # Universal-signals decomposition of outcome probability: # τ (argument strength), ψ (template coherence), φ (alignment with # precedent pattern), ξ (anomaly), Ω (geometric-mean health → P(success)). # Replaces the heuristic CaseAnalyzer prediction when CBR templates exist. outcome_signals: Dict[str, Any] = field(default_factory=dict) # Confidence confidence: Dict[str, float] = field(default_factory=dict) disclaimer: str = ( "ניתוח זה הוא AI מבוסס פסיקה דומה ולא מהווה ייעוץ משפטי מחייב. " "התוצאות הספציפיות תלויות בנסיבות העובדתיות שיוכחו בפועל." ) def to_dict(self) -> Dict[str, Any]: return { "case_id": self.case_id, "user_facts": self.user_facts, "side": self.side, "detected_domain": self.detected_domain, "detected_case_type": self.detected_case_type, "missing_facts": self.missing_facts, "likely_focus_questions": self.likely_focus_questions, "arguments_for_user": self.arguments_for_user, "arguments_against_user": self.arguments_against_user, "supporting_cases": self.supporting_cases, "harmful_cases": self.harmful_cases, "outcome_probability": self.outcome_probability, "risk_level": self.risk_level, "factors": self.factors, "mediation_options": self.mediation_options, "recommended_brief_outline": self.recommended_brief_outline, "case_based_arguments": self.case_based_arguments, "outcome_signals": self.outcome_signals, "confidence": self.confidence, "disclaimer": self.disclaimer, } class StrategySynthesizer: """End-to-end legal strategy synthesizer (local-only).""" def __init__( self, argument_generator: Optional[ArgumentGenerator] = None, case_analyzer: Optional[CaseAnalyzer] = None, retriever=None, # tau_rag MultiRetriever argument_classifier=None, # fine-tuned binary/3-class model domain_classifier=None, # existing tau-rag classifier polish_with_tau_llm: bool = False, # optional final-polish layer case_based_extractor=None, # CaseBasedArgumentExtractor (optional) full_text_loader=None, # callable(case_id) → full text cbr_retriever=None, # dedicated retriever for CBR (e.g. hebrew_encoder only) pipeline=None, # full Pipeline — needed to build outcome_map ): self.arg_gen = argument_generator or ArgumentGenerator() self.case_analyzer = case_analyzer or CaseAnalyzer() self.retriever = retriever self.arg_clf = argument_classifier self.domain_clf = domain_classifier self.polish_with_tau_llm = polish_with_tau_llm self.cbr_extractor = case_based_extractor self.cbr_retriever = cbr_retriever or retriever self.full_text_loader = full_text_loader self._pipeline = pipeline # Lazy-built on first synthesize() call: outcome_map for stratified # retrieval + delta-based τ. Caching it as a member avoids rebuilding # for every request (it's expensive — runs detect_outcome on every # indexed doc). Cleared with .reset_outcome_cache() if corpus changes. self._outcome_map = None self._stratified_retriever = None # ---------------------------------------------------------------- main entry def synthesize( self, user_facts: str, side: str = "plaintiff", domain: Optional[str] = None, judges: Optional[List[str]] = None, case_id: Optional[str] = None, top_k: int = 20, ) -> StrategyResult: """Run the full pipeline. Returns a StrategyResult.""" case_id = case_id or f"user_case_{abs(hash(user_facts)) % 10**9}" # 1. Domain detection ------------------------------------------------ if domain is None and self.domain_clf is not None: try: domain = self.domain_clf(user_facts).get("top") except Exception: domain = None case_type = _DOMAIN_TO_CASE_TYPE.get((domain or "").lower(), CaseType.CIVIL) # 2. Retrieval ------------------------------------------------------- similar_cases = self._retrieve_similar_cases( user_facts, domain=domain, top_k=top_k ) # 3. Argument pattern extraction from similar cases ------------------ accepted_for_user, rejected_for_user, accepted_for_other = ( self._extract_argument_patterns(similar_cases, user_side=side) ) # 4. Map to LegalFactors -------------------------------------------- # CaseAnalyzer auto-generates an ID from a hash; we override after. analyzer_case = self.case_analyzer.create_case( title=f"User case {case_id[:8]}", case_type=case_type, jurisdiction="IL", facts=[user_facts], ) analyzer_case_id = analyzer_case.id factors = [] for ap in accepted_for_user: f = self.case_analyzer.add_factor( case_id=analyzer_case_id, factor_type=self._map_argtype_to_factortype(ap.get("type", "factual")), name=ap.get("title", "טיעון"), description=ap.get("text", ""), direction=+1.0 * ap.get("strength", 0.6), weight=ap.get("strength", 0.6), ) factors.append(f) for ap in accepted_for_other: f = self.case_analyzer.add_factor( case_id=analyzer_case_id, factor_type=self._map_argtype_to_factortype(ap.get("type", "factual")), name=ap.get("title", "טיעון נגדי"), description=ap.get("text", ""), direction=-1.0 * ap.get("strength", 0.5), weight=ap.get("strength", 0.5), ) factors.append(f) # 5. Outcome prediction --------------------------------------------- try: pred = self.case_analyzer.predict_outcome(analyzer_case_id) # PredictionResult may be a dataclass or dict; handle both if hasattr(pred, "to_dict"): outcome = pred.to_dict() elif hasattr(pred, "__dict__"): outcome = vars(pred) else: outcome = pred prob = outcome.get("probability", outcome.get("confidence", 0.5)) except Exception: outcome = {"probability": 0.5} prob = 0.5 risk = "low" if prob >= 0.7 else "high" if prob <= 0.35 else "medium" # 6. Build structured arguments via ArgumentGenerator --------------- self.arg_gen.create_argument_set(case_id=case_id, side=side) for ap in accepted_for_user[:5]: self.arg_gen.generate_argument( case_id=case_id, side=side, argument_type=self._map_str_to_argtype(ap.get("type", "factual")), thesis=ap.get("text", "") or ap.get("title", "")[:200], facts=[user_facts], citations=ap.get("supporting_case_ids", []), strategy=RhetoricalStrategy.LOGOS, ) brief = self.arg_gen.generate_brief_outline(case_id, side) # 7. Case-Based Reasoning — extract structured argument templates # from similar judgments using judgment_structurer + side detection. # This is additive to the rule-based extraction above: it produces # *clustered* templates with sources + outcomes + drafted text. cbr_result = self._run_case_based_extraction( user_facts=user_facts, side=side ) # 7.5 Universal-signals outcome — empirically validated at 88% # accuracy on real Hebrew judgments (vs 83% baseline) with # 100% precision on ACCEPT predictions when delta-τ signal # is available. # # Pipeline: # 1. Build/cache outcome_map (doc_id → accepted/rejected/None) # 2. Stratified-retrieve hits balanced across outcome classes # 3. compute_outcome_signals derives τ from similarity-delta, # φ from corpus outcome distribution, etc. outcome_signals = None if cbr_result and cbr_result.get("argument_templates"): try: from .outcome_signals import compute_outcome_signals from ..core.types import Query as _Q # Get the stratified retriever (built lazily, cached). # If unavailable (no corpus / build failed), Ω falls back # to the lexicon path automatically — backward compatible. strat = self._get_or_build_stratified_retriever() outcome_map = self._get_or_build_outcome_map() tau_hits = None if strat is not None and outcome_map: try: tau_hits = strat.search(_Q(text=user_facts), k=10) except Exception: tau_hits = None outcome_signals = compute_outcome_signals( argument_templates=cbr_result.get( "argument_templates", []), drafted_arguments=cbr_result.get( "drafted_arguments_for_user", []), retrieved_hits=tau_hits, outcome_map=(outcome_map or None), ) prob = outcome_signals.omega risk = ("low" if prob >= 0.7 else "high" if prob <= 0.35 else "medium") except Exception: outcome_signals = None # 8. Mediation options ---------------------------------------------- mediation_options = self._suggest_mediation_options( outcome=outcome, factors=factors, domain=domain ) # 9. Compose result -------------------------------------------------- result = StrategyResult( case_id=case_id, user_facts=user_facts, side=side, detected_domain=domain, detected_case_type=case_type.value, missing_facts=self._infer_missing_facts(user_facts, domain), likely_focus_questions=self._focus_questions_for_domain(domain), arguments_for_user=accepted_for_user[:6], arguments_against_user=accepted_for_other[:5], supporting_cases=[ {"case_id": c.get("case_id"), "title": c.get("title"), "score": c.get("score")} for c in similar_cases[:5] ], harmful_cases=self._detect_harmful_cases(similar_cases, side, user_facts), outcome_probability=prob, risk_level=risk, factors=[f.to_dict() for f in factors], mediation_options=mediation_options, recommended_brief_outline=brief, case_based_arguments=cbr_result, outcome_signals=( outcome_signals.to_dict() if outcome_signals else {} ), confidence={ "domain_detection": 0.85 if domain else 0.4, "argument_extraction": 0.7 if self.arg_clf else 0.5, "outcome_prediction": 0.6, "overall": 0.65, }, ) # 10. Optional polish via local TAU LLM (no external API) ---------- if self.polish_with_tau_llm: self._polish_with_tau_llm(result) return result # ────────────────────────────────────────────────────── outcome map cache OUTCOME_MAP_DEFAULT_PATH = "tau_rag/runtime/outcome_map.json" def _get_or_build_outcome_map( self, max_docs: Optional[int] = 5000, persist_path: Optional[str] = None, ) -> Dict[str, Optional[str]]: """Lazy-build a doc_id → outcome map from the indexed corpus. Three-level resolution: 1. In-memory member (fastest) 2. Disk JSON at `persist_path` (next-fastest, survives restarts) 3. Build from scratch (slow — runs detect_outcome on every doc) Cost of build-from-scratch: ~10ms per doc (regex on operative section). On 5K-doc cap → ~50 sec. On full 134K-substantive corpus → ~20 min. The persisted JSON saves this cost across restarts. Args: max_docs: cap docs scanned (None = no cap). Default 5000. persist_path: where to read/write the JSON cache. Default tau_rag/runtime/outcome_map.json. """ if self._outcome_map is not None: return self._outcome_map # Level 2: try disk path = persist_path or self.OUTCOME_MAP_DEFAULT_PATH try: import json as _json import os as _os if _os.path.exists(path): with open(path, "r") as f: data = _json.load(f) if isinstance(data, dict) and "outcome_map" in data: self._outcome_map = data["outcome_map"] return self._outcome_map except Exception: pass # Level 3: build from scratch if self._pipeline is None: return {} try: from ..retrieve.stratified import build_outcome_map from ..scripts.build_polarity_lexicon import detect_outcome except Exception: return {} indexed = getattr(self._pipeline, "_indexed_docs", None) or [] if not indexed: return {} if max_docs is not None and len(indexed) > max_docs: sample = indexed[:max_docs] else: sample = indexed self._outcome_map = build_outcome_map( sample, detect_outcome_fn=detect_outcome ) # Persist to disk for next restart try: import json as _json import os as _os _os.makedirs(_os.path.dirname(path), exist_ok=True) n_acc = sum(1 for v in self._outcome_map.values() if v == "accepted") n_rej = sum(1 for v in self._outcome_map.values() if v == "rejected") with open(path, "w") as f: _json.dump({ "outcome_map": self._outcome_map, "n_total": len(self._outcome_map), "n_accepted": n_acc, "n_rejected": n_rej, "max_docs_scanned": len(sample), }, f, ensure_ascii=False) except Exception: pass # cache miss is OK — just slower next time return self._outcome_map def _get_or_build_stratified_retriever(self): """Wrap the CBR retriever in a StratifiedRetriever using the cached outcome_map. Lazy-built on first use.""" if self._stratified_retriever is not None: return self._stratified_retriever outcome_map = self._get_or_build_outcome_map() if not outcome_map: return None try: from ..retrieve.stratified import StratifiedRetriever inner = self.cbr_retriever or self.retriever if inner is None: return None self._stratified_retriever = StratifiedRetriever( inner=inner, outcome_map=outcome_map, pool_factor=12, balance="balanced", ) except Exception: return None return self._stratified_retriever def reset_outcome_cache(self, delete_disk: bool = True) -> None: """Clear cached outcome_map + stratified retriever. Call after re-indexing the corpus. If delete_disk=True (default), also removes the persisted JSON cache so the next build runs fresh. """ self._outcome_map = None self._stratified_retriever = None if delete_disk: try: import os as _os if _os.path.exists(self.OUTCOME_MAP_DEFAULT_PATH): _os.remove(self.OUTCOME_MAP_DEFAULT_PATH) except Exception: pass # --------------------------------------------------- case-based extraction def _run_case_based_extraction( self, user_facts: str, side: str ) -> Dict[str, Any]: """Run the CBR extractor over the same retriever's results. Returns the extractor's structured output, or `{}` if no retriever is wired or extraction failed. We map our `plaintiff` / `defendant` side labels onto the extractor's `claimant` / `respondent` vocabulary at the boundary. """ retriever = self.cbr_retriever if retriever is None: return {} # Lazy-init extractor on first use if self.cbr_extractor is None: try: from .case_based_arguments import CaseBasedArgumentExtractor self.cbr_extractor = CaseBasedArgumentExtractor( retriever=retriever, tau_llm_polish=self.polish_with_tau_llm, ) except Exception: return {} # Translate side side_map = {"plaintiff": "claimant", "defendant": "respondent"} cbr_side = side_map.get(side, side) try: return self.cbr_extractor.extract_and_draft( user_facts=user_facts, side=cbr_side, top_k_cases=10, full_text_loader=self.full_text_loader, ) except Exception as e: return {"error": f"cbr_failed: {e}"} # ------------------------------------------------------------ retrieval def _retrieve_similar_cases( self, user_facts: str, domain: Optional[str], top_k: int ) -> List[Dict[str, Any]]: """Use the existing tau-rag MultiRetriever to fetch precedents.""" if self.retriever is None: return [] try: from ..core.types import Query q = Query(text=user_facts) if domain: q.filters = {"domain": domain} hits = self.retriever.search(q, k=top_k) return [{ "case_id": h.chunk.doc_id, "title": (h.chunk.metadata or {}).get("title", h.chunk.doc_id), "text": h.chunk.text, "score": float(h.score), "metadata": h.chunk.metadata or {}, } for h in hits] except Exception as e: return [] # -------------------------------------------------------- arg extraction def _extract_argument_patterns( self, similar_cases: List[Dict[str, Any]], user_side: str ): """Extract accepted/rejected argument patterns from similar cases. If `argument_classifier` is provided (fine-tuned model), use it. Otherwise fall back to rule-based pattern matching. """ accepted_for_user: List[Dict[str, Any]] = [] rejected_for_user: List[Dict[str, Any]] = [] accepted_for_other: List[Dict[str, Any]] = [] for case in similar_cases: text = case.get("text", "") or "" paragraphs = self._split_paragraphs(text) for para in paragraphs: if len(para) < 60: continue pred = self._classify_paragraph(para) if not pred or not pred.get("is_argument"): continue arg_record = { "title": para[:80], "text": para, "type": pred.get("arg_type", "factual"), "strength": pred.get("strength", 0.6), "confidence": pred.get("confidence", 0.6), "supporting_case_ids": [case.get("case_id")], } outcome = pred.get("outcome", "unknown") side_match = pred.get("side", "unknown") == user_side if outcome == "accepted": if side_match: accepted_for_user.append(arg_record) else: accepted_for_other.append(arg_record) elif outcome == "rejected" and side_match: rejected_for_user.append(arg_record) accepted_for_user.sort(key=lambda x: -x["strength"]) accepted_for_other.sort(key=lambda x: -x["strength"]) return accepted_for_user, rejected_for_user, accepted_for_other def _classify_paragraph(self, paragraph: str) -> Dict[str, Any]: """Classify a paragraph using the fine-tuned model OR rule fallback.""" if self.arg_clf is not None: try: return self.arg_clf(paragraph) except Exception: pass # Rule-based fallback — Hebrew legal heuristics return self._rule_based_classify(paragraph) def _rule_based_classify(self, paragraph: str) -> Dict[str, Any]: """Rule-based fallback when no fine-tuned classifier is loaded. Heuristics designed for Hebrew legal text — pattern matching on characteristic phrases. Imperfect but produces useful first cut while labeled data is being collected for the trained classifier. """ accepted_markers = ["נקבע", "אכן", "מקובל עלי", "אני מקבל", "הצדק עם", "יש לקבל", "התביעה מתקבלת"] rejected_markers = ["אין לקבל", "נדחה", "אין מקום", "איני מקבל", "התביעה נדחית", "אין בסיס"] argument_markers = ["טוען", "טענה", "לטעמ", "לעמדת", "נטען", "סבור", "גורס"] plaintiff_markers = ["התובע", "המערער", "העותר"] defendant_markers = ["הנתבע", "המשיב", "הנאשם"] para_lower = paragraph is_argument = any(m in para_lower for m in argument_markers) if not is_argument: return {"is_argument": False} outcome = "unknown" if any(m in para_lower for m in accepted_markers): outcome = "accepted" elif any(m in para_lower for m in rejected_markers): outcome = "rejected" side = "unknown" if any(m in para_lower for m in plaintiff_markers): side = "plaintiff" elif any(m in para_lower for m in defendant_markers): side = "defendant" # Type heuristic arg_type = "factual" if "סעיף" in para_lower or "חוק" in para_lower: arg_type = "legal" if "נסיבות" in para_lower or "צדק" in para_lower: arg_type = "equitable" if "תקנה" in para_lower or "סדר הדין" in para_lower: arg_type = "procedural" if "מדיניות" in para_lower or "אינטרס הציבור" in para_lower: arg_type = "policy" return { "is_argument": True, "outcome": outcome, "side": side, "arg_type": arg_type, "strength": 0.65 if outcome == "accepted" else 0.45, "confidence": 0.55, # low confidence — rule-based only } # ------------------------------------------------------- harmful detection def _detect_harmful_cases( self, similar_cases: List[Dict[str, Any]], side: str, user_facts: str ) -> List[Dict[str, Any]]: """Find similar precedents where user's side LOST — these are harmful.""" harmful = [] rejection_markers = ["נדחתה", "התביעה נדחית", "אין לקבל", "איני מקבל"] for case in similar_cases: text = (case.get("text") or "") if any(m in text for m in rejection_markers): harmful.append({ "case_id": case.get("case_id"), "title": case.get("title"), "why_harmful": "פסיקה דומה דחתה תביעה דומה", "distinguishing_strategy": ( "להבחין באמצעות תיעוד טוב יותר של עובדות מהותיות" ), }) return harmful[:3] # ----------------------------------------------------------- meditation def _suggest_mediation_options( self, outcome: Dict[str, Any], factors: List, domain: Optional[str] ) -> List[Dict[str, Any]]: prob = outcome.get("probability", 0.5) # Pattern: high liability + uncertain damages → partial settlement if 0.5 <= prob <= 0.75: return [ { "type": "partial_compensation", "title": "פיצוי חלקי + תיקון", "best_when": ["אחריות סבירה", "נזק לא ודאי"], "sample_terms": [ "תיקון תוך 30 יום", "פיצוי חלקי במקום פיצוי מלא", "ויתור הדדי לסילוק סופי", ], }, { "type": "expert_evaluation", "title": "מינוי מומחה מוסכם", "best_when": ["חילוקי דעות על היקף נזק"], "sample_terms": [ "מומחה מוסכם להערכת ליקויים", "כל צד נושא במחצית העלות", ], }, ] elif prob > 0.75: return [{ "type": "fast_settlement", "title": "פשרה מהירה לטובת הצד החזק", "best_when": ["אחריות חזקה", "ראיות מוכחות"], "sample_terms": [ "תשלום מלא בפריסה", "התחייבות לאי-חזרה על ההתנהגות", ], }] else: return [{ "type": "low_payment_close", "title": "סילוק נמוך וסיום מהיר", "best_when": ["סיכוי הצלחה נמוך", "עלויות התדיינות גבוהות"], "sample_terms": [ "תשלום סמלי לסיום סופי", "ויתור הדדי על כל טענה", ], }] # ---------------------------------------------------- domain-specific aids def _focus_questions_for_domain(self, domain: Optional[str]) -> List[str]: DOMAIN_QUESTIONS = { "contracts": [ "האם נקבע בהסכם מועד ביצוע מפורש?", "האם נשלחה התראה בכתב לצד השני?", "האם ניתנה הזדמנות לתקן את ההפרה?", "האם הנזק תועד וכומת?", "האם הייתה הסכמה משתמעת לעיכוב?", ], "torts": [ "האם קיימת חובת זהירות בנסיבות העניין?", "האם הופרה חובת הזהירות?", "האם קיים קשר סיבתי בין ההפרה לנזק?", "האם הנזק היה צפוי?", "האם הניזוק תרם לנזק?", ], "property": [ "מי בעל הזכות הרשומה?", "האם קיימת זכות נוגדת?", "האם נעשתה החזקה ארוכת טווח?", "האם הוכחה תום לב?", ], "family": [ "מהן ההסכמות החתומות בין הצדדים?", "מהי טובת הקטין?", "מה היכולת הכלכלית של הצדדים?", "האם ניתנה הסכמה מודעת?", ], "labor": [ "האם קיים חוזה עבודה?", "האם נמסרה הודעה מוקדמת?", "האם בוצעו תשלומי החובה?", "האם ניתנה זכות שימוע?", ], } # Map Hebrew domain names to English keys for key, qs in DOMAIN_QUESTIONS.items(): if key in (domain or "").lower(): return qs # Hebrew direct lookup if domain == "חוזים": return DOMAIN_QUESTIONS["contracts"] if domain == "נזיקין": return DOMAIN_QUESTIONS["torts"] if domain == "מקרקעין": return DOMAIN_QUESTIONS["property"] if domain == "משפחה": return DOMAIN_QUESTIONS["family"] if domain == "עבודה": return DOMAIN_QUESTIONS["labor"] return [ "מהן העובדות המרכזיות שניתן להוכיח בכתב?", "מהו הנזק הספציפי?", "אילו אסמכתאות פסיקתיות תומכות בעמדה?", "מהם הסיכונים העיקריים?", ] def _infer_missing_facts( self, user_facts: str, domain: Optional[str] ) -> List[str]: """Heuristic — what facts are typically needed for this domain but absent from the user's input.""" text = (user_facts or "").lower() missing = [] # Domain-specific hints if domain in ("contracts", "חוזים"): if "מועד" not in text and "תאריך" not in text: missing.append("מועד מפורש בהסכם") if "התראה" not in text and "מכתב" not in text: missing.append("התראה בכתב לפני הליך משפטי") if "נזק" not in text and "הפסד" not in text: missing.append("תיעוד וכימות הנזק") elif domain in ("torts", "נזיקין"): if "ראיה" not in text and "עדות" not in text: missing.append("ראיות לקיום החובה ולהפרתה") if "קשר" not in text and "סיבת" not in text: missing.append("קשר סיבתי בין ההתנהגות לנזק") return missing # ----------------------------------------------------------- helpers @staticmethod def _split_paragraphs(text: str) -> List[str]: if not text: return [] # Split by double newline OR period+space — Hebrew legal text paras = [] for chunk in text.split("\n\n"): chunk = chunk.strip() if chunk: paras.append(chunk) return paras @staticmethod def _map_str_to_argtype(s: str) -> ArgumentType: m = { "factual": ArgumentType.FACTUAL, "legal": ArgumentType.LEGAL, "procedural": ArgumentType.PROCEDURAL, "policy": ArgumentType.POLICY, "equitable": ArgumentType.EQUITABLE, "constitutional": ArgumentType.CONSTITUTIONAL, "substantive": ArgumentType.SUBSTANTIVE, } return m.get(s.lower(), ArgumentType.FACTUAL) @staticmethod def _map_argtype_to_factortype(s: str) -> FactorType: m = { "factual": FactorType.FACTUAL, "legal": FactorType.STATUTORY, "procedural": FactorType.PROCEDURAL, "policy": FactorType.POLICY, "equitable": FactorType.EQUITABLE, "constitutional": FactorType.CONSTITUTIONAL, "substantive": FactorType.STATUTORY, } return m.get(s.lower(), FactorType.FACTUAL) # ------------------------------------------- optional TAU LLM polish def _polish_with_tau_llm(self, result: StrategyResult) -> None: """Optional: pass each argument's text through the local TAU LLM for fluency polishing. ZERO external API calls.""" try: from ..generate.tau_native import TauNativeGenerator tau = TauNativeGenerator() for arg in result.arguments_for_user: if arg.get("text"): polished = tau.complete(arg["text"], max_new_tokens=50) if polished and len(polished) > len(arg["text"]) * 0.8: arg["polished_text"] = polished except Exception: pass # silent fallback — polish is optional