Spaces:
Running on Zero
A newer version of the Gradio SDK is available: 6.19.0
M07 β File & Blobs
Spec version: v1.0
Depends on: M01 (identity, for signing advertise events), M03 (bus), X01 (transport, for streamed chunk transfer), X02 (events), X04 (config), blake3
Depended on by: M05 (RAG stores source PDFs as blobs), M10 (chat attachments), M08 (UI file browser)
1. Responsibility
Two coupled concerns sharing one module:
- Blob store (
hearthnet.blobs.*): on-disk content-addressed store, chunking, hash verification - File service (
hearthnet.services.file.*): exposesfile.read,file.list,file.advertise,file.putcapabilities to the bus
The split exists because the blob store is also used by M05 (for storing source PDFs) without going through the bus.
2. File layout
hearthnet/blobs/
βββ __init__.py
βββ store.py # BlobStore: filesystem-backed CID store
βββ chunker.py # split / reassemble + BLAKE3
βββ transfer.py # parallel chunk fetch across multiple sources
hearthnet/services/file/
βββ __init__.py
βββ service.py # FileService: registers file.* capabilities
3. Blob store (hearthnet.blobs.*)
3.1 chunker.py
# hearthnet/blobs/chunker.py
from dataclasses import dataclass
@dataclass(frozen=True)
class ChunkRef:
index: int
cid: str # "blake3:<hex>" of this chunk's bytes
size_bytes: int
@dataclass(frozen=True)
class BlobManifest:
cid: str # "blake3:<hex>" β the merkle root, derived from chunks
size_bytes: int
chunk_size_bytes: int # CHUNK_SIZE_BYTES = 262144
chunks: list[ChunkRef]
mime_type: str | None
filename: str | None # advisory only, not part of the CID
def hash_bytes(data: bytes) -> str:
"""BLAKE3 β 'blake3:<hex>'."""
def chunk_blob(data: bytes, *, chunk_size: int = CHUNK_SIZE_BYTES) -> tuple[BlobManifest, list[bytes]]:
"""Split bytes into 256KB chunks. Compute per-chunk CID and the merkle-root CID for the manifest."""
def manifest_cid(manifest: BlobManifest) -> str:
"""Compute the manifest's CID: BLAKE3 over canonical-JSON of {chunks[].cid, size_bytes, chunk_size_bytes}."""
def reassemble(chunks: list[bytes]) -> bytes:
"""Concat chunks in index order. Caller is responsible for verifying each chunk's CID first."""
def verify_chunk(data: bytes, expected_cid: str) -> None:
"""Raises BlobError('hash_mismatch') if data's BLAKE3 != expected_cid."""
3.2 store.py
# hearthnet/blobs/store.py
class BlobStore:
"""Sharded filesystem store at <DATA>/blobs/<aa>/<bb...>.bin"""
def __init__(self, dir_path: Path, gc_threshold: float = BLOB_GC_DISK_THRESHOLD):
...
# -- single-chunk ops --
def has(self, cid: str) -> bool: ...
def read_chunk(self, cid: str) -> bytes:
"""Raises BlobError('not_found') if absent. Verifies hash on read."""
def write_chunk(self, cid: str, data: bytes) -> None:
"""Verifies hash before writing; idempotent if already present."""
def delete_chunk(self, cid: str) -> bool: ...
# -- blob (manifest + chunks) ops --
def has_blob(self, manifest_cid: str) -> bool:
"""True iff the manifest exists AND all referenced chunks exist."""
def read_manifest(self, manifest_cid: str) -> BlobManifest: ...
def write_blob(self, manifest: BlobManifest, chunks: list[bytes]) -> None:
"""Atomically write manifest + chunks. Hash-verifies each chunk."""
def read_blob_bytes(self, manifest_cid: str) -> bytes:
"""Reassemble whole blob into memory. For small blobs only (< 100 MB)."""
async def read_blob_stream(self, manifest_cid: str) -> AsyncIterator[tuple[ChunkRef, bytes]]:
"""Stream chunks for large blobs."""
# -- introspection / GC --
def list_cids(self, prefix: str | None = None) -> list[str]: ...
def total_bytes(self) -> int: ...
def pin(self, cid: str) -> None: ...
def unpin(self, cid: str) -> None: ...
def is_pinned(self, cid: str) -> bool: ...
def gc(self, target_fraction: float = 0.7) -> int:
"""Run LRU eviction of unpinned blobs until disk usage < target_fraction.
Returns bytes freed."""
class BlobError(Exception):
"""code in {'not_found','hash_mismatch','io_error','disk_full','manifest_invalid'}"""
code: str
3.3 transfer.py
# hearthnet/blobs/transfer.py
class TransferManager:
"""Coordinates parallel chunk fetch from multiple peer sources."""
def __init__(self, store: BlobStore, bus: CapabilityBus, concurrency: int = 4):
...
async def fetch_blob(
self,
manifest_cid: str,
*,
sources: list[str] | None = None, # NodeIDs known to hold; if None, ask via bus
) -> BlobManifest:
"""1. Fetch manifest (via bus.call('file.read', ...)).
2. Determine missing chunks.
3. For each missing chunk, pick a source (round-robin, load-aware via bus), fetch via stream.
4. Verify each chunk, write to store.
5. Return manifest once complete.
Resumable: re-running after partial completion skips already-present chunks."""
async def advertise(self, cids: list[str]) -> None:
"""Tell other nodes we hold these CIDs. Emits file.cid.advertised events
and direct file.advertise calls to known peers (best-effort)."""
4. File service (hearthnet.services.file.*)
4.1 service.py
# hearthnet/services/file/service.py
class FileService:
name = "file"
version = "1.0"
def __init__(self, config: FileConfig, store: BlobStore, event_log: EventLog):
...
def capabilities(self) -> list[tuple[CapabilityDescriptor, Callable, ParamsPredicate]]:
"""Registers: file.read, file.list, file.advertise, file.put."""
async def start(self) -> None: ...
async def stop(self) -> None: ...
def health(self) -> dict: ...
# --- handlers ---
async def handle_read(self, req: RouteRequest) -> AsyncIterator[dict] | dict:
"""CONTRACT Β§4.7.
If CID is a chunk β return single chunk JSON (non-stream).
If CID is a manifest β stream: 'manifest' frame, then chunks, then 'done'."""
async def handle_list(self, req: RouteRequest) -> dict:
"""CONTRACT Β§4.8."""
async def handle_advertise(self, req: RouteRequest) -> dict:
"""CONTRACT Β§4.9.
Records the caller as a source for the given CIDs in an in-memory index.
Optionally emits file.cid.advertised event to gossip."""
async def handle_put(self, req: RouteRequest) -> AsyncIterator[dict]:
"""CONTRACT Β§4.10.
Client streams chunks; server verifies and stores.
Requires trust 'trusted' (disk-poisoning risk)."""
4.2 Capability descriptors
descriptor_read = CapabilityDescriptor(
name="file.read", version=(1, 0), stability="stable",
request_schema={...}, response_schema={...},
stream_schema={...}, # manifest + chunk + done frames
params={},
max_concurrent=8,
trust_required="member",
timeout_seconds=300,
idempotent=True,
)
descriptor_list = CapabilityDescriptor(
name="file.list", version=(1, 0), stability="stable",
request_schema={...}, response_schema={...}, stream_schema=None,
params={}, max_concurrent=4,
trust_required="member", timeout_seconds=5, idempotent=True,
)
descriptor_advertise = CapabilityDescriptor(
name="file.advertise", version=(1, 0), stability="stable",
request_schema={...}, response_schema={...}, stream_schema=None,
params={}, max_concurrent=16,
trust_required="member", timeout_seconds=5, idempotent=True,
)
descriptor_put = CapabilityDescriptor(
name="file.put", version=(1, 0), stability="stable",
request_schema={...}, response_schema={...},
stream_schema={...}, # ready + chunk + done
params={}, max_concurrent=2,
trust_required="trusted", timeout_seconds=600, idempotent=True,
)
4.3 params_compatible
All file.* capabilities use the default lambda offered, requested: True. Any node with the capability can serve any CID it has; the bus query later checks has(cid) and returns not_found if absent.
5. Behaviour
5.1 Storage layout on disk
<DATA>/blobs/
βββ ab/
β βββ c123def456...bin # chunk
β βββ c123def456...manifest.json
βββ cd/
β βββ ...
βββ pinned.txt # newline-separated CIDs that must not be GC'd
Sharded by first 2 hex chars of CID (256 directories, ~uniform).
5.2 Hash verification is always-on
Every read and every write verifies the chunk's BLAKE3 against its CID. Cost is small (BLAKE3 is fast); benefit is bit-rot detection and protection from a malicious advertiser sending bad bytes.
5.3 Pinning
- The
personalRAG corpus pins its source documents - Snapshot blobs are pinned by X02
- The community manifest blob is pinned
- User-pinned via UI
Anything else is GC-eligible.
5.4 GC
When disk usage exceeds BLOB_GC_DISK_THRESHOLD (0.80), evict LRU unpinned blobs until below 0.7 Γ disk_capacity. Tracked by file mtime (read updates mtime; common Linux mount option).
5.5 Source discovery
When a node needs a CID it doesn't have:
- Ask the bus:
file.list@1.0withprefix=<short CID>against known peers - Aggregate the responders into a sources list
- Optionally also consult
file.cid.advertisedevents in the log - Pass sources to
TransferManager.fetch_blob
Phase 2: a DHT-like source index. MVP: per-request fan-out.
5.6 Concurrent fetches
TransferManager issues N concurrent chunk fetches across sources. Each chunk request goes via the bus (so health/quarantine work). On per-chunk failure, retry from a different source; after 3 attempts on a chunk, fail the whole blob with partition.
5.7 Backpressure on PUT
When a node receives file.put, it inspects free disk before accepting. If below 1 GB after putative add, refuse with capacity_exceeded.
6. Errors
| Condition | Wire code |
|---|---|
| CID not present | not_found |
| Hash verification failed | bad_request (caller sent corrupted bytes) |
| Out of disk | capacity_exceeded |
| Caller not trusted for put | unauthorized |
| Chunk size mismatch | bad_request |
7. Configuration
From X04 Β§3:
config.file.blobs_dir # default <DATA>/blobs
config.file.gc_threshold # default 0.8
Constants: CHUNK_SIZE_BYTES, BLOB_GC_DISK_THRESHOLD.
8. Tests
Unit
test_chunk_then_reassemble_roundtriptest_manifest_cid_deterministictest_verify_chunk_detects_one_bit_fliptest_store_write_then_read_returns_same_bytestest_store_rejects_wrong_cid_on_writetest_pinning_prevents_gctest_gc_evicts_lru_unpinned
Integration
test_two_node_blob_fetchβ node A has blob, node B requests via bus, succeedstest_three_source_parallel_fetchβ three nodes hold the blob, B fetches in parallel, distribution observedtest_chunk_corruption_falls_over_to_alt_sourceβ one source returns bad chunk; fetcher retries from anothertest_put_requires_trusted_caller
9. Cross-references
| What | Where |
|---|---|
file.* wire |
CONTRACT Β§4.7β4.10 |
| BLAKE3 + CID format | GLOSSARY.md, CONTRACT Β§1.4 |
| Used by RAG ingest | M05 Β§3.3 |
| Used by chat attachments | M10 Β§3.3 |
file.cid.advertised event |
CONTRACT Β§7.2, X02 |
| Snapshot pinning | X02 Β§5 |
10. Open questions
- DHT source index β Phase 2 nice-to-have.
- Background replication β Phase 2: a node may auto-replicate blobs of high "interest" (referenced by recent events). Out of scope MVP.
- Encrypted blobs β for personal corpus, blobs are stored in cleartext on local disk. Add at-rest encryption Phase 2.
- Resumable PUT mid-transfer crash β Phase 1.5: server keeps a partial-transfer index keyed by client_id + manifest_cid; client can resume.