Spaces:
Running on Zero
Running on Zero
| from __future__ import annotations | |
| import time | |
| from collections import defaultdict, deque | |
| from hearthnet.bus.capability import CapabilityEntry | |
| HEALTH_WINDOW_CALLS = 5 | |
| HEALTH_QUARANTINE_SECONDS = 30 | |
| HEALTH_QUARANTINE_THRESHOLD = 0.60 | |
| class HealthTracker: | |
| def __init__(self, window: int = HEALTH_WINDOW_CALLS) -> None: | |
| self.window = window | |
| self._samples: dict[tuple[str, str, tuple[int, int]], deque[tuple[bool, float]]] = ( | |
| defaultdict(lambda: deque(maxlen=window)) | |
| ) | |
| def record(self, entry: CapabilityEntry, *, success: bool, latency_ms: float) -> None: | |
| key = (entry.node_id, entry.descriptor.name, entry.descriptor.version) | |
| samples = self._samples[key] | |
| samples.append((success, latency_ms)) | |
| latencies = sorted(sample[1] for sample in samples) | |
| successes = sum(1 for sample in samples if sample[0]) | |
| entry.success_rate = successes / len(samples) | |
| entry.p50_latency_ms = latencies[len(latencies) // 2] | |
| entry.p99_latency_ms = latencies[-1] | |
| if len(samples) >= self.window and entry.success_rate < HEALTH_QUARANTINE_THRESHOLD: | |
| entry.quarantined_until = time.monotonic() + HEALTH_QUARANTINE_SECONDS | |
| def reset(self, entry: CapabilityEntry) -> None: | |
| key = (entry.node_id, entry.descriptor.name, entry.descriptor.version) | |
| self._samples.pop(key, None) | |
| entry.success_rate = 1.0 | |
| entry.quarantined_until = 0.0 | |