GitHub Actions
feat: impl_ref Β§22 gap-fill β€” all missing symbols implemented
38cba90
Raw
History Blame
5.21 kB
"""HearthNet β€” X03 Observability: Per-request tracing.
Uses contextvars.ContextVar so traces propagate correctly across asyncio tasks.
Public API:
new_trace(capability) β€” create and attach a fresh Trace
current_trace() β€” return the active Trace or None
attach(trace) β€” set the active Trace on this context
span(name, **extras) β€” context-manager that records a Span
TraceRingBuffer β€” thread-safe ring buffer of last N traces
get_ring_buffer() β€” module-level singleton ring buffer
"""
from __future__ import annotations
import secrets
import threading
import time
from collections import deque
from collections.abc import Iterator
from contextlib import contextmanager
from contextvars import ContextVar
from dataclasses import dataclass, field
from hearthnet.constants import TRACE_RING_BUFFER_SIZE
# ── ULID approximation ───────────────────────────────────────────────────────
def _new_ulid() -> str:
"""Simple ULID approximation: 13-digit ms timestamp + 12 hex random chars."""
try:
from python_ulid import ULID # type: ignore[import]
return str(ULID())
except ImportError:
ts = str(int(time.time() * 1000)).zfill(13)
rand = secrets.token_hex(6).upper()
return ts + rand
# ── Dataclasses ──────────────────────────────────────────────────────────────
@dataclass
class Span:
name: str
started_at: float = field(default_factory=time.monotonic)
ended_at: float | None = None
extras: dict = field(default_factory=dict)
@property
def duration_ms(self) -> float | None:
if self.ended_at is None:
return None
return (self.ended_at - self.started_at) * 1000.0
@dataclass
class Trace:
trace_id: str = field(default_factory=_new_ulid)
capability: str = ""
started_at: float = field(default_factory=time.monotonic)
spans: list[Span] = field(default_factory=list)
_lock: threading.Lock = field(default_factory=threading.Lock, repr=False, compare=False)
def add_span(self, span: Span) -> None:
with self._lock:
self.spans.append(span)
# ── Context variable ─────────────────────────────────────────────────────────
_current_trace: ContextVar[Trace | None] = ContextVar("_current_trace", default=None)
# ── Public API ───────────────────────────────────────────────────────────────
def new_trace(capability: str) -> Trace:
"""Create a fresh Trace, attach it to this context, and return it."""
trace = Trace(capability=capability)
_current_trace.set(trace)
get_ring_buffer().push(trace)
return trace
def current_trace() -> Trace | None:
"""Return the Trace active on this context, or None."""
return _current_trace.get()
def attach(trace: Trace) -> None:
"""Set *trace* as the active trace on this context (e.g. to propagate to a child task)."""
_current_trace.set(trace)
def detach() -> None:
"""Clear the active trace from this context."""
_current_trace.set(None) # type: ignore[arg-type]
@contextmanager
def span(name: str, **extras: object) -> Iterator[Span]:
"""Context-manager that records a Span on the current Trace (if any).
Usage::
async with span("embed", model="nomic"):
...
"""
s = Span(name=name, extras=dict(extras))
trace = current_trace()
try:
yield s
finally:
s.ended_at = time.monotonic()
if trace is not None:
trace.add_span(s)
# ── Ring buffer ──────────────────────────────────────────────────────────────
class TraceRingBuffer:
"""Thread-safe bounded ring buffer that keeps the last *maxlen* traces."""
def __init__(self, maxlen: int = TRACE_RING_BUFFER_SIZE) -> None:
self._buf: deque[Trace] = deque(maxlen=maxlen)
self._lock = threading.Lock()
def push(self, trace: Trace) -> None:
with self._lock:
self._buf.append(trace)
def snapshot(self) -> list[Trace]:
"""Return a copy of all buffered traces, oldest first."""
with self._lock:
return list(self._buf)
def __len__(self) -> int:
with self._lock:
return len(self._buf)
_ring_buffer: TraceRingBuffer | None = None
_ring_lock = threading.Lock()
def get_ring_buffer() -> TraceRingBuffer:
"""Return the module-level singleton TraceRingBuffer."""
global _ring_buffer
if _ring_buffer is None:
with _ring_lock:
if _ring_buffer is None:
_ring_buffer = TraceRingBuffer()
return _ring_buffer