Spaces:
Running on Zero
Running on Zero
File size: 36,190 Bytes
8231854 8514223 9505822 8514223 4aaae80 9fb722e 9505822 d6ca3a2 9505822 9fb722e 78cc96f 9505822 9fb722e 4aaae80 c91b229 9fb722e 9505822 9fb722e 27be63b d6ca3a2 27be63b 9fb722e 8231854 8f53c4c 8231854 8f53c4c 8231854 8f53c4c 9fb722e 8231854 8f53c4c 9505822 6c38d43 9505822 8f53c4c 9505822 9fb722e 9505822 d4978df 4aaae80 c91b229 4aaae80 c91b229 d4978df 9fb722e 4aaae80 9fb722e 4aaae80 78cc96f 4aaae80 c91b229 9fb722e c91b229 9505822 6c38d43 c91b229 8231854 9505822 6c38d43 9fb722e d4978df 9505822 d4978df 31d4f9b 4aaae80 31d4f9b d4978df 31d4f9b d4978df 9505822 d4978df 31d4f9b d4978df 31d4f9b d4978df 31d4f9b d4978df 6c38d43 d4978df 6c38d43 d4978df 78cc96f 4aaae80 c91b229 9505822 d4978df 6c38d43 d4978df c91b229 d4978df 27be63b 9505822 8231854 9505822 8231854 9505822 27be63b 9505822 3f78ea8 9505822 6c38d43 9505822 8f53c4c 9505822 d6ca3a2 9505822 d6ca3a2 9505822 6c38d43 9505822 d6ca3a2 9505822 d6ca3a2 9505822 d6ca3a2 9505822 27be63b 9505822 27be63b 8231854 27be63b 9505822 9fb722e 9505822 9fb722e 9505822 38cba90 9505822 38cba90 | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392 393 394 395 396 397 398 399 400 401 402 403 404 405 406 407 408 409 410 411 412 413 414 415 416 417 418 419 420 421 422 423 424 425 426 427 428 429 430 431 432 433 434 435 436 437 438 439 440 441 442 443 444 445 446 447 448 449 450 451 452 453 454 455 456 457 458 459 460 461 462 463 464 465 466 467 468 469 470 471 472 473 474 475 476 477 478 479 480 481 482 483 484 485 486 487 488 489 490 491 492 493 494 495 496 497 498 499 500 501 502 503 504 505 506 507 508 509 510 511 512 513 514 515 516 517 518 519 520 521 522 523 524 525 526 527 528 529 530 531 532 533 534 535 536 537 538 539 540 541 542 543 544 545 546 547 548 549 550 551 552 553 554 555 556 557 558 559 560 561 562 563 564 565 566 567 568 569 570 571 572 573 574 575 576 577 578 579 580 581 582 583 584 585 586 587 588 589 590 591 592 593 594 595 596 597 598 599 600 601 602 603 604 605 606 607 608 609 610 611 612 613 614 615 616 617 618 619 620 621 622 623 624 625 626 627 628 629 630 631 632 633 634 635 636 637 638 639 640 641 642 643 644 645 646 647 648 649 650 651 652 653 654 655 656 657 658 659 660 661 662 663 664 665 666 667 668 669 670 671 672 673 674 675 676 677 678 679 680 681 682 683 684 685 686 687 688 689 690 691 692 693 694 695 696 697 698 699 700 701 702 703 704 705 706 707 708 709 710 711 712 713 714 715 716 717 718 719 720 721 722 723 724 725 726 727 728 729 730 731 732 733 734 735 736 737 738 739 740 741 742 743 744 745 746 747 748 749 750 751 752 753 754 755 756 757 758 759 760 761 762 763 764 765 766 767 768 769 770 771 772 773 774 775 776 777 778 779 780 781 782 783 784 785 786 787 788 789 790 791 792 793 794 795 796 797 798 799 800 801 802 803 804 805 806 807 808 809 810 811 812 813 814 815 816 817 818 819 820 821 822 823 824 825 826 827 828 829 830 831 832 833 834 835 836 837 838 839 840 841 842 843 844 845 846 847 848 849 850 851 852 853 854 855 856 857 858 859 860 861 862 863 864 865 866 867 868 869 870 871 872 873 874 875 876 877 878 879 880 881 882 883 | """M12/Node - HearthNode composition root.
Spec: docs/M12-cli.md ΓΒ§5 (node.start 15-step sequence)
Impl-ref: impl_ref.md ΓΒ§17 (node.py, ManifestPublisher)
Wires all services together. The 15-step startup lives in node.start().
"""
from __future__ import annotations
import asyncio
import contextlib
import logging
import time
from dataclasses import dataclass
from pathlib import Path
from typing import Any
from hearthnet.bus import CapabilityBus, InMemoryTransport
from hearthnet.discovery import PeerRecord, PeerRegistry
from hearthnet.emergency.detector import Detector
from hearthnet.emergency.state import StateBus
from hearthnet.facades import ChatFacade, MarketplaceFacade, RagFacade
from hearthnet.services import ChatService, LlmService, MarketplaceService, RagService
from hearthnet.services.files import FileService
from hearthnet.services.moe import MoeService
from hearthnet.services.tools import PlantIdentificationService
from hearthnet.types import CommunityID, Endpoint, NodeID, Profile
_log = logging.getLogger(__name__)
# Gossip-sync period in seconds
_GOSSIP_INTERVAL_SECONDS = 30
class _HttpxSyncClient:
"""Minimal httpx adapter for :class:`SyncClient`.
SyncClient treats a dict response as already-parsed JSON, so we return the
decoded body directly from ``get``/``post`` (avoiding SyncClient's
aiohttp-style ``await resp.json()`` path). Degrades to a no-op when httpx is
not installed.
"""
def __init__(self) -> None:
self._client: Any = None
self.unavailable = False
try:
import httpx
self._client = httpx.AsyncClient(timeout=30.0)
except ImportError:
self.unavailable = True
async def get(self, url: str) -> dict[str, Any]:
resp = await self._client.get(url)
resp.raise_for_status()
return resp.json()
async def post(
self, url: str, *, data: Any = None, headers: dict[str, str] | None = None
) -> dict[str, Any]:
resp = await self._client.post(url, content=data, headers=headers)
resp.raise_for_status()
return resp.json()
async def aclose(self) -> None:
if self._client is not None:
with contextlib.suppress(Exception):
await self._client.aclose()
@dataclass
class NodeManifest:
node_id: NodeID
display_name: str
community_id: CommunityID
profile: Profile
capabilities: list[dict[str, Any]]
def as_dict(self) -> dict[str, Any]:
return {
"version": 1,
"contract_version": "1.0",
"node_id": self.node_id,
"display_name": self.display_name,
"community_id": self.community_id,
"profile": self.profile,
"capabilities": self.capabilities,
}
class HearthNode:
def __init__(
self,
node_id: NodeID,
display_name: str,
community_id: CommunityID,
*,
transport: InMemoryTransport | None = None,
profile: Profile = "hearth",
) -> None:
self.node_id = node_id
self.display_name = display_name
self.community_id = community_id
self.profile: Profile = profile
# Default to the HTTP-capable transport so a standalone node can reach
# remote peers over the network (e.g. the public HF Space). The
# in-process InMemoryNetwork still passes a shared InMemoryTransport.
# CompositeTransport is a drop-in superset of HttpBusTransport that also
# accepts pluggable delivery strategies (relay/WebRTC/tunnel) β relay is
# attached only on demand via join_relay(), keeping nodes local-first.
if transport is None:
from hearthnet.bus.transport import CompositeTransport
transport = CompositeTransport()
self.bus = CapabilityBus(node_id, community_id, transport)
self.peers = PeerRegistry(node_id, community_id)
self.state_bus = StateBus()
self.detector = Detector(self.bus, self.state_bus, self.peers)
self.rag = RagFacade(self.bus)
self.chat = ChatFacade(self.bus)
self.marketplace = MarketplaceFacade(self.bus)
# Manual peer bridging (discovery.peer.add / discovery.peers) β enables
# cross-network peering where mDNS/UDP multicast cannot reach.
from hearthnet.discovery.service import DiscoveryService
self.bus.register_service(DiscoveryService(self.bus, self.peers))
# mesh.join β redeem an invite/relay code into all-to-all relay membership.
from hearthnet.transport.mesh_service import MeshService
self.bus.register_service(MeshService(self))
# Populated by start()
self._http_server: Any = None
self._event_log: Any = None
self._replay_engine: Any = None
self._mdns_announcer: Any = None
self._mdns_browser: Any = None
self._udp_announcer: Any = None
self._udp_listener: Any = None
self._gossip_task: asyncio.Task | None = None
self._emergency_task: asyncio.Task | None = None
self._pubsub_task: asyncio.Task | None = None
self._replicator_task: asyncio.Task | None = None
self._replicator: Any = None
self._rag_service: Any = None
self._started: bool = False
self._relay_client: Any = None
# ------------------------------------------------------------------
# Relay mesh (opt-in, NAT-safe all-to-all over a public hub)
# ------------------------------------------------------------------
async def join_relay(
self, relay_url: str, *, token: str | None = None
) -> dict[str, Any]:
"""Join a relay hub so this node meshes all-to-all with NAT-bound peers.
Opt-in only β a node stays purely local until this is called (e.g. from a
redeemed invite or the ``mesh up`` launcher). Registers the relay roster's
capabilities locally and attaches a :class:`RelayStrategy` to the bus
transport so calls to those peers are delivered through the hub.
Returns the hub's join response (current roster + ttl). Raises if the bus
transport is not relay-capable (i.e. not a CompositeTransport).
"""
from hearthnet.bus.transport import CompositeTransport
from hearthnet.transport.relay_client import RelayClient, RelayStrategy
if not isinstance(self.bus.transport, CompositeTransport):
raise RuntimeError("relay requires a CompositeTransport bus transport")
if self._relay_client is not None:
await self._relay_client.close()
self.bus.transport.remove_strategy("relay")
client = RelayClient(
relay_url,
node_id=self.node_id,
display_name=self.display_name,
community_id=self.community_id,
bus=self.bus,
peers=self.peers,
token=token,
)
result = await client.join()
self.bus.transport.add_strategy(RelayStrategy(client))
self._relay_client = client
return result
async def leave_relay(self) -> None:
if self._relay_client is not None:
from hearthnet.bus.transport import CompositeTransport
await self._relay_client.close()
if isinstance(self.bus.transport, CompositeTransport):
self.bus.transport.remove_strategy("relay")
self._relay_client = None
# ------------------------------------------------------------------
# Service installation
# ------------------------------------------------------------------
def install_demo_services(self, *, internet_llm: bool = False, corpus: str = "demo") -> None:
"""FOR TESTS ONLY Γ’β¬β install echo-LLM + in-memory services (no disk I/O, fast).
Production code should call install_services() which auto-discovers real backends.
"""
# Use demo- prefixed model name so LlmService creates _EchoBackend (test path)
from hearthnet.services.demo import (
LlmService as DemoLlm,
)
from hearthnet.services.demo import (
MarketplaceService as DemoMarket,
)
from hearthnet.services.demo import (
RagService as DemoRag,
)
model_name = "demo-remote" if internet_llm else "demo-local"
services = [
DemoLlm(model=model_name, requires_internet=internet_llm),
DemoRag(
corpus=corpus,
documents=[
{
"id": "seed",
"title": "Water",
"text": "Store clean water and boil rainwater.",
}
],
),
DemoMarket(),
ChatService(self.node_id, bus=self.bus),
FileService(),
MoeService(bus=self.bus),
PlantIdentificationService(bus=self.bus),
]
# ModelDistributionService also needed in tests; use a temp BlobStore
import tempfile
from hearthnet.blobs.store import BlobStore
from hearthnet.services.llm.model_distribution import ModelDistributionService
from hearthnet.services.protocol import ProtocolService
from hearthnet.services.rag.federated import FederatedRagService
tmp_store = BlobStore(Path(tempfile.mkdtemp()) / "blobs")
services.append(ModelDistributionService(store=tmp_store, models_dir=None, bus=self.bus))
services.append(ProtocolService(node=self))
services.append(FederatedRagService(self.bus, corpus=corpus))
for service in services:
self.bus.register_service(service)
def install_services(
self,
*,
corpus: str = "community",
models_dir=None,
blob_store=None,
) -> None:
"""Install real services with auto-discovered LLM backends.
Backend discovery order (local-first, no internet unless explicitly enabled):
1. OllamaBackend Γ’β¬β if ollama is running on localhost
2. LlamaCppBackend Γ’β¬β if llama.cpp HTTP server is running on localhost
3. HfLocalBackend Γ’β¬β if transformers is installed (loads on first call)
4. _UnavailableBackend Γ’β¬β fallback: returns a clear error, not a silent echo
Also installs ModelDistributionService so peers can pull model weights.
"""
import os
from hearthnet.services.llm.backends.hf_local import HfLocalBackend
from hearthnet.services.llm.backends.modal_backend import ModalBackend
from hearthnet.services.llm.backends.nemotron import NemotronBackend
from hearthnet.services.llm.backends.ollama import OllamaBackend
from hearthnet.services.llm.backends.openai_compat import OpenAICompatBackend
from hearthnet.services.llm.backends.openbmb import OpenBmbBackend
from hearthnet.services.llm.model_distribution import ModelDistributionService
from hearthnet.services.protocol import ProtocolService
backends = []
# 1. Ollama (best quality, zero-config local)
ollama = OllamaBackend()
if ollama.is_available():
backends.append(ollama)
# 2. llama.cpp HTTP server on default port
llama_http = OpenAICompatBackend(
base_url="http://localhost:8080/v1",
api_key_env="",
model="local",
)
if llama_http.is_available():
backends.append(llama_http)
# 3. MiniCPM local server (OpenBMB prize track)
if os.getenv("MINICPM_URL"):
minicpm = OpenBmbBackend(base_url=os.getenv("MINICPM_URL", "http://localhost:8000"))
if minicpm.is_available():
backends.append(minicpm)
_log.info("MiniCPM backend registered from MINICPM_URL")
# 4. NVIDIA Nemotron (cloud NIM or local; NVIDIA prize track)
if os.getenv("NVIDIA_API_KEY"):
nemotron = NemotronBackend(api_key_env="NVIDIA_API_KEY")
backends.append(nemotron) # cloud β no local check needed
_log.info("Nemotron backend registered (NVIDIA_API_KEY set)")
elif os.getenv("NEMOTRON_URL"):
nemotron_local = NemotronBackend(
base_url=os.getenv("NEMOTRON_URL", "http://localhost:8001"),
local=True,
)
if nemotron_local.is_available():
backends.append(nemotron_local)
_log.info("Nemotron local backend registered from NEMOTRON_URL")
# 5. Modal serverless GPU (Modal prize track)
if os.getenv("MODAL_ENDPOINT"):
modal_b = ModalBackend()
if modal_b.is_available():
backends.append(modal_b)
_log.info("Modal backend registered from MODAL_ENDPOINT")
# 6. HF Transformers local (always available if transformers installed)
hf = HfLocalBackend()
if hf.is_available():
backends.append(hf)
from hearthnet.services.rag.federated import FederatedRagService
services = [
LlmService(backends=backends or None), # _UnavailableBackend if none found
# RagService receives blob_store now; event_log is injected in start()
# after the EventLog is open (it's a lazy reference via _rag_service).
RagService(corpus=corpus, blob_store=blob_store),
FederatedRagService(self.bus, corpus=corpus),
MarketplaceService(),
ChatService(self.node_id, bus=self.bus),
FileService(),
MoeService(bus=self.bus),
PlantIdentificationService(bus=self.bus),
ProtocolService(node=self),
]
# Keep a reference so start() can inject the event_log later.
self._rag_service = services[1]
# Model weight distribution (BitTorrent-style M07/M26)
# Use provided blob_store or auto-create a persistent one in ~/.hearthnet/blobs
if blob_store is None:
from hearthnet.blobs.store import BlobStore
blob_store = BlobStore(Path.home() / ".hearthnet" / "blobs")
model_svc = ModelDistributionService(
store=blob_store,
models_dir=models_dir,
bus=self.bus,
)
services.append(model_svc)
for service in services:
self.bus.register_service(service)
# Register the real auxiliary services (embed/rerank/ocr/translation/
# speech/image). Phase-3 research services stay off unless opted in.
self.install_extended_services(research=False)
def install_extended_services(
self,
*,
research: bool = False,
embed_model: str = "BAAI/bge-small-en-v1.5",
) -> None:
"""Register the real auxiliary services beyond the core set.
Always (each degrades gracefully to an "unavailable" response when its
optional backend/model is missing β never a mock):
M11 EmbeddingService embed.text (real semantic vectors when
sentence-transformers present)
M24 RerankService rerank.text
M17 OcrService ocr.image / ocr.pdf
M18 TranslationService trans.text
M19 Stt/TtsService stt.transcribe / tts.speak
M20 Image services image.describe / image.generate
When ``research=True`` (opt-in; the demo Space enables it), also registers
the real Phase-3 services:
M30 EvidenceService evidence.claim.*
M31 CivilDefenseService civdef.*
Every registration is wrapped so a missing optional dependency can never
break node startup.
"""
def _register(svc: Any) -> None:
if hasattr(svc, "capabilities"):
self.bus.register_service(svc)
elif hasattr(svc, "register"):
svc.register(self.bus)
# ββ M11 Embedding (core for real RAG) ββββββββββββββββββββββββββββββ
try:
import importlib.util
from hearthnet.services.embedding.service import EmbeddingService
backend = None
if importlib.util.find_spec("sentence_transformers") is not None:
from hearthnet.services.embedding.backends import (
SentenceTransformerBackend,
)
backend = SentenceTransformerBackend(model=embed_model)
_register(EmbeddingService(backend=backend))
except Exception as exc:
_log.warning("EmbeddingService registration skipped: %s", exc)
# ββ Remaining always-on auxiliary services βββββββββββββββββββββββββ
_aux: list[tuple[str, Any]] = []
try:
from hearthnet.services.rerank.service import RerankService
_aux.append(("rerank", RerankService()))
except Exception as exc:
_log.debug("RerankService unavailable: %s", exc)
try:
from hearthnet.services.ocr.service import OcrService
_aux.append(("ocr", OcrService()))
except Exception as exc:
_log.debug("OcrService unavailable: %s", exc)
try:
from hearthnet.services.translation.service import TranslationService
_aux.append(("translation", TranslationService()))
except Exception as exc:
_log.debug("TranslationService unavailable: %s", exc)
try:
from hearthnet.services.speech.stt_service import SttService
from hearthnet.services.speech.tts_service import TtsService
_aux.append(("stt", SttService()))
_aux.append(("tts", TtsService()))
except Exception as exc:
_log.debug("Speech services unavailable: %s", exc)
try:
from hearthnet.services.image.describe_service import ImageDescribeService
_aux.append(("image.describe", ImageDescribeService()))
except Exception as exc:
_log.debug("ImageDescribeService unavailable: %s", exc)
try:
from hearthnet.services.image.generate_service import ImageGenerateService
_aux.append(("image.generate", ImageGenerateService()))
except Exception as exc:
_log.debug("ImageGenerateService unavailable: %s", exc)
for label, svc in _aux:
try:
_register(svc)
except Exception as exc:
_log.warning("%s registration skipped: %s", label, exc)
if not research:
return
# ββ Phase-3 research services (opt-in only) ββββββββββββββββββββββββ
try:
from hearthnet.evidence.service import EvidenceService
_register(EvidenceService(community_id=self.community_id))
except Exception as exc:
_log.warning("EvidenceService registration skipped: %s", exc)
try:
from hearthnet.civdef.service import CivilDefenseService
_register(CivilDefenseService())
except Exception as exc:
_log.warning("CivilDefenseService registration skipped: %s", exc)
async def start(
self,
*,
host: str = "0.0.0.0", # nosec B104
port: int = 7080,
data_dir: Path | str | None = None,
gossip_interval: int = _GOSSIP_INTERVAL_SECONDS,
) -> None:
"""Start the node Γ’β¬β wires all subsystems.
Steps:
1-2. Already done: node_id + bus created in __init__
3. Start mDNS + UDP peer discovery
4. Load config (external; this method receives data_dir)
5. Services already registered by caller via install_services()
6. (LLM warmup deferred to first call)
7. (RAG warmup deferred to first call)
8. Start Detector (emergency probe loop)
9. Start EventLog + ReplayEngine
10. Start FastAPI HttpServer (X01)
11. Publish manifest (mDNS)
12. (Community join via invite; deferred to CLI/UI)
13. Start observability
14. (Federation; deferred)
15. Signal ready
"""
if self._started:
return
_log.info("HearthNode.start() node_id=%s port=%d", self.node_id, port)
data_dir_path = (
Path(data_dir) if data_dir else Path.home() / ".hearthnet" / "nodes" / self.node_id[:16]
)
data_dir_path.mkdir(parents=True, exist_ok=True)
# Γ’ββ¬Γ’ββ¬ Step 9: Event log + replay engine Γ’ββ¬Γ’ββ¬Γ’ββ¬Γ’ββ¬Γ’ββ¬Γ’ββ¬Γ’ββ¬Γ’ββ¬Γ’ββ¬Γ’ββ¬Γ’ββ¬Γ’ββ¬Γ’ββ¬Γ’ββ¬Γ’ββ¬Γ’ββ¬Γ’ββ¬Γ’ββ¬Γ’ββ¬Γ’ββ¬Γ’ββ¬Γ’ββ¬Γ’ββ¬Γ’ββ¬Γ’ββ¬
try:
from hearthnet.events import EventLog, ReplayEngine
self._event_log = EventLog(data_dir_path / "events.db", self.community_id, self.node_id)
self._replay_engine = ReplayEngine(self._event_log)
_log.debug("EventLog opened at %s", data_dir_path / "events.db")
except Exception as exc:
_log.warning("EventLog init failed (non-fatal): %s", exc)
# Γ’ββ¬Γ’ββ¬ Step 3: Peer discovery (mDNS + UDP) Γ’ββ¬Γ’ββ¬Γ’ββ¬Γ’ββ¬Γ’ββ¬Γ’ββ¬Γ’ββ¬Γ’ββ¬Γ’ββ¬Γ’ββ¬Γ’ββ¬Γ’ββ¬Γ’ββ¬Γ’ββ¬Γ’ββ¬Γ’ββ¬Γ’ββ¬Γ’ββ¬Γ’ββ¬Γ’ββ¬Γ’ββ¬Γ’ββ¬Γ’ββ¬
caps = [e.descriptor.name for e in self.bus.registry.all_local()]
try:
from hearthnet.discovery.mdns import MdnsAnnouncer, MdnsBrowser
from hearthnet.discovery.udp import UdpAnnouncer, UdpListener
self._mdns_announcer = MdnsAnnouncer(
self.peers,
self.node_id,
self.display_name,
port=port,
properties={"profile": self.profile, "caps": caps},
)
self._mdns_browser = MdnsBrowser(self.peers, self.community_id)
self._udp_announcer = UdpAnnouncer(
self.peers, self.node_id, self.community_id, port=port, caps=caps
)
self._udp_listener = UdpListener(self.peers, self.community_id)
await self._mdns_announcer.start()
await self._mdns_browser.start()
await self._udp_announcer.start()
await self._udp_listener.start()
_log.debug("mDNS + UDP discovery started on port %d", port)
except Exception as exc:
_log.warning("Discovery init failed (non-fatal): %s", exc)
# Γ’ββ¬Γ’ββ¬ Step 8: Emergency detector Γ’ββ¬Γ’ββ¬Γ’ββ¬Γ’ββ¬Γ’ββ¬Γ’ββ¬Γ’ββ¬Γ’ββ¬Γ’ββ¬Γ’ββ¬Γ’ββ¬Γ’ββ¬Γ’ββ¬Γ’ββ¬Γ’ββ¬Γ’ββ¬Γ’ββ¬Γ’ββ¬Γ’ββ¬Γ’ββ¬Γ’ββ¬Γ’ββ¬Γ’ββ¬Γ’ββ¬Γ’ββ¬Γ’ββ¬Γ’ββ¬Γ’ββ¬Γ’ββ¬Γ’ββ¬Γ’ββ¬Γ’ββ¬
try:
await self.detector.start()
except Exception as exc:
_log.warning("Emergency detector start failed (non-fatal): %s", exc)
# Γ’ββ¬Γ’ββ¬ Step 10: HTTP server (X01) + WebSocket pubsub (X06) Γ’ββ¬Γ’ββ¬Γ’ββ¬Γ’ββ¬Γ’ββ¬Γ’ββ¬Γ’ββ¬
try:
from hearthnet.events.sync import SyncServer
from hearthnet.transport.server import HttpServer
sync_server = SyncServer(self._event_log) if self._event_log else None
self._http_server = HttpServer(
bus=self.bus,
node_manifest_fn=lambda: self.manifest().as_dict(),
sync_server=sync_server,
host=host,
port=port,
)
self._http_server.build_app()
await self._http_server.start()
_log.info("HTTP server listening on %s:%d", host, port)
# Wire StateBus -> WebSocket pubsub (X06)
if self._http_server._ws_pubsub is not None:
self._pubsub_task = asyncio.create_task(
self._state_bus_to_pubsub(), name="state-pubsub"
)
except Exception as exc:
_log.warning("HTTP server start failed (non-fatal): %s", exc)
# Γ’ββ¬Γ’ββ¬ Gossip sync loop (X02) Γ’ββ¬Γ’ββ¬Γ’ββ¬Γ’ββ¬Γ’ββ¬Γ’ββ¬Γ’ββ¬Γ’ββ¬Γ’ββ¬Γ’ββ¬Γ’ββ¬Γ’ββ¬Γ’ββ¬Γ’ββ¬Γ’ββ¬Γ’ββ¬Γ’ββ¬Γ’ββ¬Γ’ββ¬Γ’ββ¬Γ’ββ¬Γ’ββ¬Γ’ββ¬Γ’ββ¬Γ’ββ¬Γ’ββ¬Γ’ββ¬Γ’ββ¬Γ’ββ¬Γ’ββ¬Γ’ββ¬Γ’ββ¬Γ’ββ¬Γ’ββ¬Γ’ββ¬Γ’ββ¬
if self._event_log is not None:
self._gossip_task = asyncio.create_task(
self._gossip_loop(gossip_interval), name="gossip-sync"
)
# -- Corpus replicator (Phase 2: BitTorrent-style RAG sync) ----------
if self._event_log is not None:
try:
from hearthnet.blobs.store import BlobStore
from hearthnet.blobs.transfer import TransferManager
from hearthnet.services.rag.replication import CorpusReplicator
from hearthnet.services.rag.store import CorpusStore
# Inject event_log into the RagService now that EventLog is open.
if self._rag_service is not None:
self._rag_service._event_log = self._event_log
repl_blob_store = BlobStore(data_dir_path / "repl_blobs")
transfer = TransferManager(repl_blob_store, http_client=None)
def _corpus_store_fn(corpus: str) -> CorpusStore:
return CorpusStore(data_dir_path / "corpora", corpus)
self._replicator = CorpusReplicator(
bus=self.bus,
event_log=self._event_log,
transfer=transfer,
peers=self.peers,
local_node_id=self.node_id,
corpus_store_fn=_corpus_store_fn,
)
self._replicator_task = self._replicator.start()
_log.info("CorpusReplicator started")
except Exception as exc:
_log.warning("CorpusReplicator init failed (non-fatal): %s", exc)
_log.info("HearthNode ready: %s", self.node_id)
async def stop(self) -> None:
"""Gracefully stop all background tasks and subsystems."""
if not self._started:
return
_log.info("HearthNode.stop() node_id=%s", self.node_id)
self._started = False
# Leave relay mesh (stops poll loop, closes client)
with contextlib.suppress(Exception):
await self.leave_relay()
# Close event log
if self._event_log is not None:
with contextlib.suppress(Exception):
self._event_log.close()
self._event_log = None
# Stop emergency detector
with contextlib.suppress(Exception):
await self.detector.stop()
# Cancel background tasks
for task_attr in ("_gossip_task", "_pubsub_task", "_replicator_task"):
task = getattr(self, task_attr, None)
if task and not task.done():
task.cancel()
with contextlib.suppress(asyncio.CancelledError, Exception):
await task
setattr(self, task_attr, None)
# Stop discovery
for attr in ("_udp_announcer", "_udp_listener", "_mdns_announcer"):
obj = getattr(self, attr, None)
if obj:
with contextlib.suppress(Exception):
await obj.stop()
if self._mdns_browser:
try:
if hasattr(self._mdns_browser, "stop"):
await self._mdns_browser.stop()
except Exception:
pass
# Stop HTTP server
if self._http_server:
with contextlib.suppress(Exception):
await self._http_server.shutdown()
self._http_server = None
# ------------------------------------------------------------------
# Background tasks
# ------------------------------------------------------------------
async def _gossip_loop(self, interval: int) -> None:
"""Periodically sync event log with all known peers (X02 gossip)."""
from hearthnet.events.sync import SyncClient
http_client = _HttpxSyncClient()
if http_client.unavailable:
_log.info("Gossip sync disabled: httpx not installed")
return
sync_client = SyncClient(self._event_log, http_client)
try:
while True:
await asyncio.sleep(interval)
for peer in self.peers.all():
if not peer.endpoints:
continue
ep = peer.endpoints[0]
if ep.transport == "memory":
continue # in-process; no HTTP needed
peer_url = f"http://{ep.host}:{ep.port}"
try:
result = await sync_client.sync_with(peer_url, self.community_id)
if result.received_count or result.sent_count:
_log.debug(
"Gossip with %s: sent=%d recv=%d ms=%d",
peer.display_name,
result.sent_count,
result.received_count,
result.duration_ms,
)
except Exception as exc:
_log.debug("Gossip sync with %s failed: %s", peer.display_name, exc)
finally:
await http_client.aclose()
async def _state_bus_to_pubsub(self) -> None:
"""Forward StateBus state changes to the WebSocket pubsub (X06)."""
try:
async for state in self.state_bus.subscribe():
if self._http_server is None:
break
await self._http_server.publish_event(
topic="emergency.mode.changed",
event="emergency.mode.changed",
data={
"mode": state.mode,
"mode_label": state.mode_label,
"changed_at": state.changed_at,
"probe_results": state.probe_results,
"consecutive_fails": state.consecutive_fails,
},
)
except asyncio.CancelledError:
pass
except Exception as exc:
_log.warning("state_bus_to_pubsub error: %s", exc)
# ------------------------------------------------------------------
# Introspection
# ------------------------------------------------------------------
def manifest(self) -> NodeManifest:
capabilities = [
{
"name": entry.descriptor.name,
"version": entry.descriptor.version_str,
"stability": entry.descriptor.stability,
"schema_hash": entry.descriptor.schema_hash(),
"params": dict(entry.descriptor.params),
"max_concurrent": entry.descriptor.max_concurrent,
}
for entry in self.bus.registry.all_local()
]
return NodeManifest(
self.node_id, self.display_name, self.community_id, self.profile, capabilities
)
def discover(self, other: HearthNode) -> None:
record = PeerRecord(
node_id_full=other.node_id,
display_name=other.display_name,
community_id=other.community_id,
profile=other.profile,
endpoints=[Endpoint("memory", other.node_id, 0)],
manifest=other.manifest().as_dict(),
last_seen=time.monotonic(),
)
if self.peers.upsert(record):
self.bus.registry.update_from_peer_manifest(record, record.manifest or {})
def snapshot(self) -> dict[str, Any]:
topology = self.bus.topology_snapshot([peer.as_view() for peer in self.peers.all()])
return {
"node": {
"node_id": self.node_id,
"display_name": self.display_name,
"community_id": self.community_id,
"profile": self.profile,
},
"emergency": self.state_bus.current(),
"topology": topology,
"started": self._started,
"event_log_head": self._event_log.head() if self._event_log else None,
}
class InMemoryNetwork:
def __init__(self) -> None:
self.transport = InMemoryTransport()
self.nodes: list[HearthNode] = []
def add_node(
self, node_id: NodeID, display_name: str, community_id: CommunityID = "ed25519:community"
) -> HearthNode:
node = HearthNode(node_id, display_name, community_id, transport=self.transport)
self.nodes.append(node)
return node
def mesh_discover(self) -> None:
for node in self.nodes:
for other in self.nodes:
if node is not other:
node.discover(other)
# ---------------------------------------------------------------------------
# PeriodicTask β generic async interval runner (M12 Β§5)
# ---------------------------------------------------------------------------
class PeriodicTask:
"""Run *fn* every *interval_seconds* until cancelled.
Usage::
task = PeriodicTask(my_async_fn, interval_seconds=60)
asyncio.create_task(task.run())
"""
def __init__(self, fn, interval_seconds: int) -> None:
self._fn = fn
self._interval = interval_seconds
async def run(self) -> None:
while True:
await asyncio.sleep(self._interval)
try:
await self._fn()
except asyncio.CancelledError:
raise
except Exception as exc:
_log.debug("PeriodicTask %s error: %s", self._fn, exc)
# ---------------------------------------------------------------------------
# ManifestPublisher β republishes node manifest to mDNS + UDP (M12 Β§5)
# ---------------------------------------------------------------------------
_MANIFEST_REPUBLISH_INTERVAL_SECONDS = 300 # 5 minutes default
class ManifestPublisher:
"""Periodically re-publishes the node manifest to mDNS + UDP announcer.
Also triggered when the bus registry changes (capability added/removed).
"""
def __init__(
self,
bus,
peer_registry,
mdns_announcer=None,
udp_announcer=None,
node_manifest_fn=None,
interval_seconds: int = _MANIFEST_REPUBLISH_INTERVAL_SECONDS,
) -> None:
self._bus = bus
self._peer_registry = peer_registry
self._mdns_announcer = mdns_announcer
self._udp_announcer = udp_announcer
self._node_manifest_fn = node_manifest_fn
self._interval = interval_seconds
self._task: asyncio.Task | None = None
async def run(self) -> None:
"""Publish immediately then republish every *interval_seconds*."""
while True:
await self._publish()
await asyncio.sleep(self._interval)
async def _publish(self) -> None:
try:
manifest = self._node_manifest_fn() if self._node_manifest_fn else {}
caps = [c.get("name") for c in manifest.get("capabilities", [])]
if self._mdns_announcer and hasattr(self._mdns_announcer, "republish"):
await self._mdns_announcer.republish(caps)
if self._udp_announcer and hasattr(self._udp_announcer, "republish"):
await self._udp_announcer.republish()
except Exception as exc:
_log.debug("ManifestPublisher._publish error: %s", exc)
|