"""M12/Node - HearthNode composition root. Spec: docs/M12-cli.md §5 (node.start 15-step sequence) Impl-ref: impl_ref.md §17 (node.py, ManifestPublisher) Wires all services together. The 15-step startup lives in node.start(). """ from __future__ import annotations import asyncio import contextlib import logging import time from dataclasses import dataclass from pathlib import Path from typing import Any from hearthnet.bus import CapabilityBus, InMemoryTransport from hearthnet.discovery import PeerRecord, PeerRegistry from hearthnet.emergency.detector import Detector from hearthnet.emergency.state import StateBus from hearthnet.facades import ChatFacade, MarketplaceFacade, RagFacade from hearthnet.services import ChatService, LlmService, MarketplaceService, RagService from hearthnet.services.files import FileService from hearthnet.services.moe import MoeService from hearthnet.services.tools import PlantIdentificationService from hearthnet.types import CommunityID, Endpoint, NodeID, Profile _log = logging.getLogger(__name__) # Gossip-sync period in seconds _GOSSIP_INTERVAL_SECONDS = 30 class _HttpxSyncClient: """Minimal httpx adapter for :class:`SyncClient`. SyncClient treats a dict response as already-parsed JSON, so we return the decoded body directly from ``get``/``post`` (avoiding SyncClient's aiohttp-style ``await resp.json()`` path). Degrades to a no-op when httpx is not installed. """ def __init__(self) -> None: self._client: Any = None self.unavailable = False try: import httpx self._client = httpx.AsyncClient(timeout=30.0) except ImportError: self.unavailable = True async def get(self, url: str) -> dict[str, Any]: resp = await self._client.get(url) resp.raise_for_status() return resp.json() async def post( self, url: str, *, data: Any = None, headers: dict[str, str] | None = None ) -> dict[str, Any]: resp = await self._client.post(url, content=data, headers=headers) resp.raise_for_status() return resp.json() async def aclose(self) -> None: if self._client is not None: with contextlib.suppress(Exception): await self._client.aclose() @dataclass class NodeManifest: node_id: NodeID display_name: str community_id: CommunityID profile: Profile capabilities: list[dict[str, Any]] def as_dict(self) -> dict[str, Any]: return { "version": 1, "contract_version": "1.0", "node_id": self.node_id, "display_name": self.display_name, "community_id": self.community_id, "profile": self.profile, "capabilities": self.capabilities, } class HearthNode: def __init__( self, node_id: NodeID, display_name: str, community_id: CommunityID, *, transport: InMemoryTransport | None = None, profile: Profile = "hearth", ) -> None: self.node_id = node_id self.display_name = display_name self.community_id = community_id self.profile: Profile = profile # Default to the HTTP-capable transport so a standalone node can reach # remote peers over the network (e.g. the public HF Space). The # in-process InMemoryNetwork still passes a shared InMemoryTransport. # CompositeTransport is a drop-in superset of HttpBusTransport that also # accepts pluggable delivery strategies (relay/WebRTC/tunnel) — relay is # attached only on demand via join_relay(), keeping nodes local-first. if transport is None: from hearthnet.bus.transport import CompositeTransport transport = CompositeTransport() self.bus = CapabilityBus(node_id, community_id, transport) self.peers = PeerRegistry(node_id, community_id) self.state_bus = StateBus() self.detector = Detector(self.bus, self.state_bus, self.peers) self.rag = RagFacade(self.bus) self.chat = ChatFacade(self.bus) self.marketplace = MarketplaceFacade(self.bus) # Manual peer bridging (discovery.peer.add / discovery.peers) — enables # cross-network peering where mDNS/UDP multicast cannot reach. from hearthnet.discovery.service import DiscoveryService self.bus.register_service(DiscoveryService(self.bus, self.peers)) # mesh.join — redeem an invite/relay code into all-to-all relay membership. from hearthnet.transport.mesh_service import MeshService self.bus.register_service(MeshService(self)) # Populated by start() self._http_server: Any = None self._event_log: Any = None self._replay_engine: Any = None self._mdns_announcer: Any = None self._mdns_browser: Any = None self._udp_announcer: Any = None self._udp_listener: Any = None self._gossip_task: asyncio.Task | None = None self._emergency_task: asyncio.Task | None = None self._pubsub_task: asyncio.Task | None = None self._replicator_task: asyncio.Task | None = None self._replicator: Any = None self._rag_service: Any = None self._started: bool = False self._relay_client: Any = None # ------------------------------------------------------------------ # Relay mesh (opt-in, NAT-safe all-to-all over a public hub) # ------------------------------------------------------------------ async def join_relay( self, relay_url: str, *, token: str | None = None ) -> dict[str, Any]: """Join a relay hub so this node meshes all-to-all with NAT-bound peers. Opt-in only — a node stays purely local until this is called (e.g. from a redeemed invite or the ``mesh up`` launcher). Registers the relay roster's capabilities locally and attaches a :class:`RelayStrategy` to the bus transport so calls to those peers are delivered through the hub. Returns the hub's join response (current roster + ttl). Raises if the bus transport is not relay-capable (i.e. not a CompositeTransport). """ from hearthnet.bus.transport import CompositeTransport from hearthnet.transport.relay_client import RelayClient, RelayStrategy if not isinstance(self.bus.transport, CompositeTransport): raise RuntimeError("relay requires a CompositeTransport bus transport") if self._relay_client is not None: await self._relay_client.close() self.bus.transport.remove_strategy("relay") client = RelayClient( relay_url, node_id=self.node_id, display_name=self.display_name, community_id=self.community_id, bus=self.bus, peers=self.peers, token=token, ) result = await client.join() self.bus.transport.add_strategy(RelayStrategy(client)) self._relay_client = client return result async def leave_relay(self) -> None: if self._relay_client is not None: from hearthnet.bus.transport import CompositeTransport await self._relay_client.close() if isinstance(self.bus.transport, CompositeTransport): self.bus.transport.remove_strategy("relay") self._relay_client = None # ------------------------------------------------------------------ # Service installation # ------------------------------------------------------------------ def install_demo_services(self, *, internet_llm: bool = False, corpus: str = "demo") -> None: """FOR TESTS ONLY — install echo-LLM + in-memory services (no disk I/O, fast). Production code should call install_services() which auto-discovers real backends. """ # Use demo- prefixed model name so LlmService creates _EchoBackend (test path) from hearthnet.services.demo import ( LlmService as DemoLlm, ) from hearthnet.services.demo import ( MarketplaceService as DemoMarket, ) from hearthnet.services.demo import ( RagService as DemoRag, ) model_name = "demo-remote" if internet_llm else "demo-local" services = [ DemoLlm(model=model_name, requires_internet=internet_llm), DemoRag( corpus=corpus, documents=[ { "id": "seed", "title": "Water", "text": "Store clean water and boil rainwater.", } ], ), DemoMarket(), ChatService(self.node_id, bus=self.bus), FileService(), MoeService(bus=self.bus), PlantIdentificationService(bus=self.bus), ] # ModelDistributionService also needed in tests; use a temp BlobStore import tempfile from hearthnet.blobs.store import BlobStore from hearthnet.services.llm.model_distribution import ModelDistributionService from hearthnet.services.protocol import ProtocolService from hearthnet.services.rag.federated import FederatedRagService tmp_store = BlobStore(Path(tempfile.mkdtemp()) / "blobs") services.append(ModelDistributionService(store=tmp_store, models_dir=None, bus=self.bus)) services.append(ProtocolService(node=self)) services.append(FederatedRagService(self.bus, corpus=corpus)) for service in services: self.bus.register_service(service) def install_services( self, *, corpus: str = "community", models_dir=None, blob_store=None, ) -> None: """Install real services with auto-discovered LLM backends. Backend discovery order (local-first, no internet unless explicitly enabled): 1. OllamaBackend — if ollama is running on localhost 2. LlamaCppBackend — if llama.cpp HTTP server is running on localhost 3. HfLocalBackend — if transformers is installed (loads on first call) 4. _UnavailableBackend — fallback: returns a clear error, not a silent echo Also installs ModelDistributionService so peers can pull model weights. """ import os from hearthnet.services.llm.backends.hf_local import HfLocalBackend from hearthnet.services.llm.backends.modal_backend import ModalBackend from hearthnet.services.llm.backends.nemotron import NemotronBackend from hearthnet.services.llm.backends.ollama import OllamaBackend from hearthnet.services.llm.backends.openai_compat import OpenAICompatBackend from hearthnet.services.llm.backends.openbmb import OpenBmbBackend from hearthnet.services.llm.model_distribution import ModelDistributionService from hearthnet.services.protocol import ProtocolService backends = [] # 1. Ollama (best quality, zero-config local) ollama = OllamaBackend() if ollama.is_available(): backends.append(ollama) # 2. llama.cpp HTTP server on default port llama_http = OpenAICompatBackend( base_url="http://localhost:8080/v1", api_key_env="", model="local", ) if llama_http.is_available(): backends.append(llama_http) # 3. MiniCPM local server (OpenBMB prize track) if os.getenv("MINICPM_URL"): minicpm = OpenBmbBackend(base_url=os.getenv("MINICPM_URL", "http://localhost:8000")) if minicpm.is_available(): backends.append(minicpm) _log.info("MiniCPM backend registered from MINICPM_URL") # 4. NVIDIA Nemotron (cloud NIM or local; NVIDIA prize track) if os.getenv("NVIDIA_API_KEY"): nemotron = NemotronBackend(api_key_env="NVIDIA_API_KEY") backends.append(nemotron) # cloud — no local check needed _log.info("Nemotron backend registered (NVIDIA_API_KEY set)") elif os.getenv("NEMOTRON_URL"): nemotron_local = NemotronBackend( base_url=os.getenv("NEMOTRON_URL", "http://localhost:8001"), local=True, ) if nemotron_local.is_available(): backends.append(nemotron_local) _log.info("Nemotron local backend registered from NEMOTRON_URL") # 5. Modal serverless GPU (Modal prize track) if os.getenv("MODAL_ENDPOINT"): modal_b = ModalBackend() if modal_b.is_available(): backends.append(modal_b) _log.info("Modal backend registered from MODAL_ENDPOINT") # 6. HF Transformers local (always available if transformers installed) hf = HfLocalBackend() if hf.is_available(): backends.append(hf) from hearthnet.services.rag.federated import FederatedRagService services = [ LlmService(backends=backends or None), # _UnavailableBackend if none found # RagService receives blob_store now; event_log is injected in start() # after the EventLog is open (it's a lazy reference via _rag_service). RagService(corpus=corpus, blob_store=blob_store), FederatedRagService(self.bus, corpus=corpus), MarketplaceService(), ChatService(self.node_id, bus=self.bus), FileService(), MoeService(bus=self.bus), PlantIdentificationService(bus=self.bus), ProtocolService(node=self), ] # Keep a reference so start() can inject the event_log later. self._rag_service = services[1] # Model weight distribution (BitTorrent-style M07/M26) # Use provided blob_store or auto-create a persistent one in ~/.hearthnet/blobs if blob_store is None: from hearthnet.blobs.store import BlobStore blob_store = BlobStore(Path.home() / ".hearthnet" / "blobs") model_svc = ModelDistributionService( store=blob_store, models_dir=models_dir, bus=self.bus, ) services.append(model_svc) for service in services: self.bus.register_service(service) # Register the real auxiliary services (embed/rerank/ocr/translation/ # speech/image). Phase-3 research services stay off unless opted in. self.install_extended_services(research=False) def install_extended_services( self, *, research: bool = False, embed_model: str = "BAAI/bge-small-en-v1.5", ) -> None: """Register the real auxiliary services beyond the core set. Always (each degrades gracefully to an "unavailable" response when its optional backend/model is missing — never a mock): M11 EmbeddingService embed.text (real semantic vectors when sentence-transformers present) M24 RerankService rerank.text M17 OcrService ocr.image / ocr.pdf M18 TranslationService trans.text M19 Stt/TtsService stt.transcribe / tts.speak M20 Image services image.describe / image.generate When ``research=True`` (opt-in; the demo Space enables it), also registers the real Phase-3 services: M30 EvidenceService evidence.claim.* M31 CivilDefenseService civdef.* Every registration is wrapped so a missing optional dependency can never break node startup. """ def _register(svc: Any) -> None: if hasattr(svc, "capabilities"): self.bus.register_service(svc) elif hasattr(svc, "register"): svc.register(self.bus) # ── M11 Embedding (core for real RAG) ────────────────────────────── try: import importlib.util from hearthnet.services.embedding.service import EmbeddingService backend = None if importlib.util.find_spec("sentence_transformers") is not None: from hearthnet.services.embedding.backends import ( SentenceTransformerBackend, ) backend = SentenceTransformerBackend(model=embed_model) _register(EmbeddingService(backend=backend)) except Exception as exc: _log.warning("EmbeddingService registration skipped: %s", exc) # ── Remaining always-on auxiliary services ───────────────────────── _aux: list[tuple[str, Any]] = [] try: from hearthnet.services.rerank.service import RerankService _aux.append(("rerank", RerankService())) except Exception as exc: _log.debug("RerankService unavailable: %s", exc) try: from hearthnet.services.ocr.service import OcrService _aux.append(("ocr", OcrService())) except Exception as exc: _log.debug("OcrService unavailable: %s", exc) try: from hearthnet.services.translation.service import TranslationService _aux.append(("translation", TranslationService())) except Exception as exc: _log.debug("TranslationService unavailable: %s", exc) try: from hearthnet.services.speech.stt_service import SttService from hearthnet.services.speech.tts_service import TtsService _aux.append(("stt", SttService())) _aux.append(("tts", TtsService())) except Exception as exc: _log.debug("Speech services unavailable: %s", exc) try: from hearthnet.services.image.describe_service import ImageDescribeService _aux.append(("image.describe", ImageDescribeService())) except Exception as exc: _log.debug("ImageDescribeService unavailable: %s", exc) try: from hearthnet.services.image.generate_service import ImageGenerateService _aux.append(("image.generate", ImageGenerateService())) except Exception as exc: _log.debug("ImageGenerateService unavailable: %s", exc) for label, svc in _aux: try: _register(svc) except Exception as exc: _log.warning("%s registration skipped: %s", label, exc) if not research: return # ── Phase-3 research services (opt-in only) ──────────────────────── try: from hearthnet.evidence.service import EvidenceService _register(EvidenceService(community_id=self.community_id)) except Exception as exc: _log.warning("EvidenceService registration skipped: %s", exc) try: from hearthnet.civdef.service import CivilDefenseService _register(CivilDefenseService()) except Exception as exc: _log.warning("CivilDefenseService registration skipped: %s", exc) async def start( self, *, host: str = "0.0.0.0", # nosec B104 port: int = 7080, data_dir: Path | str | None = None, gossip_interval: int = _GOSSIP_INTERVAL_SECONDS, ) -> None: """Start the node — wires all subsystems. Steps: 1-2. Already done: node_id + bus created in __init__ 3. Start mDNS + UDP peer discovery 4. Load config (external; this method receives data_dir) 5. Services already registered by caller via install_services() 6. (LLM warmup deferred to first call) 7. (RAG warmup deferred to first call) 8. Start Detector (emergency probe loop) 9. Start EventLog + ReplayEngine 10. Start FastAPI HttpServer (X01) 11. Publish manifest (mDNS) 12. (Community join via invite; deferred to CLI/UI) 13. Start observability 14. (Federation; deferred) 15. Signal ready """ if self._started: return _log.info("HearthNode.start() node_id=%s port=%d", self.node_id, port) data_dir_path = ( Path(data_dir) if data_dir else Path.home() / ".hearthnet" / "nodes" / self.node_id[:16] ) data_dir_path.mkdir(parents=True, exist_ok=True) # ── Step 9: Event log + replay engine ───────────────────────── try: from hearthnet.events import EventLog, ReplayEngine self._event_log = EventLog(data_dir_path / "events.db", self.community_id, self.node_id) self._replay_engine = ReplayEngine(self._event_log) _log.debug("EventLog opened at %s", data_dir_path / "events.db") except Exception as exc: _log.warning("EventLog init failed (non-fatal): %s", exc) # ── Step 3: Peer discovery (mDNS + UDP) ─────────────────────── caps = [e.descriptor.name for e in self.bus.registry.all_local()] try: from hearthnet.discovery.mdns import MdnsAnnouncer, MdnsBrowser from hearthnet.discovery.udp import UdpAnnouncer, UdpListener self._mdns_announcer = MdnsAnnouncer( self.peers, self.node_id, self.display_name, port=port, properties={"profile": self.profile, "caps": caps}, ) self._mdns_browser = MdnsBrowser(self.peers, self.community_id) self._udp_announcer = UdpAnnouncer( self.peers, self.node_id, self.community_id, port=port, caps=caps ) self._udp_listener = UdpListener(self.peers, self.community_id) await self._mdns_announcer.start() await self._mdns_browser.start() await self._udp_announcer.start() await self._udp_listener.start() _log.debug("mDNS + UDP discovery started on port %d", port) except Exception as exc: _log.warning("Discovery init failed (non-fatal): %s", exc) # ── Step 8: Emergency detector ──────────────────────────────── try: await self.detector.start() except Exception as exc: _log.warning("Emergency detector start failed (non-fatal): %s", exc) # ── Step 10: HTTP server (X01) + WebSocket pubsub (X06) ─────── try: from hearthnet.events.sync import SyncServer from hearthnet.transport.server import HttpServer sync_server = SyncServer(self._event_log) if self._event_log else None self._http_server = HttpServer( bus=self.bus, node_manifest_fn=lambda: self.manifest().as_dict(), sync_server=sync_server, host=host, port=port, ) self._http_server.build_app() await self._http_server.start() _log.info("HTTP server listening on %s:%d", host, port) # Wire StateBus -> WebSocket pubsub (X06) if self._http_server._ws_pubsub is not None: self._pubsub_task = asyncio.create_task( self._state_bus_to_pubsub(), name="state-pubsub" ) except Exception as exc: _log.warning("HTTP server start failed (non-fatal): %s", exc) # ── Gossip sync loop (X02) ──────────────────────────────────── if self._event_log is not None: self._gossip_task = asyncio.create_task( self._gossip_loop(gossip_interval), name="gossip-sync" ) # -- Corpus replicator (Phase 2: BitTorrent-style RAG sync) ---------- if self._event_log is not None: try: from hearthnet.blobs.store import BlobStore from hearthnet.blobs.transfer import TransferManager from hearthnet.services.rag.replication import CorpusReplicator from hearthnet.services.rag.store import CorpusStore # Inject event_log into the RagService now that EventLog is open. if self._rag_service is not None: self._rag_service._event_log = self._event_log repl_blob_store = BlobStore(data_dir_path / "repl_blobs") transfer = TransferManager(repl_blob_store, http_client=None) def _corpus_store_fn(corpus: str) -> CorpusStore: return CorpusStore(data_dir_path / "corpora", corpus) self._replicator = CorpusReplicator( bus=self.bus, event_log=self._event_log, transfer=transfer, peers=self.peers, local_node_id=self.node_id, corpus_store_fn=_corpus_store_fn, ) self._replicator_task = self._replicator.start() _log.info("CorpusReplicator started") except Exception as exc: _log.warning("CorpusReplicator init failed (non-fatal): %s", exc) _log.info("HearthNode ready: %s", self.node_id) async def stop(self) -> None: """Gracefully stop all background tasks and subsystems.""" if not self._started: return _log.info("HearthNode.stop() node_id=%s", self.node_id) self._started = False # Leave relay mesh (stops poll loop, closes client) with contextlib.suppress(Exception): await self.leave_relay() # Close event log if self._event_log is not None: with contextlib.suppress(Exception): self._event_log.close() self._event_log = None # Stop emergency detector with contextlib.suppress(Exception): await self.detector.stop() # Cancel background tasks for task_attr in ("_gossip_task", "_pubsub_task", "_replicator_task"): task = getattr(self, task_attr, None) if task and not task.done(): task.cancel() with contextlib.suppress(asyncio.CancelledError, Exception): await task setattr(self, task_attr, None) # Stop discovery for attr in ("_udp_announcer", "_udp_listener", "_mdns_announcer"): obj = getattr(self, attr, None) if obj: with contextlib.suppress(Exception): await obj.stop() if self._mdns_browser: try: if hasattr(self._mdns_browser, "stop"): await self._mdns_browser.stop() except Exception: pass # Stop HTTP server if self._http_server: with contextlib.suppress(Exception): await self._http_server.shutdown() self._http_server = None # ------------------------------------------------------------------ # Background tasks # ------------------------------------------------------------------ async def _gossip_loop(self, interval: int) -> None: """Periodically sync event log with all known peers (X02 gossip).""" from hearthnet.events.sync import SyncClient http_client = _HttpxSyncClient() if http_client.unavailable: _log.info("Gossip sync disabled: httpx not installed") return sync_client = SyncClient(self._event_log, http_client) try: while True: await asyncio.sleep(interval) for peer in self.peers.all(): if not peer.endpoints: continue ep = peer.endpoints[0] if ep.transport == "memory": continue # in-process; no HTTP needed peer_url = f"http://{ep.host}:{ep.port}" try: result = await sync_client.sync_with(peer_url, self.community_id) if result.received_count or result.sent_count: _log.debug( "Gossip with %s: sent=%d recv=%d ms=%d", peer.display_name, result.sent_count, result.received_count, result.duration_ms, ) except Exception as exc: _log.debug("Gossip sync with %s failed: %s", peer.display_name, exc) finally: await http_client.aclose() async def _state_bus_to_pubsub(self) -> None: """Forward StateBus state changes to the WebSocket pubsub (X06).""" try: async for state in self.state_bus.subscribe(): if self._http_server is None: break await self._http_server.publish_event( topic="emergency.mode.changed", event="emergency.mode.changed", data={ "mode": state.mode, "mode_label": state.mode_label, "changed_at": state.changed_at, "probe_results": state.probe_results, "consecutive_fails": state.consecutive_fails, }, ) except asyncio.CancelledError: pass except Exception as exc: _log.warning("state_bus_to_pubsub error: %s", exc) # ------------------------------------------------------------------ # Introspection # ------------------------------------------------------------------ def manifest(self) -> NodeManifest: capabilities = [ { "name": entry.descriptor.name, "version": entry.descriptor.version_str, "stability": entry.descriptor.stability, "schema_hash": entry.descriptor.schema_hash(), "params": dict(entry.descriptor.params), "max_concurrent": entry.descriptor.max_concurrent, } for entry in self.bus.registry.all_local() ] return NodeManifest( self.node_id, self.display_name, self.community_id, self.profile, capabilities ) def discover(self, other: HearthNode) -> None: record = PeerRecord( node_id_full=other.node_id, display_name=other.display_name, community_id=other.community_id, profile=other.profile, endpoints=[Endpoint("memory", other.node_id, 0)], manifest=other.manifest().as_dict(), last_seen=time.monotonic(), ) if self.peers.upsert(record): self.bus.registry.update_from_peer_manifest(record, record.manifest or {}) def snapshot(self) -> dict[str, Any]: topology = self.bus.topology_snapshot([peer.as_view() for peer in self.peers.all()]) return { "node": { "node_id": self.node_id, "display_name": self.display_name, "community_id": self.community_id, "profile": self.profile, }, "emergency": self.state_bus.current(), "topology": topology, "started": self._started, "event_log_head": self._event_log.head() if self._event_log else None, } class InMemoryNetwork: def __init__(self) -> None: self.transport = InMemoryTransport() self.nodes: list[HearthNode] = [] def add_node( self, node_id: NodeID, display_name: str, community_id: CommunityID = "ed25519:community" ) -> HearthNode: node = HearthNode(node_id, display_name, community_id, transport=self.transport) self.nodes.append(node) return node def mesh_discover(self) -> None: for node in self.nodes: for other in self.nodes: if node is not other: node.discover(other) # --------------------------------------------------------------------------- # PeriodicTask — generic async interval runner (M12 §5) # --------------------------------------------------------------------------- class PeriodicTask: """Run *fn* every *interval_seconds* until cancelled. Usage:: task = PeriodicTask(my_async_fn, interval_seconds=60) asyncio.create_task(task.run()) """ def __init__(self, fn, interval_seconds: int) -> None: self._fn = fn self._interval = interval_seconds async def run(self) -> None: while True: await asyncio.sleep(self._interval) try: await self._fn() except asyncio.CancelledError: raise except Exception as exc: _log.debug("PeriodicTask %s error: %s", self._fn, exc) # --------------------------------------------------------------------------- # ManifestPublisher — republishes node manifest to mDNS + UDP (M12 §5) # --------------------------------------------------------------------------- _MANIFEST_REPUBLISH_INTERVAL_SECONDS = 300 # 5 minutes default class ManifestPublisher: """Periodically re-publishes the node manifest to mDNS + UDP announcer. Also triggered when the bus registry changes (capability added/removed). """ def __init__( self, bus, peer_registry, mdns_announcer=None, udp_announcer=None, node_manifest_fn=None, interval_seconds: int = _MANIFEST_REPUBLISH_INTERVAL_SECONDS, ) -> None: self._bus = bus self._peer_registry = peer_registry self._mdns_announcer = mdns_announcer self._udp_announcer = udp_announcer self._node_manifest_fn = node_manifest_fn self._interval = interval_seconds self._task: asyncio.Task | None = None async def run(self) -> None: """Publish immediately then republish every *interval_seconds*.""" while True: await self._publish() await asyncio.sleep(self._interval) async def _publish(self) -> None: try: manifest = self._node_manifest_fn() if self._node_manifest_fn else {} caps = [c.get("name") for c in manifest.get("capabilities", [])] if self._mdns_announcer and hasattr(self._mdns_announcer, "republish"): await self._mdns_announcer.republish(caps) if self._udp_announcer and hasattr(self._udp_announcer, "republish"): await self._udp_announcer.republish() except Exception as exc: _log.debug("ManifestPublisher._publish error: %s", exc)