Spaces:
Running on Zero
Running on Zero
GitHub Actions
fix: 0 test failures; FileService; real RagService; emergency probe; chat return
4aaae80 | from __future__ import annotations | |
| import asyncio | |
| import time | |
| from collections.abc import AsyncIterator | |
| from dataclasses import dataclass | |
| from typing import Literal | |
| Mode = Literal["online", "degraded", "offline"] | |
| class EmergencyState: | |
| mode: Mode | |
| changed_at: float # monotonic timestamp | |
| probe_results: dict[str, bool] # target -> success | |
| consecutive_fails: int = 0 | |
| def mode_label(self) -> str: | |
| return { | |
| "online": "ONLINE", | |
| "degraded": "DEGRADED β LIMITED", | |
| "offline": "INTERNET OFFLINE β LOKAL AKTIV", | |
| }[self.mode] | |
| class StateBus: | |
| """In-process pub/sub for emergency state changes.""" | |
| def __init__(self) -> None: | |
| self._state = EmergencyState(mode="online", changed_at=time.monotonic(), probe_results={}) | |
| self._subscribers: list[asyncio.Queue] = [] | |
| self._transition_times: list[float] = [] # for anti-flap | |
| def current(self) -> EmergencyState: | |
| return self._state | |
| async def subscribe(self) -> AsyncIterator[EmergencyState]: | |
| q: asyncio.Queue = asyncio.Queue(maxsize=10) | |
| self._subscribers.append(q) | |
| try: | |
| while True: | |
| state = await q.get() | |
| yield state | |
| finally: | |
| self._subscribers.remove(q) | |
| def emit_probe(self, probe_results: dict[str, bool]) -> EmergencyState: | |
| """Compute new mode from probe results, apply anti-flap, emit if changed.""" | |
| successes = sum(1 for v in probe_results.values() if v) | |
| total = len(probe_results) | |
| fails = total - successes | |
| if total == 0: | |
| new_mode: Mode = "online" | |
| elif fails >= max(2, total // 2): | |
| new_mode = "offline" | |
| elif fails > 0: | |
| new_mode = "degraded" | |
| else: | |
| new_mode = "online" | |
| old_mode = self._state.mode | |
| # Anti-flap: if too many transitions in last 60s, stay pessimistic | |
| from hearthnet.constants import ( | |
| EMERGENCY_ANTI_FLAP_MAX_TRANSITIONS, | |
| EMERGENCY_ANTI_FLAP_WINDOW_SECONDS, | |
| ) | |
| now = time.monotonic() | |
| self._transition_times = [ | |
| t for t in self._transition_times if now - t < EMERGENCY_ANTI_FLAP_WINDOW_SECONDS | |
| ] | |
| if len(self._transition_times) >= EMERGENCY_ANTI_FLAP_MAX_TRANSITIONS: | |
| # Too many flaps β hold pessimistic | |
| if old_mode in ("degraded", "offline") and new_mode == "online": | |
| new_mode = old_mode # don't restore yet | |
| new_state = EmergencyState( | |
| mode=new_mode, | |
| changed_at=now if new_mode != old_mode else self._state.changed_at, | |
| probe_results=probe_results, | |
| consecutive_fails=self._state.consecutive_fails + (1 if fails > 0 else 0), | |
| ) | |
| if new_mode != old_mode: | |
| self._transition_times.append(now) | |
| self._state = new_state | |
| self._emit(new_state) | |
| else: | |
| self._state = new_state | |
| return new_state | |
| def _emit(self, state: EmergencyState) -> None: | |
| for q in list(self._subscribers): | |
| try: | |
| q.put_nowait(state) | |
| except asyncio.QueueFull: | |
| pass | |