File size: 3,622 Bytes
31c93b1
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
from __future__ import annotations

from typing import TYPE_CHECKING, Protocol

from .types import Event, EventType

if TYPE_CHECKING:
    from .log import EventLog


class MaterialisedView(Protocol):
    """Protocol that all consuming-module views must satisfy."""

    def reset(self) -> None:
        """Clear all state (called before a full replay)."""
        ...

    def apply(self, event: Event) -> None:
        """Incorporate a single event into the view's state."""
        ...

    def snapshot_state(self) -> dict:
        """Return a JSON-serialisable representation of current state."""
        ...

    def restore_state(self, state: dict) -> None:
        """Reinstate state produced by snapshot_state()."""
        ...


class ReplayEngine:
    """Routes events to registered materialised views."""

    def __init__(self, log: EventLog) -> None:
        self.log = log
        # view_name -> (view, set of event_types it cares about or None for all)
        self._views: dict[str, tuple[MaterialisedView, frozenset[str] | None]] = {}

    # ------------------------------------------------------------------
    # Registration
    # ------------------------------------------------------------------

    def register(
        self,
        name: str,
        view: MaterialisedView,
        event_types: list[EventType] | None = None,
    ) -> None:
        """Register *view* under *name*. Pass ``event_types=None`` for all types."""
        ft: frozenset[str] | None = frozenset(event_types) if event_types else None
        self._views[name] = (view, ft)

    # Alias used in task spec
    def register_view(
        self,
        view: MaterialisedView,
        event_types: list[EventType],
    ) -> None:
        name = type(view).__name__
        self.register(name, view, event_types)

    # ------------------------------------------------------------------
    # Replay
    # ------------------------------------------------------------------

    def rebuild(self, view_name: str, from_lamport: int = 0) -> None:
        """Reset the named view and replay all relevant events from *from_lamport*."""
        view, ft = self._views[view_name]
        view.reset()
        event_types = list(ft) if ft is not None else None
        for event in self.log.replay(since_lamport=from_lamport, event_types=event_types):  # type: ignore[arg-type]
            view.apply(event)

    def rebuild_all(self, from_lamport: int = 0) -> None:
        """Reset and replay all registered views."""
        for name in list(self._views):
            self.rebuild(name, from_lamport)

    # Alias used in task spec
    def replay_all(self) -> None:
        self.rebuild_all(from_lamport=0)

    def replay_since(self, lamport: int) -> None:
        """Replay (without reset) all views for events at lamport >= *lamport*."""
        # Collect all event types across views
        for _name, (view, ft) in self._views.items():
            event_types = list(ft) if ft is not None else None
            for event in self.log.replay(since_lamport=lamport, event_types=event_types):  # type: ignore[arg-type]
                view.apply(event)

    # ------------------------------------------------------------------
    # Live fanout
    # ------------------------------------------------------------------

    def _on_event(self, event: Event) -> None:
        """Route a newly-arrived event to all subscribed views."""
        for _name, (view, ft) in self._views.items():
            if ft is None or event.event_type in ft:
                view.apply(event)

    # Alias used in spec
    on_event = _on_event