HearthNet-Nemotron / docs /modules /M06-marketplace.md
GitHub Actions
Add all-to-all internet mesh over relay hub (P1-P3) + user-story screenshot proof
8f53c4c
|
Raw
History Blame Contribute Delete
10.7 kB

A newer version of the Gradio SDK is available: 6.19.0

Upgrade

M06 β€” Marketplace Service

Spec version: v1.0 Depends on: M01 (identity, for signed events), M03 (bus), X02 (events, for storage + replay), X04 (config), X03 (observability), optionally M11 (embedding, for market.search) Depended on by: M08 (UI marketplace tab)


1. Responsibility

Provide market.list, market.post, market.expire, market.search capabilities. Maintain a materialised view of current (non-expired) posts derived from the community event log.

The marketplace is event-sourced: posts are not stored as a table the service writes to. They are derived from market.post.* events in the X02 log. This makes posts automatically signed, durable, and gossipable.


2. File layout

hearthnet/services/marketplace/
β”œβ”€β”€ __init__.py
β”œβ”€β”€ service.py        # MarketplaceService
β”œβ”€β”€ post.py           # Post dataclass + helpers
└── views.py          # MarketplaceView: MaterialisedView from X02

3. Public API

3.1 post.py

# hearthnet/services/marketplace/post.py
from dataclasses import dataclass
from typing import Literal

Category = Literal["offer", "request", "info", "emergency"]

@dataclass(frozen=True)
class Location:
    lat:   float
    lng:   float
    label: str

@dataclass(frozen=True)
class Post:
    """The marketplace's domain object.
       Derived from a market.post.created event + zero or more market.post.updated events,
       and terminated by a market.post.expired event."""
    event_id:   str           # ULID of the original .created event
    lamport:    int
    author:     str           # NodeID full form
    category:   Category
    title:      str
    body:       str
    location:   Location | None
    tags:       list[str]
    created_at: str
    expires_at: str
    expired_via_event_id: str | None      # set when expired
    expiry_reason: str | None              # "fulfilled" | "withdrawn" | ...

    def is_expired(self, now: datetime | None = None) -> bool: ...

3.2 views.py

# hearthnet/services/marketplace/views.py
class MarketplaceView:
    """MaterialisedView (X02 protocol).
       Subscribes to event_types: market.post.created, .updated, .expired."""

    def __init__(self): ...

    # MaterialisedView protocol:
    def reset(self) -> None: ...
    def apply(self, event: Event) -> None: ...
    def snapshot_state(self) -> dict: ...
    def restore_state(self, state: dict) -> None: ...

    # queries used by the service handlers:
    def list(
        self,
        *,
        category: Category | None = None,
        tags: list[str] | None = None,
        since_lamport: int = 0,
        limit: int = 50,
    ) -> list[Post]: ...

    def get(self, event_id: str) -> Post | None: ...
    def max_lamport(self) -> int: ...

    # bulk listings for search:
    def all_active(self) -> list[Post]: ...

3.3 service.py

# hearthnet/services/marketplace/service.py
class MarketplaceService:
    name    = "marketplace"
    version = "1.0"

    def __init__(
        self,
        config: MarketConfig,
        bus: CapabilityBus,
        event_log: EventLog,
        replay_engine: ReplayEngine,
        author_kp: KeyPair,            # this node's key, for signing posts
        community_manifest_provider: Callable[[], CommunityManifest],
    ):
        self.view = MarketplaceView()
        replay_engine.register(
            "marketplace",
            self.view,
            event_types=["market.post.created", "market.post.updated", "market.post.expired"],
        )

    def capabilities(self) -> list[tuple[CapabilityDescriptor, Callable, ParamsPredicate]]:
        """Registers: market.list, market.post, market.expire, market.search."""

    async def start(self) -> None:
        """Replay relevant events into the view; install background sweeper for auto-expiry."""

    async def stop(self) -> None: ...
    def health(self) -> dict: ...

    # --- handlers ---

    async def handle_list(self, req: RouteRequest) -> dict:
        """CONTRACT Β§4.11."""

    async def handle_post(self, req: RouteRequest) -> dict:
        """CONTRACT Β§4.12.
        Validates ttl ≀ config.market.max_ttl_seconds.
        Idempotency by client_id: if an event with matching (author, client_id) already exists, return its event_id.
        Otherwise: append market.post.created event via event_log.append_local."""

    async def handle_expire(self, req: RouteRequest) -> dict:
        """CONTRACT Β§4.13.
        Checks caller is original author OR trusted moderator.
        Appends market.post.expired event."""

    async def handle_search(self, req: RouteRequest) -> dict:
        """CONTRACT Β§4.14.
        1. bus.call('embed.text', (1,0), {texts: [query]})
        2. For each active post, embed (cached) and score via cosine.
        3. Return top-k.
        Cache embedding per (event_id, body+title hash)."""

3.4 Capability descriptors

descriptor_list = CapabilityDescriptor(
    name="market.list", version=(1, 0), stability="stable",
    request_schema={...}, response_schema={...}, stream_schema=None,
    params={}, max_concurrent=8,
    trust_required="member", timeout_seconds=5, idempotent=True,
)

