| """retrieval_signals.py — Universal signals over retrieval-time score |
| distributions. |
| |
| Completes the third leg of the τψφξΩ trio: |
| 1. Training-time signals (next_token_trainer.compute_tau_signals) |
| 2. Inference-time signals (outcome_signals.compute_outcome_signals) |
| 3. **Retrieval-time signals** ← this module |
| |
| Why this matters: |
| A query whose top-k retrieval has uniform low scores (everyone scores |
| 0.20) means the corpus has nothing relevant — the user's question is |
| either too narrow or too off-topic. A query whose top-k has one big |
| score and a long tail has good signal. We can MEASURE this gap. |
| |
| With these signals exposed: |
| • UI can show "אמינות אחזור: 72%" next to the answer |
| • System can detect retrieval drift over time (Ω falling = corpus |
| degrading or query distribution shifting) |
| • Pipeline-health endpoint can alert on sustained low Ω |
| |
| ──────────────────────────────────────────────────────────────────────────── |
| Signal definitions (all clipped to [0, 1]) |
| ──────────────────────────────────────────────────────────────────────────── |
| |
| τ (tau, "top-hit strength"): |
| How strong is the best result relative to a baseline? |
| τ = clip01( top_score / strong_threshold ) |
| With strong_threshold ≈ 0.6 (calibrated for HebrewEncoder hybrid |
| scores). τ = 1.0 means the top hit is unambiguously a strong match. |
| |
| ψ (psi, "score concentration"): |
| Is the score mass concentrated in a few top results, or spread |
| thin? Use coefficient-of-variation on top-k: |
| ψ = clip01( std(top-k) / (mean(top-k) + ε) ) |
| High ψ means the top-k differ a lot — that's actually GOOD: the |
| retriever is discriminating well between the top hit and the rest. |
| Low ψ (uniform scores) means the retriever can't tell them apart. |
| |
| φ (phi, "top-k topical agreement"): |
| Do the top-k results agree on a topic? Computed via the centroid: |
| φ = mean cosine of top-k vectors with their own centroid |
| If 5/5 results are about "good faith violation", φ ≈ 0.95. If |
| 5/5 are scattered topics, φ ≈ 0.50. |
| |
| ξ (xi, "score-gap anomaly"): |
| Is there a discontinuity in the score curve, or smooth decay? |
| A "shelf" pattern (scores: 0.85, 0.80, 0.20, 0.15, 0.10) means |
| the top-2 are clearly different — high confidence. A smooth |
| decay (0.85, 0.83, 0.81, 0.79, 0.77) means the boundary between |
| relevant and irrelevant is fuzzy. |
| ξ = 1 - clip01( max_gap / (top_score - bottom_score + ε) ) |
| Low ξ = clear shelf (good), high ξ = smooth decay (uncertain). |
| |
| Ω (omega): |
| Standard geometric mean. |
| Ω = (τ^α · φ^β · ψ^γ · (1−ξ)^δ)^(1/Σ) |
| """ |
| from __future__ import annotations |
|
|
| import math |
| from dataclasses import dataclass, field |
| from typing import Any, Dict, List, Optional, Tuple |
|
|
|
|
| def _clip01(x: float) -> float: |
| return float(max(0.0, min(1.0, x))) |
|
|
|
|
| @dataclass |
| class RetrievalSignals: |
| """Universal signals computed over a single retrieval result list.""" |
|
|
| tau: float |
| psi: float |
| phi: float |
| xi: float |
| omega: float |
|
|
| n_results: int = 0 |
| top_score: float = 0.0 |
| score_range: float = 0.0 |
| debug: Dict[str, Any] = field(default_factory=dict) |
|
|
| def to_dict(self) -> Dict[str, Any]: |
| return { |
| "tau": round(self.tau, 4), |
| "psi": round(self.psi, 4), |
| "phi": round(self.phi, 4), |
| "xi": round(self.xi, 4), |
| "omega": round(self.omega, 4), |
| "retrieval_health": round(self.omega, 4), |
| "n_results": self.n_results, |
| "top_score": round(self.top_score, 4), |
| "score_range": round(self.score_range, 4), |
| "interpretation": self._interpret(), |
| "debug": self.debug, |
| } |
|
|
| def _interpret(self) -> Dict[str, str]: |
| out = {} |
| out["overall"] = ( |
| f"אמינות אחזור: {self.omega*100:.0f}% — " + ( |
| "אחזור איכותי" if self.omega >= 0.65 else |
| "אחזור בינוני" if self.omega >= 0.45 else |
| "אחזור חלש — שקול לנסח את השאילתה שוב" |
| ) |
| ) |
| out["tau"] = ( |
| f"חוזק תוצאה ראשונה (τ={self.tau:.2f}): " + ( |
| "התאמה ברורה" if self.tau >= 0.65 else |
| "התאמה חלקית" if self.tau >= 0.45 else |
| "התאמה חלשה — אין במאגר תוצאה דומה במובהק" |
| ) |
| ) |
| out["psi"] = ( |
| f"הבחנה בין תוצאות (ψ={self.psi:.2f}): " + ( |
| "המנוע מבחין היטב בין רלוונטי ולא" if self.psi >= 0.50 else |
| "ניקוד אחיד — מנוע מתקשה לדרג" |
| ) |
| ) |
| out["phi"] = ( |
| f"לכידות נושאית (φ={self.phi:.2f}): " + ( |
| "התוצאות עוסקות באותו נושא" if self.phi >= 0.65 else |
| "התוצאות מפוזרות על פני נושאים שונים" if self.phi >= 0.45 else |
| "פיזור גבוה — שאילתה רב-משמעית" |
| ) |
| ) |
| out["xi"] = ( |
| f"גבול ברור (ξ={self.xi:.2f}, נמוך=טוב): " + ( |
| "יש 'מדף' ברור בין רלוונטי ללא" if self.xi <= 0.35 else |
| "גבול מטושטש בין תוצאות" |
| ) |
| ) |
| return out |
|
|
|
|
| def compute_retrieval_signals( |
| hits: List[Any], |
| strong_threshold: float = 0.6, |
| omega_weights: Tuple[float, float, float, float] = (1.0, 1.0, 1.0, 1.0), |
| eps: float = 1e-6, |
| ) -> RetrievalSignals: |
| """Compute τψφξΩ over a retrieval result list. |
| |
| Args: |
| hits: list of `Retrieved` (or any object with `.score` and |
| optionally `.chunk.text` for φ computation). Order is |
| assumed descending by score. |
| strong_threshold: τ = top_score / strong_threshold, clipped to 1. |
| Calibrated to the HebrewEncoder hybrid score distribution. |
| omega_weights: (α_τ, β_φ, γ_ψ, δ_ξ). |
| eps: numerical-stability constant. |
| |
| Returns: RetrievalSignals with all 5 signals + debug breakdown. |
| """ |
| if not hits: |
| return RetrievalSignals( |
| tau=0.0, psi=0.0, phi=0.0, xi=1.0, omega=0.0, |
| n_results=0, top_score=0.0, score_range=0.0, |
| debug={"reason": "no hits"}, |
| ) |
|
|
| scores = [float(getattr(h, "score", 0.0)) for h in hits] |
| top_score = scores[0] |
| bottom_score = scores[-1] |
| score_range = top_score - bottom_score |
|
|
| |
| |
| |
| tau = _clip01(top_score / max(strong_threshold, eps)) |
|
|
| |
| |
| |
| if len(scores) >= 2: |
| mean_s = sum(scores) / len(scores) |
| var_s = sum((s - mean_s) ** 2 for s in scores) / len(scores) |
| std_s = math.sqrt(var_s) |
| cv = std_s / (abs(mean_s) + eps) |
| |
| psi = _clip01(cv) |
| else: |
| psi = 0.5 |
|
|
| |
| |
| |
| |
| |
| |
| |
| import re as _re |
| HEB = _re.compile(r"[א-ת]+") |
| top_k = min(len(hits), 10) |
| token_sets = [] |
| for h in hits[:top_k]: |
| text = getattr(getattr(h, "chunk", None), "text", "") or "" |
| toks = set(t for t in HEB.findall(text) if len(t) >= 3) |
| token_sets.append(toks) |
| if len(token_sets) >= 2: |
| sims = [] |
| for i in range(len(token_sets)): |
| for j in range(i + 1, len(token_sets)): |
| a, b = token_sets[i], token_sets[j] |
| u = a | b |
| if u: |
| sims.append(len(a & b) / len(u)) |
| else: |
| sims.append(0.0) |
| phi = _clip01(sum(sims) / max(len(sims), 1)) |
| |
| |
| phi = _clip01(phi * 2.2) |
| else: |
| phi = 0.5 |
|
|
| |
| |
| |
| if len(scores) >= 3 and score_range > eps: |
| |
| gaps = [scores[i] - scores[i + 1] for i in range(len(scores) - 1)] |
| max_gap = max(gaps) if gaps else 0.0 |
| |
| |
| gap_ratio = max_gap / (score_range + eps) |
| |
| |
| xi = _clip01(1.0 - gap_ratio) |
| else: |
| xi = 0.5 |
|
|
| |
| |
| |
| α, β, γ, δ = omega_weights |
| a = max(tau, eps) ** α |
| b = max(phi, eps) ** β |
| c = max(psi, eps) ** γ |
| d = max(1.0 - xi, eps) ** δ |
| total_weight = α + β + γ + δ |
| omega = _clip01((a * b * c * d) ** (1.0 / total_weight)) |
|
|
| return RetrievalSignals( |
| tau=round(tau, 4), |
| psi=round(psi, 4), |
| phi=round(phi, 4), |
| xi=round(xi, 4), |
| omega=round(omega, 4), |
| n_results=len(hits), |
| top_score=top_score, |
| score_range=score_range, |
| debug={ |
| "scores_first_5": [round(s, 3) for s in scores[:5]], |
| "scores_last_5": [round(s, 3) for s in scores[-5:]], |
| "weights": list(omega_weights), |
| "omega_components": { |
| "tau_pow": round(a, 4), |
| "phi_pow": round(b, 4), |
| "psi_pow": round(c, 4), |
| "1-xi_pow": round(d, 4), |
| }, |
| }, |
| ) |
|
|
|
|
| __all__ = ["RetrievalSignals", "compute_retrieval_signals"] |
|
|