Spaces:
Running on Zero
Running on Zero
GitHub Actions
fix: 0 test failures; FileService; real RagService; emergency probe; chat return
4aaae80 | """File-chunk envelope encryption for HearthNet blobs (M23 / M07 extension).""" | |
| from __future__ import annotations | |
| import hashlib | |
| import hmac | |
| from dataclasses import dataclass | |
| from hearthnet.crypto import CryptoError | |
| try: | |
| import nacl.secret | |
| import nacl.utils | |
| _NACL_AVAILABLE = True | |
| except ImportError: # pragma: no cover | |
| _NACL_AVAILABLE = False | |
| def _require_nacl() -> None: | |
| if not _NACL_AVAILABLE: | |
| raise ImportError( | |
| "PyNaCl is required for envelope encryption. Install it with: pip install pynacl" | |
| ) | |
| # --------------------------------------------------------------------------- | |
| # HKDF helper (local copy to keep this module self-contained) | |
| # --------------------------------------------------------------------------- | |
| def _hkdf_sha256(ikm: bytes, salt: bytes, info: bytes, length: int) -> bytes: | |
| if not salt: | |
| salt = b"\x00" * 32 | |
| prk = hmac.new(salt, ikm, hashlib.sha256).digest() | |
| t = b"" | |
| okm = b"" | |
| i = 0 | |
| while len(okm) < length: | |
| i += 1 | |
| t = hmac.new(prk, t + info + bytes([i]), hashlib.sha256).digest() | |
| okm += t | |
| return okm[:length] | |
| # --------------------------------------------------------------------------- | |
| # EncryptedEnvelope | |
| # --------------------------------------------------------------------------- | |
| class EncryptedEnvelope: | |
| """An encrypted blob chunk with metadata.""" | |
| ciphertext: bytes | |
| nonce: bytes # 24 bytes (XSalsa20 nonce) | |
| key_id: str # identifies which key was used (e.g., recipient node_id or blob CID) | |
| # --------------------------------------------------------------------------- | |
| # Envelope encrypt / decrypt | |
| # --------------------------------------------------------------------------- | |
| def envelope_encrypt(plaintext: bytes, key: bytes) -> EncryptedEnvelope: | |
| """Encrypt plaintext with XSalsa20-Poly1305 using the given 32-byte key.""" | |
| _require_nacl() | |
| if len(key) != nacl.secret.SecretBox.KEY_SIZE: | |
| raise CryptoError(f"Key must be {nacl.secret.SecretBox.KEY_SIZE} bytes, got {len(key)}") | |
| box = nacl.secret.SecretBox(key) | |
| nonce = nacl.utils.random(nacl.secret.SecretBox.NONCE_SIZE) | |
| ciphertext = bytes(box.encrypt(plaintext, nonce).ciphertext) | |
| return EncryptedEnvelope(ciphertext=ciphertext, nonce=nonce, key_id="") | |
| def envelope_decrypt(envelope: EncryptedEnvelope, key: bytes) -> bytes: | |
| """Decrypt an EncryptedEnvelope using the given 32-byte key.""" | |
| _require_nacl() | |
| if len(key) != nacl.secret.SecretBox.KEY_SIZE: | |
| raise CryptoError(f"Key must be {nacl.secret.SecretBox.KEY_SIZE} bytes, got {len(key)}") | |
| box = nacl.secret.SecretBox(key) | |
| try: | |
| return bytes(box.decrypt(envelope.ciphertext, envelope.nonce)) | |
| except Exception as exc: | |
| raise CryptoError(f"Envelope decryption failed: {exc}") from exc | |
| # --------------------------------------------------------------------------- | |
| # Per-recipient key derivation | |
| # --------------------------------------------------------------------------- | |
| def per_recipient_key(shared_secret: bytes, recipient_id: str, blob_cid: str) -> bytes: | |
| """Derive a 32-byte per-recipient encryption key via HKDF-SHA256. | |
| Binds the key to the recipient identity and the specific blob CID so that | |
| a key derived for one blob cannot decrypt another. | |
| """ | |
| info = f"HearthNet_BlobKey_v1:{recipient_id}:{blob_cid}".encode() | |
| return _hkdf_sha256(shared_secret, salt=b"HearthNet_envelope", info=info, length=32) | |