File size: 11,166 Bytes
6f9a5fd
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
# M05 β€” RAG Service

**Spec version:** v1.0
**Depends on:** M03 (bus, for both registration and invoking embed.text), M07 (blobs, for source document storage), X04 (config), X03 (observability), X02 (events, for `rag.document.ingested`), `chromadb`, `pypdf`
**Depended on by:** M08 (UI), other applications that consume retrieved chunks

---

## 1. Responsibility

Implement `rag.query@1.0`, `rag.ingest@1.0`, `rag.list_corpora@1.0`. Maintain per-corpus vector stores. Chunk and embed ingested documents. Store original document blobs via [M07](M07-file-blobs.md).

RAG is **never** the LLM provider β€” answer generation is a separate hop the caller makes after retrieving chunks. This separation is deliberate: it keeps `rag.query` cacheable and reusable.

---

## 2. File layout

```
hearthnet/services/rag/
β”œβ”€β”€ __init__.py
β”œβ”€β”€ service.py          # RagService
β”œβ”€β”€ chunker.py          # text β†’ chunks
β”œβ”€β”€ ingest.py           # document β†’ chunks β†’ embeddings β†’ store
└── store.py            # ChromaDB wrapper, one collection per corpus
```

---

## 3. Public API

### 3.1 `chunker.py`

```python
# hearthnet/services/rag/chunker.py
@dataclass(frozen=True)
class Chunk:
    text:     str
    metadata: dict      # {doc_cid, doc_title, page, chunk_index, language}

def chunk_text(
    text: str,
    *,
    tokens_per_chunk: int = RAG_CHUNK_TOKENS,        # 1000
    overlap_tokens:   int = RAG_CHUNK_OVERLAP_TOKENS, # 200
    metadata: dict | None = None,
) -> list[Chunk]:
    """Split using a sliding window measured in approximate tokens.
    Respects paragraph boundaries where possible; falls back to sentence then word."""

def chunk_pdf(pdf_bytes: bytes, *, doc_metadata: dict) -> list[Chunk]:
    """Extract text per page using pypdf, then chunk_text per page.
    Each chunk carries page number in metadata."""
```

### 3.2 `store.py`

```python
# hearthnet/services/rag/store.py
class CorpusStore:
    """One ChromaDB collection per corpus name."""

    def __init__(self, corpora_dir: Path, corpus: str, embedding_dim: int):
        ...

    def add_chunks(self, chunks: list[Chunk], embeddings: list[list[float]]) -> None: ...
    def has_document(self, doc_cid: str) -> bool: ...
    def query(
        self,
        embedding: list[float],
        *,
        k: int,
        filter: dict | None = None,
    ) -> list[ScoredChunk]: ...
    def count(self) -> int: ...
    def size_bytes(self) -> int: ...
    def language_majority(self) -> str | None: ...

@dataclass(frozen=True)
class ScoredChunk:
    chunk:    Chunk
    score:    float    # similarity, higher = better

def list_corpora(corpora_dir: Path) -> list[str]: ...
def corpus_info(corpora_dir: Path, corpus: str) -> dict: ...
```

### 3.3 `ingest.py`

```python
# hearthnet/services/rag/ingest.py
class IngestPipeline:
    def __init__(
        self,
        bus: CapabilityBus,           # to call embed.text@1.0
        blob_store: BlobStore,        # from M07
        corpora_dir: Path,
        event_log: EventLog,
    ):
        ...

    async def ingest_document(
        self,
        doc_cid: str,
        corpus: str,
        title: str,
        language: str,
        metadata: dict,
        author_kp: KeyPair,
    ) -> IngestResult:
        """1. Fetch blob bytes from blob_store by doc_cid (assumed already stored).
        2. Detect content type (currently: PDF only).
        3. Chunk.
        4. Batch embed via bus.call('embed.text', (1,0), ...).
        5. Write to CorpusStore.
        6. Append rag.document.ingested event via event_log.
        Idempotent on doc_cid: re-ingesting is a no-op (logged, returns existing result)."""

@dataclass(frozen=True)
class IngestResult:
    doc_cid:        str
    chunks_indexed: int
    tokens_indexed: int
    ingest_event_id: str
    ms:             int
```

