"""retrieval_signals.py — Universal signals over retrieval-time score distributions. Completes the third leg of the τψφξΩ trio: 1. Training-time signals (next_token_trainer.compute_tau_signals) 2. Inference-time signals (outcome_signals.compute_outcome_signals) 3. **Retrieval-time signals** ← this module Why this matters: A query whose top-k retrieval has uniform low scores (everyone scores 0.20) means the corpus has nothing relevant — the user's question is either too narrow or too off-topic. A query whose top-k has one big score and a long tail has good signal. We can MEASURE this gap. With these signals exposed: • UI can show "אמינות אחזור: 72%" next to the answer • System can detect retrieval drift over time (Ω falling = corpus degrading or query distribution shifting) • Pipeline-health endpoint can alert on sustained low Ω ──────────────────────────────────────────────────────────────────────────── Signal definitions (all clipped to [0, 1]) ──────────────────────────────────────────────────────────────────────────── τ (tau, "top-hit strength"): How strong is the best result relative to a baseline? τ = clip01( top_score / strong_threshold ) With strong_threshold ≈ 0.6 (calibrated for HebrewEncoder hybrid scores). τ = 1.0 means the top hit is unambiguously a strong match. ψ (psi, "score concentration"): Is the score mass concentrated in a few top results, or spread thin? Use coefficient-of-variation on top-k: ψ = clip01( std(top-k) / (mean(top-k) + ε) ) High ψ means the top-k differ a lot — that's actually GOOD: the retriever is discriminating well between the top hit and the rest. Low ψ (uniform scores) means the retriever can't tell them apart. φ (phi, "top-k topical agreement"): Do the top-k results agree on a topic? Computed via the centroid: φ = mean cosine of top-k vectors with their own centroid If 5/5 results are about "good faith violation", φ ≈ 0.95. If 5/5 are scattered topics, φ ≈ 0.50. ξ (xi, "score-gap anomaly"): Is there a discontinuity in the score curve, or smooth decay? A "shelf" pattern (scores: 0.85, 0.80, 0.20, 0.15, 0.10) means the top-2 are clearly different — high confidence. A smooth decay (0.85, 0.83, 0.81, 0.79, 0.77) means the boundary between relevant and irrelevant is fuzzy. ξ = 1 - clip01( max_gap / (top_score - bottom_score + ε) ) Low ξ = clear shelf (good), high ξ = smooth decay (uncertain). Ω (omega): Standard geometric mean. Ω = (τ^α · φ^β · ψ^γ · (1−ξ)^δ)^(1/Σ) """ from __future__ import annotations import math from dataclasses import dataclass, field from typing import Any, Dict, List, Optional, Tuple def _clip01(x: float) -> float: return float(max(0.0, min(1.0, x))) @dataclass class RetrievalSignals: """Universal signals computed over a single retrieval result list.""" tau: float # top-hit strength psi: float # score concentration (CV-based) phi: float # topical agreement of top-k xi: float # score-gap anomaly omega: float # combined retrieval health n_results: int = 0 top_score: float = 0.0 score_range: float = 0.0 debug: Dict[str, Any] = field(default_factory=dict) def to_dict(self) -> Dict[str, Any]: return { "tau": round(self.tau, 4), "psi": round(self.psi, 4), "phi": round(self.phi, 4), "xi": round(self.xi, 4), "omega": round(self.omega, 4), "retrieval_health": round(self.omega, 4), "n_results": self.n_results, "top_score": round(self.top_score, 4), "score_range": round(self.score_range, 4), "interpretation": self._interpret(), "debug": self.debug, } def _interpret(self) -> Dict[str, str]: out = {} out["overall"] = ( f"אמינות אחזור: {self.omega*100:.0f}% — " + ( "אחזור איכותי" if self.omega >= 0.65 else "אחזור בינוני" if self.omega >= 0.45 else "אחזור חלש — שקול לנסח את השאילתה שוב" ) ) out["tau"] = ( f"חוזק תוצאה ראשונה (τ={self.tau:.2f}): " + ( "התאמה ברורה" if self.tau >= 0.65 else "התאמה חלקית" if self.tau >= 0.45 else "התאמה חלשה — אין במאגר תוצאה דומה במובהק" ) ) out["psi"] = ( f"הבחנה בין תוצאות (ψ={self.psi:.2f}): " + ( "המנוע מבחין היטב בין רלוונטי ולא" if self.psi >= 0.50 else "ניקוד אחיד — מנוע מתקשה לדרג" ) ) out["phi"] = ( f"לכידות נושאית (φ={self.phi:.2f}): " + ( "התוצאות עוסקות באותו נושא" if self.phi >= 0.65 else "התוצאות מפוזרות על פני נושאים שונים" if self.phi >= 0.45 else "פיזור גבוה — שאילתה רב-משמעית" ) ) out["xi"] = ( f"גבול ברור (ξ={self.xi:.2f}, נמוך=טוב): " + ( "יש 'מדף' ברור בין רלוונטי ללא" if self.xi <= 0.35 else "גבול מטושטש בין תוצאות" ) ) return out def compute_retrieval_signals( hits: List[Any], strong_threshold: float = 0.6, omega_weights: Tuple[float, float, float, float] = (1.0, 1.0, 1.0, 1.0), eps: float = 1e-6, ) -> RetrievalSignals: """Compute τψφξΩ over a retrieval result list. Args: hits: list of `Retrieved` (or any object with `.score` and optionally `.chunk.text` for φ computation). Order is assumed descending by score. strong_threshold: τ = top_score / strong_threshold, clipped to 1. Calibrated to the HebrewEncoder hybrid score distribution. omega_weights: (α_τ, β_φ, γ_ψ, δ_ξ). eps: numerical-stability constant. Returns: RetrievalSignals with all 5 signals + debug breakdown. """ if not hits: return RetrievalSignals( tau=0.0, psi=0.0, phi=0.0, xi=1.0, omega=0.0, n_results=0, top_score=0.0, score_range=0.0, debug={"reason": "no hits"}, ) scores = [float(getattr(h, "score", 0.0)) for h in hits] top_score = scores[0] bottom_score = scores[-1] score_range = top_score - bottom_score # ───────────────────────────────────────────────────────────────── # τ — top-hit strength # ───────────────────────────────────────────────────────────────── tau = _clip01(top_score / max(strong_threshold, eps)) # ───────────────────────────────────────────────────────────────── # ψ — score concentration via CV # ───────────────────────────────────────────────────────────────── if len(scores) >= 2: mean_s = sum(scores) / len(scores) var_s = sum((s - mean_s) ** 2 for s in scores) / len(scores) std_s = math.sqrt(var_s) cv = std_s / (abs(mean_s) + eps) # We WANT high CV → map directly to ψ (capping at 1.0) psi = _clip01(cv) else: psi = 0.5 # ───────────────────────────────────────────────────────────────── # φ — topical agreement among top-k # ───────────────────────────────────────────────────────────────── # Compute pairwise lexical overlap (Jaccard) as a cheap, dependency- # free topic-proxy. If the encoder is available we could use cosine, # but we already know it gives ≥0.93 for any legal-Hebrew pair — # not useful for discrimination here. Token Jaccard works better. import re as _re HEB = _re.compile(r"[א-ת]+") top_k = min(len(hits), 10) token_sets = [] for h in hits[:top_k]: text = getattr(getattr(h, "chunk", None), "text", "") or "" toks = set(t for t in HEB.findall(text) if len(t) >= 3) token_sets.append(toks) if len(token_sets) >= 2: sims = [] for i in range(len(token_sets)): for j in range(i + 1, len(token_sets)): a, b = token_sets[i], token_sets[j] u = a | b if u: sims.append(len(a & b) / len(u)) else: sims.append(0.0) phi = _clip01(sum(sims) / max(len(sims), 1)) # Boost: token Jaccard tends to underestimate topical agreement # on short legal texts; rescale so 0.30 Jaccard ≈ 0.65 φ phi = _clip01(phi * 2.2) else: phi = 0.5 # ───────────────────────────────────────────────────────────────── # ξ — score-gap anomaly (LOW ξ = clear shelf, HIGH ξ = smooth decay) # ───────────────────────────────────────────────────────────────── if len(scores) >= 3 and score_range > eps: # max consecutive gap divided by total range gaps = [scores[i] - scores[i + 1] for i in range(len(scores) - 1)] max_gap = max(gaps) if gaps else 0.0 # If max_gap is large relative to total range → clear shelf → low ξ # If max_gap is small (smooth decay) → high ξ gap_ratio = max_gap / (score_range + eps) # gap_ratio of 1/(N-1) means perfectly uniform → max ξ # gap_ratio of >0.5 means strong shelf → ξ ≈ 0 xi = _clip01(1.0 - gap_ratio) else: xi = 0.5 # ───────────────────────────────────────────────────────────────── # Ω — geometric mean # ───────────────────────────────────────────────────────────────── α, β, γ, δ = omega_weights a = max(tau, eps) ** α b = max(phi, eps) ** β c = max(psi, eps) ** γ d = max(1.0 - xi, eps) ** δ total_weight = α + β + γ + δ omega = _clip01((a * b * c * d) ** (1.0 / total_weight)) return RetrievalSignals( tau=round(tau, 4), psi=round(psi, 4), phi=round(phi, 4), xi=round(xi, 4), omega=round(omega, 4), n_results=len(hits), top_score=top_score, score_range=score_range, debug={ "scores_first_5": [round(s, 3) for s in scores[:5]], "scores_last_5": [round(s, 3) for s in scores[-5:]], "weights": list(omega_weights), "omega_components": { "tau_pow": round(a, 4), "phi_pow": round(b, 4), "psi_pow": round(c, 4), "1-xi_pow": round(d, 4), }, }, ) __all__ = ["RetrievalSignals", "compute_retrieval_signals"]