descriptor_post = CapabilityDescriptor(
    name="market.post", version=(1, 0), stability="stable",
    request_schema={...}, response_schema={...}, stream_schema=None,
    params={}, max_concurrent=4,
    trust_required="member", timeout_seconds=10, idempotent=True,
)

descriptor_expire = CapabilityDescriptor(
    name="market.expire", version=(1, 0), stability="stable",
    request_schema={...}, response_schema={...}, stream_schema=None,
    params={}, max_concurrent=4,
    trust_required="member", timeout_seconds=5, idempotent=True,
)

descriptor_search = CapabilityDescriptor(
    name="market.search", version=(1, 0), stability="stable",
    request_schema={...}, response_schema={...}, stream_schema=None,
    params={}, max_concurrent=4,
    trust_required="member", timeout_seconds=10, idempotent=True,
)

All four use the default lambda offered, requested: True params predicate.


4. Behaviour

4.1 Event-sourcing in practice

client β†’ market.post (via bus)
  ↓
service.handle_post
  β†’ validate
  β†’ idempotency check on (author, client_id)
  β†’ event_log.append_local("market.post.created", data, author_kp)
  β†’ X02 fans out the new event to ReplayEngine
  β†’ MarketplaceView.apply(event) updates in-memory state
  β†’ service returns {event_id, lamport}
peer sync brings remote market.post.created event
  ↓
event_log.append_received(event)
  β†’ ReplayEngine triggers MarketplaceView.apply(event)
  β†’ next market.list call sees the new post

The service NEVER writes posts directly. The event log is the only mutator.

4.2 Auto-expiry sweeper

Background task scanning the view every 60 seconds:

  • For each post where now >= expires_at AND no .expired event seen: append market.post.expired event with reason "stale" and the local node as author.
  • This is best-effort; only the post's author or a trusted member is supposed to expire it, but for "stale" reason, any anchor MAY do it.

Conflict: if two nodes auto-expire concurrently, both events land, view de-duplicates by target_event_id. Last-writer-wins per Lamport (no functional difference β€” both say "expired").

4.3 Search caching

Embedding all active posts on every search is wasteful. Cache strategy:

  • Per-post embedding cached in memory keyed by (event_id, hash(title+body))
  • On .updated event, invalidate
  • Cache size bounded; LRU eviction at 5000 entries

Phase 1.5 optimisation. MVP may re-embed each time.

4.4 Trust check for expire

def can_expire(caller_node_id, post, reason, community_manifest):
    if caller_node_id == post.author:
        return True
    if reason in ("fulfilled", "withdrawn", "user_request"):
        return caller_node_id == post.author   # author-only for these reasons
    if reason == "stale":
        return community_manifest.level_of(caller_node_id) in ("trusted", "anchor")
    return False

4.5 Categories vs tags

  • category is a fixed enum (4 values)
  • tags are free-form, sourced from user input; the UI presents popular tags as suggestions

4.6 Geofencing

location is advisory. Display only. No filtering in MVP. Phase 2: market.list filter on distance.


5. Composition with market.search

UI input: "wasser notfall"
  ↓
bus.call("market.search", (1,0), {input: {query: "wasser notfall", k: 10}})
  β†’ MarketplaceService.handle_search
       β†’ bus.call("embed.text", (1,0), {texts: ["wasser notfall"]})
       β†’ score each active post (cached embedding) β†’ top-k
       β†’ return posts with scores

The marketplace service depends on embed.text being available somewhere on the mesh. If embedding is unavailable, search falls back to substring matching (logged at warning).


6. Errors

Condition Wire code
ttl_seconds > max_ttl_seconds bad_request
caller not authorised to expire unauthorized
target post not found (expire/update) not_found
no embedding provider for search partition (degrades to substring)

7. Configuration

From X04 Β§3:

config.market.enabled
config.market.default_ttl_seconds   # 7 days
config.market.max_ttl_seconds       # 30 days

8. Tests

Unit

  • test_post_event_appears_in_view_after_apply
  • test_post_is_idempotent_on_client_id
  • test_expire_unauthorized_caller_rejected
  • test_auto_expire_appends_event_with_stale_reason
  • test_replay_then_snapshot_then_restore_equal_state
  • test_search_falls_back_to_substring_when_embedding_unavailable

Integration

  • test_two_node_post_visible_after_sync
  • test_three_node_concurrent_posts_all_visible
  • test_expire_propagates_then_list_excludes
  • test_market_search_returns_relevant

9. Cross-references

What Where
market.* wire CONTRACT Β§4.11–4.14
Event types CONTRACT Β§7.2
Event log X02
Embed via bus M11
UI marketplace tab M08 Β§5.4

10. Open questions

  1. Geographic filter β€” Phase 2 with a location_filter: {center, radius_km}.
  2. Moderation tooling β€” a "report" flow with admin queue; Phase 2.
  3. Inter-community marketplace federation β€” Phase 2 / 3.
  4. Encryption of post bodies β€” currently cleartext within community. Could encrypt at-rest in event log. Out of scope.