Observability

Telemetry, logging, and diagnostic infrastructure for monitoring SQL operations. Supports OpenTelemetry, Prometheus, and structured cloud logging formats.

Configuration

class sqlspec.observability.ObservabilityConfig[source]

Bases: object

Aggregates lifecycle hooks, observers, and telemetry toggles.

__init__(*, lifecycle=None, print_sql=None, statement_observers=None, telemetry=None, redaction=None, logging=None, sampling=None, cloud_formatter=None)[source]
copy()[source]

Return a deep copy of the configuration.

Return type:

ObservabilityConfig

classmethod merge(base_config, override_config)[source]

Merge registry-level and adapter-level configuration objects.

Return type:

ObservabilityConfig

class sqlspec.observability.TelemetryConfig[source]

Bases: object

Span emission and tracer provider settings.

__init__(*, enable_spans=False, provider_factory=None, resource_attributes=None)[source]
copy()[source]

Return a shallow copy preserving optional dictionaries.

Return type:

TelemetryConfig

class sqlspec.observability.LoggingConfig[source]

Bases: object

Controls log output format and verbosity.

__init__(*, include_sql_hash=True, sql_truncation_length=2000, parameter_truncation_count=100, include_trace_context=True, include_driver_name=False)[source]
copy()[source]

Return a shallow copy of the logging configuration.

Return type:

LoggingConfig

class sqlspec.observability.RedactionConfig[source]

Bases: object

Controls SQL and parameter redaction before observers run.

__init__(*, mask_parameters=None, mask_literals=None, parameter_allow_list=None)[source]
copy()[source]

Return a copy to avoid sharing mutable state.

Return type:

RedactionConfig

class sqlspec.observability.SamplingConfig[source]

Bases: object

Configuration for log and metric sampling.

Controls when observability data (logs, spans, metrics) is emitted. Supports both random and deterministic sampling modes, with force-sample conditions for errors and slow queries.

sample_rate

Probability of sampling (0.0 to 1.0). 1.0 means always sample.

force_sample_on_error

If True, always sample when an error occurs.

force_sample_slow_queries_ms

If set, always sample queries slower than this threshold.

deterministic

If True, use hash-based sampling for consistency across distributed systems.

Example

```python # Sample 10% of requests, but always sample errors and slow queries config = SamplingConfig(

sample_rate=0.1, force_sample_on_error=True, force_sample_slow_queries_ms=100.0, deterministic=True,

)

# Check if a request should be sampled if config.should_sample(

correlation_id=”abc-123”, is_error=False, duration_ms=50.0,

):

emit_logs()

```

DEFAULT_SAMPLE_RATE: ClassVar[float] = 1.0

Default sample rate (100% - sample everything).

DEFAULT_SLOW_QUERY_THRESHOLD_MS: ClassVar[float] = 100.0

Default threshold in milliseconds for slow query detection.

HASH_MODULUS: ClassVar[int] = 10000

Modulus for deterministic hash-based sampling.

__init__(*, sample_rate=1.0, force_sample_on_error=True, force_sample_slow_queries_ms=100.0, deterministic=True)[source]

Initialize sampling configuration.

Parameters:
  • sample_rate (float) – Probability of sampling (0.0 to 1.0). Values outside this range are clamped. Defaults to 1.0 (always sample).

  • force_sample_on_error (bool) – If True, always sample when an error occurs. Defaults to True.

  • force_sample_slow_queries_ms (float | None) – If set, always sample queries that take longer than this threshold in milliseconds. Defaults to 100.0ms. Set to None to disable.

  • deterministic (bool) – If True, use hash-based sampling that produces consistent results for the same correlation ID across distributed systems. If False, use random sampling. Defaults to True.

should_sample(correlation_id=None, *, is_error=False, duration_ms=None, force=False)[source]

Determine if this request should be sampled.

Evaluates force-sample conditions first (errors, slow queries, explicit force), then falls back to rate-based sampling.

Parameters:
  • correlation_id (str | None) – The correlation ID for deterministic sampling. If None and deterministic=True, falls back to random sampling.

  • is_error (bool) – Whether this request resulted in an error.

  • duration_ms (float | None) – Query duration in milliseconds, if known.

  • force (bool) – Explicit force-sample flag from application code.

Return type:

bool

Returns:

True if the request should be sampled, False otherwise.

Example

