Spaces:
Running on Zero
Running on Zero
GitHub Actions
Quality improvements: Unicode chars, Token class, imports, type hints, formatting
3f78ea8 | """HearthNet — X03 Observability: Structured JSON logging. | |
| Public API: | |
| configure(config) — install handlers/formatters. Idempotent. | |
| get_logger(name) — return JSON-emitting stdlib logger | |
| JsonFormatter — one-line JSON log records | |
| RateLimitedLogger — at most one log per second per (logger, key) | |
| """ | |
| from __future__ import annotations | |
| import json | |
| import logging | |
| import logging.handlers | |
| import threading | |
| import time | |
| from pathlib import Path | |
| from typing import Any | |
| from hearthnet.config import ObservabilityConfig | |
| from hearthnet.constants import LOG_RETENTION_DAYS | |
| _configured = False | |
| _configure_lock = threading.Lock() | |
| class JsonFormatter(logging.Formatter): | |
| """Renders a LogRecord as a single JSON line.""" | |
| def format(self, record: logging.LogRecord) -> str: | |
| payload: dict[str, Any] = { | |
| "ts": self.formatTime(record, "%Y-%m-%dT%H:%M:%S.") + f"{record.msecs:03.0f}Z", | |
| "level": record.levelname.lower(), | |
| "logger": record.name, | |
| "msg": record.getMessage(), | |
| } | |
| # Attach structured extras (skip stdlib internals) | |
| _SKIP = { | |
| "name", | |
| "msg", | |
| "args", | |
| "created", | |
| "filename", | |
| "funcName", | |
| "levelname", | |
| "levelno", | |
| "lineno", | |
| "module", | |
| "msecs", | |
| "pathname", | |
| "process", | |
| "processName", | |
| "relativeCreated", | |
| "stack_info", | |
| "thread", | |
| "threadName", | |
| "exc_info", | |
| "exc_text", | |
| "message", | |
| } | |
| payload.update({key: val for key, val in record.__dict__.items() if key not in _SKIP}) | |
| if record.exc_info: | |
| payload["exc"] = self.formatException(record.exc_info) | |
| return json.dumps(payload, default=str, ensure_ascii=False) | |
| def configure(config: ObservabilityConfig) -> None: | |
| """Install handlers and formatters on the root 'hearthnet' logger. | |
| Idempotent — safe to call multiple times; only runs once. | |
| """ | |
| global _configured | |
| with _configure_lock: | |
| if _configured: | |
| return | |
| _configured = True | |
| level_name = (config.log_level or "info").upper() | |
| level = getattr(logging, level_name, logging.INFO) | |
| root = logging.getLogger("hearthnet") | |
| root.setLevel(level) | |
| root.handlers.clear() # reset on reconfigure | |
| formatter = JsonFormatter() | |
| # Console handler | |
| console = logging.StreamHandler() | |
| console.setFormatter(formatter) | |
| root.addHandler(console) | |
| # File handler (daily rotation, 14-day retention) | |
| log_dir: Path | None = config.log_dir | |
| if log_dir is not None: | |
| log_dir = Path(log_dir) | |
| log_dir.mkdir(parents=True, exist_ok=True) | |
| log_path = log_dir / "hearthnet.log" | |
| file_handler = logging.handlers.TimedRotatingFileHandler( | |
| filename=str(log_path), | |
| when="midnight", | |
| utc=True, | |
| backupCount=LOG_RETENTION_DAYS, | |
| encoding="utf-8", | |
| ) | |
| file_handler.setFormatter(formatter) | |
| root.addHandler(file_handler) | |
| root.propagate = False | |
| def get_logger(name: str) -> logging.Logger: | |
| """Return a stdlib logger that emits JSON lines. | |
| Convention: ``name = __name__`` of the calling module. | |
| """ | |
| return logging.getLogger(name) | |
| class RateLimitedLogger: | |
| """Wraps a Logger and suppresses duplicate messages within a 1-second window. | |
| Keyed by ``(logger_name, message_key)`` — call with an explicit *key* | |
| argument to group semantically similar messages: | |
| rl_log.warning("peer unreachable", key="peer_unreachable") | |
| """ | |
| def __init__(self, logger: logging.Logger) -> None: | |
| self._logger = logger | |
| self._last: dict[tuple[str, str], float] = {} | |
| self._lock = threading.Lock() | |
| self._window = 1.0 # seconds | |
| def _should_emit(self, key: str) -> bool: | |
| bucket = (self._logger.name, key) | |
| now = time.monotonic() | |
| with self._lock: | |
| last = self._last.get(bucket, 0.0) | |
| if now - last >= self._window: | |
| self._last[bucket] = now | |
| return True | |
| return False | |
| def _emit(self, level: int, msg: str, key: str, **kwargs: Any) -> None: | |
| if self._should_emit(key): | |
| self._logger.log(level, msg, **kwargs) | |
| def debug(self, msg: str, *, key: str = "", **kwargs: Any) -> None: | |
| self._emit(logging.DEBUG, msg, key or msg, **kwargs) | |
| def info(self, msg: str, *, key: str = "", **kwargs: Any) -> None: | |
| self._emit(logging.INFO, msg, key or msg, **kwargs) | |
| def warning(self, msg: str, *, key: str = "", **kwargs: Any) -> None: | |
| self._emit(logging.WARNING, msg, key or msg, **kwargs) | |
| def error(self, msg: str, *, key: str = "", **kwargs: Any) -> None: | |
| self._emit(logging.ERROR, msg, key or msg, **kwargs) | |