Source code for sqlspec.observability._formatters._gcp

"""Google Cloud Platform log formatter."""

import os
from typing import Any, ClassVar

__all__ = ("GCPLogFormatter",)


[docs] class GCPLogFormatter: """Formatter for Google Cloud Logging structured format. Produces JSON-compatible dictionaries that conform to GCP's structured logging format, including: - severity field with GCP severity levels - logging.googleapis.com/trace for trace correlation - logging.googleapis.com/spanId for span tracking - logging.googleapis.com/labels for custom labels - logging.googleapis.com/sourceLocation for code location Example: ```python formatter = GCPLogFormatter(project_id="my-project") entry = formatter.format( "INFO", "Query executed", correlation_id="abc-123", trace_id="4bf92f3577b34da6a3ce929d0e0e4736", duration_ms=15.5, ) ``` Reference: https://cloud.google.com/logging/docs/structured-logging """ __slots__ = ("_project_id",) SEVERITY_MAP: ClassVar[dict[str, str]] = { "DEBUG": "DEBUG", "INFO": "INFO", "WARNING": "WARNING", "ERROR": "ERROR", "CRITICAL": "CRITICAL", }
[docs] def __init__(self, project_id: str | None = None) -> None: """Initialize GCP log formatter. Args: project_id: GCP project ID for trace URL construction. If not provided, attempts to read from GOOGLE_CLOUD_PROJECT environment variable. """ self._project_id = project_id or os.environ.get("GOOGLE_CLOUD_PROJECT")
@property def project_id(self) -> str | None: """Get the configured GCP project ID.""" return self._project_id
[docs] def format( self, level: str, message: str, *, correlation_id: str | None = None, trace_id: str | None = None, span_id: str | None = None, duration_ms: float | None = None, source_file: str | None = None, source_line: int | None = None, source_function: str | None = None, extra: dict[str, Any] | None = None, ) -> dict[str, Any]: """Format log entry for Google Cloud Logging. Args: level: Log level (DEBUG, INFO, WARNING, ERROR, CRITICAL). message: Log message. correlation_id: Request correlation ID. trace_id: Distributed trace ID. span_id: Span ID within the trace. duration_ms: Operation duration in milliseconds. source_file: Source file path. source_line: Source line number. source_function: Source function name. extra: Additional fields to include in the log entry. Returns: Dictionary formatted for GCP structured logging. """ entry: dict[str, Any] = {"severity": self.SEVERITY_MAP.get(level.upper(), "DEFAULT"), "message": message} if trace_id and self._project_id: entry["logging.googleapis.com/trace"] = f"projects/{self._project_id}/traces/{trace_id}" if span_id: entry["logging.googleapis.com/spanId"] = span_id if correlation_id: entry.setdefault("logging.googleapis.com/labels", {}) entry["logging.googleapis.com/labels"]["correlation_id"] = correlation_id if source_file or source_line is not None or source_function: source_location: dict[str, str] = {} if source_file: source_location["file"] = source_file if source_line is not None: source_location["line"] = str(source_line) if source_function: source_location["function"] = source_function entry["logging.googleapis.com/sourceLocation"] = source_location if duration_ms is not None: entry["duration_ms"] = duration_ms if extra: entry.update(extra) return entry
def __hash__(self) -> int: return hash((self.__class__.__name__, self._project_id)) def __repr__(self) -> str: return f"GCPLogFormatter(project_id={self._project_id!r})" def __eq__(self, other: object) -> bool: if not isinstance(other, GCPLogFormatter): return NotImplemented return self._project_id == other._project_id