Events

Pub/sub event channel system with database-backed queue support. Provides both sync and async channels with listener management and native backend integration for databases that support LISTEN/NOTIFY.

Channels

class sqlspec.extensions.events.AsyncEventChannel[source]

Bases: object

Event channel for asynchronous database configurations.

__init__(config)[source]
async publish(channel, payload, metadata=None)[source]

Publish an event to a channel.

Return type:

str

async iter_events(channel, *, poll_interval=None)[source]

Yield events as they become available.

Return type:

AsyncIterator[EventMessage]

listen(channel, handler, *, poll_interval=None, auto_ack=True)[source]

Start an async task that delivers events to handler.

Return type:

AsyncEventListener

async stop_listener(listener_id)[source]

Stop a running listener.

Return type:

None

async ack(event_id)[source]

Acknowledge an event.

Return type:

None

async nack(event_id)[source]

Return an event to the queue for redelivery.

Return type:

None

async shutdown()[source]

Shutdown the event channel and release backend resources.

Return type:

None

class sqlspec.extensions.events.SyncEventChannel[source]

Bases: object

Event channel for synchronous database configurations.

__init__(config)[source]
publish(channel, payload, metadata=None)[source]

Publish an event to a channel.

Return type:

str

iter_events(channel, *, poll_interval=None)[source]

Yield events as they become available.

Return type:

Iterator[EventMessage]

listen(channel, handler, *, poll_interval=None, auto_ack=True)[source]

Start a background thread that invokes handler for each event.

Return type:

SyncEventListener

stop_listener(listener_id)[source]

Stop a running listener.

Return type:

None

ack(event_id)[source]

Acknowledge an event.

Return type:

None

nack(event_id)[source]

Return an event to the queue for redelivery.

Return type:

None

shutdown()[source]

Shutdown the event channel and release backend resources.

Return type:

None

Listeners

class sqlspec.extensions.events.AsyncEventListener[source]

Bases: object

Represents a running async listener task.

async stop()[source]

Signal the listener to stop and await task completion.

Return type:

None

__init__(id, channel, task, stop_event, poll_interval)
class sqlspec.extensions.events.SyncEventListener[source]

Bases: object

Represents a running sync listener thread.

stop()[source]

Signal the listener to stop and join the thread.

Return type:

None

__init__(id, channel, thread, stop_event, poll_interval)

Event Queue

class sqlspec.extensions.events.AsyncTableEventQueue[source]

Bases: _BaseTableEventQueue

Async table queue implementation.

async shutdown()[source]

Shutdown the backend (no-op for table queue).

Return type:

None

class sqlspec.extensions.events.SyncTableEventQueue[source]

Bases: _BaseTableEventQueue

Sync table queue implementation.

shutdown()[source]

Shutdown the backend (no-op for table queue).

Return type:

None

sqlspec.extensions.events.build_queue_backend(config, extension_settings=None, *, adapter_name=None, hints=None)[source]

Build a table queue backend using adapter hints and extension overrides.

Return type:

SyncTableEventQueue | AsyncTableEventQueue

Store

class sqlspec.extensions.events.BaseEventQueueStore[source]

Bases: ABC, Generic[ConfigT]

Base class for adapter-specific event queue DDL generators.

This class provides a hook-based pattern for DDL generation. Adapters only need to override _column_types() and optionally any hook methods for dialect-specific variations:

  • _string_type(length): String type syntax (default: VARCHAR(N))

  • _integer_type(): Integer type syntax (default: INTEGER)

  • _timestamp_default(): Timestamp default expression (default: CURRENT_TIMESTAMP)

  • _primary_key_syntax(): Inline PRIMARY KEY clause (default: empty, PK on column)

  • _table_clause(): Additional table options (default: empty)

For complex dialects (Oracle PL/SQL, BigQuery CLUSTER BY), adapters may override _build_create_table_sql() directly.

__init__(config)[source]
property table_name: str

Return the configured queue table name.

property settings: dict[str, TypeAliasForwardRef('typing.Any')]

Return extension settings for adapters to inspect.

create_statements()[source]

Return statements required to create the queue table and indexes.

Return type:

list[str]

drop_statements()[source]

Return statements required to drop queue artifacts.

Return type:

list[str]

Models

class sqlspec.extensions.events.EventMessage[source]

Bases: object

Structured payload delivered to event handlers.

__init__(event_id, channel, payload, metadata, attempts, available_at, lease_expires_at, created_at)
class sqlspec.extensions.events.EventRuntimeHints[source]

