"""Structural NLI — deterministic faithfulness without an LLM. Idea: a claim is *structurally entailed* by a source iff (a) every deontic cue type present in the claim is also present in the source, and (b) the claim's content tokens are a subset of (some source's content tokens ∪ a small margin). We check per-sentence and aggregate. This gives a real faithfulness score that's more meaningful than Jaccard for legal/deontic text, and it's fast, interpretable, and LLM-free. """ from __future__ import annotations import re from typing import Dict, List, Optional, Sequence, Set from ..core.types import Query, Retrieved, VerificationAlert, VerificationResult from ..verify.structure import CUES, signature_of_text _SENT = re.compile(r"(?<=[\.\!\?…])\s+|\n+") _TOKEN = re.compile(r"\w+", re.UNICODE) def _toks(s: str) -> Set[str]: return {t.lower() for t in _TOKEN.findall(s)} def _deontic_of(text: str) -> Set[str]: lo = (text or "").lower() return {k for k, cues in CUES.items() if any(c in lo or c in text for c in cues)} def _strip_citation_marks(sent: str) -> str: return re.sub(r"\[\d+\]", "", sent).strip() class StructuralNLIChecker: """Per-sentence structural entailment. For each claim sentence in the answer: * Let D_c = deontic cues in the sentence * Let T_c = content tokens of the sentence * Find best supporting source s* = argmax overlap(T_c, T_s) * Entailed iff D_c ⊆ D_{s*} AND overlap ≥ `min_overlap` """ name = "structural_nli" def __init__( self, min_overlap: float = 0.2, min_pass_rate: float = 0.7, ignore_citations: bool = True, ) -> None: self.min_overlap = min_overlap self.min_pass_rate = min_pass_rate self.ignore_citations = ignore_citations # ---------------------------------------------------------- core def _score_sentence( self, sentence: str, sources: Sequence[Retrieved], ) -> Dict[str, object]: if self.ignore_citations: sentence = _strip_citation_marks(sentence) s_toks = _toks(sentence) s_deo = _deontic_of(sentence) if not s_toks: return {"entailed": True, "best_overlap": 1.0, "deontic_ok": True, "best_doc": None} best_overlap = 0.0 best_doc = None deontic_ok = False for r in sources: d_toks = _toks(r.chunk.text) if not d_toks: continue overlap = len(s_toks & d_toks) / max(1, len(s_toks)) if overlap > best_overlap: best_overlap = overlap best_doc = r.chunk.doc_id # deontic subset check against ANY source (weakest form: at least # one source supports all deontic cues in the claim) d_deo = _deontic_of(r.chunk.text) if s_deo.issubset(d_deo): deontic_ok = True entailed = bool( best_overlap >= self.min_overlap and (deontic_ok or not s_deo) ) return {"entailed": entailed, "best_overlap": best_overlap, "deontic_ok": deontic_ok, "best_doc": best_doc} # ------------------------------------------------------- Verifier API def verify_answer( self, answer: str, sources: Sequence[Retrieved], ) -> VerificationResult: if not answer.strip() or not sources: return VerificationResult(passed=True, faithfulness=1.0, structure_match=True) sents = [s.strip() for s in _SENT.split(answer) if s.strip()] if not sents: return VerificationResult(passed=True, faithfulness=1.0) # v2.x — skip disclaimer/footer sentences from NLI scoring. They # carry legal boilerplate that won't match any retrieved source and # would falsely trigger "unsupported_claim" alerts. _SKIP_MARKERS = ( "אין באמור ייעוץ משפטי", "disclaimer:", "— — —", "—\u00a0—\u00a0—", ) sents = [s for s in sents if not any(m in s or m in s.lower() for m in _SKIP_MARKERS)] if not sents: return VerificationResult(passed=True, faithfulness=1.0) scored = [self._score_sentence(s, sources) for s in sents] pass_rate = sum(1 for r in scored if r["entailed"]) / len(scored) faithfulness = pass_rate alerts: List[VerificationAlert] = [] for sent, info in zip(sents, scored): if not info["entailed"]: alerts.append(VerificationAlert( type="unsupported_claim", risk="MEDIUM", impact=(f"Sentence not supported by any source " f"(overlap={info['best_overlap']:.2f}, " f"deontic_ok={info['deontic_ok']})."), detail={"sentence": sent[:200], "best_doc": info["best_doc"]}, )) passed = pass_rate >= self.min_pass_rate return VerificationResult( passed=passed, alerts=alerts, citation_coverage=0.0, faithfulness=faithfulness, structure_match=True, ) def verify( self, query: Query, answer: str, context: List[Retrieved], ) -> VerificationResult: return self.verify_answer(answer, context)