### 3.4 `service.py`

```python
# hearthnet/services/rag/service.py
class RagService:
    name    = "rag"
    version = "1.0"

    def __init__(
        self,
        config: RagConfig,
        bus: CapabilityBus,
        blob_store: BlobStore,
        event_log: EventLog,
        community_manifest_provider: Callable[[], CommunityManifest],
    ):
        self._stores: dict[str, CorpusStore] = {}
        self._ingest = IngestPipeline(bus, blob_store, config.corpora_dir, event_log)

    def capabilities(self) -> list[tuple[CapabilityDescriptor, Callable, ParamsPredicate]]:
        """Registers one entry per existing corpus for rag.query (params include corpus name).
        rag.ingest registered once (corpus is a request param).
        rag.list_corpora registered once."""

    async def start(self) -> None:
        """Discover existing corpora on disk, open ChromaDB collections."""

    async def stop(self) -> None: ...
    def health(self) -> dict: ...

    # --- handlers ---

    async def handle_query(self, req: RouteRequest) -> dict:
        """CONTRACT Β§4.4.
        1. Embed query via bus.call('embed.text', (1,0), ...).
        2. CorpusStore.query(embedding, k).
        3. Format response."""

    async def handle_ingest(self, req: RouteRequest) -> dict:
        """CONTRACT Β§4.5.
        Checks caller is at least 'trusted'.
        Delegates to IngestPipeline.ingest_document."""

    async def handle_list_corpora(self, req: RouteRequest) -> dict:
        """CONTRACT Β§4.6."""
```

### 3.5 Capability descriptors and predicates

```python
# rag.query: registered per corpus
descriptor_query = CapabilityDescriptor(
    name="rag.query", version=(1, 0), stability="stable",
    request_schema={...}, response_schema={...}, stream_schema=None,
    params={"corpus": "<corpus_name>", "embedding_model": "<model>", "k_max": RAG_MAX_K},
    max_concurrent=4,
    trust_required="member",
    timeout_seconds=10,
    idempotent=True,
)

def query_params_compatible(offered: dict, requested: dict) -> bool:
    return requested.get("corpus") == offered.get("corpus")

# rag.ingest: registered once
descriptor_ingest = CapabilityDescriptor(
    name="rag.ingest", version=(1, 0), stability="stable",
    request_schema={...}, response_schema={...}, stream_schema=None,
    params={"corpora_available": "<list of corpus names>"},
    max_concurrent=2,
    trust_required="trusted",
    timeout_seconds=300,
    idempotent=True,
)
```

---

## 4. Behaviour

### 4.1 Embedding via the bus, not direct import

`RagService` never imports `EmbeddingService`. It uses `bus.call("embed.text", (1, 0), ...)`. Reasons:
- Embeddings might run on another node (e.g. a GPU anchor) while RAG runs on a CPU hearth
- The bus handles load balancing and quarantine automatically
- Keeps the service module dependency graph honest

### 4.2 Corpus naming

- `[a-z0-9-]+` only, max 64 chars
- One corpus per ChromaDB collection
- Two reserved names: `personal` (per-user, NEVER federated) and `system` (read-only, ships with HearthNet)

### 4.3 Ingest idempotency

A `(corpus, doc_cid)` already in the store is a no-op. This makes re-ingestion safe across restarts and gossip re-delivery of `rag.document.ingested` events.

### 4.4 Event log integration

After a successful ingest, append a `rag.document.ingested` event ([X02 Β§3.1](../cross-cutting/X02-events.md), [CONTRACT Β§7.2](../CAPABILITY_CONTRACT.md)). Other nodes seeing this event MAY pre-fetch the blob (via `file.read`) and ingest into their own RAG corpus, depending on their replication policy. (Replication policy is out of scope for MVP; nodes do not auto-replicate.)

### 4.5 Multi-tenant isolation

