"""Reciprocal Rank Fusion. score(d) = Σ_L 1 / ( k + rank_L(d) ) with k=60 by default. Ports the idea from tau_platform_v4/knowledge_graph/rag_pipeline.py::_merge_results. """ from __future__ import annotations from typing import Dict, List from ..core.types import Retrieved class RRFFuser: """Fuse several ranked lists into one global ranking.""" name = "rrf" def __init__(self, k: int = 60) -> None: self.k = k def fuse( self, per_retriever_results: List[List[Retrieved]], top_n: int = 20, ) -> List[Retrieved]: scores: Dict[str, float] = {} best: Dict[str, Retrieved] = {} for retrieved_list in per_retriever_results: for rank0, r in enumerate(retrieved_list): cid = r.chunk.chunk_id contrib = 1.0 / (self.k + rank0 + 1) scores[cid] = scores.get(cid, 0.0) + contrib cur = best.get(cid) if cur is None or r.score > cur.score: best[cid] = r fused: List[Retrieved] = [] for cid, s in sorted(scores.items(), key=lambda kv: kv[1], reverse=True)[:top_n]: base = best[cid] # Preserve original provenance (e.g. via_reference_from, # reference_label, ref_depth) so downstream stages can see them. merged_extra = dict(base.extra or {}) merged_extra.setdefault("origin_retriever", base.retriever) merged_extra.setdefault("origin_score", base.score) fused.append(Retrieved( chunk=base.chunk, score=s, retriever="rrf", rank=len(fused) + 1, extra=merged_extra, )) return fused