GitHub Actions
feat: implement all M01-M13, X01-X04 modules + quality gate fixes
31c93b1
Raw
History Blame
2.91 kB
from __future__ import annotations
import sqlite3
import threading
from pathlib import Path
class LamportClock:
"""Thread-safe, SQLite-persisted Lamport clock for one community.
The clock row lives in the same ``clock`` table as the event log DB so
that both the event insert and the clock bump happen in the same
transaction.
"""
def __init__(self, community_id: str, db_path: Path) -> None:
self._community_id = community_id
self._db_path = db_path
self._lock = threading.Lock()
self._value: int = 0
self._conn: sqlite3.Connection | None = None
self._load()
# ------------------------------------------------------------------
# Public interface
# ------------------------------------------------------------------
def tick(self) -> int:
"""Increment and return the new Lamport value (for local events)."""
with self._lock:
self._value += 1
self._save()
return self._value
def update(self, received: int) -> int:
"""Advance to max(local, received) + 1 (for received events)."""
with self._lock:
self._value = max(self._value, received) + 1
self._save()
return self._value
def current(self) -> int:
with self._lock:
return self._value
# ------------------------------------------------------------------
# Internal helpers
# ------------------------------------------------------------------
def _get_conn(self) -> sqlite3.Connection:
if self._conn is None:
self._conn = sqlite3.connect(str(self._db_path), check_same_thread=False)
return self._conn
def _load(self) -> None:
conn = self._get_conn()
conn.execute(
"CREATE TABLE IF NOT EXISTS clock "
"(community_id TEXT PRIMARY KEY, lamport INTEGER NOT NULL)"
)
conn.commit()
row = conn.execute(
"SELECT lamport FROM clock WHERE community_id = ?",
(self._community_id,),
).fetchone()
self._value = row[0] if row else 0
def _save(self) -> None:
"""Persist current value. Called while ``_lock`` is held."""
conn = self._get_conn()
conn.execute(
"INSERT INTO clock (community_id, lamport) VALUES (?, ?) "
"ON CONFLICT(community_id) DO UPDATE SET lamport = excluded.lamport",
(self._community_id, self._value),
)
conn.commit()
def _save_in_tx(self, conn: sqlite3.Connection) -> None:
"""Persist inside an already-open transaction (no commit here)."""
conn.execute(
"INSERT INTO clock (community_id, lamport) VALUES (?, ?) "
"ON CONFLICT(community_id) DO UPDATE SET lamport = excluded.lamport",
(self._community_id, self._value),
)