```python # Always sampled due to error config.should_sample(is_error=True) # True

# Always sampled due to slow query (>100ms default) config.should_sample(duration_ms=150.0) # True

# Rate-based sampling config.should_sample(

correlation_id=”abc-123”

) # depends on rate ```

copy()[source]

Return a copy of the sampling configuration.

Return type:

SamplingConfig

Returns:

A new SamplingConfig instance with the same values.

Runtime

class sqlspec.observability.ObservabilityRuntime[source]

Bases: object

Aggregates dispatchers, observers, spans, and custom metrics.

__init__(config=None, *, bind_key=None, config_name=None)[source]
property is_idle: bool

Return True when no observability features are active.

A runtime is idle if it has no lifecycle hooks, no statement observers, and telemetry spans are disabled. Drivers can use this to skip expensive context construction.

property has_statement_observers: bool

Return True when any observers are registered.

property diagnostics_key: str

Derive diagnostics key from bind key or configuration name.

base_context()[source]

Return the base payload for lifecycle events.

Return type:

dict[str, Any]

lifecycle_snapshot()[source]

Return lifecycle counters keyed under the diagnostics prefix.

Return type:

dict[str, int]

metrics_snapshot()[source]

Return accumulated custom metrics with diagnostics prefix.

Return type:

dict[str, float]

increment_metric(name, amount=1.0)[source]

Increment a custom metric counter.

Return type:

None

record_metric(name, value)[source]

Set a custom metric to an explicit value.

Return type:

None

start_migration_span(event, *, version=None, metadata=None)[source]

Start a migration span when telemetry is enabled.

Return type:

Any

end_migration_span(span, *, duration_ms=None, error=None)[source]

Finish a migration span, attaching optional duration metadata.

Return type:

None

emit_statement_event(*, sql, parameters, driver, operation, execution_mode, is_many, is_script, rows_affected, duration_s, storage_backend, started_at=None)[source]

Emit a statement event to all registered observers.

Return type:

None

start_query_span(sql, operation, driver)[source]

Start a query span with runtime metadata.

Return type:

Any

start_storage_span(operation, *, destination=None, format_label=None)[source]

Start a storage bridge span for read/write operations.

Return type:

Any

start_span(name, *, attributes=None)[source]

Start a custom span enriched with configuration context.

Return type:

Any

end_span(span, *, error=None)[source]

Finish a custom span.

Return type:

None

end_storage_span(span, *, telemetry=None, error=None)[source]

Finish a storage span, attaching telemetry metadata when available.

Return type:

None

annotate_storage_telemetry(telemetry)[source]

Add bind key / config / correlation metadata to telemetry payloads.

Return type:

StorageTelemetry

Statement Observer

class sqlspec.observability.StatementEvent[source]

Bases: object

Structured payload describing a SQL execution.

__init__(*, sql, parameters, driver, adapter, bind_key, db_system, operation, execution_mode, is_many, is_script, rows_affected, duration_s, started_at, correlation_id, storage_backend, sql_hash, sql_truncated, sql_original_length, transaction_state, prepared_statement, trace_id, span_id, sampled=True)[source]
as_dict()[source]

Return event payload as a dictionary.

Return type:

dict[str, typing.Any]

sqlspec.observability.create_statement_observer(logging_config)[source]

Create a statement observer bound to the provided logging config.

Return type:

Callable[[StatementEvent], None]

sqlspec.observability.default_statement_observer(event)[source]

Log statement execution payload when no custom observer is supplied.

Return type:

None

sqlspec.observability.create_event(*, sql, parameters, driver, adapter, bind_key, operation, execution_mode, is_many, is_script, rows_affected, duration_s, correlation_id, storage_backend=None, started_at=None, db_system=None, sql_hash=None, sql_truncated=False, sql_original_length=None, transaction_state=None, prepared_statement=None, trace_id=None, span_id=None, sampled=True)[source]

Factory helper used by runtime to build statement events.

Return type:

StatementEvent

sqlspec.observability.format_statement_event(event)[source]

Create a concise human-readable representation of a statement event.

Return type:

str

Span Management

class sqlspec.observability.SpanManager[source]

Bases: object

Lazy OpenTelemetry span manager with graceful degradation.

__init__(telemetry=None)[source]
property is_enabled: bool

Return True once OpenTelemetry spans are available.

start_query_span(*, driver, adapter, bind_key, sql, operation, connection_info=None, storage_backend=None, correlation_id=None)[source]

Start a query span with SQLSpec semantic attributes.

Return type:

Any

