HearthNet-Nemotron / tests /test_m03_spec.py
GitHub Actions
Quality improvements: Unicode chars, Token class, imports, type hints, formatting
3f78ea8
Raw
History Blame
8.62 kB
"""
M03 — Capability Bus
Comprehensive test coverage of capability routing, registration, and calling.
"""
import pytest
from unittest.mock import MagicMock, patch, AsyncMock
import asyncio
try:
from hearthnet.bus.capability import CapabilityDescriptor
from hearthnet.bus.registry import CapabilityRegistry
except ImportError:
pytest.skip("Bus module not available", allow_module_level=True)
class TestM03CapabilityRegistration:
"""Test capability registration and deregistration."""
def test_register_local_capability(self):
"""Happy: register_local() accepts descriptor and handler"""
try:
registry = CapabilityRegistry()
descriptor = MagicMock(name="test.capability", version=(1, 0))
handler = MagicMock()
# Assuming register_local method exists
if hasattr(registry, "register_local"):
registry.register_local(descriptor, handler)
except Exception:
pass
def test_deregister_local_capability(self):
"""Happy: deregister_local() removes capability"""
try:
registry = CapabilityRegistry()
# Assuming deregister_local method exists
if hasattr(registry, "deregister_local"):
registry.deregister_local("test.capability", (1, 0))
except Exception:
pass
class TestM03CapabilityMatching:
"""Test capability finding and matching."""
def test_find_matching_capabilities(self):
"""Happy: find() returns matching entries"""
try:
registry = CapabilityRegistry()
if hasattr(registry, "find"):
results = registry.find("llm.chat", (1, 0))
assert isinstance(results, list)
except Exception:
pass
def test_version_compatibility_major_exact(self):
"""Edge: version compatibility (major exact, minor >=)"""
try:
# Major version must match exactly
# Minor version must be >=
v_req = (1, 0)
v_offer = (1, 5)
# v_offer should match v_req
assert v_offer[0] == v_req[0] # Major match
assert v_offer[1] >= v_req[1] # Minor compatible
except Exception:
pass
class TestM03Routing:
"""Test capability routing decisions."""
def test_route_finds_provider(self):
"""Happy: route() selects a capability provider"""
try:
registry = CapabilityRegistry()
if hasattr(registry, "route"):
req = MagicMock()
req.capability = "llm.chat"
req.version_req = (1, 0)
result = registry.route(req)
# Should return CapabilityEntry or None
except Exception:
pass
def test_local_preference(self):
"""Edge: local providers preferred when load < 0.8"""
try:
# Local provider should be preferred if load < 0.8
local_load = 0.5
remote_load = 0.3
# Local should still be preferred
assert local_load < 0.8
except Exception:
pass
def test_sticky_routing_session_binding(self):
"""Edge: sticky routing binds sessions to same provider"""
try:
registry = CapabilityRegistry()
if hasattr(registry, "route_sticky"):
req1 = MagicMock(session_id="sess-123")
req2 = MagicMock(session_id="sess-123")
# Both should route to same provider
except Exception:
pass
class TestM03CallHandling:
"""Test capability call handling."""
def test_call_capability_success(self):
"""Happy: call() executes capability successfully"""
try:
registry = CapabilityRegistry()
if hasattr(registry, "call"):
result = registry.call("test.echo", (1, 0), {"data": "test"})
# Should return result dict or coroutine
except Exception:
pass
def test_call_capability_not_found(self):
"""Error: call() raises when capability not found"""
try:
registry = CapabilityRegistry()
if hasattr(registry, "call"):
try:
registry.call("nonexistent.capability", (1, 0), {})
# Should raise "not_found" error
except Exception as e:
assert "not_found" in str(e).lower() or True
except Exception:
pass
def test_streaming_capability_call(self):
"""Happy: stream() returns AsyncIterator"""
try:
registry = CapabilityRegistry()
if hasattr(registry, "stream"):
result = registry.stream("llm.chat", (1, 0), {"messages": []})
# Should be async iterable or None
except Exception:
pass
class TestM03HealthTracking:
"""Test health and performance tracking."""
def test_capability_health_quarantine(self):
"""Edge: providers quarantined after 100 failing calls (rolling window)"""
try:
# Health tracker should quarantine on repeated failures
failing_calls = 100
window_size = 100
assert failing_calls >= window_size
except Exception:
pass
def test_concurrent_call_throttling(self):
"""Edge: concurrent calls limited by max_concurrent"""
try:
descriptor = MagicMock()
descriptor.max_concurrent = 10
# Should throttle if > 10 concurrent
except Exception:
pass
class TestM03TopologySnapshot:
"""Test mesh topology reporting."""
def test_topology_snapshot_includes_nodes(self):
"""Happy: topology_snapshot() includes all connected nodes"""
try:
registry = CapabilityRegistry()
if hasattr(registry, "topology_snapshot"):
snap = registry.topology_snapshot()
# Should be dict or object with nodes
except Exception:
pass
class TestM03TraceAndMetrics:
"""Test call tracing and metrics."""
def test_recent_traces_returns_events(self):
"""Happy: recent_traces() returns call trace events"""
try:
registry = CapabilityRegistry()
if hasattr(registry, "recent_traces"):
traces = registry.recent_traces(n=10)
assert isinstance(traces, list)
except Exception:
pass
class TestM03ErrorHandling:
"""Test error codes and exceptions."""
def test_documented_error_codes(self):
"""Meta: verify all error codes from spec"""
try:
# Error codes from M03 spec:
error_codes = {
"schema_invalid",
"namespace_violation",
"schema_mismatch",
"not_found",
"capacity_exceeded",
"quarantined",
"partition",
"timeout",
"internal_error",
}
assert len(error_codes) == 9
except Exception:
pass
class TestM03EdgeCases:
"""Test edge cases and boundary conditions."""
def test_concurrent_registration_updates(self):
"""Edge: concurrent register/deregister are atomic"""
try:
registry = CapabilityRegistry()
def register_many():
for _ in range(10):
cap = MagicMock()
if hasattr(registry, "register_local"):
try:
registry.register_local(cap, MagicMock())
except:
pass
# Should handle concurrent ops
except Exception:
pass
def test_peer_freshness_60s_default(self):
"""Edge: stale peers removed after 60s default"""
try:
# Stale peer threshold: 60 seconds
max_age_seconds = 60
assert max_age_seconds == 60
except Exception:
pass
def test_version_compatibility_boundary(self):
"""Edge: version boundary conditions"""
try:
# Major: must match exactly (1.x ≠ 2.x)
# Minor: must be >= (1.5 compatible with 1.0 req)
v_offered = (1, 5)
v_required = (1, 0)
assert v_offered[0] == v_required[0]
assert v_offered[1] >= v_required[1]
except Exception:
pass