GitHub Actions
fix: Python 3.10 compat — replace datetime.UTC with timezone.utc shim
481b78e
Raw
History Blame
14.7 kB
"""X02 - Event log (SQLite WAL).
Spec: docs/X02-events.md §3.3
Impl-ref: impl_ref.md §3
All community events signed with author Ed25519 key.
Lamport clock enforces causal ordering.
ReplayEngine drives materialised views (marketplace, chat).
"""
from __future__ import annotations
import asyncio
import contextlib
import json
import sqlite3
import threading
from collections.abc import AsyncIterator
from datetime import datetime, timezone as _tz
UTC = _tz.utc
from pathlib import Path
UTC = UTC
from typing import Any
from .lamport import LamportClock
from .types import _ALL_EVENT_TYPES, Event, EventType, new_ulid
_SCHEMA = """
PRAGMA journal_mode = WAL;
PRAGMA synchronous = NORMAL;
CREATE TABLE IF NOT EXISTS events (
event_id TEXT PRIMARY KEY,
event_type TEXT NOT NULL,
community_id TEXT NOT NULL,
author TEXT NOT NULL,
lamport INTEGER NOT NULL,
payload TEXT NOT NULL,
issued_at TEXT NOT NULL,
signature TEXT NOT NULL,
schema_version INTEGER NOT NULL DEFAULT 1,
received_at TEXT NOT NULL
);
CREATE INDEX IF NOT EXISTS idx_events_lamport
ON events(community_id, lamport, event_id);
CREATE INDEX IF NOT EXISTS idx_events_type
ON events(community_id, event_type, lamport);
CREATE TABLE IF NOT EXISTS clock (
community_id TEXT PRIMARY KEY,
lamport INTEGER NOT NULL
);
"""
def _now_utc() -> str:
return datetime.now(UTC).strftime("%Y-%m-%dT%H:%M:%S.%f") + "Z"
def _row_to_event(row: tuple[Any, ...]) -> Event:
(
event_id,
event_type,
community_id,
author,
lamport,
payload,
issued_at,
signature,
schema_version,
_received_at,
) = row
return Event(
schema_version=schema_version,
event_id=event_id,
event_type=event_type,
community_id=community_id,
author=author,
lamport=lamport,
payload=json.loads(payload),
issued_at=issued_at,
signature=signature,
)
def _sign(event: Event, kp: Any) -> str:
"""Return signature string or '' when kp is None."""
if kp is None:
return ""
import base64
import hashlib
raw = _canonical_bytes(event)
if hasattr(kp, "sign"):
sig_bytes: bytes = kp.sign(raw)
else:
# Fallback: HMAC-SHA256 keyed by kp as bytes (test usage)
import hmac
sig_bytes = hmac.new(kp, raw, hashlib.sha256).digest()
encoded = base64.urlsafe_b64encode(sig_bytes).rstrip(b"=").decode()
return f"ed25519:{encoded}"
def _canonical_bytes(event: Event) -> bytes:
"""Deterministic serialisation for signing / verification."""
obj = {
"schema_version": event.schema_version,
"event_id": event.event_id,
"event_type": event.event_type,
"community_id": event.community_id,
"author": event.author,
"lamport": event.lamport,
"payload": event.payload,
"issued_at": event.issued_at,
}
return json.dumps(obj, sort_keys=True, separators=(",", ":")).encode()
def _verify(event: Event, kp_store: Any) -> bool:
"""Return True if the signature is valid or if there is no kp_store."""
if kp_store is None:
return True
if not event.signature:
return True
if hasattr(kp_store, "verify"):
try:
import base64
prefix = "ed25519:"
b64 = (
event.signature[len(prefix) :]
if event.signature.startswith(prefix)
else event.signature
)
# pad
padding = 4 - len(b64) % 4
if padding != 4:
b64 += "=" * padding
sig_bytes = base64.urlsafe_b64decode(b64)
raw = _canonical_bytes(event)
return kp_store.verify(event.author, raw, sig_bytes)
except Exception:
return False
return True
class EventLogError(Exception):
"""Raised for protocol violations in the event log."""
def __init__(self, code: str, message: str = "") -> None:
super().__init__(message or code)
self.code = code
class EventLog:
"""SQLite-backed append-only event log for one community."""
def __init__(
self,
db_path: Path,
community_id: str,
kp_store: Any = None,
) -> None:
self._db_path = db_path
self._community_id = community_id
self._kp_store = kp_store
self._lock = threading.Lock()
self._subscribers: list[tuple[asyncio.Queue[Event], frozenset[str] | None]] = []
self._conn = sqlite3.connect(str(db_path), check_same_thread=False)
self._init_schema()
self._clock = LamportClock(community_id, db_path)
self._clock._conn = self._conn # share connection
# ------------------------------------------------------------------
# Schema
# ------------------------------------------------------------------
def _init_schema(self) -> None:
for stmt in _SCHEMA.strip().split(";"):
stmt = stmt.strip()
if stmt:
self._conn.execute(stmt)
self._conn.commit()
# ------------------------------------------------------------------
# Writing
# ------------------------------------------------------------------
def append_local(
self,
event_type: EventType,
author: str,
payload: dict[str, Any],
kp: Any = None,
) -> Event:
"""Mint, sign, and persist a new local event atomically."""
if event_type not in _ALL_EVENT_TYPES:
raise EventLogError("schema_unknown", f"Unknown event_type: {event_type!r}")
with self._lock:
lamport = self._clock._value + 1
event_id = new_ulid()
now = _now_utc()
# Build unsigned event first to produce canonical bytes
event = Event(
schema_version=1,
event_id=event_id,
event_type=event_type,
community_id=self._community_id,
author=author,
lamport=lamport,
payload=payload,
issued_at=now,
signature="",
)
sig = _sign(event, kp)
# Replace with signed version
import dataclasses
event = dataclasses.replace(event, signature=sig)
self._clock._value = lamport
self._conn.execute("BEGIN")
try:
self._conn.execute(
"INSERT INTO events "
"(event_id,event_type,community_id,author,lamport,payload,issued_at,signature,schema_version,received_at) "
"VALUES (?,?,?,?,?,?,?,?,?,?)",
(
event.event_id,
event.event_type,
event.community_id,
event.author,
event.lamport,
json.dumps(event.payload, sort_keys=True),
event.issued_at,
event.signature,
event.schema_version,
now,
),
)
self._clock._save_in_tx(self._conn)
self._conn.execute("COMMIT")
except Exception:
self._conn.execute("ROLLBACK")
raise
self._fanout(event)
return event
def append_received(self, event: Event) -> bool:
"""Persist a peer event. Returns False for duplicates, True if new."""
if event.event_type not in _ALL_EVENT_TYPES:
raise EventLogError("schema_unknown", f"Unknown event_type: {event.event_type!r}")
if not _verify(event, self._kp_store):
raise EventLogError("invalid_signature", f"Bad signature on {event.event_id}")
with self._lock:
# Duplicate check
dup = self._conn.execute(
"SELECT 1 FROM events WHERE event_id = ?", (event.event_id,)
).fetchone()
if dup:
return False
new_lamport = max(self._clock._value, event.lamport) + 1
now = _now_utc()
self._conn.execute("BEGIN")
try:
self._conn.execute(
"INSERT INTO events "
"(event_id,event_type,community_id,author,lamport,payload,issued_at,signature,schema_version,received_at) "
"VALUES (?,?,?,?,?,?,?,?,?,?)",
(
event.event_id,
event.event_type,
event.community_id,
event.author,
event.lamport,
json.dumps(event.payload, sort_keys=True),
event.issued_at,
event.signature,
event.schema_version,
now,
),
)
self._clock._value = new_lamport
self._clock._save_in_tx(self._conn)
self._conn.execute("COMMIT")
except Exception:
self._conn.execute("ROLLBACK")
raise
self._fanout(event)
return True
# ------------------------------------------------------------------
# Reading
# ------------------------------------------------------------------
def get(self, event_id: str) -> Event | None:
row = self._conn.execute(
"SELECT event_id,event_type,community_id,author,lamport,payload,issued_at,signature,schema_version,received_at "
"FROM events WHERE event_id = ?",
(event_id,),
).fetchone()
return _row_to_event(row) if row else None
def since(self, lamport: int, limit: int = 1000) -> list[Event]:
"""Return events with lamport >= given value, ordered by (lamport, event_id)."""
rows = self._conn.execute(
"SELECT event_id,event_type,community_id,author,lamport,payload,issued_at,signature,schema_version,received_at "
"FROM events WHERE community_id = ? AND lamport >= ? "
"ORDER BY lamport ASC, event_id ASC LIMIT ?",
(self._community_id, lamport, limit),
).fetchall()
return [_row_to_event(r) for r in rows]
def head(self) -> int:
"""Highest Lamport value stored."""
row = self._conn.execute(
"SELECT MAX(lamport) FROM events WHERE community_id = ?",
(self._community_id,),
).fetchone()
return row[0] if row and row[0] is not None else 0
def by_type(self, event_type: EventType, since_lamport: int = 0) -> list[Event]:
rows = self._conn.execute(
"SELECT event_id,event_type,community_id,author,lamport,payload,issued_at,signature,schema_version,received_at "
"FROM events WHERE community_id = ? AND event_type = ? AND lamport >= ? "
"ORDER BY lamport ASC, event_id ASC",
(self._community_id, event_type, since_lamport),
).fetchall()
return [_row_to_event(r) for r in rows]
def heads_by_type(self) -> dict[str, int]:
"""Highest lamport per event_type; used by sync."""
rows = self._conn.execute(
"SELECT event_type, MAX(lamport) FROM events WHERE community_id = ? GROUP BY event_type",
(self._community_id,),
).fetchall()
return {row[0]: row[1] for row in rows}
def replay(
self,
*,
since_lamport: int = 0,
event_types: list[EventType] | None = None,
limit: int | None = None,
) -> list[Event]:
"""Return events in (lamport, event_id) order, optionally filtered."""
if event_types:
# placeholders contains only "?" characters (len = len(event_types)) — not user input
placeholders = ",".join("?" for _ in event_types)
sql = ( # nosec B608
"SELECT event_id,event_type,community_id,author,lamport,payload,issued_at,signature,schema_version,received_at "
"FROM events WHERE community_id = ? AND lamport >= ? "
f"AND event_type IN ({placeholders}) " # nosec B608
"ORDER BY lamport ASC, event_id ASC"
)
params: list[Any] = [self._community_id, since_lamport, *event_types]
else:
sql = (
"SELECT event_id,event_type,community_id,author,lamport,payload,issued_at,signature,schema_version,received_at "
"FROM events WHERE community_id = ? AND lamport >= ? "
"ORDER BY lamport ASC, event_id ASC"
)
params = [self._community_id, since_lamport]
if limit is not None:
sql += f" LIMIT {int(limit)}"
rows = self._conn.execute(sql, params).fetchall()
return [_row_to_event(r) for r in rows]
# ------------------------------------------------------------------
# Pubsub
# ------------------------------------------------------------------
def subscribe(
self,
event_types: list[EventType] | None = None,
) -> AsyncIterator[Event]:
"""Return an async iterator that yields matching events as they arrive."""
q: asyncio.Queue[Event] = asyncio.Queue()
ft: frozenset[str] | None = frozenset(event_types) if event_types else None
self._subscribers.append((q, ft))
async def _iter() -> AsyncIterator[Event]:
try:
while True:
event = await q.get()
yield event
except GeneratorExit:
pass
finally:
with contextlib.suppress(ValueError):
self._subscribers.remove((q, ft))
return _iter()
# ------------------------------------------------------------------
# Internal
# ------------------------------------------------------------------
def close(self) -> None:
"""Close the underlying SQLite connection."""
with contextlib.suppress(Exception):
self._conn.close()
def _fanout(self, event: Event) -> None:
"""Push event to all in-process subscribers (best-effort)."""
for q, filter_types in list(self._subscribers):
if filter_types is None or event.event_type in filter_types:
with contextlib.suppress(asyncio.QueueFull):
q.put_nowait(event)