Spaces:
Running on Zero
Running on Zero
File size: 3,622 Bytes
31c93b1 | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 | from __future__ import annotations
from typing import TYPE_CHECKING, Protocol
from .types import Event, EventType
if TYPE_CHECKING:
from .log import EventLog
class MaterialisedView(Protocol):
"""Protocol that all consuming-module views must satisfy."""
def reset(self) -> None:
"""Clear all state (called before a full replay)."""
...
def apply(self, event: Event) -> None:
"""Incorporate a single event into the view's state."""
...
def snapshot_state(self) -> dict:
"""Return a JSON-serialisable representation of current state."""
...
def restore_state(self, state: dict) -> None:
"""Reinstate state produced by snapshot_state()."""
...
class ReplayEngine:
"""Routes events to registered materialised views."""
def __init__(self, log: EventLog) -> None:
self.log = log
# view_name -> (view, set of event_types it cares about or None for all)
self._views: dict[str, tuple[MaterialisedView, frozenset[str] | None]] = {}
# ------------------------------------------------------------------
# Registration
# ------------------------------------------------------------------
def register(
self,
name: str,
view: MaterialisedView,
event_types: list[EventType] | None = None,
) -> None:
"""Register *view* under *name*. Pass ``event_types=None`` for all types."""
ft: frozenset[str] | None = frozenset(event_types) if event_types else None
self._views[name] = (view, ft)
# Alias used in task spec
def register_view(
self,
view: MaterialisedView,
event_types: list[EventType],
) -> None:
name = type(view).__name__
self.register(name, view, event_types)
# ------------------------------------------------------------------
# Replay
# ------------------------------------------------------------------
def rebuild(self, view_name: str, from_lamport: int = 0) -> None:
"""Reset the named view and replay all relevant events from *from_lamport*."""
view, ft = self._views[view_name]
view.reset()
event_types = list(ft) if ft is not None else None
for event in self.log.replay(since_lamport=from_lamport, event_types=event_types): # type: ignore[arg-type]
view.apply(event)
def rebuild_all(self, from_lamport: int = 0) -> None:
"""Reset and replay all registered views."""
for name in list(self._views):
self.rebuild(name, from_lamport)
# Alias used in task spec
def replay_all(self) -> None:
self.rebuild_all(from_lamport=0)
def replay_since(self, lamport: int) -> None:
"""Replay (without reset) all views for events at lamport >= *lamport*."""
# Collect all event types across views
for _name, (view, ft) in self._views.items():
event_types = list(ft) if ft is not None else None
for event in self.log.replay(since_lamport=lamport, event_types=event_types): # type: ignore[arg-type]
view.apply(event)
# ------------------------------------------------------------------
# Live fanout
# ------------------------------------------------------------------
def _on_event(self, event: Event) -> None:
"""Route a newly-arrived event to all subscribed views."""
for _name, (view, ft) in self._views.items():
if ft is None or event.event_type in ft:
view.apply(event)
# Alias used in spec
on_event = _on_event
|