Spaces:
Running on Zero
Running on Zero
| from __future__ import annotations | |
| from typing import TYPE_CHECKING, Protocol | |
| from .types import Event, EventType | |
| if TYPE_CHECKING: | |
| from .log import EventLog | |
| class MaterialisedView(Protocol): | |
| """Protocol that all consuming-module views must satisfy.""" | |
| def reset(self) -> None: | |
| """Clear all state (called before a full replay).""" | |
| ... | |
| def apply(self, event: Event) -> None: | |
| """Incorporate a single event into the view's state.""" | |
| ... | |
| def snapshot_state(self) -> dict: | |
| """Return a JSON-serialisable representation of current state.""" | |
| ... | |
| def restore_state(self, state: dict) -> None: | |
| """Reinstate state produced by snapshot_state().""" | |
| ... | |
| class ReplayEngine: | |
| """Routes events to registered materialised views.""" | |
| def __init__(self, log: EventLog) -> None: | |
| self.log = log | |
| # view_name -> (view, set of event_types it cares about or None for all) | |
| self._views: dict[str, tuple[MaterialisedView, frozenset[str] | None]] = {} | |
| # ------------------------------------------------------------------ | |
| # Registration | |
| # ------------------------------------------------------------------ | |
| def register( | |
| self, | |
| name: str, | |
| view: MaterialisedView, | |
| event_types: list[EventType] | None = None, | |
| ) -> None: | |
| """Register *view* under *name*. Pass ``event_types=None`` for all types.""" | |
| ft: frozenset[str] | None = frozenset(event_types) if event_types else None | |
| self._views[name] = (view, ft) | |
| # Alias used in task spec | |
| def register_view( | |
| self, | |
| view: MaterialisedView, | |
| event_types: list[EventType], | |
| ) -> None: | |
| name = type(view).__name__ | |
| self.register(name, view, event_types) | |
| # ------------------------------------------------------------------ | |
| # Replay | |
| # ------------------------------------------------------------------ | |
| def rebuild(self, view_name: str, from_lamport: int = 0) -> None: | |
| """Reset the named view and replay all relevant events from *from_lamport*.""" | |
| view, ft = self._views[view_name] | |
| view.reset() | |
| event_types = list(ft) if ft is not None else None | |
| for event in self.log.replay(since_lamport=from_lamport, event_types=event_types): # type: ignore[arg-type] | |
| view.apply(event) | |
| def rebuild_all(self, from_lamport: int = 0) -> None: | |
| """Reset and replay all registered views.""" | |
| for name in list(self._views): | |
| self.rebuild(name, from_lamport) | |
| # Alias used in task spec | |
| def replay_all(self) -> None: | |
| self.rebuild_all(from_lamport=0) | |
| def replay_since(self, lamport: int) -> None: | |
| """Replay (without reset) all views for events at lamport >= *lamport*.""" | |
| # Collect all event types across views | |
| for _name, (view, ft) in self._views.items(): | |
| event_types = list(ft) if ft is not None else None | |
| for event in self.log.replay(since_lamport=lamport, event_types=event_types): # type: ignore[arg-type] | |
| view.apply(event) | |
| # ------------------------------------------------------------------ | |
| # Live fanout | |
| # ------------------------------------------------------------------ | |
| def _on_event(self, event: Event) -> None: | |
| """Route a newly-arrived event to all subscribed views.""" | |
| for _name, (view, ft) in self._views.items(): | |
| if ft is None or event.event_type in ft: | |
| view.apply(event) | |
| # Alias used in spec | |
| on_event = _on_event | |