Bases: object

Adapter-specific defaults for event polling and leases.

__init__(poll_interval=1.0, lease_seconds=30, retention_seconds=86400, select_for_update=False, skip_locked=False, json_passthrough=False)

Protocols

class sqlspec.extensions.events.AsyncEventBackendProtocol[source]

Bases: Protocol

Protocol for async event backends.

All async event backends (native or queue-based) must implement these methods.

async publish(channel, payload, metadata=None)[source]

Publish an event to a channel.

Parameters:
  • channel (str) – Target channel name.

  • payload (dict[str, typing.Any]) – Event payload (must be JSON-serializable).

  • metadata (dict[str, typing.Any] | None) – Optional metadata dict.

Return type:

str

Returns:

The event ID.

async dequeue(channel, poll_interval)[source]

Dequeue an event from the channel.

Parameters:
  • channel (str) – Channel name to listen on.

  • poll_interval (float) – Timeout in seconds to wait for a notification.

Return type:

EventMessage | None

Returns:

EventMessage if a notification was received, None otherwise.

async ack(event_id)[source]

Acknowledge an event.

Parameters:

event_id (str) – ID of the event to acknowledge.

Return type:

None

async nack(event_id)[source]

Return an event to the queue for redelivery.

Parameters:

event_id (str) – ID of the event to return.

Return type:

None

async shutdown()[source]

Shutdown the backend and release resources.

Return type:

None

__init__(*args, **kwargs)
class sqlspec.extensions.events.SyncEventBackendProtocol[source]

Bases: Protocol

Protocol for sync event backends.

All sync event backends (native or queue-based) must implement these methods.

publish(channel, payload, metadata=None)[source]

Publish an event to a channel.

Parameters:
  • channel (str) – Target channel name.

  • payload (dict[str, typing.Any]) – Event payload (must be JSON-serializable).

  • metadata (dict[str, typing.Any] | None) – Optional metadata dict.

Return type:

str

Returns:

The event ID.

dequeue(channel, poll_interval)[source]

Dequeue an event from the channel.

Parameters:
  • channel (str) – Channel name to listen on.

  • poll_interval (float) – Timeout in seconds to wait for a notification.

Return type:

EventMessage | None

Returns:

EventMessage if a notification was received, None otherwise.

ack(event_id)[source]

Acknowledge an event.

Parameters:

event_id (str) – ID of the event to acknowledge.

Return type:

None

nack(event_id)[source]

Return an event to the queue for redelivery.

Parameters:

event_id (str) – ID of the event to return.

Return type:

None

shutdown()[source]

Shutdown the backend and release resources.

Return type:

None

__init__(*args, **kwargs)
class sqlspec.extensions.events.AsyncEventHandler[source]

Bases: Protocol

Protocol describing async event handler callables.

async __call__(message)[source]

Process a queued event message asynchronously.

Return type:

Any

__init__(*args, **kwargs)
class sqlspec.extensions.events.SyncEventHandler[source]

Bases: Protocol

Protocol describing sync event handler callables.

__call__(message)[source]

Process a queued event message synchronously.

Return type:

Any

__init__(*args, **kwargs)

Payload Helpers

sqlspec.extensions.events.encode_notify_payload(event_id, payload, metadata)[source]

Encode event data as JSON for NOTIFY payload.

Raises:

EventChannelError – If the encoded payload exceeds PostgreSQL’s 8KB limit.

Return type:

str

sqlspec.extensions.events.decode_notify_payload(channel, payload)[source]

Decode JSON payload from NOTIFY into an EventMessage.

Return type:

EventMessage

sqlspec.extensions.events.parse_event_timestamp(value)[source]

Parse a timestamp value into a timezone-aware datetime.

Handles ISO format strings, datetime objects, and falls back to current UTC time for invalid or missing values.

Return type:

datetime

Utility Functions

sqlspec.extensions.events.load_native_backend(config, backend_name, extension_settings)[source]

Load adapter-specific native backend if available.

Return type:

Any | None

sqlspec.extensions.events.resolve_poll_interval(poll_interval, default)[source]

Resolve poll interval with validation.

Return type:

float

sqlspec.extensions.events.normalize_event_channel_name(name)[source]

Validate event channel identifiers and return normalized name.

Return type:

str

sqlspec.extensions.events.normalize_queue_table_name(name)[source]

Validate schema-qualified identifiers and return normalized name.

Return type:

str