Source code for sqlspec.extensions.events._store

"""Base classes for adapter-specific event queue stores."""

import re
from abc import ABC, abstractmethod
from typing import TYPE_CHECKING, Any, Generic, TypeVar, cast

from sqlspec.exceptions import EventChannelError

if TYPE_CHECKING:
    from sqlspec.config import DatabaseConfigProtocol

ConfigT = TypeVar("ConfigT", bound="DatabaseConfigProtocol[Any, Any, Any]")

__all__ = ("BaseEventQueueStore", "normalize_event_channel_name", "normalize_queue_table_name")

_IDENTIFIER_PATTERN = re.compile(r"^[A-Za-z_][A-Za-z0-9_]*$")


[docs] def normalize_queue_table_name(name: str) -> str: """Validate schema-qualified identifiers and return normalized name.""" segments = name.split(".") for segment in segments: if not _IDENTIFIER_PATTERN.match(segment): msg = f"Invalid events table name: {name}" raise EventChannelError(msg) return name
[docs] def normalize_event_channel_name(name: str) -> str: """Validate event channel identifiers and return normalized name.""" if not _IDENTIFIER_PATTERN.match(name): msg = f"Invalid events channel name: {name}" raise EventChannelError(msg) return name
[docs] class BaseEventQueueStore(ABC, Generic[ConfigT]): """Base class for adapter-specific event queue DDL generators. This class provides a hook-based pattern for DDL generation. Adapters only need to override `_column_types()` and optionally any hook methods for dialect-specific variations: - `_string_type(length)`: String type syntax (default: VARCHAR(N)) - `_integer_type()`: Integer type syntax (default: INTEGER) - `_timestamp_default()`: Timestamp default expression (default: CURRENT_TIMESTAMP) - `_primary_key_syntax()`: Inline PRIMARY KEY clause (default: empty, PK on column) - `_table_clause()`: Additional table options (default: empty) For complex dialects (Oracle PL/SQL, BigQuery CLUSTER BY), adapters may override `_build_create_table_sql()` directly. """ __slots__ = ("_config", "_extension_settings", "_table_name")
[docs] def __init__(self, config: ConfigT) -> None: self._config = config extension_config = cast("dict[str, Any]", config.extension_config) self._extension_settings = cast("dict[str, Any]", extension_config.get("events", {})) table_name = self._extension_settings.get("queue_table", "sqlspec_event_queue") self._table_name = normalize_queue_table_name(str(table_name))
@property def table_name(self) -> str: """Return the configured queue table name.""" return self._table_name @property def settings(self) -> "dict[str, Any]": """Return extension settings for adapters to inspect.""" return self._extension_settings
[docs] def create_statements(self) -> "list[str]": """Return statements required to create the queue table and indexes.""" statements = [self._wrap_create_statement(self._build_create_table_sql(), "table")] index_statement = self._build_index_sql() if index_statement: statements.append(self._wrap_create_statement(index_statement, "index")) return statements
[docs] def drop_statements(self) -> "list[str]": """Return statements required to drop queue artifacts.""" return [self._wrap_drop_statement(f"DROP TABLE {self.table_name}")]
def _string_type(self, length: int) -> str: """Return string type syntax for the given length. Override for dialects with different string type syntax. Args: length: Maximum string length. Returns: String type declaration (e.g., VARCHAR(64), STRING(64)). """ return f"VARCHAR({length})" def _integer_type(self) -> str: """Return integer type syntax. Override for dialects with different integer type syntax. Returns: Integer type declaration (e.g., INTEGER, INT64). """ return "INTEGER" def _timestamp_default(self) -> str: """Return timestamp default expression. Override for dialects requiring different default syntax. Returns: Default timestamp expression (e.g., CURRENT_TIMESTAMP, CURRENT_TIMESTAMP(6)). """ return "CURRENT_TIMESTAMP" def _primary_key_syntax(self) -> str: """Return inline PRIMARY KEY clause for table definition. Override for dialects that require PRIMARY KEY at the end of CREATE TABLE instead of on the column definition (e.g., Spanner). Returns: Empty string for column-level PK, or " PRIMARY KEY (event_id)" for table-level. """ return "" def _build_create_table_sql(self) -> str: """Build CREATE TABLE SQL using hook methods. Most adapters should NOT override this method. Instead, override the hook methods (_string_type, _integer_type, _timestamp_default, etc.) for dialect-specific variations. Only override this method for complex dialects that require entirely different DDL structure (e.g., Oracle PL/SQL blocks, BigQuery CLUSTER BY). """ payload_type, metadata_type, timestamp_type = self._column_types() string_64 = self._string_type(64) string_128 = self._string_type(128) string_32 = self._string_type(32) integer_type = self._integer_type() ts_default = self._timestamp_default() pk_inline = self._primary_key_syntax() table_clause = self._table_clause() pk_column = " PRIMARY KEY" if not pk_inline else "" return ( f"CREATE TABLE {self.table_name} (" f"event_id {string_64}{pk_column}," f" channel {string_128} NOT NULL," f" payload_json {payload_type} NOT NULL," f" metadata_json {metadata_type}," f" status {string_32} NOT NULL DEFAULT 'pending'," f" available_at {timestamp_type} NOT NULL DEFAULT {ts_default}," f" lease_expires_at {timestamp_type}," f" attempts {integer_type} NOT NULL DEFAULT 0," f" created_at {timestamp_type} NOT NULL DEFAULT {ts_default}," f" acknowledged_at {timestamp_type}" f"){pk_inline}{table_clause}" ) def _build_index_sql(self) -> str | None: """Build CREATE INDEX SQL for queue operations.""" index_name = self._index_name() return f"CREATE INDEX {index_name} ON {self.table_name}(channel, status, available_at)" def _table_clause(self) -> str: """Return additional table options clause. Override for dialects that need options after the column definitions (e.g., BigQuery CLUSTER BY, Oracle INMEMORY). """ return "" def _index_name(self) -> str: """Return the index name for the queue table.""" return f"idx_{self.table_name.replace('.', '_')}_channel_status" def _wrap_create_statement(self, statement: str, object_type: str) -> str: """Wrap CREATE statement with IF NOT EXISTS. Override for dialects that don't support IF NOT EXISTS (e.g., Spanner). """ if object_type == "table": return statement.replace("CREATE TABLE", "CREATE TABLE IF NOT EXISTS", 1) if object_type == "index": return statement.replace("CREATE INDEX", "CREATE INDEX IF NOT EXISTS", 1) return statement def _wrap_drop_statement(self, statement: str) -> str: """Wrap DROP statement with IF EXISTS. Override for dialects that don't support IF EXISTS (e.g., Spanner). """ return statement.replace("DROP TABLE", "DROP TABLE IF EXISTS", 1) @abstractmethod def _column_types(self) -> "tuple[str, str, str]": """Return payload, metadata, and timestamp column types for the adapter. Args: None Returns: Tuple of (payload_type, metadata_type, timestamp_type). """