GitHub Actions
fix: 0 test failures; FileService; real RagService; emergency probe; chat return
4aaae80
Raw
History Blame
3.29 kB
from __future__ import annotations
import asyncio
import time
from collections.abc import AsyncIterator
from dataclasses import dataclass
from typing import Literal
Mode = Literal["online", "degraded", "offline"]
@dataclass(frozen=True)
class EmergencyState:
mode: Mode
changed_at: float # monotonic timestamp
probe_results: dict[str, bool] # target -> success
consecutive_fails: int = 0
@property
def mode_label(self) -> str:
return {
"online": "ONLINE",
"degraded": "DEGRADED β€” LIMITED",
"offline": "INTERNET OFFLINE β€” LOKAL AKTIV",
}[self.mode]
class StateBus:
"""In-process pub/sub for emergency state changes."""
def __init__(self) -> None:
self._state = EmergencyState(mode="online", changed_at=time.monotonic(), probe_results={})
self._subscribers: list[asyncio.Queue] = []
self._transition_times: list[float] = [] # for anti-flap
def current(self) -> EmergencyState:
return self._state
async def subscribe(self) -> AsyncIterator[EmergencyState]:
q: asyncio.Queue = asyncio.Queue(maxsize=10)
self._subscribers.append(q)
try:
while True:
state = await q.get()
yield state
finally:
self._subscribers.remove(q)
def emit_probe(self, probe_results: dict[str, bool]) -> EmergencyState:
"""Compute new mode from probe results, apply anti-flap, emit if changed."""
successes = sum(1 for v in probe_results.values() if v)
total = len(probe_results)
fails = total - successes
if total == 0:
new_mode: Mode = "online"
elif fails >= max(2, total // 2):
new_mode = "offline"
elif fails > 0:
new_mode = "degraded"
else:
new_mode = "online"
old_mode = self._state.mode
# Anti-flap: if too many transitions in last 60s, stay pessimistic
from hearthnet.constants import (
EMERGENCY_ANTI_FLAP_MAX_TRANSITIONS,
EMERGENCY_ANTI_FLAP_WINDOW_SECONDS,
)
now = time.monotonic()
self._transition_times = [
t for t in self._transition_times if now - t < EMERGENCY_ANTI_FLAP_WINDOW_SECONDS
]
if len(self._transition_times) >= EMERGENCY_ANTI_FLAP_MAX_TRANSITIONS:
# Too many flaps β€” hold pessimistic
if old_mode in ("degraded", "offline") and new_mode == "online":
new_mode = old_mode # don't restore yet
new_state = EmergencyState(
mode=new_mode,
changed_at=now if new_mode != old_mode else self._state.changed_at,
probe_results=probe_results,
consecutive_fails=self._state.consecutive_fails + (1 if fails > 0 else 0),
)
if new_mode != old_mode:
self._transition_times.append(now)
self._state = new_state
self._emit(new_state)
else:
self._state = new_state
return new_state
def _emit(self, state: EmergencyState) -> None:
for q in list(self._subscribers):
try:
q.put_nowait(state)
except asyncio.QueueFull:
pass