start_span(name, attributes=None)[source]

Start a generic span when instrumentation needs a custom name.

Return type:

Any

end_span(span, error=None)[source]

Close a span and record errors when provided.

Return type:

None

Diagnostics

class sqlspec.observability.TelemetryDiagnostics[source]

Bases: object

Aggregates lifecycle counters, custom metrics, and storage telemetry.

__init__()[source]
add_lifecycle_snapshot(config_key, counters)[source]

Store lifecycle counters for later snapshot generation.

Return type:

None

add_metric_snapshot(metrics)[source]

Store custom metric snapshots.

Return type:

None

snapshot()[source]

Return aggregated diagnostics payload.

Return type:

dict[str, float | list[StorageTelemetry]]

Lifecycle

class sqlspec.observability.LifecycleDispatcher[source]

Bases: object

Dispatches lifecycle hooks with guard flags and diagnostics counters.

__init__(hooks=None)[source]
property is_enabled: bool

Return True when at least one hook is registered.

emit_pool_create(context)[source]

Fire pool creation hooks.

Return type:

None

emit_pool_destroy(context)[source]

Fire pool destruction hooks.

Return type:

None

emit_connection_create(context)[source]

Fire connection creation hooks.

Return type:

None

emit_connection_destroy(context)[source]

Fire connection teardown hooks.

Return type:

None

emit_session_start(context)[source]

Fire session start hooks.

Return type:

None

emit_session_end(context)[source]

Fire session end hooks.

Return type:

None

emit_query_start(context)[source]

Fire query start hooks.

Return type:

None

emit_query_complete(context)[source]

Fire query completion hooks.

Return type:

None

emit_error(context)[source]

Fire error hooks with failure context.

Return type:

None

snapshot(*, prefix=None)[source]

Return counter snapshot keyed for diagnostics export.

Return type:

dict[str, int]

Log Formatters

class sqlspec.observability.OTelConsoleFormatter[source]

Bases: Formatter

Console formatter that orders OTel-aligned fields consistently.

__init__(datefmt=None)[source]

Initialize the formatter with specified format strings.

Initialize the formatter either with the specified format string, or a default as described above. Allow for specialized date formatting with the optional datefmt argument. If datefmt is omitted, you get an ISO8601-like (or RFC 3339-like) format.

