Spaces:
Running on Zero
Running on Zero
GitHub Actions
Quality improvements: Unicode chars, Token class, imports, type hints, formatting
3f78ea8 | """Phase 2 corpus replication — BitTorrent-style knowledge propagation. | |
| When any peer ingests a document, it emits a ``rag.document.ingested`` event | |
| that eventually arrives at every node via gossip (X02 event sync). This module | |
| listens for those events from *other* nodes and pulls the raw blob via | |
| ``TransferManager`` (BLAKE3 chunked, content-addressed), then re-ingests it into | |
| the local corpus so the local ``rag.query`` can answer questions about it. | |
| Result: every node ends up with a complete local corpus copy — making | |
| single-best routing (Option A) eventually correct AND making federated | |
| scatter-gather (Option B) redundantly available as a freshness hedge. | |
| Spec: docs/M05-rag.md §10 (corpus replication) | |
| docs/X02-events.md §3.1 (rag.document.ingested) | |
| docs/M07-file-blobs.md §4 (TransferManager) | |
| """ | |
| from __future__ import annotations | |
| import asyncio | |
| import logging | |
| from typing import Any | |
| _log = logging.getLogger(__name__) | |
| # Back-off between retry attempts when fetch fails. | |
| _RETRY_DELAY_SECONDS = 15 | |
| class CorpusReplicator: | |
| """Background task that replicates documents from peer nodes. | |
| Constructor args: | |
| bus — CapabilityBus (used to call rag.ingest locally) | |
| event_log — EventLog (subscribed to rag.document.ingested events) | |
| transfer — TransferManager (fetches BLAKE3 blobs from peers) | |
| peers — PeerRegistry (resolves peer URLs from node_id) | |
| local_node_id — this node's full node ID (to skip own events) | |
| corpus_store_fn — callable(corpus:str) → CorpusStore | None; used to | |
| check has_doc before fetching (optional — saves a | |
| round-trip on duplicates we already have) | |
| """ | |
| def __init__( | |
| self, | |
| bus: Any, | |
| event_log: Any, | |
| transfer: Any, | |
| peers: Any, | |
| local_node_id: str, | |
| corpus_store_fn: Any = None, | |
| ) -> None: | |
| self._bus = bus | |
| self._event_log = event_log | |
| self._transfer = transfer | |
| self._peers = peers | |
| self._local_node_id = local_node_id | |
| self._corpus_store_fn = corpus_store_fn | |
| self._task: asyncio.Task | None = None | |
| def start(self) -> asyncio.Task: | |
| """Create and return the background asyncio Task.""" | |
| self._task = asyncio.create_task(self.run(), name="corpus-replicator") | |
| return self._task | |
| def stop(self) -> None: | |
| if self._task and not self._task.done(): | |
| self._task.cancel() | |
| async def run(self) -> None: | |
| """Main event loop — never returns until cancelled.""" | |
| _log.info("CorpusReplicator started (local_node=%s)", self._local_node_id[:16]) | |
| try: | |
| async for event in self._event_log.subscribe(["rag.document.ingested"]): | |
| asyncio.create_task(self._handle_event(event), name="corpus-repl-event") | |
| except asyncio.CancelledError: | |
| _log.info("CorpusReplicator stopped") | |
| raise | |
| except Exception as exc: | |
| _log.error("CorpusReplicator crashed: %s", exc) | |
| # ------------------------------------------------------------------ | |
| # Event handler | |
| # ------------------------------------------------------------------ | |
| async def _handle_event(self, event: Any) -> None: | |
| """Process one rag.document.ingested event from a peer.""" | |
| try: | |
| # Only replicate events authored by OTHER nodes. | |
| if getattr(event, "author", None) == self._local_node_id: | |
| return | |
| payload = event.payload or {} | |
| corpus: str = payload.get("corpus", "default") | |
| doc_cid: str | None = payload.get("doc_cid") | |
| blob_cid: str | None = payload.get("blob_cid") | |
| title: str = payload.get("title", "Untitled") | |
| author: str = event.author | |
| if not doc_cid: | |
| return | |
| # Idempotency: skip if we already have this document. | |
| if self._corpus_store_fn is not None: | |
| try: | |
| store = self._corpus_store_fn(corpus) | |
| if store is not None and store.has_doc(doc_cid): | |
| _log.debug( | |
| "replicator: already have doc_cid=%s corpus=%s — skip", | |
| doc_cid[:16], | |
| corpus, | |
| ) | |
| return | |
| except Exception: | |
| pass | |
| # If no blob_cid we cannot fetch — log and skip. | |
| if not blob_cid: | |
| _log.debug( | |
| "replicator: event from %s has no blob_cid, cannot fetch doc_cid=%s", | |
| author[:16], | |
| doc_cid[:16] if doc_cid else "?", | |
| ) | |
| return | |
| # Resolve peer source URLs for this author node. | |
| sources = self._peer_urls_for(author) | |
| if not sources: | |
| _log.debug("replicator: no reachable peer URL for author %s", author[:16]) | |
| return | |
| # Fetch the blob via BLAKE3 chunked transfer (M07 TransferManager). | |
| try: | |
| manifest = await self._transfer.fetch(blob_cid, sources) | |
| raw_bytes = self._transfer.store.get(manifest.cid) | |
| text = raw_bytes.decode("utf-8", errors="replace") | |
| except Exception as exc: | |
| _log.warning( | |
| "replicator: fetch failed blob_cid=%s from %s: %s", | |
| blob_cid[:16] if blob_cid else "?", | |
| sources, | |
| exc, | |
| ) | |
| # Retry once after a delay (e.g., peer was momentarily unavailable). | |
| await asyncio.sleep(_RETRY_DELAY_SECONDS) | |
| try: | |
| manifest = await self._transfer.fetch(blob_cid, sources) | |
| raw_bytes = self._transfer.store.get(manifest.cid) | |
| text = raw_bytes.decode("utf-8", errors="replace") | |
| except Exception as exc2: | |
| _log.warning("replicator: retry also failed: %s", exc2) | |
| return | |
| # Re-ingest locally via the bus (goes through the normal pipeline, | |
| # honours has_doc idempotency, emits NO new event because event_log | |
| # is only attached to the original RagService which will see this | |
| # call as local — that's correct; the ingest IS local now). | |
| try: | |
| await self._bus.call( | |
| "rag.ingest", | |
| (1, 0), | |
| { | |
| "input": { | |
| "text": text, | |
| "title": title, | |
| "doc_cid": doc_cid, | |
| "corpus": corpus, | |
| }, | |
| "params": {"corpus": corpus}, | |
| }, | |
| ) | |
| _log.info( | |
| "replicator: ingested doc_cid=%s corpus=%s from %s", | |
| doc_cid[:16], | |
| corpus, | |
| author[:16], | |
| ) | |
| except Exception as exc: | |
| _log.warning("replicator: ingest failed doc_cid=%s: %s", doc_cid[:16], exc) | |
| except Exception as exc: | |
| _log.warning("replicator: unhandled error in _handle_event: %s", exc) | |
| # ------------------------------------------------------------------ | |
| # Peer URL resolution | |
| # ------------------------------------------------------------------ | |
| def _peer_urls_for(self, node_id: str) -> list[str]: | |
| """Return HTTP base URLs for a peer node_id from the PeerRegistry.""" | |
| try: | |
| for peer in self._peers.all(): | |
| if peer.node_id == node_id or node_id.startswith(peer.node_id): | |
| urls = [] | |
| for ep in getattr(peer, "endpoints", []): | |
| transport = getattr(ep, "transport", "") | |
| if transport in ("http", ""): | |
| urls.append(f"http://{ep.host}:{ep.port}") | |
| if urls: | |
| return urls | |
| except Exception: | |
| pass | |
| return [] | |