Spaces:
Running on Zero
Running on Zero
GitHub Actions
Quality improvements: Unicode chars, Token class, imports, type hints, formatting
3f78ea8 | """X01 — Backpressure / flow control. | |
| Spec: docs/X01-transport.md §3.4 | |
| FlowControl gates outbound work when a downstream consumer is slow. | |
| Used by HttpServer SSE streams and WebSocket pub-sub to avoid unbounded queues. | |
| """ | |
| from __future__ import annotations | |
| import asyncio | |
| class FlowControl: | |
| """Leaky-bucket / semaphore flow control for streaming responses. | |
| Usage:: | |
| fc = FlowControl(capacity=32) | |
| async with fc.acquire(): | |
| await stream_chunk(data) | |
| If the number of in-flight chunks reaches *capacity*, ``acquire()`` | |
| blocks until a slot is freed. This creates natural back-pressure so | |
| a slow HTTP client cannot cause the server to buffer unbounded data. | |
| """ | |
| def __init__(self, capacity: int = 64) -> None: | |
| if capacity < 1: | |
| raise ValueError("capacity must be >= 1") | |
| self._sem = asyncio.Semaphore(capacity) | |
| self._capacity = capacity | |
| self._total_acquired: int = 0 | |
| self._total_released: int = 0 | |
| def capacity(self) -> int: | |
| return self._capacity | |
| def in_flight(self) -> int: | |
| return self._total_acquired - self._total_released | |
| def acquire(self) -> _AcquireContext: | |
| return _AcquireContext(self) | |
| async def _acquire(self) -> None: | |
| await self._sem.acquire() | |
| self._total_acquired += 1 | |
| def _release(self) -> None: | |
| self._sem.release() | |
| self._total_released += 1 | |
| def stats(self) -> dict: | |
| return { | |
| "capacity": self._capacity, | |
| "in_flight": self.in_flight, | |
| "total_acquired": self._total_acquired, | |
| "total_released": self._total_released, | |
| } | |
| class _AcquireContext: | |
| def __init__(self, fc: FlowControl) -> None: | |
| self._fc = fc | |
| async def __aenter__(self) -> _AcquireContext: | |
| await self._fc._acquire() | |
| return self | |
| async def __aexit__(self, *_) -> None: | |
| self._fc._release() | |
| # --------------------------------------------------------------------------- | |
| # RateCheck / RateLimiter (X01 §3.5) | |
| # --------------------------------------------------------------------------- | |
| class RateCheck: | |
| """Simple sliding-window rate check (read-only, no blocking). | |
| Use to check whether a call is within limits before proceeding. | |
| Returns True if allowed, False if over limit. | |
| """ | |
| def __init__(self, max_calls: int, window_seconds: float = 1.0) -> None: | |
| self._max = max_calls | |
| self._window = window_seconds | |
| self._calls: list[float] = [] | |
| def check(self, now: float | None = None) -> bool: | |
| import time | |
| t = now if now is not None else time.monotonic() | |
| cutoff = t - self._window | |
| self._calls = [c for c in self._calls if c > cutoff] | |
| if len(self._calls) < self._max: | |
| self._calls.append(t) | |
| return True | |
| return False | |
| def reset(self) -> None: | |
| self._calls.clear() | |
| class RateLimiter: | |
| """Async rate limiter — blocks until a slot is available. | |
| Usage:: | |
| rl = RateLimiter(max_calls=10, window_seconds=1.0) | |
| await rl.acquire() | |
| # ... do work ... | |
| """ | |
| def __init__(self, max_calls: int, window_seconds: float = 1.0) -> None: | |
| self._max = max_calls | |
| self._window = window_seconds | |
| self._calls: list[float] = [] | |
| self._lock = asyncio.Lock() | |
| async def acquire(self) -> None: | |
| import time | |
| while True: | |
| async with self._lock: | |
| t = time.monotonic() | |
| cutoff = t - self._window | |
| self._calls = [c for c in self._calls if c > cutoff] | |
| if len(self._calls) < self._max: | |
| self._calls.append(t) | |
| return | |
| await asyncio.sleep(self._window / self._max) | |