File size: 5,214 Bytes
31c93b1
 
 
 
 
 
 
 
 
 
 
 
4aaae80
31c93b1
 
 
 
 
 
4aaae80
31c93b1
 
 
 
 
 
 
 
4aaae80
31c93b1
 
 
 
4aaae80
31c93b1
 
 
 
 
 
 
 
 
4aaae80
31c93b1
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
4aaae80
31c93b1
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
38cba90
 
 
 
 
31c93b1
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
4aaae80
31c93b1
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
"""HearthNet β€” X03 Observability: Per-request tracing.

Uses contextvars.ContextVar so traces propagate correctly across asyncio tasks.

Public API:
    new_trace(capability)  β€” create and attach a fresh Trace
    current_trace()        β€” return the active Trace or None
    attach(trace)          β€” set the active Trace on this context
    span(name, **extras)   β€” context-manager that records a Span
    TraceRingBuffer        β€” thread-safe ring buffer of last N traces
    get_ring_buffer()      β€” module-level singleton ring buffer
"""

from __future__ import annotations

import secrets
import threading
import time
from collections import deque
from collections.abc import Iterator
from contextlib import contextmanager
from contextvars import ContextVar
from dataclasses import dataclass, field

from hearthnet.constants import TRACE_RING_BUFFER_SIZE

# ── ULID approximation ───────────────────────────────────────────────────────


def _new_ulid() -> str:
    """Simple ULID approximation: 13-digit ms timestamp + 12 hex random chars."""
    try:
        from python_ulid import ULID  # type: ignore[import]

        return str(ULID())
    except ImportError:
        ts = str(int(time.time() * 1000)).zfill(13)
        rand = secrets.token_hex(6).upper()
        return ts + rand


# ── Dataclasses ──────────────────────────────────────────────────────────────


@dataclass
class Span:
    name: str
    started_at: float = field(default_factory=time.monotonic)
    ended_at: float | None = None
    extras: dict = field(default_factory=dict)

    @property
    def duration_ms(self) -> float | None:
        if self.ended_at is None:
            return None
        return (self.ended_at - self.started_at) * 1000.0


@dataclass
class Trace:
    trace_id: str = field(default_factory=_new_ulid)
    capability: str = ""
    started_at: float = field(default_factory=time.monotonic)
    spans: list[Span] = field(default_factory=list)
    _lock: threading.Lock = field(default_factory=threading.Lock, repr=False, compare=False)

    def add_span(self, span: Span) -> None:
        with self._lock:
            self.spans.append(span)


# ── Context variable ─────────────────────────────────────────────────────────

_current_trace: ContextVar[Trace | None] = ContextVar("_current_trace", default=None)


# ── Public API ───────────────────────────────────────────────────────────────


def new_trace(capability: str) -> Trace:
    """Create a fresh Trace, attach it to this context, and return it."""
    trace = Trace(capability=capability)
    _current_trace.set(trace)
    get_ring_buffer().push(trace)
    return trace


def current_trace() -> Trace | None:
    """Return the Trace active on this context, or None."""
    return _current_trace.get()


def attach(trace: Trace) -> None:
    """Set *trace* as the active trace on this context (e.g. to propagate to a child task)."""
    _current_trace.set(trace)


def detach() -> None:
    """Clear the active trace from this context."""
    _current_trace.set(None)  # type: ignore[arg-type]


@contextmanager
def span(name: str, **extras: object) -> Iterator[Span]:
    """Context-manager that records a Span on the current Trace (if any).

    Usage::

        async with span("embed", model="nomic"):
            ...
    """
    s = Span(name=name, extras=dict(extras))
    trace = current_trace()
    try:
        yield s
    finally:
        s.ended_at = time.monotonic()
        if trace is not None:
            trace.add_span(s)


# ── Ring buffer ──────────────────────────────────────────────────────────────


class TraceRingBuffer:
    """Thread-safe bounded ring buffer that keeps the last *maxlen* traces."""

    def __init__(self, maxlen: int = TRACE_RING_BUFFER_SIZE) -> None:
        self._buf: deque[Trace] = deque(maxlen=maxlen)
        self._lock = threading.Lock()

    def push(self, trace: Trace) -> None:
        with self._lock:
            self._buf.append(trace)

    def snapshot(self) -> list[Trace]:
        """Return a copy of all buffered traces, oldest first."""
        with self._lock:
            return list(self._buf)

    def __len__(self) -> int:
        with self._lock:
            return len(self._buf)


_ring_buffer: TraceRingBuffer | None = None
_ring_lock = threading.Lock()


def get_ring_buffer() -> TraceRingBuffer:
    """Return the module-level singleton TraceRingBuffer."""
    global _ring_buffer
    if _ring_buffer is None:
        with _ring_lock:
            if _ring_buffer is None:
                _ring_buffer = TraceRingBuffer()
    return _ring_buffer