Use a style parameter of ‘%’, ‘{’ or ‘$’ to specify that you want to use one of %-formatting, str.format() ({}) formatting or string.Template formatting in your format string.

Changed in version 3.2: Added the style parameter.

format(record)[source]

Format the specified record as text.

The record’s attribute dictionary is used as the operand to a string formatting operation which yields the returned string. Before formatting the dictionary, a couple of preparatory steps are carried out. The message attribute of the record is computed using LogRecord.getMessage(). If the formatting string uses the time (as determined by a call to usesTime(), formatTime() is called to format the event time. If there is exception information, it is formatted using formatException() and appended to the message.

Return type:

str

class sqlspec.observability.OTelJSONFormatter[source]

Bases: StructuredFormatter

Structured JSON formatter for OTel-aligned log fields.

Cloud Log Formatters

class sqlspec.observability.AWSLogFormatter[source]

Bases: object

Formatter for AWS CloudWatch Logs structured format.

Produces JSON-compatible dictionaries that conform to AWS CloudWatch structured logging conventions, including: - level field with AWS log level conventions - requestId for correlation ID - xray_trace_id for X-Ray integration - ISO 8601 timestamp

Example

```python formatter = AWSLogFormatter() entry = formatter.format(

“INFO”, “Query executed”, correlation_id=”abc-123”, trace_id=”1-5f84c7a1-sample”, duration_ms=15.5,

Reference:

https://docs.aws.amazon.com/AmazonCloudWatch/latest/logs/AnalyzingLogData.html

format(level, message, *, correlation_id=None, trace_id=None, span_id=None, duration_ms=None, extra=None)[source]

Format log entry for AWS CloudWatch.

Parameters:
  • level (str) – Log level (DEBUG, INFO, WARNING, ERROR, CRITICAL).

  • message (str) – Log message.

  • correlation_id (str | None) – Request correlation ID (maps to requestId).

  • trace_id (str | None) – X-Ray trace ID.

  • span_id (str | None) – X-Ray segment ID.

  • duration_ms (float | None) – Operation duration in milliseconds.

  • extra (dict[str, Any] | None) – Additional fields to include in the log entry.

Return type:

dict[str, Any]

Returns:

Dictionary formatted for AWS CloudWatch structured logging.

class sqlspec.observability.GCPLogFormatter[source]

Bases: object

Formatter for Google Cloud Logging structured format.

Produces JSON-compatible dictionaries that conform to GCP’s structured logging format, including: - severity field with GCP severity levels - logging.googleapis.com/trace for trace correlation - logging.googleapis.com/spanId for span tracking - logging.googleapis.com/labels for custom labels - logging.googleapis.com/sourceLocation for code location

Example

```python formatter = GCPLogFormatter(project_id=”my-project”) entry = formatter.format(

“INFO”, “Query executed”, correlation_id=”abc-123”, trace_id=”4bf92f3577b34da6a3ce929d0e0e4736”, duration_ms=15.5,

Reference:

https://cloud.google.com/logging/docs/structured-logging

__init__(project_id=None)[source]

Initialize GCP log formatter.

Parameters:

project_id (str | None) – GCP project ID for trace URL construction. If not provided, attempts to read from GOOGLE_CLOUD_PROJECT environment variable.

property project_id: str | None

Get the configured GCP project ID.

format(level, message, *, correlation_id=None, trace_id=None, span_id=None, duration_ms=None, source_file=None, source_line=None, source_function=None, extra=None)[source]

Format log entry for Google Cloud Logging.

Parameters:
  • level (str) – Log level (DEBUG, INFO, WARNING, ERROR, CRITICAL).

  • message (str) – Log message.

  • correlation_id (str | None) – Request correlation ID.

  • trace_id (str | None) – Distributed trace ID.

  • span_id (str | None) – Span ID within the trace.

  • duration_ms (float | None) – Operation duration in milliseconds.

  • source_file (str | None) – Source file path.

  • source_line (int | None) – Source line number.

  • source_function (str | None) – Source function name.

  • extra (dict[str, Any] | None) – Additional fields to include in the log entry.

Return type:

dict[str, Any]

Returns:

Dictionary formatted for GCP structured logging.

class sqlspec.observability.AzureLogFormatter[source]

Bases: object

Formatter for Azure Monitor / Application Insights structured format.

Produces JSON-compatible dictionaries that conform to Azure Monitor structured logging conventions, including: - severityLevel with numeric severity (0-4) - operation_Id for trace correlation - operation_ParentId for span tracking - properties dict for custom fields

Example

```python formatter = AzureLogFormatter() entry = formatter.format(

“INFO”, “Query executed”, correlation_id=”abc-123”, trace_id=”4bf92f3577b34da6a3ce929d0e0e4736”, duration_ms=15.5,

Reference:

https://docs.microsoft.com/en-us/azure/azure-monitor/app/data-model

format(level, message, *, correlation_id=None, trace_id=None, span_id=None, duration_ms=None, extra=None)[source]

Format log entry for Azure Monitor.

Parameters:
  • level (str) – Log level (DEBUG, INFO, WARNING, ERROR, CRITICAL).

  • message (str) – Log message.

  • correlation_id (str | None) – Request correlation ID.

  • trace_id (str | None) – Operation ID for distributed tracing.

  • span_id (str | None) – Parent operation ID.

  • duration_ms (float | None) – Operation duration in milliseconds.

  • extra (dict[str, Any] | None) – Additional fields to include in properties.

Return type:

dict[str, Any]

Returns:

Dictionary formatted for Azure Monitor structured logging.

Prometheus

class sqlspec.extensions.prometheus.PrometheusStatementObserver[source]

Bases: object

Statement observer that records Prometheus metrics.

__init__(*, namespace='sqlspec', subsystem='driver', registry=None, label_names=('db_system', 'operation'), duration_buckets=None)[source]

Helper Functions

sqlspec.observability.compute_sql_hash(sql)[source]

Return the 16-character SHA256 hash for SQL text.

Parameters:

sql (str) – SQL statement text.

Return type:

str

Returns:

SHA256 hash prefix.

sqlspec.observability.get_trace_context()[source]

Extract trace_id and span_id from the current OTel context.

Return type:

tuple[str | None, str | None]

Returns:

Tuple of trace_id and span_id if available, otherwise (None, None).

sqlspec.observability.resolve_db_system(adapter_or_driver)[source]

Resolve adapter/driver name to OTel db.system value.

Parameters:

adapter_or_driver (str) – Adapter or driver name.

Return type:

str

Returns:

OTel db.system value.