File size: 5,000 Bytes
31c93b1
 
 
 
 
 
 
 
4aaae80
31c93b1
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
4aaae80
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
31c93b1
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
"""HearthNet — X03 Observability: Structured JSON logging.

Public API:
    configure(config)   — install handlers/formatters. Idempotent.
    get_logger(name)    — return JSON-emitting stdlib logger
    JsonFormatter       — one-line JSON log records
    RateLimitedLogger   — at most one log per second per (logger, key)
"""

from __future__ import annotations

import json
import logging
import logging.handlers
import threading
import time
from pathlib import Path
from typing import Any

from hearthnet.config import ObservabilityConfig
from hearthnet.constants import LOG_RETENTION_DAYS

_configured = False
_configure_lock = threading.Lock()


class JsonFormatter(logging.Formatter):
    """Renders a LogRecord as a single JSON line."""

    def format(self, record: logging.LogRecord) -> str:
        payload: dict[str, Any] = {
            "ts": self.formatTime(record, "%Y-%m-%dT%H:%M:%S.") + f"{record.msecs:03.0f}Z",
            "level": record.levelname.lower(),
            "logger": record.name,
            "msg": record.getMessage(),
        }

        # Attach structured extras (skip stdlib internals)
        _SKIP = {
            "name",
            "msg",
            "args",
            "created",
            "filename",
            "funcName",
            "levelname",
            "levelno",
            "lineno",
            "module",
            "msecs",
            "pathname",
            "process",
            "processName",
            "relativeCreated",
            "stack_info",
            "thread",
            "threadName",
            "exc_info",
            "exc_text",
            "message",
        }
        for key, val in record.__dict__.items():
            if key not in _SKIP:
                payload[key] = val

        if record.exc_info:
            payload["exc"] = self.formatException(record.exc_info)

        return json.dumps(payload, default=str, ensure_ascii=False)


def configure(config: ObservabilityConfig) -> None:
    """Install handlers and formatters on the root 'hearthnet' logger.

    Idempotent — safe to call multiple times; only runs once.
    """
    global _configured
    with _configure_lock:
        if _configured:
            return
        _configured = True

    level_name = (config.log_level or "info").upper()
    level = getattr(logging, level_name, logging.INFO)

    root = logging.getLogger("hearthnet")
    root.setLevel(level)
    root.handlers.clear()  # reset on reconfigure

    formatter = JsonFormatter()

    # Console handler
    console = logging.StreamHandler()
    console.setFormatter(formatter)
    root.addHandler(console)

    # File handler (daily rotation, 14-day retention)
    log_dir: Path | None = config.log_dir
    if log_dir is not None:
        log_dir = Path(log_dir)
        log_dir.mkdir(parents=True, exist_ok=True)
        log_path = log_dir / "hearthnet.log"
        file_handler = logging.handlers.TimedRotatingFileHandler(
            filename=str(log_path),
            when="midnight",
            utc=True,
            backupCount=LOG_RETENTION_DAYS,
            encoding="utf-8",
        )
        file_handler.setFormatter(formatter)
        root.addHandler(file_handler)

    root.propagate = False


def get_logger(name: str) -> logging.Logger:
    """Return a stdlib logger that emits JSON lines.

    Convention: ``name = __name__`` of the calling module.
    """
    return logging.getLogger(name)


class RateLimitedLogger:
    """Wraps a Logger and suppresses duplicate messages within a 1-second window.

    Keyed by ``(logger_name, message_key)`` — call with an explicit *key*
    argument to group semantically similar messages:

        rl_log.warning("peer unreachable", key="peer_unreachable")
    """

    def __init__(self, logger: logging.Logger) -> None:
        self._logger = logger
        self._last: dict[tuple[str, str], float] = {}
        self._lock = threading.Lock()
        self._window = 1.0  # seconds

    def _should_emit(self, key: str) -> bool:
        bucket = (self._logger.name, key)
        now = time.monotonic()
        with self._lock:
            last = self._last.get(bucket, 0.0)
            if now - last >= self._window:
                self._last[bucket] = now
                return True
        return False

    def _emit(self, level: int, msg: str, key: str, **kwargs: Any) -> None:
        if self._should_emit(key):
            self._logger.log(level, msg, **kwargs)

    def debug(self, msg: str, *, key: str = "", **kwargs: Any) -> None:
        self._emit(logging.DEBUG, msg, key or msg, **kwargs)

    def info(self, msg: str, *, key: str = "", **kwargs: Any) -> None:
        self._emit(logging.INFO, msg, key or msg, **kwargs)

    def warning(self, msg: str, *, key: str = "", **kwargs: Any) -> None:
        self._emit(logging.WARNING, msg, key or msg, **kwargs)

    def error(self, msg: str, *, key: str = "", **kwargs: Any) -> None:
        self._emit(logging.ERROR, msg, key or msg, **kwargs)