HearthNet-Nemotron / tests /test_phase2_modules.py
GitHub Actions
feat: Phase 2 (M14-M25, X05-X07) + Phase 3 experimental (M26-M31) + E2E tests + docs
4cd8837
Raw
History Blame
16.6 kB
"""Phase 2 functional tests — M14-M25, X05-X07.
Covers:
- M14 Federation manifests and peering
- M16 Capability tokens
- M17 OCR service (backend health, graceful unavailable)
- M18 Translation service
- M19 STT/TTS services
- M20 Vision/Image services
- M21 Tool calls (ToolExecutor)
- M23 E2E encryption (X3DH + ratchet + envelope)
- M24 Reranking service
- M25 Group chat threads
- X05 DHT Kademlia node
- X06 WebSocket pubsub
- X07 Federated metrics
- Relay client (M15)
"""
from __future__ import annotations
from hearthnet.bus.capability import RouteRequest
def _req(body: dict, cap: str = "test", caller: str = "test") -> RouteRequest:
"""Wrap a body dict as a RouteRequest for direct service handler calls."""
return RouteRequest(
capability=cap, version_req="1.0", body=body,
caller=caller, trace_id="test-trace",
)
import asyncio
import base64
import time
import nacl.signing
import pytest
from hearthnet.identity.keys import KeyPair, full_node_id, generate
# ---------------------------------------------------------------------------
# Helpers
# ---------------------------------------------------------------------------
def _make_keypair() -> KeyPair:
return generate()
# ===========================================================================
# M23 — E2E Encryption
# ===========================================================================
def test_e2e_x3dh_and_ratchet():
"""X3DH key agreement + Double Ratchet encrypt/decrypt round-trip."""
from hearthnet.crypto.kem import (
build_prekey_bundle,
derive_identity_x25519_from_ed25519,
x25519_generate,
x3dh_initiator,
x3dh_responder,
)
from hearthnet.crypto.ratchet import decrypt, encrypt, init_from_shared_secret
kp_a = _make_keypair()
kp_b = _make_keypair()
bundle_b, spk_b, otp_b = build_prekey_bundle(kp_b, num_one_time=3)
eph_a = x25519_generate()
id_x_a = derive_identity_x25519_from_ed25519(kp_a)
id_x_b = derive_identity_x25519_from_ed25519(kp_b)
ss_init, init_msg = x3dh_initiator(id_x_a, eph_a, bundle_b)
def _b64d(s: str) -> bytes:
pad = 4 - len(s) % 4
return base64.urlsafe_b64decode(s + "=" * (pad % 4))
otp_used = otp_b[init_msg["used_otp_index"]] if init_msg["used_otp_index"] is not None else None
ss_resp = x3dh_responder(
id_x_b, spk_b, otp_used,
_b64d(init_msg["ephemeral_pub"]),
_b64d(init_msg["identity_pub"]),
)
assert ss_init == ss_resp, "X3DH shared secrets must match"
# Double Ratchet
s_alice = init_from_shared_secret(ss_init, is_initiator=True)
s_bob = init_from_shared_secret(ss_resp, is_initiator=False)
messages = [b"hello", b"world"] + [b"test message " + str(i).encode() for i in range(5)]
for i, msg in enumerate(messages):
ct, hdr = encrypt(s_alice, msg)
pt = decrypt(s_bob, ct, hdr)
assert pt == msg, f"Message {i} decryption mismatch"
def test_e2e_envelope():
"""File chunk envelope encrypt/decrypt."""
from hearthnet.crypto.envelope import envelope_decrypt, envelope_encrypt, per_recipient_key
shared = b"\xab" * 32
key = per_recipient_key(shared, "bob_node", "blake3:abc123")
assert len(key) == 32
plaintext = b"This is a test blob chunk."
env = envelope_encrypt(plaintext, key)
assert env.ciphertext != plaintext
assert envelope_decrypt(env, key) == plaintext
def test_e2e_prekeys():
"""PrekeyStore stores, loads, and consumes one-time prekeys."""
from hearthnet.crypto.kem import build_prekey_bundle
from hearthnet.crypto.prekeys import PrekeyStore
kp = _make_keypair()
bundle, spk, otp_list = build_prekey_bundle(kp, num_one_time=4)
store = PrekeyStore()
store.store_bundle(bundle, spk, otp_list)
loaded_bundle, _ = store.load_bundle()
assert len(loaded_bundle.one_time_prekeys) == 4
# Consume one — should succeed and then be gone
kp_otp = store.consume_one_time_prekey(loaded_bundle.one_time_prekeys[0])
assert kp_otp is not None
assert store.consume_one_time_prekey(loaded_bundle.one_time_prekeys[0]) is None
# ===========================================================================
# M16 — Capability Tokens
# ===========================================================================
def test_token_issue_verify():
"""Issue a token, decode it, and verify it."""
from hearthnet.identity.tokens import TokenScope, decode_token, issue_token, verify_token
kp = _make_keypair()
scope = TokenScope(capabilities=["rag.query@1.0", "embed.text@1.0"])
tok, encoded = issue_token(kp, "some-node-id", "", scope, ttl_seconds=3600)
assert encoded.startswith("hntoken://v1/")
decoded = decode_token(encoded)
assert decoded.jti == tok.jti
assert "rag.query@1.0" in decoded.scope.capabilities
# Should not raise
verify_token(decoded)
def test_token_expired():
"""Verify that an expired token raises TokenError."""
from hearthnet.identity.tokens import TokenScope, TokenError, decode_token, issue_token, verify_token
kp = _make_keypair()
scope = TokenScope(capabilities=["llm.chat@1.0"])
_, encoded = issue_token(kp, "node-x", "", scope, ttl_seconds=-1)
decoded = decode_token(encoded)
with pytest.raises((ValueError, TokenError)):
verify_token(decoded, now=time.time() + 100)
def test_auth_service():
"""AuthService issues and verifies tokens via bus capabilities."""
from hearthnet.services.auth.service import AuthService
kp = _make_keypair()
svc = AuthService(keypair=kp)
result = svc._handle_issue({
"subject": "node-abc",
"audience": "",
"capabilities": ["rag.query@1.0"],
"ttl_seconds": 3600,
"issued_via": "manual",
})
assert "token" in result
assert result["token"].startswith("hntoken://v1/")
verify_result = svc._handle_verify({"token": result["token"]})
assert verify_result["valid"] is True
assert "rag.query@1.0" in verify_result["capabilities"]
# ===========================================================================
# M14 — Federation
# ===========================================================================
def test_federation_manifest_build():
"""Build, co-sign, and finalize a federation manifest."""
from hearthnet.federation.manifest import (
FederationManifest,
FederationScope,
build_federation_proposal,
co_sign_federation,
finalize_federation_manifest,
)
kp_a = _make_keypair()
kp_b = _make_keypair()
class MockManifest:
community_id = "comm-alpha"
community_name = "Alpha"
scope_a = FederationScope(capabilities=["rag.query@1.0"], data_visibility="public_corpora_only")
scope_b = FederationScope(capabilities=["embed.text@1.0"], data_visibility="members_only")
proposal = build_federation_proposal(
MockManifest(), kp_a,
their_community_id="comm-beta",
their_community_name="Beta",
scope_we_grant=scope_a,
scope_they_grant=scope_b,
bootstrap_endpoints=["http://alpha:7080"],
)
assert proposal.community_a == "comm-alpha"
assert proposal.community_b == "comm-beta"
co_sig = co_sign_federation(proposal, kp_b, role="anchor_b")
assert "signature" in co_sig
manifest = finalize_federation_manifest(
proposal, proposal.proposer_sig, co_sig["signature"], "Alpha", "Beta"
)
assert isinstance(manifest, FederationManifest)
assert manifest.community_a_id == "comm-alpha"
assert manifest.community_b_id == "comm-beta"
assert not manifest.is_expired()
def test_federation_service():
"""FederationService lists empty peers at startup."""
from hearthnet.federation.service import FederationService
kp = _make_keypair()
svc = FederationService(keypair=kp)
result = svc._handle_list({})
assert "peers" in result
assert isinstance(result["peers"], list)
# ===========================================================================
# M17 — OCR Service
# ===========================================================================
def test_ocr_service_health():
"""OcrService initialises and reports health (backends may be unavailable)."""
from hearthnet.services.ocr.service import OcrService
svc = OcrService()
# OcrService exposes backends list; check it's iterable
assert hasattr(svc, '_backends')
@pytest.mark.asyncio
async def test_ocr_service_unavailable_graceful():
"""OCR call with no installed backends returns error dict, not exception."""
from hearthnet.services.ocr.service import OcrService
import inspect
svc = OcrService(backends=[]) # no backends
result = await svc._handle_image({
"image_b64": base64.b64encode(b"fake image").decode(),
"languages": ["de"],
})
assert "error" in result
# ===========================================================================
# M18 — Translation Service
# ===========================================================================
def test_translation_service_health():
"""TranslationService initialises cleanly."""
from hearthnet.services.translation.service import TranslationService
svc = TranslationService()
assert hasattr(svc, '_backends')
@pytest.mark.asyncio
async def test_translation_too_long():
"""Text over 4000 chars returns bad_request."""
from hearthnet.services.translation.service import TranslationService
svc = TranslationService(backends=[])
result = await svc._handle_translate({
"text": "x" * 5000,
"to_lang": "en",
})
assert result.get("error") == "bad_request"
# ===========================================================================
# M19 — STT / TTS Services
# ===========================================================================
def test_stt_service_health():
from hearthnet.services.speech.stt_service import SttService
svc = SttService()
assert hasattr(svc, '_backends')
def test_tts_service_health():
from hearthnet.services.speech.tts_service import TtsService
svc = TtsService()
assert hasattr(svc, '_backends')
# ===========================================================================
# M20 — Vision Services
# ===========================================================================
def test_image_describe_service_health():
from hearthnet.services.image.describe_service import ImageDescribeService
svc = ImageDescribeService()
health = svc.health()
assert isinstance(health, dict)
@pytest.mark.asyncio
async def test_image_generate_service_unavailable():
from hearthnet.services.image.generate_service import ImageGenerateService
svc = ImageGenerateService(backends=[])
result = await svc.generate({"prompt": "a cat"})
assert "error" in result
# ===========================================================================
# M24 — Reranking Service
# ===========================================================================
def test_rerank_service_health():
from hearthnet.services.rerank.service import RerankService
svc = RerankService()
health = svc.health()
assert isinstance(health, dict)
@pytest.mark.asyncio
async def test_rerank_too_many_docs():
"""Over RERANK_MAX_DOCS returns bad_request."""
from hearthnet.services.rerank.service import RerankService
from hearthnet.constants import RERANK_MAX_DOCS
svc = RerankService(backends=[])
docs = [{"id": str(i), "text": f"doc {i}"} for i in range(RERANK_MAX_DOCS + 1)]
result = await svc.rerank_text(_req({"query": "test", "docs": docs}))
assert result.get("error") == "bad_request"
# ===========================================================================
# M25 — Group Chat Threads
# ===========================================================================
@pytest.mark.asyncio
async def test_group_chat_create_and_send():
"""Create a thread and send a message."""
from hearthnet.services.chat.thread_service import ThreadService
svc = ThreadService(node_id="node-alice")
# ThreadService (and other generated services) expect params under req.body["input"]
create_result = await svc.create_thread(_req({
"input": {"name": "Planning", "members": ["node-alice", "node-bob"], "e2e_enabled": False}
}))
out = create_result.get("output", create_result)
assert "thread_id" in out
tid = out["thread_id"]
send_result = await svc.send_message(_req({
"input": {"thread_id": tid, "content": "Hello group!"}
}))
send_out = send_result.get("output", send_result)
assert "event_id" in send_out
history_result = await svc.get_history(_req({"input": {"thread_id": tid}}))
history_out = history_result.get("output", history_result)
msgs = history_out.get("messages", [])
assert len(msgs) == 1
assert msgs[0]["content"] == "Hello group!"
# ===========================================================================
# X05 — DHT Kademlia
# ===========================================================================
def test_dht_kademlia_store_find():
"""KademliaNode stores and retrieves values, computes XOR distances."""
from hearthnet.dht.kademlia import KademliaNode, DhtContact
import hashlib, time
node = KademliaNode(node_id="test-node-001")
key = hashlib.sha256(b"community:alpha").digest()
node.store(key, {"endpoint": "http://alpha:7080"}, ttl=3600)
val = node.find_value(key)
assert val is not None
assert val.payload["endpoint"] == "http://alpha:7080"
# Expire stale values (this one is not stale)
expired = node.expire_stale()
assert expired == 0
# Contact routing
contact = DhtContact(
node_key=hashlib.sha256(b"peer-1").digest(),
endpoint="http://peer1:7080",
node_id="peer-1",
last_seen=time.time(),
)
node.update_contact(contact)
closest = node.find_closest(key, k=5)
assert len(closest) <= 5
# ===========================================================================
# X06 — WebSocket PubSub
# ===========================================================================
@pytest.mark.asyncio
async def test_websocket_pubsub():
"""WebsocketPubSub delivers messages to subscribed sessions."""
from hearthnet.transport.websocket import WebsocketPubSub
received: list[dict] = []
class FakeSession:
session_id = "fake-001"
_closed = False
async def send_event(self, event: str, data: dict, seq=None):
received.append({"event": event, "data": data})
pubsub = WebsocketPubSub()
session = FakeSession()
pubsub.subscribe("test-topic", session)
count = await pubsub.publish("test-topic", "chat.message", {"text": "hi"})
assert count == 1
assert len(received) == 1
assert received[0]["data"]["text"] == "hi"
pubsub.unsubscribe("test-topic", session)
count2 = await pubsub.publish("test-topic", "chat.message", {"text": "bye"})
assert count2 == 0
# ===========================================================================
# X07 — Federated Metrics
# ===========================================================================
def test_federated_metrics_collect():
"""FederatedMetricsExporter collects a NodeMetricsTick."""
from hearthnet.observability.federated import FederatedMetricsExporter, MetricsAggregator
exporter = FederatedMetricsExporter(node_id="node-metrics-test", community_id="comm-x")
tick = exporter.collect_tick()
assert tick.node_id == "node-metrics-test"
assert tick.community_id == "comm-x"
assert tick.cpu_percent >= 0
assert tick.memory_mb > 0
agg = MetricsAggregator(community_id="comm-x")
agg.apply_tick(tick)
snapshot = agg.community_snapshot()
assert snapshot.community_id == "comm-x"
assert snapshot.member_count >= 1
fed_snap = agg.federated_snapshot("peer-community")
assert "band" in fed_snap.member_count_band or fed_snap.member_count_band # non-empty
# ===========================================================================
# M15 — Relay Client
# ===========================================================================
def test_relay_client_init():
"""RelayClient initialises without error."""
from hearthnet.relay.client import RelayClient
client = RelayClient(relay_url="http://relay.hearthnet.de:7080")
assert client._relay_url == "http://relay.hearthnet.de:7080"