File size: 8,314 Bytes
6c38d43
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
3f78ea8
6c38d43
 
 
78cc96f
6c38d43
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
78cc96f
6c38d43
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
78cc96f
6c38d43
 
 
 
 
 
 
 
 
 
 
 
78cc96f
6c38d43
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
78cc96f
6c38d43
 
78cc96f
6c38d43
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
78cc96f
6c38d43
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
"""Phase 2 corpus replication β€” BitTorrent-style knowledge propagation.

When any peer ingests a document, it emits a ``rag.document.ingested`` event
that eventually arrives at every node via gossip (X02 event sync).  This module
listens for those events from *other* nodes and pulls the raw blob via
``TransferManager`` (BLAKE3 chunked, content-addressed), then re-ingests it into
the local corpus so the local ``rag.query`` can answer questions about it.

Result: every node ends up with a complete local corpus copy β€” making
single-best routing (Option A) eventually correct AND making federated
scatter-gather (Option B) redundantly available as a freshness hedge.

Spec:  docs/M05-rag.md Β§10 (corpus replication)
       docs/X02-events.md Β§3.1 (rag.document.ingested)
       docs/M07-file-blobs.md Β§4 (TransferManager)
"""

from __future__ import annotations

import asyncio
import logging
from typing import Any

_log = logging.getLogger(__name__)

# Back-off between retry attempts when fetch fails.
_RETRY_DELAY_SECONDS = 15


class CorpusReplicator:
    """Background task that replicates documents from peer nodes.

    Constructor args:
        bus            β€” CapabilityBus (used to call rag.ingest locally)
        event_log      β€” EventLog (subscribed to rag.document.ingested events)
        transfer       β€” TransferManager (fetches BLAKE3 blobs from peers)
        peers          β€” PeerRegistry (resolves peer URLs from node_id)
        local_node_id  β€” this node's full node ID (to skip own events)
        corpus_store_fn β€” callable(corpus:str) β†’ CorpusStore | None; used to
                          check has_doc before fetching (optional β€” saves a
                          round-trip on duplicates we already have)
    """

    def __init__(
        self,
        bus: Any,
        event_log: Any,
        transfer: Any,
        peers: Any,
        local_node_id: str,
        corpus_store_fn: Any = None,
    ) -> None:
        self._bus = bus
        self._event_log = event_log
        self._transfer = transfer
        self._peers = peers
        self._local_node_id = local_node_id
        self._corpus_store_fn = corpus_store_fn
        self._task: asyncio.Task | None = None

    def start(self) -> asyncio.Task:
        """Create and return the background asyncio Task."""
        self._task = asyncio.create_task(self.run(), name="corpus-replicator")
        return self._task

    def stop(self) -> None:
        if self._task and not self._task.done():
            self._task.cancel()

    async def run(self) -> None:
        """Main event loop β€” never returns until cancelled."""
        _log.info("CorpusReplicator started (local_node=%s)", self._local_node_id[:16])
        try:
            async for event in self._event_log.subscribe(["rag.document.ingested"]):
                asyncio.create_task(self._handle_event(event), name="corpus-repl-event")
        except asyncio.CancelledError:
            _log.info("CorpusReplicator stopped")
            raise
        except Exception as exc:
            _log.error("CorpusReplicator crashed: %s", exc)

    # ------------------------------------------------------------------
    # Event handler
    # ------------------------------------------------------------------

    async def _handle_event(self, event: Any) -> None:
        """Process one rag.document.ingested event from a peer."""
        try:
            # Only replicate events authored by OTHER nodes.
            if getattr(event, "author", None) == self._local_node_id:
                return

            payload = event.payload or {}
            corpus: str = payload.get("corpus", "default")
            doc_cid: str | None = payload.get("doc_cid")
            blob_cid: str | None = payload.get("blob_cid")
            title: str = payload.get("title", "Untitled")
            author: str = event.author

            if not doc_cid:
                return

            # Idempotency: skip if we already have this document.
            if self._corpus_store_fn is not None:
                try:
                    store = self._corpus_store_fn(corpus)
                    if store is not None and store.has_doc(doc_cid):
                        _log.debug(
                            "replicator: already have doc_cid=%s corpus=%s β€” skip",
                            doc_cid[:16],
                            corpus,
                        )
                        return
                except Exception:
                    pass

            # If no blob_cid we cannot fetch β€” log and skip.
            if not blob_cid:
                _log.debug(
                    "replicator: event from %s has no blob_cid, cannot fetch doc_cid=%s",
                    author[:16],
                    doc_cid[:16] if doc_cid else "?",
                )
                return

            # Resolve peer source URLs for this author node.
            sources = self._peer_urls_for(author)
            if not sources:
                _log.debug("replicator: no reachable peer URL for author %s", author[:16])
                return

            # Fetch the blob via BLAKE3 chunked transfer (M07 TransferManager).
            try:
                manifest = await self._transfer.fetch(blob_cid, sources)
                raw_bytes = self._transfer.store.get(manifest.cid)
                text = raw_bytes.decode("utf-8", errors="replace")
            except Exception as exc:
                _log.warning(
                    "replicator: fetch failed blob_cid=%s from %s: %s",
                    blob_cid[:16] if blob_cid else "?",
                    sources,
                    exc,
                )
                # Retry once after a delay (e.g., peer was momentarily unavailable).
                await asyncio.sleep(_RETRY_DELAY_SECONDS)
                try:
                    manifest = await self._transfer.fetch(blob_cid, sources)
                    raw_bytes = self._transfer.store.get(manifest.cid)
                    text = raw_bytes.decode("utf-8", errors="replace")
                except Exception as exc2:
                    _log.warning("replicator: retry also failed: %s", exc2)
                    return

            # Re-ingest locally via the bus (goes through the normal pipeline,
            # honours has_doc idempotency, emits NO new event because event_log
            # is only attached to the original RagService which will see this
            # call as local β€” that's correct; the ingest IS local now).
            try:
                await self._bus.call(
                    "rag.ingest",
                    (1, 0),
                    {
                        "input": {
                            "text": text,
                            "title": title,
                            "doc_cid": doc_cid,
                            "corpus": corpus,
                        },
                        "params": {"corpus": corpus},
                    },
                )
                _log.info(
                    "replicator: ingested doc_cid=%s corpus=%s from %s",
                    doc_cid[:16],
                    corpus,
                    author[:16],
                )
            except Exception as exc:
                _log.warning("replicator: ingest failed doc_cid=%s: %s", doc_cid[:16], exc)

        except Exception as exc:
            _log.warning("replicator: unhandled error in _handle_event: %s", exc)

    # ------------------------------------------------------------------
    # Peer URL resolution
    # ------------------------------------------------------------------

    def _peer_urls_for(self, node_id: str) -> list[str]:
        """Return HTTP base URLs for a peer node_id from the PeerRegistry."""
        try:
            for peer in self._peers.all():
                if peer.node_id == node_id or node_id.startswith(peer.node_id):
                    urls = []
                    for ep in getattr(peer, "endpoints", []):
                        transport = getattr(ep, "transport", "")
                        if transport in ("http", ""):
                            urls.append(f"http://{ep.host}:{ep.port}")
                    if urls:
                        return urls
        except Exception:
            pass
        return []