"""Prometheus-format metrics + deep health probes. Emits a plain-text ``/metrics`` payload in the Prometheus exposition format (no prometheus_client dependency — stdlib only). Aggregates everything the middleware layer already tracks: * request counters bucketed by status class (2xx/4xx/5xx) * latency percentiles (p50/p95/p99) + average * cache hits/misses/hit-rate/size * rate-limiter allowed/denied/buckets * auth key counts (active + revoked) Health endpoints: * /livez — 200 if the process is up and can answer at all * /readyz — 200 only if the pipeline has initialized cleanly (503 + JSON detail otherwise) """ from __future__ import annotations from typing import Dict, List, Tuple def _fmt_metric( name: str, help_text: str, mtype: str, samples: List[Tuple[Dict[str, str], float]], ) -> List[str]: """Format one metric family in Prometheus text format.""" lines = [f"# HELP {name} {help_text}", f"# TYPE {name} {mtype}"] for labels, value in samples: if labels: lbl = ",".join(f'{k}="{_escape(str(v))}"' for k, v in sorted(labels.items())) lines.append(f"{name}{{{lbl}}} {value}") else: lines.append(f"{name} {value}") return lines def _escape(s: str) -> str: return s.replace("\\", "\\\\").replace('"', '\\"').replace("\n", "\\n") def render_prometheus( obs_stats: Dict, cache_stats: Dict, limiter_stats: Dict, auth_keys: int = 0, auth_keys_revoked: int = 0, version: str = "0.1.0", ) -> str: """Produce a Prometheus text-format payload from existing stats.""" lines: List[str] = [] # Build info lines += _fmt_metric( "rag_build_info", "Build info (version label always 1).", "gauge", [({"version": version}, 1)], ) # Requests by status class by_status = obs_stats.get("by_status", {}) req_samples = [({"status": code}, count) for code, count in by_status.items()] if not req_samples: req_samples = [({"status": "none"}, 0)] lines += _fmt_metric( "rag_requests_total", "HTTP requests grouped by status class (2xx/4xx/5xx).", "counter", req_samples, ) # Total request count lines += _fmt_metric( "rag_requests_observed", "Total records in the observability ring buffer.", "gauge", [({}, obs_stats.get("total", 0))], ) # Latency quantiles lat_samples = [ ({"quantile": "0.5"}, obs_stats.get("p50_ms", 0.0)), ({"quantile": "0.95"}, obs_stats.get("p95_ms", 0.0)), ({"quantile": "0.99"}, obs_stats.get("p99_ms", 0.0)), ] lines += _fmt_metric( "rag_request_latency_ms", "Request latency in milliseconds.", "summary", lat_samples, ) lines += _fmt_metric( "rag_request_latency_avg_ms", "Average request latency in milliseconds.", "gauge", [({}, obs_stats.get("avg_latency_ms", 0.0))], ) # Cache lines += _fmt_metric( "rag_cache_hits_total", "Query cache hits.", "counter", [({}, cache_stats.get("hits", 0))], ) lines += _fmt_metric( "rag_cache_misses_total", "Query cache misses.", "counter", [({}, cache_stats.get("misses", 0))], ) lines += _fmt_metric( "rag_cache_size", "Current cache entry count.", "gauge", [({}, cache_stats.get("size", 0))], ) lines += _fmt_metric( "rag_cache_hit_rate", "Cache hit rate (0..1).", "gauge", [({}, cache_stats.get("hit_rate", 0.0))], ) # Rate limiter lines += _fmt_metric( "rag_ratelimit_allowed_total", "Requests allowed by the token-bucket limiter.", "counter", [({}, limiter_stats.get("allowed", 0))], ) lines += _fmt_metric( "rag_ratelimit_denied_total", "Requests denied by the token-bucket limiter.", "counter", [({}, limiter_stats.get("denied", 0))], ) lines += _fmt_metric( "rag_ratelimit_buckets", "Live rate-limit buckets (unique clients).", "gauge", [({}, limiter_stats.get("buckets", 0))], ) # Auth lines += _fmt_metric( "rag_auth_keys", "API keys configured (revoked excluded).", "gauge", [ ({"revoked": "false"}, auth_keys), ({"revoked": "true"}, auth_keys_revoked), ], ) return "\n".join(lines) + "\n" # --------------------------------------------------------------- readiness def check_readiness(pipeline, require_warmed: bool = False) -> Tuple[bool, Dict[str, object]]: """Inspect the pipeline state. Return (ready, detail dict). Maps the ``Pipeline`` fields to generic names expected by operators (retriever/generator/verifier/monitor) so readiness looks the same no matter which retriever/generator flavor is configured. When ``require_warmed=True`` the probe additionally checks for ``pipeline._warmed`` (set by ``POST /v1/admin/warmup``). If the flag is absent/False, the pipeline is marked not-ready. """ detail: Dict[str, object] = {} ok = True # Pipeline.retrievers is a MultiRetriever; generator/verifier are direct; # "monitor" maps onto the signals computer (no one-to-one monitor object). checks = [ ("retriever", getattr(pipeline, "retrievers", None)), ("generator", getattr(pipeline, "generator", None)), ("verifier", getattr(pipeline, "verifier", None)), ("monitor", getattr(pipeline, "signals", None)), ] for name, obj in checks: has = obj is not None detail[name] = "ok" if has else "missing" if not has: ok = False # Retriever family name (useful in readiness output) try: r = getattr(pipeline, "retrievers", None) if r is not None: detail["retriever_type"] = type(r).__name__ inner = list(getattr(r, "retrievers", {}).keys()) detail["retriever_members"] = inner except Exception as e: detail["retriever_type"] = f"err:{e}" ok = False # How many documents/chunks have been added? try: detail["cached_queries"] = len(getattr(pipeline, "cache", {})) except Exception: detail["cached_queries"] = "unknown" # Warmup status (v1.56) — always reported; optionally gated warmed = bool(getattr(pipeline, "_warmed", False)) detail["warmed"] = warmed if require_warmed and not warmed: ok = False return ok, detail