Spaces:
Running on Zero
Running on Zero
File size: 3,663 Bytes
31c93b1 481b78e 31c93b1 3f78ea8 a190f73 31c93b1 4aaae80 31c93b1 4aaae80 31c93b1 4aaae80 31c93b1 | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 | from __future__ import annotations
from datetime import datetime, timezone as _tz
UTC = _tz.utc
from typing import Any
UTC = UTC
from hearthnet.services.marketplace.post import Location, Post
class MarketplaceView:
"""MaterialisedView: maintains set of active (non-expired) posts from event stream."""
def __init__(self) -> None:
self._posts: dict[str, Post] = {} # event_id -> Post
self._expired: set[str] = set() # event_ids that are expired
self._seen_client_ids: set[str] = set()
def apply(self, event: Any) -> None:
"""Process one event. Compatible with X02 Event dataclass or dict."""
if hasattr(event, "event_type"):
etype = event.event_type
payload = event.payload
event_id = event.event_id
author = event.author
lamport = event.lamport
else:
etype = event.get("event_type", "")
payload = event.get("payload", {})
event_id = event.get("event_id", "")
author = event.get("author", "")
lamport = event.get("lamport", 0)
if etype == "market.post.created":
client_id = payload.get("client_id", event_id)
if client_id in self._seen_client_ids:
return # idempotent
self._seen_client_ids.add(client_id)
loc_raw = payload.get("location")
location = Location(**loc_raw) if loc_raw else None
post = Post(
event_id=event_id,
author=author,
category=payload.get("category", "info"),
title=payload.get("title", ""),
body=payload.get("body", ""),
location=location,
tags=payload.get("tags", []),
created_at=payload.get("created_at", ""),
expires_at=payload.get("expires_at", ""),
lamport=lamport,
client_id=client_id,
)
self._posts[event_id] = post
elif etype in ("market.post.expired", "market.post.updated"):
target_id = payload.get("target_event_id", event_id)
if target_id in self._posts:
self._expired.add(target_id)
def all_active(self) -> list[Post]:
now = datetime.now(UTC)
return [
post
for eid, post in self._posts.items()
if eid not in self._expired and not post.is_expired(now)
]
def snapshot_state(self) -> dict:
return {
"posts": {eid: p.as_dict() for eid, p in self._posts.items()},
"expired": list(self._expired),
"seen_client_ids": list(self._seen_client_ids),
}
def restore_state(self, state: dict) -> None:
self._posts = {}
for eid, pd in state.get("posts", {}).items():
loc = Location(**pd["location"]) if pd.get("location") else None
self._posts[eid] = Post(
event_id=pd["event_id"],
author=pd["author"],
category=pd["category"],
title=pd["title"],
body=pd["body"],
location=loc,
tags=pd["tags"],
created_at=pd["created_at"],
expires_at=pd["expires_at"],
lamport=pd["lamport"],
client_id=pd["client_id"],
)
self._expired = set(state.get("expired", []))
self._seen_client_ids = set(state.get("seen_client_ids", []))
def reset(self) -> None:
self._posts.clear()
self._expired.clear()
self._seen_client_ids.clear()
|