GitHub Actions
Quality improvements: Unicode chars, Token class, imports, type hints, formatting
3f78ea8
Raw
History Blame
3.87 kB
"""X01 — Backpressure / flow control.
Spec: docs/X01-transport.md §3.4
FlowControl gates outbound work when a downstream consumer is slow.
Used by HttpServer SSE streams and WebSocket pub-sub to avoid unbounded queues.
"""
from __future__ import annotations
import asyncio
class FlowControl:
"""Leaky-bucket / semaphore flow control for streaming responses.
Usage::
fc = FlowControl(capacity=32)
async with fc.acquire():
await stream_chunk(data)
If the number of in-flight chunks reaches *capacity*, ``acquire()``
blocks until a slot is freed. This creates natural back-pressure so
a slow HTTP client cannot cause the server to buffer unbounded data.
"""
def __init__(self, capacity: int = 64) -> None:
if capacity < 1:
raise ValueError("capacity must be >= 1")
self._sem = asyncio.Semaphore(capacity)
self._capacity = capacity
self._total_acquired: int = 0
self._total_released: int = 0
@property
def capacity(self) -> int:
return self._capacity
@property
def in_flight(self) -> int:
return self._total_acquired - self._total_released
def acquire(self) -> _AcquireContext:
return _AcquireContext(self)
async def _acquire(self) -> None:
await self._sem.acquire()
self._total_acquired += 1
def _release(self) -> None:
self._sem.release()
self._total_released += 1
def stats(self) -> dict:
return {
"capacity": self._capacity,
"in_flight": self.in_flight,
"total_acquired": self._total_acquired,
"total_released": self._total_released,
}
class _AcquireContext:
def __init__(self, fc: FlowControl) -> None:
self._fc = fc
async def __aenter__(self) -> _AcquireContext:
await self._fc._acquire()
return self
async def __aexit__(self, *_) -> None:
self._fc._release()
# ---------------------------------------------------------------------------
# RateCheck / RateLimiter (X01 §3.5)
# ---------------------------------------------------------------------------
class RateCheck:
"""Simple sliding-window rate check (read-only, no blocking).
Use to check whether a call is within limits before proceeding.
Returns True if allowed, False if over limit.
"""
def __init__(self, max_calls: int, window_seconds: float = 1.0) -> None:
self._max = max_calls
self._window = window_seconds
self._calls: list[float] = []
def check(self, now: float | None = None) -> bool:
import time
t = now if now is not None else time.monotonic()
cutoff = t - self._window
self._calls = [c for c in self._calls if c > cutoff]
if len(self._calls) < self._max:
self._calls.append(t)
return True
return False
def reset(self) -> None:
self._calls.clear()
class RateLimiter:
"""Async rate limiter — blocks until a slot is available.
Usage::
rl = RateLimiter(max_calls=10, window_seconds=1.0)
await rl.acquire()
# ... do work ...
"""
def __init__(self, max_calls: int, window_seconds: float = 1.0) -> None:
self._max = max_calls
self._window = window_seconds
self._calls: list[float] = []
self._lock = asyncio.Lock()
async def acquire(self) -> None:
import time
while True:
async with self._lock:
t = time.monotonic()
cutoff = t - self._window
self._calls = [c for c in self._calls if c > cutoff]
if len(self._calls) < self._max:
self._calls.append(t)
return
await asyncio.sleep(self._window / self._max)