"""Citation network — the dependency graph between judgments and statutes. Walks the live corpus once on demand, extracts every citation from every doc using `judgment_structurer.CASE_CITE_RE` + `STATUTE_CITE_RE`, then builds three indexes: • cites[doc_id] → list of citation strings appearing in the doc • cited_by[citation_string] → list of doc_ids that contain this citation • doc_for_citation[citation_string] → doc_id (if a doc IN the corpus IS the thing being cited; resolved by matching the citation string against doc metadata) This lets us answer: • "what does this judgment cite?" → cites[doc_id] • "who cites this judgment?" → cited_by[doc.citation] • "which precedents are most influential?" → sort cited_by by len descending • "what should I read after this?" → docs that cite the same things (co-citation overlap) Caching: the index is built once on first access and stored on the pipeline's `_citation_network_cache` attr. It's invalidated when the caller passes `force_rebuild=True` (after a community promote, etc). """ from __future__ import annotations from dataclasses import dataclass, field from typing import Any, Dict, List, Optional, Set, Tuple from .judgment_structurer import CASE_CITE_RE, STATUTE_CITE_RE # ============================================================================ # Public dataclass-style container # ============================================================================ @dataclass class CitationNetwork: cites: Dict[str, List[Dict[str, str]]] = field(default_factory=dict) cited_by: Dict[str, List[str]] = field(default_factory=dict) doc_for_citation: Dict[str, str] = field(default_factory=dict) citation_meta: Dict[str, Dict[str, Any]] = field(default_factory=dict) n_docs: int = 0 n_edges: int = 0 # ============================================================================ # Building the index # ============================================================================ def _normalize_case_cite(kind: str, num: str) -> str: """Canonicalize an Israeli case citation so 'ע\"א 1234/22' and 'ע\"א1234 / 22' collide on the same key.""" return f"{kind.replace(chr(0x05F4), chr(0x0022))} {num.replace(' ', '')}" def _extract_citations_from_text(text: str) -> List[Dict[str, str]]: """Quick variant of judgment_structurer._extract_citations that keeps duplicates collapsed but doesn't strip kind detail.""" seen: Set[str] = set() out: List[Dict[str, str]] = [] for m in CASE_CITE_RE.finditer(text or ""): s = _normalize_case_cite(m.group("kind"), m.group("num")) if s in seen: continue seen.add(s) out.append({"text": s, "kind": "caselaw"}) for m in STATUTE_CITE_RE.finditer(text or ""): s = m.group(0).strip().rstrip(",.;") # Same gates as the structurer import re as _re has_year = bool(_re.search( r"ה?תש[א-ת][\"׳״]?[א-ת]?[-־]\d{4}", s)) if (len(s) < 12 and not has_year) or s in seen: continue if s.count("(") != s.count(")") or s.count("[") != s.count("]"): continue seen.add(s) out.append({"text": s, "kind": "statute"}) return out _CASE_PREFIX_RE = __import__("re").compile( r'(?:בג"?ץ|ע"?א|ע"?פ|רע"?א|רע"?פ|דנ"?א|דנ"?פ)\s*\d+\s*[/\-]\s*\d+' ) def _build_citation_resolver(docs: List) -> Dict[str, str]: """One-shot inverse index: {normalized_self_citation: doc_id}. Each doc contributes UP TO TWO keys: the full normalized metadata.citation string, and (if it begins with a Hebrew case prefix) the bare "kind+number" prefix. Lookup is O(1) per citation afterwards. Replaces the per-citation O(N) corpus scan that scaled poorly past a few hundred docs. """ table: Dict[str, str] = {} for d in docs: did = getattr(d, "id", None) if not did: continue meta = (getattr(d, "metadata", None) or {}) own_cite = meta.get("citation", "") if not own_cite: continue own_norm = own_cite.replace(" ", "").lower() # First write wins so the earliest doc with a citation owns the # mapping (matches the original linear-scan behavior). table.setdefault(own_norm, did) m_self = _CASE_PREFIX_RE.match(own_cite) if m_self: self_norm = m_self.group(0).replace(" ", "").lower() if self_norm and self_norm != own_norm: table.setdefault(self_norm, did) return table def _resolve_doc_for_citation_via( table: Dict[str, str], citation: str, ) -> Optional[str]: """Resolve a citation against a precomputed table from `_build_citation_resolver`. O(1) plus a couple of fallback probes. The original linear scan tried both `cit_norm in own_norm` AND `own_norm in cit_norm` (substring match in either direction). The table covers both: if either side normalizes to a key in the table we hit; for the rest we fall back to scanning table keys. """ cit_norm = citation.replace(" ", "").lower() if not cit_norm: return None # Direct hit on the full or prefix-form citation hit = table.get(cit_norm) if hit is not None: return hit # Substring fallback — only run when no direct hit. Bounded by the # number of *unique* doc citations (much smaller than `len(docs)`). for own_norm, did in table.items(): if cit_norm in own_norm or own_norm in cit_norm: return did return None def _resolve_doc_for_citation(docs: List, citation: str) -> Optional[str]: """Backwards-compatible single-shot resolver. Builds the index on every call — fine for one-off lookups, **don't use in a loop**; use `_build_citation_resolver` once and `_resolve_doc_for_citation_via` per lookup instead. """ return _resolve_doc_for_citation_via( _build_citation_resolver(docs), citation, ) def build_citation_network(docs: List) -> CitationNetwork: """One-shot builder. Pass the live pipeline's _indexed_docs (or any list of objects with .id, .text, .metadata). Lazy-mode aware: if metadata.cites_extracted is present (set by Pipeline.add_documents during ingest), uses it directly — avoiding the need to access doc.text, which may be empty under TAU_RAG_LAZY_TEXT=1. Falls back to scanning text for docs without precomputed citations.""" cn = CitationNetwork() edges = 0 for d in docs: doc_id = getattr(d, "id", None) or getattr(d, "doc_id", None) if not doc_id: continue # Prefer precomputed citations (lazy-text-safe) meta = getattr(d, "metadata", None) or {} cites_list = meta.get("cites_extracted") if cites_list is None: text = getattr(d, "text", None) or "" cites_list = _extract_citations_from_text(text) cn.cites[doc_id] = cites_list for c in cites_list: citation = c["text"] cn.cited_by.setdefault(citation, []).append(doc_id) edges += 1 # Build citation metadata (kind, label) as we encounter them if citation not in cn.citation_meta: cn.citation_meta[citation] = { "text": citation, "kind": c["kind"], } cn.n_docs = len(cn.cites) cn.n_edges = edges # Resolve which corpus docs ARE the cited things (forward + reverse). # Build the {citation_norm: doc_id} lookup ONCE, then resolve each # cited citation in O(1). Old code re-scanned the entire corpus for # every citation (1.7B comparisons on a 17K-doc corpus → 30+ min). resolver = _build_citation_resolver(docs) for citation in cn.cited_by.keys(): target_doc = _resolve_doc_for_citation_via(resolver, citation) if target_doc: cn.doc_for_citation[citation] = target_doc cn.citation_meta[citation]["resolved_doc_id"] = target_doc # Co-citation cache — built lazily in `cocitations_for` cn._cocite_cache = {} # type: ignore return cn # ============================================================================ # Public lookup API # ============================================================================ def get_or_build(pipe, force_rebuild: bool = False) -> CitationNetwork: """Return the cached citation network for the given pipeline. Builds it if absent or invalidated. The pipeline holds the cache so multiple endpoints can share. Build is O(n_docs * avg_text_len) which on a 600-doc corpus takes ~1s — fine for cold-start, cached after. """ if pipe is None: return CitationNetwork() if not force_rebuild and getattr(pipe, "_citation_network_cache", None) is not None: return pipe._citation_network_cache docs = (getattr(pipe, "_indexed_docs", None) or getattr(pipe, "_docs", None) or []) cn = build_citation_network(docs) pipe._citation_network_cache = cn return cn def cocitations_for(cn: CitationNetwork, doc_id: str, min_overlap: int = 1) -> List[Tuple[str, int]]: """Find docs that share citations with the given doc. Returns a list of (other_doc_id, overlap_count) tuples sorted by overlap descending. A high overlap suggests substantive relevance — "you might also want to read these". """ cache = getattr(cn, "_cocite_cache", None) if cache is None: cn._cocite_cache = {} cache = cn._cocite_cache if doc_id in cache: return cache[doc_id] my_cites = {c["text"] for c in cn.cites.get(doc_id, [])} if not my_cites: cache[doc_id] = [] return [] counts: Dict[str, int] = {} for cite in my_cites: for other_doc in cn.cited_by.get(cite, []): if other_doc == doc_id: continue counts[other_doc] = counts.get(other_doc, 0) + 1 out = sorted( ((d, n) for d, n in counts.items() if n >= min_overlap), key=lambda x: (-x[1], x[0]), ) cache[doc_id] = out return out def popular_citations(cn: CitationNetwork, top_k: int = 25, kind: Optional[str] = None) -> List[Dict[str, Any]]: """Most-cited entries in the corpus — the precedents/statutes your corpus relies on most heavily. `kind=None` mixes caselaw + statute. Pass `"caselaw"` or `"statute"` to filter. """ rows: List[Dict[str, Any]] = [] for cite, doc_ids in cn.cited_by.items(): meta = cn.citation_meta.get(cite, {}) if kind and meta.get("kind") != kind: continue rows.append({ "citation": cite, "kind": meta.get("kind", "unknown"), "n_citers": len(doc_ids), "resolved_doc_id": meta.get("resolved_doc_id"), }) rows.sort(key=lambda r: -r["n_citers"]) return rows[:top_k] def network_for_doc(cn: CitationNetwork, doc_id: str, max_recommendations: int = 5) -> Dict[str, Any]: """Build the per-doc network payload returned by /v1/judgments/.../network. Includes: cites: list of {citation, kind, resolved_doc_id?} cited_by: list of doc_ids that cite docs/citations matching this doc cocited: list of {doc_id, overlap, sample_shared} — top-N co-citation recommendations stats: {n_cites, n_cited_by, n_cocited} Note: cited_by here means: any other corpus doc whose text contains a citation of THIS doc (matched via metadata.citation). It is NOT the same as "any doc that mentions this doc_id literally". """ raw_cites = cn.cites.get(doc_id, []) cites = [] for c in raw_cites: cites.append({ **c, "resolved_doc_id": cn.citation_meta.get(c["text"], {}).get("resolved_doc_id"), }) # cited_by: find every citation that resolves to this doc, then collect # all docs that cite it. incoming_citations = [ c for c, target in cn.doc_for_citation.items() if target == doc_id ] cited_by_ids: List[str] = [] seen_set: Set[str] = set() for c in incoming_citations: for d in cn.cited_by.get(c, []): if d == doc_id or d in seen_set: continue seen_set.add(d) cited_by_ids.append(d) # Co-citation recommendations coc = cocitations_for(cn, doc_id)[:max_recommendations] cocited = [] for other_id, overlap in coc: # Pick up to 3 sample shared citations to display my_cites = {c["text"] for c in cn.cites.get(doc_id, [])} their_cites = {c["text"] for c in cn.cites.get(other_id, [])} shared = list(my_cites & their_cites)[:3] cocited.append({ "doc_id": other_id, "overlap": overlap, "shared_citations": shared, }) return { "doc_id": doc_id, "cites": cites, "cited_by": cited_by_ids, "cocited": cocited, "stats": { "n_cites": len(cites), "n_cited_by": len(cited_by_ids), "n_cocited": len(cocited), }, } __all__ = [ "CitationNetwork", "build_citation_network", "get_or_build", "cocitations_for", "popular_citations", "network_for_doc", ]