Spaces:
Running on Zero
Running on Zero
GitHub Actions
Quality improvements: Unicode chars, Token class, imports, type hints, formatting
3f78ea8 | """Federated metrics aggregation (X07).""" | |
| from __future__ import annotations | |
| import asyncio | |
| import logging | |
| import time | |
| from collections import defaultdict, deque | |
| from dataclasses import dataclass | |
| from typing import Any | |
| logger = logging.getLogger(__name__) | |
| # ββ Dataclasses βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| class NodeMetricsTick: | |
| """Single per-node metrics sample.""" | |
| node_id: str | |
| community_id: str | |
| tick_at: float | |
| active_capabilities: int = 0 | |
| events_per_min: float = 0.0 | |
| peers_online: int = 0 | |
| llm_requests_total: int = 0 | |
| rag_requests_total: int = 0 | |
| gpu_memory_mb: float | None = None | |
| cpu_percent: float = 0.0 | |
| memory_mb: float = 0.0 | |
| online_seconds: int = 0 | |
| class CommunityMetrics: | |
| """Aggregated metrics across all members of a community (full detail).""" | |
| community_id: str | |
| member_count: int | |
| online_count: int | |
| events_per_min_total: float | |
| capabilities_total: int | |
| ticks: list[NodeMetricsTick] # per-node detail | |
| sampled_at: float | |
| class AggregatedSnapshot: | |
| """ | |
| Anonymised/banded aggregate for federated peers (less information | |
| at greater distance per X07 design rule). | |
| """ | |
| community_id: str | |
| member_count_band: str # e.g. "10-20" | |
| online_count_band: str | |
| events_per_min_band: str | |
| capabilities_count: int | |
| federation_links_count: int | |
| sampled_at: float | |
| # ββ Helpers βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| def _band(value: float, steps: list[int]) -> str: | |
| """Return a band string like '10-20' for *value* given boundary *steps*.""" | |
| for i, upper in enumerate(steps): | |
| lower = steps[i - 1] if i > 0 else 0 | |
| if value < upper: | |
| return f"{lower}-{upper}" | |
| last = steps[-1] if steps else 0 | |
| return f"{last}+" | |
| _MEMBER_BANDS = [5, 10, 20, 50, 100, 250, 500, 1000] | |
| _ONLINE_BANDS = [2, 5, 10, 25, 50, 100, 250, 500] | |
| _EPM_BANDS = [10, 50, 100, 500, 1000, 5000, 10000] | |
| def _collect_system_metrics() -> dict[str, float]: | |
| """Snapshot CPU / memory using psutil if available; otherwise zeros.""" | |
| try: | |
| import psutil # type: ignore[import] | |
| cpu = psutil.cpu_percent(interval=None) | |
| mem = psutil.virtual_memory().used / (1024 * 1024) | |
| return {"cpu_percent": cpu, "memory_mb": mem} | |
| except ImportError: | |
| return {"cpu_percent": 0.0, "memory_mb": 0.0} | |
| except Exception: | |
| return {"cpu_percent": 0.0, "memory_mb": 0.0} | |
| def _collect_gpu_memory() -> float | None: | |
| """Return GPU memory usage in MB if pynvml is available.""" | |
| try: | |
| import pynvml # type: ignore[import] | |
| pynvml.nvmlInit() | |
| handle = pynvml.nvmlDeviceGetHandleByIndex(0) | |
| info = pynvml.nvmlDeviceGetMemoryInfo(handle) | |
| return info.used / (1024 * 1024) | |
| except Exception: | |
| return None | |
| # ββ FederatedMetricsExporter ββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| class FederatedMetricsExporter: | |
| """ | |
| Snapshots local metrics and publishes them to the community bus topic | |
| and optionally to an OTLP collector. | |
| """ | |
| def __init__( | |
| self, | |
| node_id: str = "", | |
| community_id: str = "", | |
| bus: Any = None, | |
| ) -> None: | |
| self._node_id = node_id | |
| self._community_id = community_id | |
| self._bus = bus | |
| self._start_time: float = time.time() | |
| def collect_tick(self, bus: Any = None) -> NodeMetricsTick: | |
| """Snapshot current metrics into a :class:`NodeMetricsTick`.""" | |
| _bus = bus or self._bus | |
| sys_metrics = _collect_system_metrics() | |
| # Collect capability count from bus | |
| active_caps = 0 | |
| if _bus is not None: | |
| try: | |
| caps = _bus.list_capabilities() | |
| active_caps = len(caps) if caps else 0 | |
| except Exception: | |
| pass | |
| # Collect request counters from prometheus registry if available | |
| llm_total = 0 | |
| rag_total = 0 | |
| try: | |
| from hearthnet.observability.metrics import _STD | |
| llm_counter = _STD.get("hearthnet_llm_requests_total") | |
| rag_counter = _STD.get("hearthnet_rag_requests_total") | |
| if llm_counter is not None and hasattr(llm_counter, "_value"): | |
| llm_total = int(llm_counter._value.get() or 0) | |
| if rag_counter is not None and hasattr(rag_counter, "_value"): | |
| rag_total = int(rag_counter._value.get() or 0) | |
| except Exception: | |
| pass | |
| online_secs = int(time.time() - self._start_time) | |
| return NodeMetricsTick( | |
| node_id=self._node_id, | |
| community_id=self._community_id, | |
| tick_at=time.time(), | |
| active_capabilities=active_caps, | |
| events_per_min=0.0, # filled by aggregator from event log | |
| peers_online=0, # filled by aggregator from peer registry | |
| llm_requests_total=llm_total, | |
| rag_requests_total=rag_total, | |
| gpu_memory_mb=_collect_gpu_memory(), | |
| cpu_percent=sys_metrics["cpu_percent"], | |
| memory_mb=sys_metrics["memory_mb"], | |
| online_seconds=online_secs, | |
| ) | |
| async def push_to_community(self, tick: NodeMetricsTick, bus: Any = None) -> None: | |
| """Publish *tick* to the bus topic ``observability.metrics.tick.<node_id>``.""" | |
| _bus = bus or self._bus | |
| if _bus is None: | |
| logger.debug("FederatedMetricsExporter.push_to_community: no bus configured") | |
| return | |
| topic = f"observability.metrics.tick.{tick.node_id}" | |
| payload: dict[str, Any] = { | |
| "node_id": tick.node_id, | |
| "community_id": tick.community_id, | |
| "tick_at": tick.tick_at, | |
| "active_capabilities": tick.active_capabilities, | |
| "events_per_min": tick.events_per_min, | |
| "peers_online": tick.peers_online, | |
| "llm_requests_total": tick.llm_requests_total, | |
| "rag_requests_total": tick.rag_requests_total, | |
| "gpu_memory_mb": tick.gpu_memory_mb, | |
| "cpu_percent": tick.cpu_percent, | |
| "memory_mb": tick.memory_mb, | |
| "online_seconds": tick.online_seconds, | |
| } | |
| try: | |
| result = _bus.call( | |
| "bus.publish", | |
| (1, 0), | |
| {"topic": topic, "event": "metrics_tick", "data": payload}, | |
| ) | |
| if asyncio.iscoroutine(result): | |
| await result | |
| except Exception as exc: | |
| logger.warning("FederatedMetricsExporter.push_to_community failed: %s", exc) | |
| async def push_otlp(self, endpoint: str, tick: NodeMetricsTick) -> None: | |
| """ | |
| Export *tick* via OTLP HTTP. Requires opentelemetry-exporter-otlp-proto-http. | |
| Delegates to :class:`OtlpExporter`. | |
| """ | |
| try: | |
| from hearthnet.observability.otlp_export import OtlpExporter | |
| exporter = OtlpExporter(endpoint) | |
| await exporter.export_metrics(tick) | |
| except ImportError: | |
| logger.debug("push_otlp: opentelemetry not installed β skipping") | |
| except Exception as exc: | |
| logger.warning("FederatedMetricsExporter.push_otlp failed: %s", exc) | |
| # ββ MetricsAggregator βββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| class MetricsAggregator: | |
| """ | |
| Receives NodeMetricsTick events from all community members and builds | |
| community-level and federated snapshots. | |
| """ | |
| def __init__( | |
| self, | |
| community_id: str, | |
| max_ticks_per_node: int = 60, | |
| ) -> None: | |
| self._community_id = community_id | |
| self._max_ticks = max_ticks_per_node | |
| # node_id β deque of ticks (newest last) | |
| self._ticks: dict[str, deque[NodeMetricsTick]] = defaultdict( | |
| lambda: deque(maxlen=self._max_ticks) | |
| ) | |
| self._federation_links: dict[str, int] = {} # peer_community_id β count | |
| def apply_tick(self, tick: NodeMetricsTick) -> None: | |
| """Incorporate a new tick from a community member.""" | |
| self._ticks[tick.node_id].append(tick) | |
| def community_snapshot(self) -> CommunityMetrics: | |
| """Return the latest community-wide aggregate.""" | |
| now = time.time() | |
| online_cutoff = now - 120 # consider online if tick within 2 min | |
| latest_ticks: list[NodeMetricsTick] = [d[-1] for d in self._ticks.values() if d] | |
| online = [t for t in latest_ticks if t.tick_at >= online_cutoff] | |
| total_epm = sum(t.events_per_min for t in online) | |
| total_caps = sum(t.active_capabilities for t in online) | |
| return CommunityMetrics( | |
| community_id=self._community_id, | |
| member_count=len(self._ticks), | |
| online_count=len(online), | |
| events_per_min_total=total_epm, | |
| capabilities_total=total_caps, | |
| ticks=list(latest_ticks), | |
| sampled_at=now, | |
| ) | |
| def federated_snapshot(self, peer_community_id: str) -> AggregatedSnapshot: | |
| """ | |
| Return a banded/anonymised snapshot suitable for sharing with a | |
| federated peer community. | |
| """ | |
| snap = self.community_snapshot() | |
| fed_links = len(self._federation_links) | |
| return AggregatedSnapshot( | |
| community_id=self._community_id, | |
| member_count_band=_band(snap.member_count, _MEMBER_BANDS), | |
| online_count_band=_band(snap.online_count, _ONLINE_BANDS), | |
| events_per_min_band=_band(snap.events_per_min_total, _EPM_BANDS), | |
| capabilities_count=snap.capabilities_total, | |
| federation_links_count=fed_links, | |
| sampled_at=snap.sampled_at, | |
| ) | |
| def record_federation_link(self, peer_community_id: str) -> None: | |
| """Track that we have an active federation link to *peer_community_id*.""" | |
| self._federation_links[peer_community_id] = ( | |
| self._federation_links.get(peer_community_id, 0) + 1 | |
| ) | |