legal-eye / tau_rag /intelligence /retrieval_signals.py
Legal-i's picture
Initial deploy: legal-eye Hebrew legal RAG (17K corpus, verbatim-from-precedent)
3be54c6 verified
"""retrieval_signals.py — Universal signals over retrieval-time score
distributions.
Completes the third leg of the τψφξΩ trio:
1. Training-time signals (next_token_trainer.compute_tau_signals)
2. Inference-time signals (outcome_signals.compute_outcome_signals)
3. **Retrieval-time signals** ← this module
Why this matters:
A query whose top-k retrieval has uniform low scores (everyone scores
0.20) means the corpus has nothing relevant — the user's question is
either too narrow or too off-topic. A query whose top-k has one big
score and a long tail has good signal. We can MEASURE this gap.
With these signals exposed:
• UI can show "אמינות אחזור: 72%" next to the answer
• System can detect retrieval drift over time (Ω falling = corpus
degrading or query distribution shifting)
• Pipeline-health endpoint can alert on sustained low Ω
────────────────────────────────────────────────────────────────────────────
Signal definitions (all clipped to [0, 1])
────────────────────────────────────────────────────────────────────────────
τ (tau, "top-hit strength"):
How strong is the best result relative to a baseline?
τ = clip01( top_score / strong_threshold )
With strong_threshold ≈ 0.6 (calibrated for HebrewEncoder hybrid
scores). τ = 1.0 means the top hit is unambiguously a strong match.
ψ (psi, "score concentration"):
Is the score mass concentrated in a few top results, or spread
thin? Use coefficient-of-variation on top-k:
ψ = clip01( std(top-k) / (mean(top-k) + ε) )
High ψ means the top-k differ a lot — that's actually GOOD: the
retriever is discriminating well between the top hit and the rest.
Low ψ (uniform scores) means the retriever can't tell them apart.
φ (phi, "top-k topical agreement"):
Do the top-k results agree on a topic? Computed via the centroid:
φ = mean cosine of top-k vectors with their own centroid
If 5/5 results are about "good faith violation", φ ≈ 0.95. If
5/5 are scattered topics, φ ≈ 0.50.
ξ (xi, "score-gap anomaly"):
Is there a discontinuity in the score curve, or smooth decay?
A "shelf" pattern (scores: 0.85, 0.80, 0.20, 0.15, 0.10) means
the top-2 are clearly different — high confidence. A smooth
decay (0.85, 0.83, 0.81, 0.79, 0.77) means the boundary between
relevant and irrelevant is fuzzy.
ξ = 1 - clip01( max_gap / (top_score - bottom_score + ε) )
Low ξ = clear shelf (good), high ξ = smooth decay (uncertain).
Ω (omega):
Standard geometric mean.
Ω = (τ^α · φ^β · ψ^γ · (1−ξ)^δ)^(1/Σ)
"""
from __future__ import annotations
import math
from dataclasses import dataclass, field
from typing import Any, Dict, List, Optional, Tuple
def _clip01(x: float) -> float:
return float(max(0.0, min(1.0, x)))
@dataclass
class RetrievalSignals:
"""Universal signals computed over a single retrieval result list."""
tau: float # top-hit strength
psi: float # score concentration (CV-based)
phi: float # topical agreement of top-k
xi: float # score-gap anomaly
omega: float # combined retrieval health
n_results: int = 0
top_score: float = 0.0
score_range: float = 0.0
debug: Dict[str, Any] = field(default_factory=dict)
def to_dict(self) -> Dict[str, Any]:
return {
"tau": round(self.tau, 4),
"psi": round(self.psi, 4),
"phi": round(self.phi, 4),
"xi": round(self.xi, 4),
"omega": round(self.omega, 4),
"retrieval_health": round(self.omega, 4),
"n_results": self.n_results,
"top_score": round(self.top_score, 4),
"score_range": round(self.score_range, 4),
"interpretation": self._interpret(),
"debug": self.debug,
}
def _interpret(self) -> Dict[str, str]:
out = {}
out["overall"] = (
f"אמינות אחזור: {self.omega*100:.0f}% — " + (
"אחזור איכותי" if self.omega >= 0.65 else
"אחזור בינוני" if self.omega >= 0.45 else
"אחזור חלש — שקול לנסח את השאילתה שוב"
)
)
out["tau"] = (
f"חוזק תוצאה ראשונה (τ={self.tau:.2f}): " + (
"התאמה ברורה" if self.tau >= 0.65 else
"התאמה חלקית" if self.tau >= 0.45 else
"התאמה חלשה — אין במאגר תוצאה דומה במובהק"
)
)
out["psi"] = (
f"הבחנה בין תוצאות (ψ={self.psi:.2f}): " + (
"המנוע מבחין היטב בין רלוונטי ולא" if self.psi >= 0.50 else
"ניקוד אחיד — מנוע מתקשה לדרג"
)
)
out["phi"] = (
f"לכידות נושאית (φ={self.phi:.2f}): " + (
"התוצאות עוסקות באותו נושא" if self.phi >= 0.65 else
"התוצאות מפוזרות על פני נושאים שונים" if self.phi >= 0.45 else
"פיזור גבוה — שאילתה רב-משמעית"
)
)
out["xi"] = (
f"גבול ברור (ξ={self.xi:.2f}, נמוך=טוב): " + (
"יש 'מדף' ברור בין רלוונטי ללא" if self.xi <= 0.35 else
"גבול מטושטש בין תוצאות"
)
)
return out
def compute_retrieval_signals(
hits: List[Any],
strong_threshold: float = 0.6,
omega_weights: Tuple[float, float, float, float] = (1.0, 1.0, 1.0, 1.0),
eps: float = 1e-6,
) -> RetrievalSignals:
"""Compute τψφξΩ over a retrieval result list.
Args:
hits: list of `Retrieved` (or any object with `.score` and
optionally `.chunk.text` for φ computation). Order is
assumed descending by score.
strong_threshold: τ = top_score / strong_threshold, clipped to 1.
Calibrated to the HebrewEncoder hybrid score distribution.
omega_weights: (α_τ, β_φ, γ_ψ, δ_ξ).
eps: numerical-stability constant.
Returns: RetrievalSignals with all 5 signals + debug breakdown.
"""
if not hits:
return RetrievalSignals(
tau=0.0, psi=0.0, phi=0.0, xi=1.0, omega=0.0,
n_results=0, top_score=0.0, score_range=0.0,
debug={"reason": "no hits"},
)
scores = [float(getattr(h, "score", 0.0)) for h in hits]
top_score = scores[0]
bottom_score = scores[-1]
score_range = top_score - bottom_score
# ─────────────────────────────────────────────────────────────────
# τ — top-hit strength
# ─────────────────────────────────────────────────────────────────
tau = _clip01(top_score / max(strong_threshold, eps))
# ─────────────────────────────────────────────────────────────────
# ψ — score concentration via CV
# ─────────────────────────────────────────────────────────────────
if len(scores) >= 2:
mean_s = sum(scores) / len(scores)
var_s = sum((s - mean_s) ** 2 for s in scores) / len(scores)
std_s = math.sqrt(var_s)
cv = std_s / (abs(mean_s) + eps)
# We WANT high CV → map directly to ψ (capping at 1.0)
psi = _clip01(cv)
else:
psi = 0.5
# ─────────────────────────────────────────────────────────────────
# φ — topical agreement among top-k
# ─────────────────────────────────────────────────────────────────
# Compute pairwise lexical overlap (Jaccard) as a cheap, dependency-
# free topic-proxy. If the encoder is available we could use cosine,
# but we already know it gives ≥0.93 for any legal-Hebrew pair —
# not useful for discrimination here. Token Jaccard works better.
import re as _re
HEB = _re.compile(r"[א-ת]+")
top_k = min(len(hits), 10)
token_sets = []
for h in hits[:top_k]:
text = getattr(getattr(h, "chunk", None), "text", "") or ""
toks = set(t for t in HEB.findall(text) if len(t) >= 3)
token_sets.append(toks)
if len(token_sets) >= 2:
sims = []
for i in range(len(token_sets)):
for j in range(i + 1, len(token_sets)):
a, b = token_sets[i], token_sets[j]
u = a | b
if u:
sims.append(len(a & b) / len(u))
else:
sims.append(0.0)
phi = _clip01(sum(sims) / max(len(sims), 1))
# Boost: token Jaccard tends to underestimate topical agreement
# on short legal texts; rescale so 0.30 Jaccard ≈ 0.65 φ
phi = _clip01(phi * 2.2)
else:
phi = 0.5
# ─────────────────────────────────────────────────────────────────
# ξ — score-gap anomaly (LOW ξ = clear shelf, HIGH ξ = smooth decay)
# ─────────────────────────────────────────────────────────────────
if len(scores) >= 3 and score_range > eps:
# max consecutive gap divided by total range
gaps = [scores[i] - scores[i + 1] for i in range(len(scores) - 1)]
max_gap = max(gaps) if gaps else 0.0
# If max_gap is large relative to total range → clear shelf → low ξ
# If max_gap is small (smooth decay) → high ξ
gap_ratio = max_gap / (score_range + eps)
# gap_ratio of 1/(N-1) means perfectly uniform → max ξ
# gap_ratio of >0.5 means strong shelf → ξ ≈ 0
xi = _clip01(1.0 - gap_ratio)
else:
xi = 0.5
# ─────────────────────────────────────────────────────────────────
# Ω — geometric mean
# ─────────────────────────────────────────────────────────────────
α, β, γ, δ = omega_weights
a = max(tau, eps) ** α
b = max(phi, eps) ** β
c = max(psi, eps) ** γ
d = max(1.0 - xi, eps) ** δ
total_weight = α + β + γ + δ
omega = _clip01((a * b * c * d) ** (1.0 / total_weight))
return RetrievalSignals(
tau=round(tau, 4),
psi=round(psi, 4),
phi=round(phi, 4),
xi=round(xi, 4),
omega=round(omega, 4),
n_results=len(hits),
top_score=top_score,
score_range=score_range,
debug={
"scores_first_5": [round(s, 3) for s in scores[:5]],
"scores_last_5": [round(s, 3) for s in scores[-5:]],
"weights": list(omega_weights),
"omega_components": {
"tau_pow": round(a, 4),
"phi_pow": round(b, 4),
"psi_pow": round(c, 4),
"1-xi_pow": round(d, 4),
},
},
)
__all__ = ["RetrievalSignals", "compute_retrieval_signals"]