Source code for sqlspec.observability._diagnostics

"""Diagnostics aggregation utilities for observability exports."""

from collections.abc import Iterable

from sqlspec.storage.pipeline import (
    StorageDiagnostics,
    StorageTelemetry,
    get_recent_storage_events,
    get_storage_bridge_diagnostics,
)

DiagnosticsPayload = dict[str, float | list[StorageTelemetry]]


def _increment_metric(payload: "dict[str, float]", metric: str, amount: float) -> None:
    payload[metric] = payload.get(metric, 0.0) + amount


[docs] class TelemetryDiagnostics: """Aggregates lifecycle counters, custom metrics, and storage telemetry.""" __slots__ = ("_lifecycle_sections", "_metrics")
[docs] def __init__(self) -> None: self._lifecycle_sections: list[tuple[str, dict[str, float]]] = [] self._metrics: StorageDiagnostics = {}
[docs] def add_lifecycle_snapshot(self, config_key: str, counters: dict[str, int]) -> None: """Store lifecycle counters for later snapshot generation.""" if not counters: return float_counters = {metric: float(value) for metric, value in counters.items()} self._lifecycle_sections.append((config_key, float_counters))
[docs] def add_metric_snapshot(self, metrics: StorageDiagnostics) -> None: """Store custom metric snapshots.""" for key, value in metrics.items(): if key in self._metrics: self._metrics[key] += value else: self._metrics[key] = value
[docs] def snapshot(self) -> "DiagnosticsPayload": """Return aggregated diagnostics payload.""" numeric_payload: dict[str, float] = {} for key, value in get_storage_bridge_diagnostics().items(): _increment_metric(numeric_payload, key, float(value)) for _prefix, counters in self._lifecycle_sections: for metric, value in counters.items(): _increment_metric(numeric_payload, metric, value) for metric, value in self._metrics.items(): _increment_metric(numeric_payload, metric, float(value)) payload: DiagnosticsPayload = dict(numeric_payload) recent_jobs = get_recent_storage_events() if recent_jobs: payload["storage_bridge.recent_jobs"] = recent_jobs return payload
def collect_diagnostics(sections: Iterable[tuple[str, dict[str, int]]]) -> DiagnosticsPayload: """Convenience helper for aggregating sections without constructing a class.""" diag = TelemetryDiagnostics() for prefix, counters in sections: diag.add_lifecycle_snapshot(prefix, counters) return diag.snapshot() __all__ = ("DiagnosticsPayload", "TelemetryDiagnostics", "collect_diagnostics")