File size: 2,746 Bytes
31c93b1
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
c91b229
31c93b1
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
from __future__ import annotations

import hashlib
import json
from dataclasses import dataclass

CHUNK_SIZE_BYTES = 256 * 1024  # 256 KB


class BlobError(Exception):
    def __init__(self, code: str, message: str = "") -> None:
        super().__init__(message or code)
        self.code = code


@dataclass(frozen=True)
class ChunkRef:
    index: int
    cid: str  # "blake3:<hex>" or "sha256:<hex>"
    size_bytes: int


@dataclass(frozen=True)
class BlobManifest:
    cid: str  # merkle root CID
    size_bytes: int
    chunk_size_bytes: int
    chunks: list[ChunkRef]
    filename: str | None  # advisory only


def hash_bytes(data: bytes) -> str:
    """Hash with BLAKE3 if available, else SHA256. Returns 'blake3:<hex>' or 'sha256:<hex>'."""
    try:
        import blake3

        return "blake3:" + blake3.blake3(data).hexdigest()
    except ImportError:
        return "sha256:" + hashlib.sha256(data).hexdigest()


def chunk_blob(
    data: bytes, *, chunk_size: int = CHUNK_SIZE_BYTES
) -> tuple[BlobManifest, list[bytes]]:
    """Split data into chunks. Compute per-chunk CID and merkle-root CID."""
    chunks_data: list[bytes] = []
    chunk_refs: list[ChunkRef] = []

    offset = 0
    index = 0
    while offset < len(data) or index == 0:
        piece = data[offset : offset + chunk_size]
        cid = hash_bytes(piece)
        chunk_refs.append(ChunkRef(index=index, cid=cid, size_bytes=len(piece)))
        chunks_data.append(piece)
        offset += chunk_size
        index += 1
        if offset >= len(data):
            break

    merkle_root = hash_bytes(b"\n".join(sorted(c.cid.encode() for c in chunk_refs)))

    manifest = BlobManifest(
        cid=merkle_root,
        size_bytes=len(data),
        chunk_size_bytes=chunk_size,
        chunks=chunk_refs,
        filename=None,
    )
    return manifest, chunks_data


def manifest_cid(manifest: BlobManifest) -> str:
    """CID of canonical JSON of {chunks: [{cid,size_bytes}], size_bytes, chunk_size_bytes}."""
    payload = {
        "chunk_size_bytes": manifest.chunk_size_bytes,
        "chunks": [{"cid": c.cid, "size_bytes": c.size_bytes} for c in manifest.chunks],
        "size_bytes": manifest.size_bytes,
    }
    raw = json.dumps(payload, sort_keys=True, separators=(",", ":"), ensure_ascii=False).encode(
        "utf-8"
    )
    return hash_bytes(raw)


def reassemble(chunks: list[bytes]) -> bytes:
    """Concat chunks in index order."""
    return b"".join(chunks)


def verify_chunk(data: bytes, expected_cid: str) -> None:
    """Raise BlobError('hash_mismatch') if hash(data) != expected_cid."""
    actual = hash_bytes(data)
    if actual != expected_cid:
        raise BlobError("hash_mismatch", f"Expected {expected_cid}, got {actual}")