Spaces:
Running on Zero
Running on Zero
File size: 8,314 Bytes
6c38d43 3f78ea8 6c38d43 78cc96f 6c38d43 78cc96f 6c38d43 78cc96f 6c38d43 78cc96f 6c38d43 78cc96f 6c38d43 78cc96f 6c38d43 78cc96f 6c38d43 | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 | """Phase 2 corpus replication β BitTorrent-style knowledge propagation.
When any peer ingests a document, it emits a ``rag.document.ingested`` event
that eventually arrives at every node via gossip (X02 event sync). This module
listens for those events from *other* nodes and pulls the raw blob via
``TransferManager`` (BLAKE3 chunked, content-addressed), then re-ingests it into
the local corpus so the local ``rag.query`` can answer questions about it.
Result: every node ends up with a complete local corpus copy β making
single-best routing (Option A) eventually correct AND making federated
scatter-gather (Option B) redundantly available as a freshness hedge.
Spec: docs/M05-rag.md Β§10 (corpus replication)
docs/X02-events.md Β§3.1 (rag.document.ingested)
docs/M07-file-blobs.md Β§4 (TransferManager)
"""
from __future__ import annotations
import asyncio
import logging
from typing import Any
_log = logging.getLogger(__name__)
# Back-off between retry attempts when fetch fails.
_RETRY_DELAY_SECONDS = 15
class CorpusReplicator:
"""Background task that replicates documents from peer nodes.
Constructor args:
bus β CapabilityBus (used to call rag.ingest locally)
event_log β EventLog (subscribed to rag.document.ingested events)
transfer β TransferManager (fetches BLAKE3 blobs from peers)
peers β PeerRegistry (resolves peer URLs from node_id)
local_node_id β this node's full node ID (to skip own events)
corpus_store_fn β callable(corpus:str) β CorpusStore | None; used to
check has_doc before fetching (optional β saves a
round-trip on duplicates we already have)
"""
def __init__(
self,
bus: Any,
event_log: Any,
transfer: Any,
peers: Any,
local_node_id: str,
corpus_store_fn: Any = None,
) -> None:
self._bus = bus
self._event_log = event_log
self._transfer = transfer
self._peers = peers
self._local_node_id = local_node_id
self._corpus_store_fn = corpus_store_fn
self._task: asyncio.Task | None = None
def start(self) -> asyncio.Task:
"""Create and return the background asyncio Task."""
self._task = asyncio.create_task(self.run(), name="corpus-replicator")
return self._task
def stop(self) -> None:
if self._task and not self._task.done():
self._task.cancel()
async def run(self) -> None:
"""Main event loop β never returns until cancelled."""
_log.info("CorpusReplicator started (local_node=%s)", self._local_node_id[:16])
try:
async for event in self._event_log.subscribe(["rag.document.ingested"]):
asyncio.create_task(self._handle_event(event), name="corpus-repl-event")
except asyncio.CancelledError:
_log.info("CorpusReplicator stopped")
raise
except Exception as exc:
_log.error("CorpusReplicator crashed: %s", exc)
# ------------------------------------------------------------------
# Event handler
# ------------------------------------------------------------------
async def _handle_event(self, event: Any) -> None:
"""Process one rag.document.ingested event from a peer."""
try:
# Only replicate events authored by OTHER nodes.
if getattr(event, "author", None) == self._local_node_id:
return
payload = event.payload or {}
corpus: str = payload.get("corpus", "default")
doc_cid: str | None = payload.get("doc_cid")
blob_cid: str | None = payload.get("blob_cid")
title: str = payload.get("title", "Untitled")
author: str = event.author
if not doc_cid:
return
# Idempotency: skip if we already have this document.
if self._corpus_store_fn is not None:
try:
store = self._corpus_store_fn(corpus)
if store is not None and store.has_doc(doc_cid):
_log.debug(
"replicator: already have doc_cid=%s corpus=%s β skip",
doc_cid[:16],
corpus,
)
return
except Exception:
pass
# If no blob_cid we cannot fetch β log and skip.
if not blob_cid:
_log.debug(
"replicator: event from %s has no blob_cid, cannot fetch doc_cid=%s",
author[:16],
doc_cid[:16] if doc_cid else "?",
)
return
# Resolve peer source URLs for this author node.
sources = self._peer_urls_for(author)
if not sources:
_log.debug("replicator: no reachable peer URL for author %s", author[:16])
return
# Fetch the blob via BLAKE3 chunked transfer (M07 TransferManager).
try:
manifest = await self._transfer.fetch(blob_cid, sources)
raw_bytes = self._transfer.store.get(manifest.cid)
text = raw_bytes.decode("utf-8", errors="replace")
except Exception as exc:
_log.warning(
"replicator: fetch failed blob_cid=%s from %s: %s",
blob_cid[:16] if blob_cid else "?",
sources,
exc,
)
# Retry once after a delay (e.g., peer was momentarily unavailable).
await asyncio.sleep(_RETRY_DELAY_SECONDS)
try:
manifest = await self._transfer.fetch(blob_cid, sources)
raw_bytes = self._transfer.store.get(manifest.cid)
text = raw_bytes.decode("utf-8", errors="replace")
except Exception as exc2:
_log.warning("replicator: retry also failed: %s", exc2)
return
# Re-ingest locally via the bus (goes through the normal pipeline,
# honours has_doc idempotency, emits NO new event because event_log
# is only attached to the original RagService which will see this
# call as local β that's correct; the ingest IS local now).
try:
await self._bus.call(
"rag.ingest",
(1, 0),
{
"input": {
"text": text,
"title": title,
"doc_cid": doc_cid,
"corpus": corpus,
},
"params": {"corpus": corpus},
},
)
_log.info(
"replicator: ingested doc_cid=%s corpus=%s from %s",
doc_cid[:16],
corpus,
author[:16],
)
except Exception as exc:
_log.warning("replicator: ingest failed doc_cid=%s: %s", doc_cid[:16], exc)
except Exception as exc:
_log.warning("replicator: unhandled error in _handle_event: %s", exc)
# ------------------------------------------------------------------
# Peer URL resolution
# ------------------------------------------------------------------
def _peer_urls_for(self, node_id: str) -> list[str]:
"""Return HTTP base URLs for a peer node_id from the PeerRegistry."""
try:
for peer in self._peers.all():
if peer.node_id == node_id or node_id.startswith(peer.node_id):
urls = []
for ep in getattr(peer, "endpoints", []):
transport = getattr(ep, "transport", "")
if transport in ("http", ""):
urls.append(f"http://{ep.host}:{ep.port}")
if urls:
return urls
except Exception:
pass
return []
|