GitHub Actions
Quality improvements: Unicode chars, Token class, imports, type hints, formatting
3f78ea8
Raw
History Blame
8.31 kB
"""Phase 2 corpus replication — BitTorrent-style knowledge propagation.
When any peer ingests a document, it emits a ``rag.document.ingested`` event
that eventually arrives at every node via gossip (X02 event sync). This module
listens for those events from *other* nodes and pulls the raw blob via
``TransferManager`` (BLAKE3 chunked, content-addressed), then re-ingests it into
the local corpus so the local ``rag.query`` can answer questions about it.
Result: every node ends up with a complete local corpus copy — making
single-best routing (Option A) eventually correct AND making federated
scatter-gather (Option B) redundantly available as a freshness hedge.
Spec: docs/M05-rag.md §10 (corpus replication)
docs/X02-events.md §3.1 (rag.document.ingested)
docs/M07-file-blobs.md §4 (TransferManager)
"""
from __future__ import annotations
import asyncio
import logging
from typing import Any
_log = logging.getLogger(__name__)
# Back-off between retry attempts when fetch fails.
_RETRY_DELAY_SECONDS = 15
class CorpusReplicator:
"""Background task that replicates documents from peer nodes.
Constructor args:
bus — CapabilityBus (used to call rag.ingest locally)
event_log — EventLog (subscribed to rag.document.ingested events)
transfer — TransferManager (fetches BLAKE3 blobs from peers)
peers — PeerRegistry (resolves peer URLs from node_id)
local_node_id — this node's full node ID (to skip own events)
corpus_store_fn — callable(corpus:str) → CorpusStore | None; used to
check has_doc before fetching (optional — saves a
round-trip on duplicates we already have)
"""
def __init__(
self,
bus: Any,
event_log: Any,
transfer: Any,
peers: Any,
local_node_id: str,
corpus_store_fn: Any = None,
) -> None:
self._bus = bus
self._event_log = event_log
self._transfer = transfer
self._peers = peers
self._local_node_id = local_node_id
self._corpus_store_fn = corpus_store_fn
self._task: asyncio.Task | None = None
def start(self) -> asyncio.Task:
"""Create and return the background asyncio Task."""
self._task = asyncio.create_task(self.run(), name="corpus-replicator")
return self._task
def stop(self) -> None:
if self._task and not self._task.done():
self._task.cancel()
async def run(self) -> None:
"""Main event loop — never returns until cancelled."""
_log.info("CorpusReplicator started (local_node=%s)", self._local_node_id[:16])
try:
async for event in self._event_log.subscribe(["rag.document.ingested"]):
asyncio.create_task(self._handle_event(event), name="corpus-repl-event")
except asyncio.CancelledError:
_log.info("CorpusReplicator stopped")
raise
except Exception as exc:
_log.error("CorpusReplicator crashed: %s", exc)
# ------------------------------------------------------------------
# Event handler
# ------------------------------------------------------------------
async def _handle_event(self, event: Any) -> None:
"""Process one rag.document.ingested event from a peer."""
try:
# Only replicate events authored by OTHER nodes.
if getattr(event, "author", None) == self._local_node_id:
return
payload = event.payload or {}
corpus: str = payload.get("corpus", "default")
doc_cid: str | None = payload.get("doc_cid")
blob_cid: str | None = payload.get("blob_cid")
title: str = payload.get("title", "Untitled")
author: str = event.author
if not doc_cid:
return
# Idempotency: skip if we already have this document.
if self._corpus_store_fn is not None:
try:
store = self._corpus_store_fn(corpus)
if store is not None and store.has_doc(doc_cid):
_log.debug(
"replicator: already have doc_cid=%s corpus=%s — skip",
doc_cid[:16],
corpus,
)
return
except Exception:
pass
# If no blob_cid we cannot fetch — log and skip.
if not blob_cid:
_log.debug(
"replicator: event from %s has no blob_cid, cannot fetch doc_cid=%s",
author[:16],
doc_cid[:16] if doc_cid else "?",
)
return
# Resolve peer source URLs for this author node.
sources = self._peer_urls_for(author)
if not sources:
_log.debug("replicator: no reachable peer URL for author %s", author[:16])
return
# Fetch the blob via BLAKE3 chunked transfer (M07 TransferManager).
try:
manifest = await self._transfer.fetch(blob_cid, sources)
raw_bytes = self._transfer.store.get(manifest.cid)
text = raw_bytes.decode("utf-8", errors="replace")
except Exception as exc:
_log.warning(
"replicator: fetch failed blob_cid=%s from %s: %s",
blob_cid[:16] if blob_cid else "?",
sources,
exc,
)
# Retry once after a delay (e.g., peer was momentarily unavailable).
await asyncio.sleep(_RETRY_DELAY_SECONDS)
try:
manifest = await self._transfer.fetch(blob_cid, sources)
raw_bytes = self._transfer.store.get(manifest.cid)
text = raw_bytes.decode("utf-8", errors="replace")
except Exception as exc2:
_log.warning("replicator: retry also failed: %s", exc2)
return
# Re-ingest locally via the bus (goes through the normal pipeline,
# honours has_doc idempotency, emits NO new event because event_log
# is only attached to the original RagService which will see this
# call as local — that's correct; the ingest IS local now).
try:
await self._bus.call(
"rag.ingest",
(1, 0),
{
"input": {
"text": text,
"title": title,
"doc_cid": doc_cid,
"corpus": corpus,
},
"params": {"corpus": corpus},
},
)
_log.info(
"replicator: ingested doc_cid=%s corpus=%s from %s",
doc_cid[:16],
corpus,
author[:16],
)
except Exception as exc:
_log.warning("replicator: ingest failed doc_cid=%s: %s", doc_cid[:16], exc)
except Exception as exc:
_log.warning("replicator: unhandled error in _handle_event: %s", exc)
# ------------------------------------------------------------------
# Peer URL resolution
# ------------------------------------------------------------------
def _peer_urls_for(self, node_id: str) -> list[str]:
"""Return HTTP base URLs for a peer node_id from the PeerRegistry."""
try:
for peer in self._peers.all():
if peer.node_id == node_id or node_id.startswith(peer.node_id):
urls = []
for ep in getattr(peer, "endpoints", []):
transport = getattr(ep, "transport", "")
if transport in ("http", ""):
urls.append(f"http://{ep.host}:{ep.port}")
if urls:
return urls
except Exception:
pass
return []