HearthNet-Nemotron / docs /modules /M06-marketplace.md
GitHub Actions
Add all-to-all internet mesh over relay hub (P1-P3) + user-story screenshot proof
8f53c4c
|
Raw
History Blame Contribute Delete
10.7 kB
# M06 β€” Marketplace Service
**Spec version:** v1.0
**Depends on:** M01 (identity, for signed events), M03 (bus), X02 (events, for storage + replay), X04 (config), X03 (observability), optionally M11 (embedding, for `market.search`)
**Depended on by:** M08 (UI marketplace tab)
---
## 1. Responsibility
Provide `market.list`, `market.post`, `market.expire`, `market.search` capabilities. Maintain a materialised view of current (non-expired) posts derived from the community event log.
The marketplace is **event-sourced**: posts are not stored as a table the service writes to. They are derived from `market.post.*` events in the X02 log. This makes posts automatically signed, durable, and gossipable.
---
## 2. File layout
```
hearthnet/services/marketplace/
β”œβ”€β”€ __init__.py
β”œβ”€β”€ service.py # MarketplaceService
β”œβ”€β”€ post.py # Post dataclass + helpers
└── views.py # MarketplaceView: MaterialisedView from X02
```
---
## 3. Public API
### 3.1 `post.py`
```python
# hearthnet/services/marketplace/post.py
from dataclasses import dataclass
from typing import Literal
Category = Literal["offer", "request", "info", "emergency"]
@dataclass(frozen=True)
class Location:
lat: float
lng: float
label: str
@dataclass(frozen=True)
class Post:
"""The marketplace's domain object.
Derived from a market.post.created event + zero or more market.post.updated events,
and terminated by a market.post.expired event."""
event_id: str # ULID of the original .created event
lamport: int
author: str # NodeID full form
category: Category
title: str
body: str
location: Location | None
tags: list[str]
created_at: str
expires_at: str
expired_via_event_id: str | None # set when expired
expiry_reason: str | None # "fulfilled" | "withdrawn" | ...
def is_expired(self, now: datetime | None = None) -> bool: ...
```
### 3.2 `views.py`
```python
# hearthnet/services/marketplace/views.py
class MarketplaceView:
"""MaterialisedView (X02 protocol).
Subscribes to event_types: market.post.created, .updated, .expired."""
def __init__(self): ...
# MaterialisedView protocol:
def reset(self) -> None: ...
def apply(self, event: Event) -> None: ...
def snapshot_state(self) -> dict: ...
def restore_state(self, state: dict) -> None: ...
# queries used by the service handlers:
def list(
self,
*,
category: Category | None = None,
tags: list[str] | None = None,
since_lamport: int = 0,
limit: int = 50,
) -> list[Post]: ...
def get(self, event_id: str) -> Post | None: ...
def max_lamport(self) -> int: ...
# bulk listings for search:
def all_active(self) -> list[Post]: ...
```
### 3.3 `service.py`
```python
# hearthnet/services/marketplace/service.py
class MarketplaceService:
name = "marketplace"
version = "1.0"
def __init__(
self,
config: MarketConfig,
bus: CapabilityBus,
event_log: EventLog,
replay_engine: ReplayEngine,
author_kp: KeyPair, # this node's key, for signing posts
community_manifest_provider: Callable[[], CommunityManifest],
):
self.view = MarketplaceView()
replay_engine.register(
"marketplace",
self.view,
event_types=["market.post.created", "market.post.updated", "market.post.expired"],
)
def capabilities(self) -> list[tuple[CapabilityDescriptor, Callable, ParamsPredicate]]:
"""Registers: market.list, market.post, market.expire, market.search."""
async def start(self) -> None:
"""Replay relevant events into the view; install background sweeper for auto-expiry."""
async def stop(self) -> None: ...
def health(self) -> dict: ...
# --- handlers ---
async def handle_list(self, req: RouteRequest) -> dict:
"""CONTRACT Β§4.11."""
async def handle_post(self, req: RouteRequest) -> dict:
"""CONTRACT Β§4.12.
Validates ttl ≀ config.market.max_ttl_seconds.
Idempotency by client_id: if an event with matching (author, client_id) already exists, return its event_id.
Otherwise: append market.post.created event via event_log.append_local."""
async def handle_expire(self, req: RouteRequest) -> dict:
"""CONTRACT Β§4.13.
Checks caller is original author OR trusted moderator.
Appends market.post.expired event."""
async def handle_search(self, req: RouteRequest) -> dict:
"""CONTRACT Β§4.14.
1. bus.call('embed.text', (1,0), {texts: [query]})
2. For each active post, embed (cached) and score via cosine.
3. Return top-k.
Cache embedding per (event_id, body+title hash)."""
```
### 3.4 Capability descriptors
```python
descriptor_list = CapabilityDescriptor(
name="market.list", version=(1, 0), stability="stable",
request_schema={...}, response_schema={...}, stream_schema=None,
params={}, max_concurrent=8,
trust_required="member", timeout_seconds=5, idempotent=True,
)
descriptor_post = CapabilityDescriptor(
name="market.post", version=(1, 0), stability="stable",
request_schema={...}, response_schema={...}, stream_schema=None,
params={}, max_concurrent=4,
trust_required="member", timeout_seconds=10, idempotent=True,
)
descriptor_expire = CapabilityDescriptor(
name="market.expire", version=(1, 0), stability="stable",
request_schema={...}, response_schema={...}, stream_schema=None,
params={}, max_concurrent=4,
trust_required="member", timeout_seconds=5, idempotent=True,
)
descriptor_search = CapabilityDescriptor(
name="market.search", version=(1, 0), stability="stable",
request_schema={...}, response_schema={...}, stream_schema=None,
params={}, max_concurrent=4,
trust_required="member", timeout_seconds=10, idempotent=True,
)
```
All four use the default `lambda offered, requested: True` params predicate.
---
## 4. Behaviour
### 4.1 Event-sourcing in practice
```
client β†’ market.post (via bus)
↓
service.handle_post
β†’ validate
β†’ idempotency check on (author, client_id)
β†’ event_log.append_local("market.post.created", data, author_kp)
β†’ X02 fans out the new event to ReplayEngine
β†’ MarketplaceView.apply(event) updates in-memory state
β†’ service returns {event_id, lamport}
```
```
peer sync brings remote market.post.created event
↓
event_log.append_received(event)
β†’ ReplayEngine triggers MarketplaceView.apply(event)
β†’ next market.list call sees the new post
```
The service NEVER writes posts directly. The event log is the only mutator.
### 4.2 Auto-expiry sweeper
Background task scanning the view every 60 seconds:
- For each post where `now >= expires_at` AND no `.expired` event seen: append `market.post.expired` event with reason `"stale"` and the local node as author.
- This is best-effort; only the post's author or a trusted member is supposed to expire it, but for "stale" reason, any anchor MAY do it.
Conflict: if two nodes auto-expire concurrently, both events land, view de-duplicates by `target_event_id`. Last-writer-wins per Lamport (no functional difference β€” both say "expired").
### 4.3 Search caching
Embedding all active posts on every search is wasteful. Cache strategy:
- Per-post embedding cached in memory keyed by `(event_id, hash(title+body))`
- On `.updated` event, invalidate
- Cache size bounded; LRU eviction at 5000 entries
Phase 1.5 optimisation. MVP may re-embed each time.
### 4.4 Trust check for expire
```python
def can_expire(caller_node_id, post, reason, community_manifest):
if caller_node_id == post.author:
return True
if reason in ("fulfilled", "withdrawn", "user_request"):
return caller_node_id == post.author # author-only for these reasons
if reason == "stale":
return community_manifest.level_of(caller_node_id) in ("trusted", "anchor")
return False
```
### 4.5 Categories vs tags
- `category` is a fixed enum (4 values)
- `tags` are free-form, sourced from user input; the UI presents popular tags as suggestions
### 4.6 Geofencing
`location` is advisory. Display only. No filtering in MVP. Phase 2: `market.list` filter on distance.
---
## 5. Composition with `market.search`
```
UI input: "wasser notfall"
↓
bus.call("market.search", (1,0), {input: {query: "wasser notfall", k: 10}})
β†’ MarketplaceService.handle_search
β†’ bus.call("embed.text", (1,0), {texts: ["wasser notfall"]})
β†’ score each active post (cached embedding) β†’ top-k
β†’ return posts with scores
```
The marketplace service depends on `embed.text` being available somewhere on the mesh. If embedding is unavailable, search falls back to substring matching (logged at `warning`).
---
## 6. Errors
| Condition | Wire code |
|-----------|-----------|
| ttl_seconds > max_ttl_seconds | `bad_request` |
| caller not authorised to expire | `unauthorized` |
| target post not found (expire/update) | `not_found` |
| no embedding provider for search | `partition` (degrades to substring) |
---
## 7. Configuration
From [X04 Β§3](../cross-cutting/X04-config.md):
```python
config.market.enabled
config.market.default_ttl_seconds # 7 days
config.market.max_ttl_seconds # 30 days
```
---
## 8. Tests
### Unit
- `test_post_event_appears_in_view_after_apply`
- `test_post_is_idempotent_on_client_id`
- `test_expire_unauthorized_caller_rejected`
- `test_auto_expire_appends_event_with_stale_reason`
- `test_replay_then_snapshot_then_restore_equal_state`
- `test_search_falls_back_to_substring_when_embedding_unavailable`
### Integration
- `test_two_node_post_visible_after_sync`
- `test_three_node_concurrent_posts_all_visible`
- `test_expire_propagates_then_list_excludes`
- `test_market_search_returns_relevant`
---
## 9. Cross-references
| What | Where |
|------|-------|
| `market.*` wire | [CONTRACT Β§4.11–4.14](../CAPABILITY_CONTRACT.md) |
| Event types | [CONTRACT Β§7.2](../CAPABILITY_CONTRACT.md) |
| Event log | [X02](../cross-cutting/X02-events.md) |
| Embed via bus | [M11](M11-embedding.md) |
| UI marketplace tab | [M08 Β§5.4](M08-ui.md) |
---
## 10. Open questions
1. **Geographic filter** β€” Phase 2 with a `location_filter: {center, radius_km}`.
2. **Moderation tooling** β€” a "report" flow with admin queue; Phase 2.
3. **Inter-community marketplace federation** β€” Phase 2 / 3.
4. **Encryption of post bodies** β€” currently cleartext within community. Could encrypt at-rest in event log. Out of scope.