Each corpus is open in read or read/write mode by the node. The `personal` corpus is local-only and is NEVER routable from other nodes (the service does not register a `rag.query` capability for it).

### 4.6 PDF extraction quality

`pypdf` is OK for digital PDFs. For scanned PDFs, OCR is needed; this is M-Phase-2 (`ocr.*` namespace). Ingest of a scanned PDF without OCR will produce empty chunks; service detects and returns `bad_request` with hint.

### 4.7 Query language detection

Optional: detect query language; pass as metadata filter to the store. MVP: detection skipped; caller's filter is respected.

---

## 5. Composition flow (typical user query)

```
UI β†’ bus.call("llm.chat", ..., body containing user message)
         ↓ (handler in LLM service, but UI may also explicitly call rag.query first)
UI β†’ bus.call("rag.query", (1,0), {params: {corpus: ...}, input: {query: ...}})
         ↓
RagService.handle_query
   β†’ bus.call("embed.text", (1,0), ...)       # may go remote
   β†’ CorpusStore.query β†’ list[ScoredChunk]
   β†’ return chunks with metadata
         ↓
UI builds prompt with chunks + question
UI β†’ bus.call("llm.chat", ..., messages including context)
```

The UI orchestrates this in M08. RAG service does NOT chain into the LLM itself.

---

## 6. Errors

| Condition | Wire code |
|-----------|-----------|
| Unknown corpus on query | `not_found` |
| `k > RAG_MAX_K` | `bad_request` |
| Blob not resolvable on ingest | `not_found` |
| Unsupported MIME type on ingest | `bad_request` |
| Caller not trusted for ingest | `unauthorized` |
| Embedding model unavailable (no embed.text providers) | `partition` (bus quarantine state) |

---

## 7. Configuration

From [X04 Β§3](../cross-cutting/X04-config.md):

```python
config.rag.enabled       # bool
config.rag.corpora_dir   # default <CACHE>/embeddings
```

Constants: `RAG_CHUNK_TOKENS`, `RAG_CHUNK_OVERLAP_TOKENS`, `RAG_DEFAULT_K`, `RAG_MAX_K`.

---

## 8. Tests

### Unit
- `test_chunk_text_respects_paragraph_boundaries`
- `test_chunk_pdf_carries_page_number`
- `test_corpus_store_add_then_query_recovers_chunk`
- `test_ingest_idempotent_on_doc_cid`
- `test_query_handler_calls_embed_via_bus_not_direct_import`
- `test_query_handler_rejects_unknown_corpus`
- `test_personal_corpus_not_registered_as_capability`

### Integration
- `test_demo_corpus_query_returns_relevant_chunks` β€” load the 6 demo PDFs, query, expect top hit
- `test_ingest_then_other_node_sees_event` β€” two-node gossip
- `test_query_falls_back_to_remote_when_local_corpus_missing` β€” two nodes, only one has corpus

---

## 9. Cross-references

| What | Where |
|------|-------|
| `rag.*` wire spec | [CONTRACT Β§4.4–4.6](../CAPABILITY_CONTRACT.md) |
| Service protocol | [M03 Β§4](M03-bus.md) |
| Uses embed.text | [M11](M11-embedding.md) |
| Uses blob store | [M07 Β§3](M07-file-blobs.md) |
| Emits rag.document.ingested | [X02](../cross-cutting/X02-events.md), [CONTRACT Β§7.2](../CAPABILITY_CONTRACT.md) |
| UI query composition | [M08 Β§4](M08-ui.md) |

---

## 10. Open questions

1. **Re-embedding when models change** β€” if the configured embedding model changes, the existing corpora are stale. Decision (MVP): refuse to start with mismatched model; print a `hearthnet rag reindex` hint. Phase 2: auto-reindex.
2. **Federation of corpora** β€” Phase 2: a corpus may be marked "federated" and queries fan out to other communities. Out of scope here.
3. **Reranking** β€” Phase 2: a `rerank.text@1.0` capability could be inserted between embedding and final ranking. Reserved namespace.
4. **Hybrid search** β€” keyword + dense. ChromaDB has limited support. Phase 2.