File size: 3,663 Bytes
31c93b1
 
481b78e
 
31c93b1
 
3f78ea8
a190f73
31c93b1
 
 
 
 
 
 
4aaae80
 
31c93b1
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
4aaae80
31c93b1
4aaae80
 
31c93b1
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
from __future__ import annotations

from datetime import datetime, timezone as _tz
UTC = _tz.utc
from typing import Any

UTC = UTC

from hearthnet.services.marketplace.post import Location, Post


class MarketplaceView:
    """MaterialisedView: maintains set of active (non-expired) posts from event stream."""

    def __init__(self) -> None:
        self._posts: dict[str, Post] = {}  # event_id -> Post
        self._expired: set[str] = set()  # event_ids that are expired
        self._seen_client_ids: set[str] = set()

    def apply(self, event: Any) -> None:
        """Process one event. Compatible with X02 Event dataclass or dict."""
        if hasattr(event, "event_type"):
            etype = event.event_type
            payload = event.payload
            event_id = event.event_id
            author = event.author
            lamport = event.lamport
        else:
            etype = event.get("event_type", "")
            payload = event.get("payload", {})
            event_id = event.get("event_id", "")
            author = event.get("author", "")
            lamport = event.get("lamport", 0)

        if etype == "market.post.created":
            client_id = payload.get("client_id", event_id)
            if client_id in self._seen_client_ids:
                return  # idempotent
            self._seen_client_ids.add(client_id)
            loc_raw = payload.get("location")
            location = Location(**loc_raw) if loc_raw else None
            post = Post(
                event_id=event_id,
                author=author,
                category=payload.get("category", "info"),
                title=payload.get("title", ""),
                body=payload.get("body", ""),
                location=location,
                tags=payload.get("tags", []),
                created_at=payload.get("created_at", ""),
                expires_at=payload.get("expires_at", ""),
                lamport=lamport,
                client_id=client_id,
            )
            self._posts[event_id] = post

        elif etype in ("market.post.expired", "market.post.updated"):
            target_id = payload.get("target_event_id", event_id)
            if target_id in self._posts:
                self._expired.add(target_id)

    def all_active(self) -> list[Post]:
        now = datetime.now(UTC)
        return [
            post
            for eid, post in self._posts.items()
            if eid not in self._expired and not post.is_expired(now)
        ]

    def snapshot_state(self) -> dict:
        return {
            "posts": {eid: p.as_dict() for eid, p in self._posts.items()},
            "expired": list(self._expired),
            "seen_client_ids": list(self._seen_client_ids),
        }

    def restore_state(self, state: dict) -> None:
        self._posts = {}
        for eid, pd in state.get("posts", {}).items():
            loc = Location(**pd["location"]) if pd.get("location") else None
            self._posts[eid] = Post(
                event_id=pd["event_id"],
                author=pd["author"],
                category=pd["category"],
                title=pd["title"],
                body=pd["body"],
                location=loc,
                tags=pd["tags"],
                created_at=pd["created_at"],
                expires_at=pd["expires_at"],
                lamport=pd["lamport"],
                client_id=pd["client_id"],
            )
        self._expired = set(state.get("expired", []))
        self._seen_client_ids = set(state.get("seen_client_ids", []))

    def reset(self) -> None:
        self._posts.clear()
        self._expired.clear()
        self._seen_client_ids.clear()