| """StrategySynthesizer — given user facts, produce a legal+mediation strategy |
| end-to-end using ONLY local components (no external LLM API). |
| |
| Pipeline: |
| 1. Use tau-rag retrieval to find similar precedent cases. |
| 2. For each precedent, use a small fine-tuned classifier (or rule-based |
| fallback) to identify accepted/rejected argument patterns. |
| 3. Map argument patterns to LegalFactor objects (positive/negative direction). |
| 4. Use ArgumentGenerator to structure the strongest arguments as IRAC. |
| 5. Use CaseAnalyzer to compute outcome probability + risk. |
| 6. Optionally polish the textual output via the local TAU LLM checkpoint. |
| |
| This module does NOT call OpenAI/Anthropic. The "training" is a one-time |
| fine-tune of small classifiers, not per-query LLM calls. |
| |
| Inputs: |
| user_facts: Hebrew or English description of the conflict |
| side: "plaintiff" / "defendant" / "mediator" |
| domain: optional override; otherwise auto-detected |
| judges: optional list of judge IDs (for panel-fit scoring) |
| |
| Output: |
| StrategyResult — structured analysis with: |
| • diagnosis (conflict type, missing facts) |
| • arguments_for / arguments_against |
| • supporting_cases / harmful_cases |
| • outcome_probability + risk |
| • mediation_options |
| • recommended_brief_outline |
| """ |
| from __future__ import annotations |
|
|
| from dataclasses import dataclass, field |
| from typing import Any, Dict, List, Optional |
|
|
| from .argument_generator import ( |
| ArgumentGenerator, |
| ArgumentType, |
| ArgumentStrength, |
| LegalArgument, |
| RhetoricalStrategy, |
| ) |
| from .case_analyzer import ( |
| CaseAnalyzer, |
| CaseType, |
| FactorType, |
| LegalFactor, |
| ) |
|
|
|
|
| |
| _DOMAIN_TO_CASE_TYPE: Dict[str, CaseType] = { |
| "contracts": CaseType.COMMERCIAL, |
| "חוזים": CaseType.COMMERCIAL, |
| "torts": CaseType.CIVIL, |
| "נזיקין": CaseType.CIVIL, |
| "property": CaseType.CIVIL, |
| "מקרקעין": CaseType.CIVIL, |
| "family": CaseType.FAMILY, |
| "משפחה": CaseType.FAMILY, |
| "labor": CaseType.LABOR, |
| "עבודה": CaseType.LABOR, |
| "administrative": CaseType.ADMINISTRATIVE, |
| "מנהלי": CaseType.ADMINISTRATIVE, |
| "criminal": CaseType.CRIMINAL, |
| "פלילי": CaseType.CRIMINAL, |
| "constitutional": CaseType.CONSTITUTIONAL, |
| "חוקתי": CaseType.CONSTITUTIONAL, |
| "tax": CaseType.TAX, |
| "מסים": CaseType.TAX, |
| } |
|
|
|
|
| @dataclass |
| class StrategyResult: |
| """End-to-end legal strategy output.""" |
|
|
| case_id: str |
| user_facts: str |
| side: str |
|
|
| |
| detected_domain: Optional[str] = None |
| detected_case_type: Optional[str] = None |
| missing_facts: List[str] = field(default_factory=list) |
| likely_focus_questions: List[str] = field(default_factory=list) |
|
|
| |
| arguments_for_user: List[Dict[str, Any]] = field(default_factory=list) |
| arguments_against_user: List[Dict[str, Any]] = field(default_factory=list) |
|
|
| |
| supporting_cases: List[Dict[str, Any]] = field(default_factory=list) |
| harmful_cases: List[Dict[str, Any]] = field(default_factory=list) |
|
|
| |
| outcome_probability: float = 0.5 |
| risk_level: str = "medium" |
| factors: List[Dict[str, Any]] = field(default_factory=list) |
|
|
| |
| mediation_options: List[Dict[str, Any]] = field(default_factory=list) |
|
|
| |
| recommended_brief_outline: Dict[str, Any] = field(default_factory=dict) |
|
|
| |
| |
| |
| |
| |
| case_based_arguments: Dict[str, Any] = field(default_factory=dict) |
|
|
| |
| |
| |
| |
| outcome_signals: Dict[str, Any] = field(default_factory=dict) |
|
|
| |
| confidence: Dict[str, float] = field(default_factory=dict) |
| disclaimer: str = ( |
| "ניתוח זה הוא AI מבוסס פסיקה דומה ולא מהווה ייעוץ משפטי מחייב. " |
| "התוצאות הספציפיות תלויות בנסיבות העובדתיות שיוכחו בפועל." |
| ) |
|
|
| def to_dict(self) -> Dict[str, Any]: |
| return { |
| "case_id": self.case_id, |
| "user_facts": self.user_facts, |
| "side": self.side, |
| "detected_domain": self.detected_domain, |
| "detected_case_type": self.detected_case_type, |
| "missing_facts": self.missing_facts, |
| "likely_focus_questions": self.likely_focus_questions, |
| "arguments_for_user": self.arguments_for_user, |
| "arguments_against_user": self.arguments_against_user, |
| "supporting_cases": self.supporting_cases, |
| "harmful_cases": self.harmful_cases, |
| "outcome_probability": self.outcome_probability, |
| "risk_level": self.risk_level, |
| "factors": self.factors, |
| "mediation_options": self.mediation_options, |
| "recommended_brief_outline": self.recommended_brief_outline, |
| "case_based_arguments": self.case_based_arguments, |
| "outcome_signals": self.outcome_signals, |
| "confidence": self.confidence, |
| "disclaimer": self.disclaimer, |
| } |
|
|
|
|
| class StrategySynthesizer: |
| """End-to-end legal strategy synthesizer (local-only).""" |
|
|
| def __init__( |
| self, |
| argument_generator: Optional[ArgumentGenerator] = None, |
| case_analyzer: Optional[CaseAnalyzer] = None, |
| retriever=None, |
| argument_classifier=None, |
| domain_classifier=None, |
| polish_with_tau_llm: bool = False, |
| case_based_extractor=None, |
| full_text_loader=None, |
| cbr_retriever=None, |
| pipeline=None, |
| ): |
| self.arg_gen = argument_generator or ArgumentGenerator() |
| self.case_analyzer = case_analyzer or CaseAnalyzer() |
| self.retriever = retriever |
| self.arg_clf = argument_classifier |
| self.domain_clf = domain_classifier |
| self.polish_with_tau_llm = polish_with_tau_llm |
| self.cbr_extractor = case_based_extractor |
| self.cbr_retriever = cbr_retriever or retriever |
| self.full_text_loader = full_text_loader |
| self._pipeline = pipeline |
| |
| |
| |
| |
| self._outcome_map = None |
| self._stratified_retriever = None |
|
|
| |
| def synthesize( |
| self, |
| user_facts: str, |
| side: str = "plaintiff", |
| domain: Optional[str] = None, |
| judges: Optional[List[str]] = None, |
| case_id: Optional[str] = None, |
| top_k: int = 20, |
| ) -> StrategyResult: |
| """Run the full pipeline. Returns a StrategyResult.""" |
| case_id = case_id or f"user_case_{abs(hash(user_facts)) % 10**9}" |
|
|
| |
| if domain is None and self.domain_clf is not None: |
| try: |
| domain = self.domain_clf(user_facts).get("top") |
| except Exception: |
| domain = None |
|
|
| case_type = _DOMAIN_TO_CASE_TYPE.get((domain or "").lower(), CaseType.CIVIL) |
|
|
| |
| similar_cases = self._retrieve_similar_cases( |
| user_facts, domain=domain, top_k=top_k |
| ) |
|
|
| |
| accepted_for_user, rejected_for_user, accepted_for_other = ( |
| self._extract_argument_patterns(similar_cases, user_side=side) |
| ) |
|
|
| |
| |
| analyzer_case = self.case_analyzer.create_case( |
| title=f"User case {case_id[:8]}", |
| case_type=case_type, |
| jurisdiction="IL", |
| facts=[user_facts], |
| ) |
| analyzer_case_id = analyzer_case.id |
| factors = [] |
| for ap in accepted_for_user: |
| f = self.case_analyzer.add_factor( |
| case_id=analyzer_case_id, |
| factor_type=self._map_argtype_to_factortype(ap.get("type", "factual")), |
| name=ap.get("title", "טיעון"), |
| description=ap.get("text", ""), |
| direction=+1.0 * ap.get("strength", 0.6), |
| weight=ap.get("strength", 0.6), |
| ) |
| factors.append(f) |
| for ap in accepted_for_other: |
| f = self.case_analyzer.add_factor( |
| case_id=analyzer_case_id, |
| factor_type=self._map_argtype_to_factortype(ap.get("type", "factual")), |
| name=ap.get("title", "טיעון נגדי"), |
| description=ap.get("text", ""), |
| direction=-1.0 * ap.get("strength", 0.5), |
| weight=ap.get("strength", 0.5), |
| ) |
| factors.append(f) |
|
|
| |
| try: |
| pred = self.case_analyzer.predict_outcome(analyzer_case_id) |
| |
| if hasattr(pred, "to_dict"): |
| outcome = pred.to_dict() |
| elif hasattr(pred, "__dict__"): |
| outcome = vars(pred) |
| else: |
| outcome = pred |
| prob = outcome.get("probability", outcome.get("confidence", 0.5)) |
| except Exception: |
| outcome = {"probability": 0.5} |
| prob = 0.5 |
| risk = "low" if prob >= 0.7 else "high" if prob <= 0.35 else "medium" |
|
|
| |
| self.arg_gen.create_argument_set(case_id=case_id, side=side) |
| for ap in accepted_for_user[:5]: |
| self.arg_gen.generate_argument( |
| case_id=case_id, |
| side=side, |
| argument_type=self._map_str_to_argtype(ap.get("type", "factual")), |
| thesis=ap.get("text", "") or ap.get("title", "")[:200], |
| facts=[user_facts], |
| citations=ap.get("supporting_case_ids", []), |
| strategy=RhetoricalStrategy.LOGOS, |
| ) |
| brief = self.arg_gen.generate_brief_outline(case_id, side) |
|
|
| |
| |
| |
| |
| cbr_result = self._run_case_based_extraction( |
| user_facts=user_facts, side=side |
| ) |
|
|
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| outcome_signals = None |
| if cbr_result and cbr_result.get("argument_templates"): |
| try: |
| from .outcome_signals import compute_outcome_signals |
| from ..core.types import Query as _Q |
|
|
| |
| |
| |
| strat = self._get_or_build_stratified_retriever() |
| outcome_map = self._get_or_build_outcome_map() |
| tau_hits = None |
| if strat is not None and outcome_map: |
| try: |
| tau_hits = strat.search(_Q(text=user_facts), k=10) |
| except Exception: |
| tau_hits = None |
|
|
| outcome_signals = compute_outcome_signals( |
| argument_templates=cbr_result.get( |
| "argument_templates", []), |
| drafted_arguments=cbr_result.get( |
| "drafted_arguments_for_user", []), |
| retrieved_hits=tau_hits, |
| outcome_map=(outcome_map or None), |
| ) |
| prob = outcome_signals.omega |
| risk = ("low" if prob >= 0.7 |
| else "high" if prob <= 0.35 |
| else "medium") |
| except Exception: |
| outcome_signals = None |
|
|
| |
| mediation_options = self._suggest_mediation_options( |
| outcome=outcome, factors=factors, domain=domain |
| ) |
|
|
| |
| result = StrategyResult( |
| case_id=case_id, |
| user_facts=user_facts, |
| side=side, |
| detected_domain=domain, |
| detected_case_type=case_type.value, |
| missing_facts=self._infer_missing_facts(user_facts, domain), |
| likely_focus_questions=self._focus_questions_for_domain(domain), |
| arguments_for_user=accepted_for_user[:6], |
| arguments_against_user=accepted_for_other[:5], |
| supporting_cases=[ |
| {"case_id": c.get("case_id"), "title": c.get("title"), |
| "score": c.get("score")} for c in similar_cases[:5] |
| ], |
| harmful_cases=self._detect_harmful_cases(similar_cases, side, user_facts), |
| outcome_probability=prob, |
| risk_level=risk, |
| factors=[f.to_dict() for f in factors], |
| mediation_options=mediation_options, |
| recommended_brief_outline=brief, |
| case_based_arguments=cbr_result, |
| outcome_signals=( |
| outcome_signals.to_dict() if outcome_signals else {} |
| ), |
| confidence={ |
| "domain_detection": 0.85 if domain else 0.4, |
| "argument_extraction": 0.7 if self.arg_clf else 0.5, |
| "outcome_prediction": 0.6, |
| "overall": 0.65, |
| }, |
| ) |
|
|
| |
| if self.polish_with_tau_llm: |
| self._polish_with_tau_llm(result) |
|
|
| return result |
|
|
| |
| OUTCOME_MAP_DEFAULT_PATH = "tau_rag/runtime/outcome_map.json" |
|
|
| def _get_or_build_outcome_map( |
| self, |
| max_docs: Optional[int] = 5000, |
| persist_path: Optional[str] = None, |
| ) -> Dict[str, Optional[str]]: |
| """Lazy-build a doc_id → outcome map from the indexed corpus. |
| |
| Three-level resolution: |
| 1. In-memory member (fastest) |
| 2. Disk JSON at `persist_path` (next-fastest, survives restarts) |
| 3. Build from scratch (slow — runs detect_outcome on every doc) |
| |
| Cost of build-from-scratch: ~10ms per doc (regex on operative |
| section). On 5K-doc cap → ~50 sec. On full 134K-substantive corpus |
| → ~20 min. The persisted JSON saves this cost across restarts. |
| |
| Args: |
| max_docs: cap docs scanned (None = no cap). Default 5000. |
| persist_path: where to read/write the JSON cache. Default |
| tau_rag/runtime/outcome_map.json. |
| """ |
| if self._outcome_map is not None: |
| return self._outcome_map |
|
|
| |
| path = persist_path or self.OUTCOME_MAP_DEFAULT_PATH |
| try: |
| import json as _json |
| import os as _os |
| if _os.path.exists(path): |
| with open(path, "r") as f: |
| data = _json.load(f) |
| if isinstance(data, dict) and "outcome_map" in data: |
| self._outcome_map = data["outcome_map"] |
| return self._outcome_map |
| except Exception: |
| pass |
|
|
| |
| if self._pipeline is None: |
| return {} |
| try: |
| from ..retrieve.stratified import build_outcome_map |
| from ..scripts.build_polarity_lexicon import detect_outcome |
| except Exception: |
| return {} |
| indexed = getattr(self._pipeline, "_indexed_docs", None) or [] |
| if not indexed: |
| return {} |
| if max_docs is not None and len(indexed) > max_docs: |
| sample = indexed[:max_docs] |
| else: |
| sample = indexed |
| self._outcome_map = build_outcome_map( |
| sample, detect_outcome_fn=detect_outcome |
| ) |
| |
| try: |
| import json as _json |
| import os as _os |
| _os.makedirs(_os.path.dirname(path), exist_ok=True) |
| n_acc = sum(1 for v in self._outcome_map.values() |
| if v == "accepted") |
| n_rej = sum(1 for v in self._outcome_map.values() |
| if v == "rejected") |
| with open(path, "w") as f: |
| _json.dump({ |
| "outcome_map": self._outcome_map, |
| "n_total": len(self._outcome_map), |
| "n_accepted": n_acc, |
| "n_rejected": n_rej, |
| "max_docs_scanned": len(sample), |
| }, f, ensure_ascii=False) |
| except Exception: |
| pass |
| return self._outcome_map |
|
|
| def _get_or_build_stratified_retriever(self): |
| """Wrap the CBR retriever in a StratifiedRetriever using the |
| cached outcome_map. Lazy-built on first use.""" |
| if self._stratified_retriever is not None: |
| return self._stratified_retriever |
| outcome_map = self._get_or_build_outcome_map() |
| if not outcome_map: |
| return None |
| try: |
| from ..retrieve.stratified import StratifiedRetriever |
| inner = self.cbr_retriever or self.retriever |
| if inner is None: |
| return None |
| self._stratified_retriever = StratifiedRetriever( |
| inner=inner, outcome_map=outcome_map, |
| pool_factor=12, balance="balanced", |
| ) |
| except Exception: |
| return None |
| return self._stratified_retriever |
|
|
| def reset_outcome_cache(self, delete_disk: bool = True) -> None: |
| """Clear cached outcome_map + stratified retriever. Call after |
| re-indexing the corpus. |
| |
| If delete_disk=True (default), also removes the persisted JSON |
| cache so the next build runs fresh. |
| """ |
| self._outcome_map = None |
| self._stratified_retriever = None |
| if delete_disk: |
| try: |
| import os as _os |
| if _os.path.exists(self.OUTCOME_MAP_DEFAULT_PATH): |
| _os.remove(self.OUTCOME_MAP_DEFAULT_PATH) |
| except Exception: |
| pass |
|
|
| |
| def _run_case_based_extraction( |
| self, user_facts: str, side: str |
| ) -> Dict[str, Any]: |
| """Run the CBR extractor over the same retriever's results. |
| |
| Returns the extractor's structured output, or `{}` if no retriever |
| is wired or extraction failed. We map our `plaintiff` / |
| `defendant` side labels onto the extractor's `claimant` / |
| `respondent` vocabulary at the boundary. |
| """ |
| retriever = self.cbr_retriever |
| if retriever is None: |
| return {} |
| |
| if self.cbr_extractor is None: |
| try: |
| from .case_based_arguments import CaseBasedArgumentExtractor |
| self.cbr_extractor = CaseBasedArgumentExtractor( |
| retriever=retriever, |
| tau_llm_polish=self.polish_with_tau_llm, |
| ) |
| except Exception: |
| return {} |
| |
| side_map = {"plaintiff": "claimant", "defendant": "respondent"} |
| cbr_side = side_map.get(side, side) |
| try: |
| return self.cbr_extractor.extract_and_draft( |
| user_facts=user_facts, |
| side=cbr_side, |
| top_k_cases=10, |
| full_text_loader=self.full_text_loader, |
| ) |
| except Exception as e: |
| return {"error": f"cbr_failed: {e}"} |
|
|
| |
| def _retrieve_similar_cases( |
| self, user_facts: str, domain: Optional[str], top_k: int |
| ) -> List[Dict[str, Any]]: |
| """Use the existing tau-rag MultiRetriever to fetch precedents.""" |
| if self.retriever is None: |
| return [] |
| try: |
| from ..core.types import Query |
| q = Query(text=user_facts) |
| if domain: |
| q.filters = {"domain": domain} |
| hits = self.retriever.search(q, k=top_k) |
| return [{ |
| "case_id": h.chunk.doc_id, |
| "title": (h.chunk.metadata or {}).get("title", h.chunk.doc_id), |
| "text": h.chunk.text, |
| "score": float(h.score), |
| "metadata": h.chunk.metadata or {}, |
| } for h in hits] |
| except Exception as e: |
| return [] |
|
|
| |
| def _extract_argument_patterns( |
| self, similar_cases: List[Dict[str, Any]], user_side: str |
| ): |
| """Extract accepted/rejected argument patterns from similar cases. |
| |
| If `argument_classifier` is provided (fine-tuned model), use it. |
| Otherwise fall back to rule-based pattern matching. |
| """ |
| accepted_for_user: List[Dict[str, Any]] = [] |
| rejected_for_user: List[Dict[str, Any]] = [] |
| accepted_for_other: List[Dict[str, Any]] = [] |
|
|
| for case in similar_cases: |
| text = case.get("text", "") or "" |
| paragraphs = self._split_paragraphs(text) |
| for para in paragraphs: |
| if len(para) < 60: |
| continue |
| pred = self._classify_paragraph(para) |
| if not pred or not pred.get("is_argument"): |
| continue |
| arg_record = { |
| "title": para[:80], |
| "text": para, |
| "type": pred.get("arg_type", "factual"), |
| "strength": pred.get("strength", 0.6), |
| "confidence": pred.get("confidence", 0.6), |
| "supporting_case_ids": [case.get("case_id")], |
| } |
| outcome = pred.get("outcome", "unknown") |
| side_match = pred.get("side", "unknown") == user_side |
| if outcome == "accepted": |
| if side_match: |
| accepted_for_user.append(arg_record) |
| else: |
| accepted_for_other.append(arg_record) |
| elif outcome == "rejected" and side_match: |
| rejected_for_user.append(arg_record) |
|
|
| accepted_for_user.sort(key=lambda x: -x["strength"]) |
| accepted_for_other.sort(key=lambda x: -x["strength"]) |
| return accepted_for_user, rejected_for_user, accepted_for_other |
|
|
| def _classify_paragraph(self, paragraph: str) -> Dict[str, Any]: |
| """Classify a paragraph using the fine-tuned model OR rule fallback.""" |
| if self.arg_clf is not None: |
| try: |
| return self.arg_clf(paragraph) |
| except Exception: |
| pass |
| |
| return self._rule_based_classify(paragraph) |
|
|
| def _rule_based_classify(self, paragraph: str) -> Dict[str, Any]: |
| """Rule-based fallback when no fine-tuned classifier is loaded. |
| |
| Heuristics designed for Hebrew legal text — pattern matching on |
| characteristic phrases. Imperfect but produces useful first cut while |
| labeled data is being collected for the trained classifier. |
| """ |
| accepted_markers = ["נקבע", "אכן", "מקובל עלי", "אני מקבל", "הצדק עם", |
| "יש לקבל", "התביעה מתקבלת"] |
| rejected_markers = ["אין לקבל", "נדחה", "אין מקום", "איני מקבל", |
| "התביעה נדחית", "אין בסיס"] |
| argument_markers = ["טוען", "טענה", "לטעמ", "לעמדת", "נטען", |
| "סבור", "גורס"] |
| plaintiff_markers = ["התובע", "המערער", "העותר"] |
| defendant_markers = ["הנתבע", "המשיב", "הנאשם"] |
|
|
| para_lower = paragraph |
| is_argument = any(m in para_lower for m in argument_markers) |
| if not is_argument: |
| return {"is_argument": False} |
|
|
| outcome = "unknown" |
| if any(m in para_lower for m in accepted_markers): |
| outcome = "accepted" |
| elif any(m in para_lower for m in rejected_markers): |
| outcome = "rejected" |
|
|
| side = "unknown" |
| if any(m in para_lower for m in plaintiff_markers): |
| side = "plaintiff" |
| elif any(m in para_lower for m in defendant_markers): |
| side = "defendant" |
|
|
| |
| arg_type = "factual" |
| if "סעיף" in para_lower or "חוק" in para_lower: |
| arg_type = "legal" |
| if "נסיבות" in para_lower or "צדק" in para_lower: |
| arg_type = "equitable" |
| if "תקנה" in para_lower or "סדר הדין" in para_lower: |
| arg_type = "procedural" |
| if "מדיניות" in para_lower or "אינטרס הציבור" in para_lower: |
| arg_type = "policy" |
|
|
| return { |
| "is_argument": True, |
| "outcome": outcome, |
| "side": side, |
| "arg_type": arg_type, |
| "strength": 0.65 if outcome == "accepted" else 0.45, |
| "confidence": 0.55, |
| } |
|
|
| |
| def _detect_harmful_cases( |
| self, similar_cases: List[Dict[str, Any]], side: str, user_facts: str |
| ) -> List[Dict[str, Any]]: |
| """Find similar precedents where user's side LOST — these are harmful.""" |
| harmful = [] |
| rejection_markers = ["נדחתה", "התביעה נדחית", "אין לקבל", "איני מקבל"] |
| for case in similar_cases: |
| text = (case.get("text") or "") |
| if any(m in text for m in rejection_markers): |
| harmful.append({ |
| "case_id": case.get("case_id"), |
| "title": case.get("title"), |
| "why_harmful": "פסיקה דומה דחתה תביעה דומה", |
| "distinguishing_strategy": ( |
| "להבחין באמצעות תיעוד טוב יותר של עובדות מהותיות" |
| ), |
| }) |
| return harmful[:3] |
|
|
| |
| def _suggest_mediation_options( |
| self, outcome: Dict[str, Any], factors: List, domain: Optional[str] |
| ) -> List[Dict[str, Any]]: |
| prob = outcome.get("probability", 0.5) |
| |
| if 0.5 <= prob <= 0.75: |
| return [ |
| { |
| "type": "partial_compensation", |
| "title": "פיצוי חלקי + תיקון", |
| "best_when": ["אחריות סבירה", "נזק לא ודאי"], |
| "sample_terms": [ |
| "תיקון תוך 30 יום", |
| "פיצוי חלקי במקום פיצוי מלא", |
| "ויתור הדדי לסילוק סופי", |
| ], |
| }, |
| { |
| "type": "expert_evaluation", |
| "title": "מינוי מומחה מוסכם", |
| "best_when": ["חילוקי דעות על היקף נזק"], |
| "sample_terms": [ |
| "מומחה מוסכם להערכת ליקויים", |
| "כל צד נושא במחצית העלות", |
| ], |
| }, |
| ] |
| elif prob > 0.75: |
| return [{ |
| "type": "fast_settlement", |
| "title": "פשרה מהירה לטובת הצד החזק", |
| "best_when": ["אחריות חזקה", "ראיות מוכחות"], |
| "sample_terms": [ |
| "תשלום מלא בפריסה", |
| "התחייבות לאי-חזרה על ההתנהגות", |
| ], |
| }] |
| else: |
| return [{ |
| "type": "low_payment_close", |
| "title": "סילוק נמוך וסיום מהיר", |
| "best_when": ["סיכוי הצלחה נמוך", "עלויות התדיינות גבוהות"], |
| "sample_terms": [ |
| "תשלום סמלי לסיום סופי", |
| "ויתור הדדי על כל טענה", |
| ], |
| }] |
|
|
| |
| def _focus_questions_for_domain(self, domain: Optional[str]) -> List[str]: |
| DOMAIN_QUESTIONS = { |
| "contracts": [ |
| "האם נקבע בהסכם מועד ביצוע מפורש?", |
| "האם נשלחה התראה בכתב לצד השני?", |
| "האם ניתנה הזדמנות לתקן את ההפרה?", |
| "האם הנזק תועד וכומת?", |
| "האם הייתה הסכמה משתמעת לעיכוב?", |
| ], |
| "torts": [ |
| "האם קיימת חובת זהירות בנסיבות העניין?", |
| "האם הופרה חובת הזהירות?", |
| "האם קיים קשר סיבתי בין ההפרה לנזק?", |
| "האם הנזק היה צפוי?", |
| "האם הניזוק תרם לנזק?", |
| ], |
| "property": [ |
| "מי בעל הזכות הרשומה?", |
| "האם קיימת זכות נוגדת?", |
| "האם נעשתה החזקה ארוכת טווח?", |
| "האם הוכחה תום לב?", |
| ], |
| "family": [ |
| "מהן ההסכמות החתומות בין הצדדים?", |
| "מהי טובת הקטין?", |
| "מה היכולת הכלכלית של הצדדים?", |
| "האם ניתנה הסכמה מודעת?", |
| ], |
| "labor": [ |
| "האם קיים חוזה עבודה?", |
| "האם נמסרה הודעה מוקדמת?", |
| "האם בוצעו תשלומי החובה?", |
| "האם ניתנה זכות שימוע?", |
| ], |
| } |
| |
| for key, qs in DOMAIN_QUESTIONS.items(): |
| if key in (domain or "").lower(): |
| return qs |
| |
| if domain == "חוזים": return DOMAIN_QUESTIONS["contracts"] |
| if domain == "נזיקין": return DOMAIN_QUESTIONS["torts"] |
| if domain == "מקרקעין": return DOMAIN_QUESTIONS["property"] |
| if domain == "משפחה": return DOMAIN_QUESTIONS["family"] |
| if domain == "עבודה": return DOMAIN_QUESTIONS["labor"] |
| return [ |
| "מהן העובדות המרכזיות שניתן להוכיח בכתב?", |
| "מהו הנזק הספציפי?", |
| "אילו אסמכתאות פסיקתיות תומכות בעמדה?", |
| "מהם הסיכונים העיקריים?", |
| ] |
|
|
| def _infer_missing_facts( |
| self, user_facts: str, domain: Optional[str] |
| ) -> List[str]: |
| """Heuristic — what facts are typically needed for this domain |
| but absent from the user's input.""" |
| text = (user_facts or "").lower() |
| missing = [] |
| |
| if domain in ("contracts", "חוזים"): |
| if "מועד" not in text and "תאריך" not in text: |
| missing.append("מועד מפורש בהסכם") |
| if "התראה" not in text and "מכתב" not in text: |
| missing.append("התראה בכתב לפני הליך משפטי") |
| if "נזק" not in text and "הפסד" not in text: |
| missing.append("תיעוד וכימות הנזק") |
| elif domain in ("torts", "נזיקין"): |
| if "ראיה" not in text and "עדות" not in text: |
| missing.append("ראיות לקיום החובה ולהפרתה") |
| if "קשר" not in text and "סיבת" not in text: |
| missing.append("קשר סיבתי בין ההתנהגות לנזק") |
| return missing |
|
|
| |
| @staticmethod |
| def _split_paragraphs(text: str) -> List[str]: |
| if not text: |
| return [] |
| |
| paras = [] |
| for chunk in text.split("\n\n"): |
| chunk = chunk.strip() |
| if chunk: |
| paras.append(chunk) |
| return paras |
|
|
| @staticmethod |
| def _map_str_to_argtype(s: str) -> ArgumentType: |
| m = { |
| "factual": ArgumentType.FACTUAL, |
| "legal": ArgumentType.LEGAL, |
| "procedural": ArgumentType.PROCEDURAL, |
| "policy": ArgumentType.POLICY, |
| "equitable": ArgumentType.EQUITABLE, |
| "constitutional": ArgumentType.CONSTITUTIONAL, |
| "substantive": ArgumentType.SUBSTANTIVE, |
| } |
| return m.get(s.lower(), ArgumentType.FACTUAL) |
|
|
| @staticmethod |
| def _map_argtype_to_factortype(s: str) -> FactorType: |
| m = { |
| "factual": FactorType.FACTUAL, |
| "legal": FactorType.STATUTORY, |
| "procedural": FactorType.PROCEDURAL, |
| "policy": FactorType.POLICY, |
| "equitable": FactorType.EQUITABLE, |
| "constitutional": FactorType.CONSTITUTIONAL, |
| "substantive": FactorType.STATUTORY, |
| } |
| return m.get(s.lower(), FactorType.FACTUAL) |
|
|
| |
| def _polish_with_tau_llm(self, result: StrategyResult) -> None: |
| """Optional: pass each argument's text through the local TAU LLM |
| for fluency polishing. ZERO external API calls.""" |
| try: |
| from ..generate.tau_native import TauNativeGenerator |
| tau = TauNativeGenerator() |
| for arg in result.arguments_for_user: |
| if arg.get("text"): |
| polished = tau.complete(arg["text"], max_new_tokens=50) |
| if polished and len(polished) > len(arg["text"]) * 0.8: |
| arg["polished_text"] = polished |
| except Exception: |
| pass |
|
|