legal-eye / tau_rag /intelligence /strategy_synthesizer.py
Legal-i's picture
Initial deploy: legal-eye Hebrew legal RAG (17K corpus, verbatim-from-precedent)
3be54c6 verified
raw
history blame contribute delete
37.3 kB
"""StrategySynthesizer — given user facts, produce a legal+mediation strategy
end-to-end using ONLY local components (no external LLM API).
Pipeline:
1. Use tau-rag retrieval to find similar precedent cases.
2. For each precedent, use a small fine-tuned classifier (or rule-based
fallback) to identify accepted/rejected argument patterns.
3. Map argument patterns to LegalFactor objects (positive/negative direction).
4. Use ArgumentGenerator to structure the strongest arguments as IRAC.
5. Use CaseAnalyzer to compute outcome probability + risk.
6. Optionally polish the textual output via the local TAU LLM checkpoint.
This module does NOT call OpenAI/Anthropic. The "training" is a one-time
fine-tune of small classifiers, not per-query LLM calls.
Inputs:
user_facts: Hebrew or English description of the conflict
side: "plaintiff" / "defendant" / "mediator"
domain: optional override; otherwise auto-detected
judges: optional list of judge IDs (for panel-fit scoring)
Output:
StrategyResult — structured analysis with:
• diagnosis (conflict type, missing facts)
• arguments_for / arguments_against
• supporting_cases / harmful_cases
• outcome_probability + risk
• mediation_options
• recommended_brief_outline
"""
from __future__ import annotations
from dataclasses import dataclass, field
from typing import Any, Dict, List, Optional
from .argument_generator import (
ArgumentGenerator,
ArgumentType,
ArgumentStrength,
LegalArgument,
RhetoricalStrategy,
)
from .case_analyzer import (
CaseAnalyzer,
CaseType,
FactorType,
LegalFactor,
)
# Domain → CaseType mapping for the legal Hebrew domains
_DOMAIN_TO_CASE_TYPE: Dict[str, CaseType] = {
"contracts": CaseType.COMMERCIAL,
"חוזים": CaseType.COMMERCIAL,
"torts": CaseType.CIVIL,
"נזיקין": CaseType.CIVIL,
"property": CaseType.CIVIL,
"מקרקעין": CaseType.CIVIL,
"family": CaseType.FAMILY,
"משפחה": CaseType.FAMILY,
"labor": CaseType.LABOR,
"עבודה": CaseType.LABOR,
"administrative": CaseType.ADMINISTRATIVE,
"מנהלי": CaseType.ADMINISTRATIVE,
"criminal": CaseType.CRIMINAL,
"פלילי": CaseType.CRIMINAL,
"constitutional": CaseType.CONSTITUTIONAL,
"חוקתי": CaseType.CONSTITUTIONAL,
"tax": CaseType.TAX,
"מסים": CaseType.TAX,
}
@dataclass
class StrategyResult:
"""End-to-end legal strategy output."""
case_id: str
user_facts: str
side: str
# Diagnosis
detected_domain: Optional[str] = None
detected_case_type: Optional[str] = None
missing_facts: List[str] = field(default_factory=list)
likely_focus_questions: List[str] = field(default_factory=list)
# Arguments
arguments_for_user: List[Dict[str, Any]] = field(default_factory=list)
arguments_against_user: List[Dict[str, Any]] = field(default_factory=list)
# Citations
supporting_cases: List[Dict[str, Any]] = field(default_factory=list)
harmful_cases: List[Dict[str, Any]] = field(default_factory=list)
# Outcome
outcome_probability: float = 0.5
risk_level: str = "medium" # low | medium | high
factors: List[Dict[str, Any]] = field(default_factory=list)
# Mediation
mediation_options: List[Dict[str, Any]] = field(default_factory=list)
# Brief
recommended_brief_outline: Dict[str, Any] = field(default_factory=dict)
# Case-Based Reasoning — structured argument templates extracted from
# similar judgments via judgment_structurer + hebrew_encoder.
# Each template tracks which case(s) it came from, statute/case citations,
# and the court's outcome pattern. Drafted arguments are template-fills
# adapted to the user's facts (optionally polished by the local TAU LLM).
case_based_arguments: Dict[str, Any] = field(default_factory=dict)
# Universal-signals decomposition of outcome probability:
# τ (argument strength), ψ (template coherence), φ (alignment with
# precedent pattern), ξ (anomaly), Ω (geometric-mean health → P(success)).
# Replaces the heuristic CaseAnalyzer prediction when CBR templates exist.
outcome_signals: Dict[str, Any] = field(default_factory=dict)
# Confidence
confidence: Dict[str, float] = field(default_factory=dict)
disclaimer: str = (
"ניתוח זה הוא AI מבוסס פסיקה דומה ולא מהווה ייעוץ משפטי מחייב. "
"התוצאות הספציפיות תלויות בנסיבות העובדתיות שיוכחו בפועל."
)
def to_dict(self) -> Dict[str, Any]:
return {
"case_id": self.case_id,
"user_facts": self.user_facts,
"side": self.side,
"detected_domain": self.detected_domain,
"detected_case_type": self.detected_case_type,
"missing_facts": self.missing_facts,
"likely_focus_questions": self.likely_focus_questions,
"arguments_for_user": self.arguments_for_user,
"arguments_against_user": self.arguments_against_user,
"supporting_cases": self.supporting_cases,
"harmful_cases": self.harmful_cases,
"outcome_probability": self.outcome_probability,
"risk_level": self.risk_level,
"factors": self.factors,
"mediation_options": self.mediation_options,
"recommended_brief_outline": self.recommended_brief_outline,
"case_based_arguments": self.case_based_arguments,
"outcome_signals": self.outcome_signals,
"confidence": self.confidence,
"disclaimer": self.disclaimer,
}
class StrategySynthesizer:
"""End-to-end legal strategy synthesizer (local-only)."""
def __init__(
self,
argument_generator: Optional[ArgumentGenerator] = None,
case_analyzer: Optional[CaseAnalyzer] = None,
retriever=None, # tau_rag MultiRetriever
argument_classifier=None, # fine-tuned binary/3-class model
domain_classifier=None, # existing tau-rag classifier
polish_with_tau_llm: bool = False, # optional final-polish layer
case_based_extractor=None, # CaseBasedArgumentExtractor (optional)
full_text_loader=None, # callable(case_id) → full text
cbr_retriever=None, # dedicated retriever for CBR (e.g. hebrew_encoder only)
pipeline=None, # full Pipeline — needed to build outcome_map
):
self.arg_gen = argument_generator or ArgumentGenerator()
self.case_analyzer = case_analyzer or CaseAnalyzer()
self.retriever = retriever
self.arg_clf = argument_classifier
self.domain_clf = domain_classifier
self.polish_with_tau_llm = polish_with_tau_llm
self.cbr_extractor = case_based_extractor
self.cbr_retriever = cbr_retriever or retriever
self.full_text_loader = full_text_loader
self._pipeline = pipeline
# Lazy-built on first synthesize() call: outcome_map for stratified
# retrieval + delta-based τ. Caching it as a member avoids rebuilding
# for every request (it's expensive — runs detect_outcome on every
# indexed doc). Cleared with .reset_outcome_cache() if corpus changes.
self._outcome_map = None
self._stratified_retriever = None
# ---------------------------------------------------------------- main entry
def synthesize(
self,
user_facts: str,
side: str = "plaintiff",
domain: Optional[str] = None,
judges: Optional[List[str]] = None,
case_id: Optional[str] = None,
top_k: int = 20,
) -> StrategyResult:
"""Run the full pipeline. Returns a StrategyResult."""
case_id = case_id or f"user_case_{abs(hash(user_facts)) % 10**9}"
# 1. Domain detection ------------------------------------------------
if domain is None and self.domain_clf is not None:
try:
domain = self.domain_clf(user_facts).get("top")
except Exception:
domain = None
case_type = _DOMAIN_TO_CASE_TYPE.get((domain or "").lower(), CaseType.CIVIL)
# 2. Retrieval -------------------------------------------------------
similar_cases = self._retrieve_similar_cases(
user_facts, domain=domain, top_k=top_k
)
# 3. Argument pattern extraction from similar cases ------------------
accepted_for_user, rejected_for_user, accepted_for_other = (
self._extract_argument_patterns(similar_cases, user_side=side)
)
# 4. Map to LegalFactors --------------------------------------------
# CaseAnalyzer auto-generates an ID from a hash; we override after.
analyzer_case = self.case_analyzer.create_case(
title=f"User case {case_id[:8]}",
case_type=case_type,
jurisdiction="IL",
facts=[user_facts],
)
analyzer_case_id = analyzer_case.id
factors = []
for ap in accepted_for_user:
f = self.case_analyzer.add_factor(
case_id=analyzer_case_id,
factor_type=self._map_argtype_to_factortype(ap.get("type", "factual")),
name=ap.get("title", "טיעון"),
description=ap.get("text", ""),
direction=+1.0 * ap.get("strength", 0.6),
weight=ap.get("strength", 0.6),
)
factors.append(f)
for ap in accepted_for_other:
f = self.case_analyzer.add_factor(
case_id=analyzer_case_id,
factor_type=self._map_argtype_to_factortype(ap.get("type", "factual")),
name=ap.get("title", "טיעון נגדי"),
description=ap.get("text", ""),
direction=-1.0 * ap.get("strength", 0.5),
weight=ap.get("strength", 0.5),
)
factors.append(f)
# 5. Outcome prediction ---------------------------------------------
try:
pred = self.case_analyzer.predict_outcome(analyzer_case_id)
# PredictionResult may be a dataclass or dict; handle both
if hasattr(pred, "to_dict"):
outcome = pred.to_dict()
elif hasattr(pred, "__dict__"):
outcome = vars(pred)
else:
outcome = pred
prob = outcome.get("probability", outcome.get("confidence", 0.5))
except Exception:
outcome = {"probability": 0.5}
prob = 0.5
risk = "low" if prob >= 0.7 else "high" if prob <= 0.35 else "medium"
# 6. Build structured arguments via ArgumentGenerator ---------------
self.arg_gen.create_argument_set(case_id=case_id, side=side)
for ap in accepted_for_user[:5]:
self.arg_gen.generate_argument(
case_id=case_id,
side=side,
argument_type=self._map_str_to_argtype(ap.get("type", "factual")),
thesis=ap.get("text", "") or ap.get("title", "")[:200],
facts=[user_facts],
citations=ap.get("supporting_case_ids", []),
strategy=RhetoricalStrategy.LOGOS,
)
brief = self.arg_gen.generate_brief_outline(case_id, side)
# 7. Case-Based Reasoning — extract structured argument templates
# from similar judgments using judgment_structurer + side detection.
# This is additive to the rule-based extraction above: it produces
# *clustered* templates with sources + outcomes + drafted text.
cbr_result = self._run_case_based_extraction(
user_facts=user_facts, side=side
)
# 7.5 Universal-signals outcome — empirically validated at 88%
# accuracy on real Hebrew judgments (vs 83% baseline) with
# 100% precision on ACCEPT predictions when delta-τ signal
# is available.
#
# Pipeline:
# 1. Build/cache outcome_map (doc_id → accepted/rejected/None)
# 2. Stratified-retrieve hits balanced across outcome classes
# 3. compute_outcome_signals derives τ from similarity-delta,
# φ from corpus outcome distribution, etc.
outcome_signals = None
if cbr_result and cbr_result.get("argument_templates"):
try:
from .outcome_signals import compute_outcome_signals
from ..core.types import Query as _Q
# Get the stratified retriever (built lazily, cached).
# If unavailable (no corpus / build failed), Ω falls back
# to the lexicon path automatically — backward compatible.
strat = self._get_or_build_stratified_retriever()
outcome_map = self._get_or_build_outcome_map()
tau_hits = None
if strat is not None and outcome_map:
try:
tau_hits = strat.search(_Q(text=user_facts), k=10)
except Exception:
tau_hits = None
outcome_signals = compute_outcome_signals(
argument_templates=cbr_result.get(
"argument_templates", []),
drafted_arguments=cbr_result.get(
"drafted_arguments_for_user", []),
retrieved_hits=tau_hits,
outcome_map=(outcome_map or None),
)
prob = outcome_signals.omega
risk = ("low" if prob >= 0.7
else "high" if prob <= 0.35
else "medium")
except Exception:
outcome_signals = None
# 8. Mediation options ----------------------------------------------
mediation_options = self._suggest_mediation_options(
outcome=outcome, factors=factors, domain=domain
)
# 9. Compose result --------------------------------------------------
result = StrategyResult(
case_id=case_id,
user_facts=user_facts,
side=side,
detected_domain=domain,
detected_case_type=case_type.value,
missing_facts=self._infer_missing_facts(user_facts, domain),
likely_focus_questions=self._focus_questions_for_domain(domain),
arguments_for_user=accepted_for_user[:6],
arguments_against_user=accepted_for_other[:5],
supporting_cases=[
{"case_id": c.get("case_id"), "title": c.get("title"),
"score": c.get("score")} for c in similar_cases[:5]
],
harmful_cases=self._detect_harmful_cases(similar_cases, side, user_facts),
outcome_probability=prob,
risk_level=risk,
factors=[f.to_dict() for f in factors],
mediation_options=mediation_options,
recommended_brief_outline=brief,
case_based_arguments=cbr_result,
outcome_signals=(
outcome_signals.to_dict() if outcome_signals else {}
),
confidence={
"domain_detection": 0.85 if domain else 0.4,
"argument_extraction": 0.7 if self.arg_clf else 0.5,
"outcome_prediction": 0.6,
"overall": 0.65,
},
)
# 10. Optional polish via local TAU LLM (no external API) ----------
if self.polish_with_tau_llm:
self._polish_with_tau_llm(result)
return result
# ────────────────────────────────────────────────────── outcome map cache
OUTCOME_MAP_DEFAULT_PATH = "tau_rag/runtime/outcome_map.json"
def _get_or_build_outcome_map(
self,
max_docs: Optional[int] = 5000,
persist_path: Optional[str] = None,
) -> Dict[str, Optional[str]]:
"""Lazy-build a doc_id → outcome map from the indexed corpus.
Three-level resolution:
1. In-memory member (fastest)
2. Disk JSON at `persist_path` (next-fastest, survives restarts)
3. Build from scratch (slow — runs detect_outcome on every doc)
Cost of build-from-scratch: ~10ms per doc (regex on operative
section). On 5K-doc cap → ~50 sec. On full 134K-substantive corpus
→ ~20 min. The persisted JSON saves this cost across restarts.
Args:
max_docs: cap docs scanned (None = no cap). Default 5000.
persist_path: where to read/write the JSON cache. Default
tau_rag/runtime/outcome_map.json.
"""
if self._outcome_map is not None:
return self._outcome_map
# Level 2: try disk
path = persist_path or self.OUTCOME_MAP_DEFAULT_PATH
try:
import json as _json
import os as _os
if _os.path.exists(path):
with open(path, "r") as f:
data = _json.load(f)
if isinstance(data, dict) and "outcome_map" in data:
self._outcome_map = data["outcome_map"]
return self._outcome_map
except Exception:
pass
# Level 3: build from scratch
if self._pipeline is None:
return {}
try:
from ..retrieve.stratified import build_outcome_map
from ..scripts.build_polarity_lexicon import detect_outcome
except Exception:
return {}
indexed = getattr(self._pipeline, "_indexed_docs", None) or []
if not indexed:
return {}
if max_docs is not None and len(indexed) > max_docs:
sample = indexed[:max_docs]
else:
sample = indexed
self._outcome_map = build_outcome_map(
sample, detect_outcome_fn=detect_outcome
)
# Persist to disk for next restart
try:
import json as _json
import os as _os
_os.makedirs(_os.path.dirname(path), exist_ok=True)
n_acc = sum(1 for v in self._outcome_map.values()
if v == "accepted")
n_rej = sum(1 for v in self._outcome_map.values()
if v == "rejected")
with open(path, "w") as f:
_json.dump({
"outcome_map": self._outcome_map,
"n_total": len(self._outcome_map),
"n_accepted": n_acc,
"n_rejected": n_rej,
"max_docs_scanned": len(sample),
}, f, ensure_ascii=False)
except Exception:
pass # cache miss is OK — just slower next time
return self._outcome_map
def _get_or_build_stratified_retriever(self):
"""Wrap the CBR retriever in a StratifiedRetriever using the
cached outcome_map. Lazy-built on first use."""
if self._stratified_retriever is not None:
return self._stratified_retriever
outcome_map = self._get_or_build_outcome_map()
if not outcome_map:
return None
try:
from ..retrieve.stratified import StratifiedRetriever
inner = self.cbr_retriever or self.retriever
if inner is None:
return None
self._stratified_retriever = StratifiedRetriever(
inner=inner, outcome_map=outcome_map,
pool_factor=12, balance="balanced",
)
except Exception:
return None
return self._stratified_retriever
def reset_outcome_cache(self, delete_disk: bool = True) -> None:
"""Clear cached outcome_map + stratified retriever. Call after
re-indexing the corpus.
If delete_disk=True (default), also removes the persisted JSON
cache so the next build runs fresh.
"""
self._outcome_map = None
self._stratified_retriever = None
if delete_disk:
try:
import os as _os
if _os.path.exists(self.OUTCOME_MAP_DEFAULT_PATH):
_os.remove(self.OUTCOME_MAP_DEFAULT_PATH)
except Exception:
pass
# --------------------------------------------------- case-based extraction
def _run_case_based_extraction(
self, user_facts: str, side: str
) -> Dict[str, Any]:
"""Run the CBR extractor over the same retriever's results.
Returns the extractor's structured output, or `{}` if no retriever
is wired or extraction failed. We map our `plaintiff` /
`defendant` side labels onto the extractor's `claimant` /
`respondent` vocabulary at the boundary.
"""
retriever = self.cbr_retriever
if retriever is None:
return {}
# Lazy-init extractor on first use
if self.cbr_extractor is None:
try:
from .case_based_arguments import CaseBasedArgumentExtractor
self.cbr_extractor = CaseBasedArgumentExtractor(
retriever=retriever,
tau_llm_polish=self.polish_with_tau_llm,
)
except Exception:
return {}
# Translate side
side_map = {"plaintiff": "claimant", "defendant": "respondent"}
cbr_side = side_map.get(side, side)
try:
return self.cbr_extractor.extract_and_draft(
user_facts=user_facts,
side=cbr_side,
top_k_cases=10,
full_text_loader=self.full_text_loader,
)
except Exception as e:
return {"error": f"cbr_failed: {e}"}
# ------------------------------------------------------------ retrieval
def _retrieve_similar_cases(
self, user_facts: str, domain: Optional[str], top_k: int
) -> List[Dict[str, Any]]:
"""Use the existing tau-rag MultiRetriever to fetch precedents."""
if self.retriever is None:
return []
try:
from ..core.types import Query
q = Query(text=user_facts)
if domain:
q.filters = {"domain": domain}
hits = self.retriever.search(q, k=top_k)
return [{
"case_id": h.chunk.doc_id,
"title": (h.chunk.metadata or {}).get("title", h.chunk.doc_id),
"text": h.chunk.text,
"score": float(h.score),
"metadata": h.chunk.metadata or {},
} for h in hits]
except Exception as e:
return []
# -------------------------------------------------------- arg extraction
def _extract_argument_patterns(
self, similar_cases: List[Dict[str, Any]], user_side: str
):
"""Extract accepted/rejected argument patterns from similar cases.
If `argument_classifier` is provided (fine-tuned model), use it.
Otherwise fall back to rule-based pattern matching.
"""
accepted_for_user: List[Dict[str, Any]] = []
rejected_for_user: List[Dict[str, Any]] = []
accepted_for_other: List[Dict[str, Any]] = []
for case in similar_cases:
text = case.get("text", "") or ""
paragraphs = self._split_paragraphs(text)
for para in paragraphs:
if len(para) < 60:
continue
pred = self._classify_paragraph(para)
if not pred or not pred.get("is_argument"):
continue
arg_record = {
"title": para[:80],
"text": para,
"type": pred.get("arg_type", "factual"),
"strength": pred.get("strength", 0.6),
"confidence": pred.get("confidence", 0.6),
"supporting_case_ids": [case.get("case_id")],
}
outcome = pred.get("outcome", "unknown")
side_match = pred.get("side", "unknown") == user_side
if outcome == "accepted":
if side_match:
accepted_for_user.append(arg_record)
else:
accepted_for_other.append(arg_record)
elif outcome == "rejected" and side_match:
rejected_for_user.append(arg_record)
accepted_for_user.sort(key=lambda x: -x["strength"])
accepted_for_other.sort(key=lambda x: -x["strength"])
return accepted_for_user, rejected_for_user, accepted_for_other
def _classify_paragraph(self, paragraph: str) -> Dict[str, Any]:
"""Classify a paragraph using the fine-tuned model OR rule fallback."""
if self.arg_clf is not None:
try:
return self.arg_clf(paragraph)
except Exception:
pass
# Rule-based fallback — Hebrew legal heuristics
return self._rule_based_classify(paragraph)
def _rule_based_classify(self, paragraph: str) -> Dict[str, Any]:
"""Rule-based fallback when no fine-tuned classifier is loaded.
Heuristics designed for Hebrew legal text — pattern matching on
characteristic phrases. Imperfect but produces useful first cut while
labeled data is being collected for the trained classifier.
"""
accepted_markers = ["נקבע", "אכן", "מקובל עלי", "אני מקבל", "הצדק עם",
"יש לקבל", "התביעה מתקבלת"]
rejected_markers = ["אין לקבל", "נדחה", "אין מקום", "איני מקבל",
"התביעה נדחית", "אין בסיס"]
argument_markers = ["טוען", "טענה", "לטעמ", "לעמדת", "נטען",
"סבור", "גורס"]
plaintiff_markers = ["התובע", "המערער", "העותר"]
defendant_markers = ["הנתבע", "המשיב", "הנאשם"]
para_lower = paragraph
is_argument = any(m in para_lower for m in argument_markers)
if not is_argument:
return {"is_argument": False}
outcome = "unknown"
if any(m in para_lower for m in accepted_markers):
outcome = "accepted"
elif any(m in para_lower for m in rejected_markers):
outcome = "rejected"
side = "unknown"
if any(m in para_lower for m in plaintiff_markers):
side = "plaintiff"
elif any(m in para_lower for m in defendant_markers):
side = "defendant"
# Type heuristic
arg_type = "factual"
if "סעיף" in para_lower or "חוק" in para_lower:
arg_type = "legal"
if "נסיבות" in para_lower or "צדק" in para_lower:
arg_type = "equitable"
if "תקנה" in para_lower or "סדר הדין" in para_lower:
arg_type = "procedural"
if "מדיניות" in para_lower or "אינטרס הציבור" in para_lower:
arg_type = "policy"
return {
"is_argument": True,
"outcome": outcome,
"side": side,
"arg_type": arg_type,
"strength": 0.65 if outcome == "accepted" else 0.45,
"confidence": 0.55, # low confidence — rule-based only
}
# ------------------------------------------------------- harmful detection
def _detect_harmful_cases(
self, similar_cases: List[Dict[str, Any]], side: str, user_facts: str
) -> List[Dict[str, Any]]:
"""Find similar precedents where user's side LOST — these are harmful."""
harmful = []
rejection_markers = ["נדחתה", "התביעה נדחית", "אין לקבל", "איני מקבל"]
for case in similar_cases:
text = (case.get("text") or "")
if any(m in text for m in rejection_markers):
harmful.append({
"case_id": case.get("case_id"),
"title": case.get("title"),
"why_harmful": "פסיקה דומה דחתה תביעה דומה",
"distinguishing_strategy": (
"להבחין באמצעות תיעוד טוב יותר של עובדות מהותיות"
),
})
return harmful[:3]
# ----------------------------------------------------------- meditation
def _suggest_mediation_options(
self, outcome: Dict[str, Any], factors: List, domain: Optional[str]
) -> List[Dict[str, Any]]:
prob = outcome.get("probability", 0.5)
# Pattern: high liability + uncertain damages → partial settlement
if 0.5 <= prob <= 0.75:
return [
{
"type": "partial_compensation",
"title": "פיצוי חלקי + תיקון",
"best_when": ["אחריות סבירה", "נזק לא ודאי"],
"sample_terms": [
"תיקון תוך 30 יום",
"פיצוי חלקי במקום פיצוי מלא",
"ויתור הדדי לסילוק סופי",
],
},
{
"type": "expert_evaluation",
"title": "מינוי מומחה מוסכם",
"best_when": ["חילוקי דעות על היקף נזק"],
"sample_terms": [
"מומחה מוסכם להערכת ליקויים",
"כל צד נושא במחצית העלות",
],
},
]
elif prob > 0.75:
return [{
"type": "fast_settlement",
"title": "פשרה מהירה לטובת הצד החזק",
"best_when": ["אחריות חזקה", "ראיות מוכחות"],
"sample_terms": [
"תשלום מלא בפריסה",
"התחייבות לאי-חזרה על ההתנהגות",
],
}]
else:
return [{
"type": "low_payment_close",
"title": "סילוק נמוך וסיום מהיר",
"best_when": ["סיכוי הצלחה נמוך", "עלויות התדיינות גבוהות"],
"sample_terms": [
"תשלום סמלי לסיום סופי",
"ויתור הדדי על כל טענה",
],
}]
# ---------------------------------------------------- domain-specific aids
def _focus_questions_for_domain(self, domain: Optional[str]) -> List[str]:
DOMAIN_QUESTIONS = {
"contracts": [
"האם נקבע בהסכם מועד ביצוע מפורש?",
"האם נשלחה התראה בכתב לצד השני?",
"האם ניתנה הזדמנות לתקן את ההפרה?",
"האם הנזק תועד וכומת?",
"האם הייתה הסכמה משתמעת לעיכוב?",
],
"torts": [
"האם קיימת חובת זהירות בנסיבות העניין?",
"האם הופרה חובת הזהירות?",
"האם קיים קשר סיבתי בין ההפרה לנזק?",
"האם הנזק היה צפוי?",
"האם הניזוק תרם לנזק?",
],
"property": [
"מי בעל הזכות הרשומה?",
"האם קיימת זכות נוגדת?",
"האם נעשתה החזקה ארוכת טווח?",
"האם הוכחה תום לב?",
],
"family": [
"מהן ההסכמות החתומות בין הצדדים?",
"מהי טובת הקטין?",
"מה היכולת הכלכלית של הצדדים?",
"האם ניתנה הסכמה מודעת?",
],
"labor": [
"האם קיים חוזה עבודה?",
"האם נמסרה הודעה מוקדמת?",
"האם בוצעו תשלומי החובה?",
"האם ניתנה זכות שימוע?",
],
}
# Map Hebrew domain names to English keys
for key, qs in DOMAIN_QUESTIONS.items():
if key in (domain or "").lower():
return qs
# Hebrew direct lookup
if domain == "חוזים": return DOMAIN_QUESTIONS["contracts"]
if domain == "נזיקין": return DOMAIN_QUESTIONS["torts"]
if domain == "מקרקעין": return DOMAIN_QUESTIONS["property"]
if domain == "משפחה": return DOMAIN_QUESTIONS["family"]
if domain == "עבודה": return DOMAIN_QUESTIONS["labor"]
return [
"מהן העובדות המרכזיות שניתן להוכיח בכתב?",
"מהו הנזק הספציפי?",
"אילו אסמכתאות פסיקתיות תומכות בעמדה?",
"מהם הסיכונים העיקריים?",
]
def _infer_missing_facts(
self, user_facts: str, domain: Optional[str]
) -> List[str]:
"""Heuristic — what facts are typically needed for this domain
but absent from the user's input."""
text = (user_facts or "").lower()
missing = []
# Domain-specific hints
if domain in ("contracts", "חוזים"):
if "מועד" not in text and "תאריך" not in text:
missing.append("מועד מפורש בהסכם")
if "התראה" not in text and "מכתב" not in text:
missing.append("התראה בכתב לפני הליך משפטי")
if "נזק" not in text and "הפסד" not in text:
missing.append("תיעוד וכימות הנזק")
elif domain in ("torts", "נזיקין"):
if "ראיה" not in text and "עדות" not in text:
missing.append("ראיות לקיום החובה ולהפרתה")
if "קשר" not in text and "סיבת" not in text:
missing.append("קשר סיבתי בין ההתנהגות לנזק")
return missing
# ----------------------------------------------------------- helpers
@staticmethod
def _split_paragraphs(text: str) -> List[str]:
if not text:
return []
# Split by double newline OR period+space — Hebrew legal text
paras = []
for chunk in text.split("\n\n"):
chunk = chunk.strip()
if chunk:
paras.append(chunk)
return paras
@staticmethod
def _map_str_to_argtype(s: str) -> ArgumentType:
m = {
"factual": ArgumentType.FACTUAL,
"legal": ArgumentType.LEGAL,
"procedural": ArgumentType.PROCEDURAL,
"policy": ArgumentType.POLICY,
"equitable": ArgumentType.EQUITABLE,
"constitutional": ArgumentType.CONSTITUTIONAL,
"substantive": ArgumentType.SUBSTANTIVE,
}
return m.get(s.lower(), ArgumentType.FACTUAL)
@staticmethod
def _map_argtype_to_factortype(s: str) -> FactorType:
m = {
"factual": FactorType.FACTUAL,
"legal": FactorType.STATUTORY,
"procedural": FactorType.PROCEDURAL,
"policy": FactorType.POLICY,
"equitable": FactorType.EQUITABLE,
"constitutional": FactorType.CONSTITUTIONAL,
"substantive": FactorType.STATUTORY,
}
return m.get(s.lower(), FactorType.FACTUAL)
# ------------------------------------------- optional TAU LLM polish
def _polish_with_tau_llm(self, result: StrategyResult) -> None:
"""Optional: pass each argument's text through the local TAU LLM
for fluency polishing. ZERO external API calls."""
try:
from ..generate.tau_native import TauNativeGenerator
tau = TauNativeGenerator()
for arg in result.arguments_for_user:
if arg.get("text"):
polished = tau.complete(arg["text"], max_new_tokens=50)
if polished and len(polished) > len(arg["text"]) * 0.8:
arg["polished_text"] = polished
except Exception:
pass # silent fallback — polish is optional