GitHub Actions
fix: 0 test failures; FileService; real RagService; emergency probe; chat return
4aaae80
Raw
History Blame
3.52 kB
"""File-chunk envelope encryption for HearthNet blobs (M23 / M07 extension)."""
from __future__ import annotations
import hashlib
import hmac
from dataclasses import dataclass
from hearthnet.crypto import CryptoError
try:
import nacl.secret
import nacl.utils
_NACL_AVAILABLE = True
except ImportError: # pragma: no cover
_NACL_AVAILABLE = False
def _require_nacl() -> None:
if not _NACL_AVAILABLE:
raise ImportError(
"PyNaCl is required for envelope encryption. Install it with: pip install pynacl"
)
# ---------------------------------------------------------------------------
# HKDF helper (local copy to keep this module self-contained)
# ---------------------------------------------------------------------------
def _hkdf_sha256(ikm: bytes, salt: bytes, info: bytes, length: int) -> bytes:
if not salt:
salt = b"\x00" * 32
prk = hmac.new(salt, ikm, hashlib.sha256).digest()
t = b""
okm = b""
i = 0
while len(okm) < length:
i += 1
t = hmac.new(prk, t + info + bytes([i]), hashlib.sha256).digest()
okm += t
return okm[:length]
# ---------------------------------------------------------------------------
# EncryptedEnvelope
# ---------------------------------------------------------------------------
@dataclass(frozen=True)
class EncryptedEnvelope:
"""An encrypted blob chunk with metadata."""
ciphertext: bytes
nonce: bytes # 24 bytes (XSalsa20 nonce)
key_id: str # identifies which key was used (e.g., recipient node_id or blob CID)
# ---------------------------------------------------------------------------
# Envelope encrypt / decrypt
# ---------------------------------------------------------------------------
def envelope_encrypt(plaintext: bytes, key: bytes) -> EncryptedEnvelope:
"""Encrypt plaintext with XSalsa20-Poly1305 using the given 32-byte key."""
_require_nacl()
if len(key) != nacl.secret.SecretBox.KEY_SIZE:
raise CryptoError(f"Key must be {nacl.secret.SecretBox.KEY_SIZE} bytes, got {len(key)}")
box = nacl.secret.SecretBox(key)
nonce = nacl.utils.random(nacl.secret.SecretBox.NONCE_SIZE)
ciphertext = bytes(box.encrypt(plaintext, nonce).ciphertext)
return EncryptedEnvelope(ciphertext=ciphertext, nonce=nonce, key_id="")
def envelope_decrypt(envelope: EncryptedEnvelope, key: bytes) -> bytes:
"""Decrypt an EncryptedEnvelope using the given 32-byte key."""
_require_nacl()
if len(key) != nacl.secret.SecretBox.KEY_SIZE:
raise CryptoError(f"Key must be {nacl.secret.SecretBox.KEY_SIZE} bytes, got {len(key)}")
box = nacl.secret.SecretBox(key)
try:
return bytes(box.decrypt(envelope.ciphertext, envelope.nonce))
except Exception as exc:
raise CryptoError(f"Envelope decryption failed: {exc}") from exc
# ---------------------------------------------------------------------------
# Per-recipient key derivation
# ---------------------------------------------------------------------------
def per_recipient_key(shared_secret: bytes, recipient_id: str, blob_cid: str) -> bytes:
"""Derive a 32-byte per-recipient encryption key via HKDF-SHA256.
Binds the key to the recipient identity and the specific blob CID so that
a key derived for one blob cannot decrypt another.
"""
info = f"HearthNet_BlobKey_v1:{recipient_id}:{blob_cid}".encode()
return _hkdf_sha256(shared_secret, salt=b"HearthNet_envelope", info=info, length=32)