Spaces:
Running on Zero
Running on Zero
File size: 5,214 Bytes
31c93b1 4aaae80 31c93b1 4aaae80 31c93b1 4aaae80 31c93b1 4aaae80 31c93b1 4aaae80 31c93b1 4aaae80 31c93b1 38cba90 31c93b1 4aaae80 31c93b1 | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 | """HearthNet β X03 Observability: Per-request tracing.
Uses contextvars.ContextVar so traces propagate correctly across asyncio tasks.
Public API:
new_trace(capability) β create and attach a fresh Trace
current_trace() β return the active Trace or None
attach(trace) β set the active Trace on this context
span(name, **extras) β context-manager that records a Span
TraceRingBuffer β thread-safe ring buffer of last N traces
get_ring_buffer() β module-level singleton ring buffer
"""
from __future__ import annotations
import secrets
import threading
import time
from collections import deque
from collections.abc import Iterator
from contextlib import contextmanager
from contextvars import ContextVar
from dataclasses import dataclass, field
from hearthnet.constants import TRACE_RING_BUFFER_SIZE
# ββ ULID approximation βββββββββββββββββββββββββββββββββββββββββββββββββββββββ
def _new_ulid() -> str:
"""Simple ULID approximation: 13-digit ms timestamp + 12 hex random chars."""
try:
from python_ulid import ULID # type: ignore[import]
return str(ULID())
except ImportError:
ts = str(int(time.time() * 1000)).zfill(13)
rand = secrets.token_hex(6).upper()
return ts + rand
# ββ Dataclasses ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
@dataclass
class Span:
name: str
started_at: float = field(default_factory=time.monotonic)
ended_at: float | None = None
extras: dict = field(default_factory=dict)
@property
def duration_ms(self) -> float | None:
if self.ended_at is None:
return None
return (self.ended_at - self.started_at) * 1000.0
@dataclass
class Trace:
trace_id: str = field(default_factory=_new_ulid)
capability: str = ""
started_at: float = field(default_factory=time.monotonic)
spans: list[Span] = field(default_factory=list)
_lock: threading.Lock = field(default_factory=threading.Lock, repr=False, compare=False)
def add_span(self, span: Span) -> None:
with self._lock:
self.spans.append(span)
# ββ Context variable βββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
_current_trace: ContextVar[Trace | None] = ContextVar("_current_trace", default=None)
# ββ Public API βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
def new_trace(capability: str) -> Trace:
"""Create a fresh Trace, attach it to this context, and return it."""
trace = Trace(capability=capability)
_current_trace.set(trace)
get_ring_buffer().push(trace)
return trace
def current_trace() -> Trace | None:
"""Return the Trace active on this context, or None."""
return _current_trace.get()
def attach(trace: Trace) -> None:
"""Set *trace* as the active trace on this context (e.g. to propagate to a child task)."""
_current_trace.set(trace)
def detach() -> None:
"""Clear the active trace from this context."""
_current_trace.set(None) # type: ignore[arg-type]
@contextmanager
def span(name: str, **extras: object) -> Iterator[Span]:
"""Context-manager that records a Span on the current Trace (if any).
Usage::
async with span("embed", model="nomic"):
...
"""
s = Span(name=name, extras=dict(extras))
trace = current_trace()
try:
yield s
finally:
s.ended_at = time.monotonic()
if trace is not None:
trace.add_span(s)
# ββ Ring buffer ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
class TraceRingBuffer:
"""Thread-safe bounded ring buffer that keeps the last *maxlen* traces."""
def __init__(self, maxlen: int = TRACE_RING_BUFFER_SIZE) -> None:
self._buf: deque[Trace] = deque(maxlen=maxlen)
self._lock = threading.Lock()
def push(self, trace: Trace) -> None:
with self._lock:
self._buf.append(trace)
def snapshot(self) -> list[Trace]:
"""Return a copy of all buffered traces, oldest first."""
with self._lock:
return list(self._buf)
def __len__(self) -> int:
with self._lock:
return len(self._buf)
_ring_buffer: TraceRingBuffer | None = None
_ring_lock = threading.Lock()
def get_ring_buffer() -> TraceRingBuffer:
"""Return the module-level singleton TraceRingBuffer."""
global _ring_buffer
if _ring_buffer is None:
with _ring_lock:
if _ring_buffer is None:
_ring_buffer = TraceRingBuffer()
return _ring_buffer
|