Spaces:
Running on Zero
Running on Zero
GitHub Actions
Add all-to-all internet mesh over relay hub (P1-P3) + user-story screenshot proof
8f53c4c | # M06 β Marketplace Service | |
| **Spec version:** v1.0 | |
| **Depends on:** M01 (identity, for signed events), M03 (bus), X02 (events, for storage + replay), X04 (config), X03 (observability), optionally M11 (embedding, for `market.search`) | |
| **Depended on by:** M08 (UI marketplace tab) | |
| --- | |
| ## 1. Responsibility | |
| Provide `market.list`, `market.post`, `market.expire`, `market.search` capabilities. Maintain a materialised view of current (non-expired) posts derived from the community event log. | |
| The marketplace is **event-sourced**: posts are not stored as a table the service writes to. They are derived from `market.post.*` events in the X02 log. This makes posts automatically signed, durable, and gossipable. | |
| --- | |
| ## 2. File layout | |
| ``` | |
| hearthnet/services/marketplace/ | |
| βββ __init__.py | |
| βββ service.py # MarketplaceService | |
| βββ post.py # Post dataclass + helpers | |
| βββ views.py # MarketplaceView: MaterialisedView from X02 | |
| ``` | |
| --- | |
| ## 3. Public API | |
| ### 3.1 `post.py` | |
| ```python | |
| # hearthnet/services/marketplace/post.py | |
| from dataclasses import dataclass | |
| from typing import Literal | |
| Category = Literal["offer", "request", "info", "emergency"] | |
| @dataclass(frozen=True) | |
| class Location: | |
| lat: float | |
| lng: float | |
| label: str | |
| @dataclass(frozen=True) | |
| class Post: | |
| """The marketplace's domain object. | |
| Derived from a market.post.created event + zero or more market.post.updated events, | |
| and terminated by a market.post.expired event.""" | |
| event_id: str # ULID of the original .created event | |
| lamport: int | |
| author: str # NodeID full form | |
| category: Category | |
| title: str | |
| body: str | |
| location: Location | None | |
| tags: list[str] | |
| created_at: str | |
| expires_at: str | |
| expired_via_event_id: str | None # set when expired | |
| expiry_reason: str | None # "fulfilled" | "withdrawn" | ... | |
| def is_expired(self, now: datetime | None = None) -> bool: ... | |
| ``` | |
| ### 3.2 `views.py` | |
| ```python | |
| # hearthnet/services/marketplace/views.py | |
| class MarketplaceView: | |
| """MaterialisedView (X02 protocol). | |
| Subscribes to event_types: market.post.created, .updated, .expired.""" | |
| def __init__(self): ... | |
| # MaterialisedView protocol: | |
| def reset(self) -> None: ... | |
| def apply(self, event: Event) -> None: ... | |
| def snapshot_state(self) -> dict: ... | |
| def restore_state(self, state: dict) -> None: ... | |
| # queries used by the service handlers: | |
| def list( | |
| self, | |
| *, | |
| category: Category | None = None, | |
| tags: list[str] | None = None, | |
| since_lamport: int = 0, | |
| limit: int = 50, | |
| ) -> list[Post]: ... | |
| def get(self, event_id: str) -> Post | None: ... | |
| def max_lamport(self) -> int: ... | |
| # bulk listings for search: | |
| def all_active(self) -> list[Post]: ... | |
| ``` | |
| ### 3.3 `service.py` | |
| ```python | |
| # hearthnet/services/marketplace/service.py | |
| class MarketplaceService: | |
| name = "marketplace" | |
| version = "1.0" | |
| def __init__( | |
| self, | |
| config: MarketConfig, | |
| bus: CapabilityBus, | |
| event_log: EventLog, | |
| replay_engine: ReplayEngine, | |
| author_kp: KeyPair, # this node's key, for signing posts | |
| community_manifest_provider: Callable[[], CommunityManifest], | |
| ): | |
| self.view = MarketplaceView() | |
| replay_engine.register( | |
| "marketplace", | |
| self.view, | |
| event_types=["market.post.created", "market.post.updated", "market.post.expired"], | |
| ) | |
| def capabilities(self) -> list[tuple[CapabilityDescriptor, Callable, ParamsPredicate]]: | |
| """Registers: market.list, market.post, market.expire, market.search.""" | |
| async def start(self) -> None: | |
| """Replay relevant events into the view; install background sweeper for auto-expiry.""" | |
| async def stop(self) -> None: ... | |
| def health(self) -> dict: ... | |
| # --- handlers --- | |
| async def handle_list(self, req: RouteRequest) -> dict: | |
| """CONTRACT Β§4.11.""" | |
| async def handle_post(self, req: RouteRequest) -> dict: | |
| """CONTRACT Β§4.12. | |
| Validates ttl β€ config.market.max_ttl_seconds. | |
| Idempotency by client_id: if an event with matching (author, client_id) already exists, return its event_id. | |
| Otherwise: append market.post.created event via event_log.append_local.""" | |
| async def handle_expire(self, req: RouteRequest) -> dict: | |
| """CONTRACT Β§4.13. | |
| Checks caller is original author OR trusted moderator. | |
| Appends market.post.expired event.""" | |
| async def handle_search(self, req: RouteRequest) -> dict: | |
| """CONTRACT Β§4.14. | |
| 1. bus.call('embed.text', (1,0), {texts: [query]}) | |
| 2. For each active post, embed (cached) and score via cosine. | |
| 3. Return top-k. | |
| Cache embedding per (event_id, body+title hash).""" | |
| ``` | |
| ### 3.4 Capability descriptors | |
| ```python | |
| descriptor_list = CapabilityDescriptor( | |
| name="market.list", version=(1, 0), stability="stable", | |
| request_schema={...}, response_schema={...}, stream_schema=None, | |
| params={}, max_concurrent=8, | |
| trust_required="member", timeout_seconds=5, idempotent=True, | |
| ) | |
| descriptor_post = CapabilityDescriptor( | |
| name="market.post", version=(1, 0), stability="stable", | |
| request_schema={...}, response_schema={...}, stream_schema=None, | |
| params={}, max_concurrent=4, | |
| trust_required="member", timeout_seconds=10, idempotent=True, | |
| ) | |
| descriptor_expire = CapabilityDescriptor( | |
| name="market.expire", version=(1, 0), stability="stable", | |
| request_schema={...}, response_schema={...}, stream_schema=None, | |
| params={}, max_concurrent=4, | |
| trust_required="member", timeout_seconds=5, idempotent=True, | |
| ) | |
| descriptor_search = CapabilityDescriptor( | |
| name="market.search", version=(1, 0), stability="stable", | |
| request_schema={...}, response_schema={...}, stream_schema=None, | |
| params={}, max_concurrent=4, | |
| trust_required="member", timeout_seconds=10, idempotent=True, | |
| ) | |
| ``` | |
| All four use the default `lambda offered, requested: True` params predicate. | |
| --- | |
| ## 4. Behaviour | |
| ### 4.1 Event-sourcing in practice | |
| ``` | |
| client β market.post (via bus) | |
| β | |
| service.handle_post | |
| β validate | |
| β idempotency check on (author, client_id) | |
| β event_log.append_local("market.post.created", data, author_kp) | |
| β X02 fans out the new event to ReplayEngine | |
| β MarketplaceView.apply(event) updates in-memory state | |
| β service returns {event_id, lamport} | |
| ``` | |
| ``` | |
| peer sync brings remote market.post.created event | |
| β | |
| event_log.append_received(event) | |
| β ReplayEngine triggers MarketplaceView.apply(event) | |
| β next market.list call sees the new post | |
| ``` | |
| The service NEVER writes posts directly. The event log is the only mutator. | |
| ### 4.2 Auto-expiry sweeper | |
| Background task scanning the view every 60 seconds: | |
| - For each post where `now >= expires_at` AND no `.expired` event seen: append `market.post.expired` event with reason `"stale"` and the local node as author. | |
| - This is best-effort; only the post's author or a trusted member is supposed to expire it, but for "stale" reason, any anchor MAY do it. | |
| Conflict: if two nodes auto-expire concurrently, both events land, view de-duplicates by `target_event_id`. Last-writer-wins per Lamport (no functional difference β both say "expired"). | |
| ### 4.3 Search caching | |
| Embedding all active posts on every search is wasteful. Cache strategy: | |
| - Per-post embedding cached in memory keyed by `(event_id, hash(title+body))` | |
| - On `.updated` event, invalidate | |
| - Cache size bounded; LRU eviction at 5000 entries | |
| Phase 1.5 optimisation. MVP may re-embed each time. | |
| ### 4.4 Trust check for expire | |
| ```python | |
| def can_expire(caller_node_id, post, reason, community_manifest): | |
| if caller_node_id == post.author: | |
| return True | |
| if reason in ("fulfilled", "withdrawn", "user_request"): | |
| return caller_node_id == post.author # author-only for these reasons | |
| if reason == "stale": | |
| return community_manifest.level_of(caller_node_id) in ("trusted", "anchor") | |
| return False | |
| ``` | |
| ### 4.5 Categories vs tags | |
| - `category` is a fixed enum (4 values) | |
| - `tags` are free-form, sourced from user input; the UI presents popular tags as suggestions | |
| ### 4.6 Geofencing | |
| `location` is advisory. Display only. No filtering in MVP. Phase 2: `market.list` filter on distance. | |
| --- | |
| ## 5. Composition with `market.search` | |
| ``` | |
| UI input: "wasser notfall" | |
| β | |
| bus.call("market.search", (1,0), {input: {query: "wasser notfall", k: 10}}) | |
| β MarketplaceService.handle_search | |
| β bus.call("embed.text", (1,0), {texts: ["wasser notfall"]}) | |
| β score each active post (cached embedding) β top-k | |
| β return posts with scores | |
| ``` | |
| The marketplace service depends on `embed.text` being available somewhere on the mesh. If embedding is unavailable, search falls back to substring matching (logged at `warning`). | |
| --- | |
| ## 6. Errors | |
| | Condition | Wire code | | |
| |-----------|-----------| | |
| | ttl_seconds > max_ttl_seconds | `bad_request` | | |
| | caller not authorised to expire | `unauthorized` | | |
| | target post not found (expire/update) | `not_found` | | |
| | no embedding provider for search | `partition` (degrades to substring) | | |
| --- | |
| ## 7. Configuration | |
| From [X04 Β§3](../cross-cutting/X04-config.md): | |
| ```python | |
| config.market.enabled | |
| config.market.default_ttl_seconds # 7 days | |
| config.market.max_ttl_seconds # 30 days | |
| ``` | |
| --- | |
| ## 8. Tests | |
| ### Unit | |
| - `test_post_event_appears_in_view_after_apply` | |
| - `test_post_is_idempotent_on_client_id` | |
| - `test_expire_unauthorized_caller_rejected` | |
| - `test_auto_expire_appends_event_with_stale_reason` | |
| - `test_replay_then_snapshot_then_restore_equal_state` | |
| - `test_search_falls_back_to_substring_when_embedding_unavailable` | |
| ### Integration | |
| - `test_two_node_post_visible_after_sync` | |
| - `test_three_node_concurrent_posts_all_visible` | |
| - `test_expire_propagates_then_list_excludes` | |
| - `test_market_search_returns_relevant` | |
| --- | |
| ## 9. Cross-references | |
| | What | Where | | |
| |------|-------| | |
| | `market.*` wire | [CONTRACT Β§4.11β4.14](../CAPABILITY_CONTRACT.md) | | |
| | Event types | [CONTRACT Β§7.2](../CAPABILITY_CONTRACT.md) | | |
| | Event log | [X02](../cross-cutting/X02-events.md) | | |
| | Embed via bus | [M11](M11-embedding.md) | | |
| | UI marketplace tab | [M08 Β§5.4](M08-ui.md) | | |
| --- | |
| ## 10. Open questions | |
| 1. **Geographic filter** β Phase 2 with a `location_filter: {center, radius_km}`. | |
| 2. **Moderation tooling** β a "report" flow with admin queue; Phase 2. | |
| 3. **Inter-community marketplace federation** β Phase 2 / 3. | |
| 4. **Encryption of post bodies** β currently cleartext within community. Could encrypt at-rest in event log. Out of scope. | |