Spaces:
Running on Zero
Running on Zero
| from __future__ import annotations | |
| import sqlite3 | |
| import threading | |
| from pathlib import Path | |
| class LamportClock: | |
| """Thread-safe, SQLite-persisted Lamport clock for one community. | |
| The clock row lives in the same ``clock`` table as the event log DB so | |
| that both the event insert and the clock bump happen in the same | |
| transaction. | |
| """ | |
| def __init__(self, community_id: str, db_path: Path) -> None: | |
| self._community_id = community_id | |
| self._db_path = db_path | |
| self._lock = threading.Lock() | |
| self._value: int = 0 | |
| self._conn: sqlite3.Connection | None = None | |
| self._load() | |
| # ------------------------------------------------------------------ | |
| # Public interface | |
| # ------------------------------------------------------------------ | |
| def tick(self) -> int: | |
| """Increment and return the new Lamport value (for local events).""" | |
| with self._lock: | |
| self._value += 1 | |
| self._save() | |
| return self._value | |
| def update(self, received: int) -> int: | |
| """Advance to max(local, received) + 1 (for received events).""" | |
| with self._lock: | |
| self._value = max(self._value, received) + 1 | |
| self._save() | |
| return self._value | |
| def current(self) -> int: | |
| with self._lock: | |
| return self._value | |
| # ------------------------------------------------------------------ | |
| # Internal helpers | |
| # ------------------------------------------------------------------ | |
| def _get_conn(self) -> sqlite3.Connection: | |
| if self._conn is None: | |
| self._conn = sqlite3.connect(str(self._db_path), check_same_thread=False) | |
| return self._conn | |
| def _load(self) -> None: | |
| conn = self._get_conn() | |
| conn.execute( | |
| "CREATE TABLE IF NOT EXISTS clock " | |
| "(community_id TEXT PRIMARY KEY, lamport INTEGER NOT NULL)" | |
| ) | |
| conn.commit() | |
| row = conn.execute( | |
| "SELECT lamport FROM clock WHERE community_id = ?", | |
| (self._community_id,), | |
| ).fetchone() | |
| self._value = row[0] if row else 0 | |
| def _save(self) -> None: | |
| """Persist current value. Called while ``_lock`` is held.""" | |
| conn = self._get_conn() | |
| conn.execute( | |
| "INSERT INTO clock (community_id, lamport) VALUES (?, ?) " | |
| "ON CONFLICT(community_id) DO UPDATE SET lamport = excluded.lamport", | |
| (self._community_id, self._value), | |
| ) | |
| conn.commit() | |
| def _save_in_tx(self, conn: sqlite3.Connection) -> None: | |
| """Persist inside an already-open transaction (no commit here).""" | |
| conn.execute( | |
| "INSERT INTO clock (community_id, lamport) VALUES (?, ?) " | |
| "ON CONFLICT(community_id) DO UPDATE SET lamport = excluded.lamport", | |
| (self._community_id, self._value), | |
| ) | |