Spaces:
Running on Zero
A newer version of the Gradio SDK is available: 6.19.0
M06 β Marketplace Service
Spec version: v1.0
Depends on: M01 (identity, for signed events), M03 (bus), X02 (events, for storage + replay), X04 (config), X03 (observability), optionally M11 (embedding, for market.search)
Depended on by: M08 (UI marketplace tab)
1. Responsibility
Provide market.list, market.post, market.expire, market.search capabilities. Maintain a materialised view of current (non-expired) posts derived from the community event log.
The marketplace is event-sourced: posts are not stored as a table the service writes to. They are derived from market.post.* events in the X02 log. This makes posts automatically signed, durable, and gossipable.
2. File layout
hearthnet/services/marketplace/
βββ __init__.py
βββ service.py # MarketplaceService
βββ post.py # Post dataclass + helpers
βββ views.py # MarketplaceView: MaterialisedView from X02
3. Public API
3.1 post.py
# hearthnet/services/marketplace/post.py
from dataclasses import dataclass
from typing import Literal
Category = Literal["offer", "request", "info", "emergency"]
@dataclass(frozen=True)
class Location:
lat: float
lng: float
label: str
@dataclass(frozen=True)
class Post:
"""The marketplace's domain object.
Derived from a market.post.created event + zero or more market.post.updated events,
and terminated by a market.post.expired event."""
event_id: str # ULID of the original .created event
lamport: int
author: str # NodeID full form
category: Category
title: str
body: str
location: Location | None
tags: list[str]
created_at: str
expires_at: str
expired_via_event_id: str | None # set when expired
expiry_reason: str | None # "fulfilled" | "withdrawn" | ...
def is_expired(self, now: datetime | None = None) -> bool: ...
3.2 views.py
# hearthnet/services/marketplace/views.py
class MarketplaceView:
"""MaterialisedView (X02 protocol).
Subscribes to event_types: market.post.created, .updated, .expired."""
def __init__(self): ...
# MaterialisedView protocol:
def reset(self) -> None: ...
def apply(self, event: Event) -> None: ...
def snapshot_state(self) -> dict: ...
def restore_state(self, state: dict) -> None: ...
# queries used by the service handlers:
def list(
self,
*,
category: Category | None = None,
tags: list[str] | None = None,
since_lamport: int = 0,
limit: int = 50,
) -> list[Post]: ...
def get(self, event_id: str) -> Post | None: ...
def max_lamport(self) -> int: ...
# bulk listings for search:
def all_active(self) -> list[Post]: ...
3.3 service.py
# hearthnet/services/marketplace/service.py
class MarketplaceService:
name = "marketplace"
version = "1.0"
def __init__(
self,
config: MarketConfig,
bus: CapabilityBus,
event_log: EventLog,
replay_engine: ReplayEngine,
author_kp: KeyPair, # this node's key, for signing posts
community_manifest_provider: Callable[[], CommunityManifest],
):
self.view = MarketplaceView()
replay_engine.register(
"marketplace",
self.view,
event_types=["market.post.created", "market.post.updated", "market.post.expired"],
)
def capabilities(self) -> list[tuple[CapabilityDescriptor, Callable, ParamsPredicate]]:
"""Registers: market.list, market.post, market.expire, market.search."""
async def start(self) -> None:
"""Replay relevant events into the view; install background sweeper for auto-expiry."""
async def stop(self) -> None: ...
def health(self) -> dict: ...
# --- handlers ---
async def handle_list(self, req: RouteRequest) -> dict:
"""CONTRACT Β§4.11."""
async def handle_post(self, req: RouteRequest) -> dict:
"""CONTRACT Β§4.12.
Validates ttl β€ config.market.max_ttl_seconds.
Idempotency by client_id: if an event with matching (author, client_id) already exists, return its event_id.
Otherwise: append market.post.created event via event_log.append_local."""
async def handle_expire(self, req: RouteRequest) -> dict:
"""CONTRACT Β§4.13.
Checks caller is original author OR trusted moderator.
Appends market.post.expired event."""
async def handle_search(self, req: RouteRequest) -> dict:
"""CONTRACT Β§4.14.
1. bus.call('embed.text', (1,0), {texts: [query]})
2. For each active post, embed (cached) and score via cosine.
3. Return top-k.
Cache embedding per (event_id, body+title hash)."""
3.4 Capability descriptors
descriptor_list = CapabilityDescriptor(
name="market.list", version=(1, 0), stability="stable",
request_schema={...}, response_schema={...}, stream_schema=None,
params={}, max_concurrent=8,
trust_required="member", timeout_seconds=5, idempotent=True,
)
descriptor_post = CapabilityDescriptor(
name="market.post", version=(1, 0), stability="stable",
request_schema={...}, response_schema={...}, stream_schema=None,
params={}, max_concurrent=4,
trust_required="member", timeout_seconds=10, idempotent=True,
)
descriptor_expire = CapabilityDescriptor(
name="market.expire", version=(1, 0), stability="stable",
request_schema={...}, response_schema={...}, stream_schema=None,
params={}, max_concurrent=4,
trust_required="member", timeout_seconds=5, idempotent=True,
)
descriptor_search = CapabilityDescriptor(
name="market.search", version=(1, 0), stability="stable",
request_schema={...}, response_schema={...}, stream_schema=None,
params={}, max_concurrent=4,
trust_required="member", timeout_seconds=10, idempotent=True,
)
All four use the default lambda offered, requested: True params predicate.
4. Behaviour
4.1 Event-sourcing in practice
client β market.post (via bus)
β
service.handle_post
β validate
β idempotency check on (author, client_id)
β event_log.append_local("market.post.created", data, author_kp)
β X02 fans out the new event to ReplayEngine
β MarketplaceView.apply(event) updates in-memory state
β service returns {event_id, lamport}
peer sync brings remote market.post.created event
β
event_log.append_received(event)
β ReplayEngine triggers MarketplaceView.apply(event)
β next market.list call sees the new post
The service NEVER writes posts directly. The event log is the only mutator.
4.2 Auto-expiry sweeper
Background task scanning the view every 60 seconds:
- For each post where
now >= expires_atAND no.expiredevent seen: appendmarket.post.expiredevent with reason"stale"and the local node as author. - This is best-effort; only the post's author or a trusted member is supposed to expire it, but for "stale" reason, any anchor MAY do it.
Conflict: if two nodes auto-expire concurrently, both events land, view de-duplicates by target_event_id. Last-writer-wins per Lamport (no functional difference β both say "expired").
4.3 Search caching
Embedding all active posts on every search is wasteful. Cache strategy:
- Per-post embedding cached in memory keyed by
(event_id, hash(title+body)) - On
.updatedevent, invalidate - Cache size bounded; LRU eviction at 5000 entries
Phase 1.5 optimisation. MVP may re-embed each time.
4.4 Trust check for expire
def can_expire(caller_node_id, post, reason, community_manifest):
if caller_node_id == post.author:
return True
if reason in ("fulfilled", "withdrawn", "user_request"):
return caller_node_id == post.author # author-only for these reasons
if reason == "stale":
return community_manifest.level_of(caller_node_id) in ("trusted", "anchor")
return False
4.5 Categories vs tags
categoryis a fixed enum (4 values)tagsare free-form, sourced from user input; the UI presents popular tags as suggestions
4.6 Geofencing
location is advisory. Display only. No filtering in MVP. Phase 2: market.list filter on distance.
5. Composition with market.search
UI input: "wasser notfall"
β
bus.call("market.search", (1,0), {input: {query: "wasser notfall", k: 10}})
β MarketplaceService.handle_search
β bus.call("embed.text", (1,0), {texts: ["wasser notfall"]})
β score each active post (cached embedding) β top-k
β return posts with scores
The marketplace service depends on embed.text being available somewhere on the mesh. If embedding is unavailable, search falls back to substring matching (logged at warning).
6. Errors
| Condition | Wire code |
|---|---|
| ttl_seconds > max_ttl_seconds | bad_request |
| caller not authorised to expire | unauthorized |
| target post not found (expire/update) | not_found |
| no embedding provider for search | partition (degrades to substring) |
7. Configuration
From X04 Β§3:
config.market.enabled
config.market.default_ttl_seconds # 7 days
config.market.max_ttl_seconds # 30 days
8. Tests
Unit
test_post_event_appears_in_view_after_applytest_post_is_idempotent_on_client_idtest_expire_unauthorized_caller_rejectedtest_auto_expire_appends_event_with_stale_reasontest_replay_then_snapshot_then_restore_equal_statetest_search_falls_back_to_substring_when_embedding_unavailable
Integration
test_two_node_post_visible_after_synctest_three_node_concurrent_posts_all_visibletest_expire_propagates_then_list_excludestest_market_search_returns_relevant
9. Cross-references
| What | Where |
|---|---|
market.* wire |
CONTRACT Β§4.11β4.14 |
| Event types | CONTRACT Β§7.2 |
| Event log | X02 |
| Embed via bus | M11 |
| UI marketplace tab | M08 Β§5.4 |
10. Open questions
- Geographic filter β Phase 2 with a
location_filter: {center, radius_km}. - Moderation tooling β a "report" flow with admin queue; Phase 2.
- Inter-community marketplace federation β Phase 2 / 3.
- Encryption of post bodies β currently cleartext within community. Could encrypt at-rest in event log. Out of scope.