Source code for sqlspec.adapters.sqlite.config

"""SQLite database configuration with thread-local connections."""

import uuid
from typing import TYPE_CHECKING, Any, ClassVar, TypedDict, cast

from typing_extensions import NotRequired

from sqlspec.adapters.sqlite._typing import SqliteConnection
from sqlspec.adapters.sqlite.core import apply_driver_features, build_connection_config, default_statement_config
from sqlspec.adapters.sqlite.driver import SqliteCursor, SqliteDriver, SqliteExceptionHandler, SqliteSessionContext
from sqlspec.adapters.sqlite.pool import SqliteConnectionPool
from sqlspec.adapters.sqlite.type_converter import register_type_handlers
from sqlspec.config import ExtensionConfigs, SyncDatabaseConfig
from sqlspec.utils.logging import get_logger

logger = get_logger("sqlspec.adapters.sqlite")

if TYPE_CHECKING:
    from collections.abc import Callable

    from sqlspec.core import StatementConfig
    from sqlspec.observability import ObservabilityConfig


class SqliteConnectionParams(TypedDict):
    """SQLite connection parameters."""

    database: NotRequired[str]
    timeout: NotRequired[float]
    detect_types: NotRequired[int]
    isolation_level: "NotRequired[str | None]"
    check_same_thread: NotRequired[bool]
    factory: "NotRequired[type[SqliteConnection] | None]"
    cached_statements: NotRequired[int]
    uri: NotRequired[bool]


class SqliteDriverFeatures(TypedDict):
    """SQLite driver feature configuration.

    Controls optional type handling and serialization features for SQLite connections.

    enable_custom_adapters: Enable custom type adapters for JSON/UUID/datetime conversion.
        Defaults to True for enhanced Python type support.
        Set to False only if you need pure SQLite behavior without type conversions.
    json_serializer: Custom JSON serializer function.
        Defaults to sqlspec.utils.serializers.to_json.
    json_deserializer: Custom JSON deserializer function.
        Defaults to sqlspec.utils.serializers.from_json.
    on_connection_create: Callback executed when a connection is created.
        Receives the raw sqlite3 connection for low-level driver configuration.
        Runs after internal setup (PRAGMA optimizations).
    enable_events: Enable database event channel support.
        Defaults to True when extension_config["events"] is configured.
        Provides pub/sub capabilities via table-backed queue (SQLite has no native pub/sub).
        Requires extension_config["events"] for migration setup.
    events_backend: Event channel backend selection.
        Only option: "table_queue" (durable table-backed queue with retries and exactly-once delivery).
        SQLite does not have native pub/sub, so table_queue is the only backend.
        Defaults to "table_queue".
    """

    enable_custom_adapters: NotRequired[bool]
    json_serializer: "NotRequired[Callable[[Any], str]]"
    json_deserializer: "NotRequired[Callable[[str], Any]]"
    on_connection_create: "NotRequired[Callable[[SqliteConnection], None]]"
    enable_events: NotRequired[bool]
    events_backend: NotRequired[str]


__all__ = ("SqliteConfig", "SqliteConnectionParams", "SqliteDriverFeatures")


class SqliteConnectionContext:
    """Context manager for Sqlite connections."""

    __slots__ = ("_config", "_ctx")

    def __init__(self, config: "SqliteConfig") -> None:
        self._config = config
        self._ctx: Any = None

    def __enter__(self) -> SqliteConnection:
        pool = self._config.provide_pool()
        self._ctx = pool.get_connection()
        return cast("SqliteConnection", self._ctx.__enter__())

    def __exit__(
        self, exc_type: "type[BaseException] | None", exc_val: "BaseException | None", exc_tb: Any
    ) -> bool | None:
        if self._ctx:
            return cast("bool | None", self._ctx.__exit__(exc_type, exc_val, exc_tb))
        return None


class _SqliteSessionConnectionHandler:
    __slots__ = ("_config", "_ctx")

    def __init__(self, config: "SqliteConfig") -> None:
        self._config = config
        self._ctx: Any = None

    def acquire_connection(self) -> "SqliteConnection":
        pool = self._config.provide_pool()
        self._ctx = pool.get_connection()
        return cast("SqliteConnection", self._ctx.__enter__())

    def release_connection(self, _conn: "SqliteConnection") -> None:
        if self._ctx is None:
            return
        self._ctx.__exit__(None, None, None)
        self._ctx = None


[docs] class SqliteConfig(SyncDatabaseConfig[SqliteConnection, SqliteConnectionPool, SqliteDriver]): """SQLite configuration with thread-local connections.""" driver_type: "ClassVar[type[SqliteDriver]]" = SqliteDriver connection_type: "ClassVar[type[SqliteConnection]]" = SqliteConnection supports_transactional_ddl: "ClassVar[bool]" = True supports_native_arrow_export: "ClassVar[bool]" = True supports_native_arrow_import: "ClassVar[bool]" = True supports_native_parquet_export: "ClassVar[bool]" = True supports_native_parquet_import: "ClassVar[bool]" = True
[docs] def __init__( self, *, connection_config: "SqliteConnectionParams | dict[str, Any] | None" = None, connection_instance: "SqliteConnectionPool | None" = None, migration_config: "dict[str, Any] | None" = None, statement_config: "StatementConfig | None" = None, driver_features: "SqliteDriverFeatures | dict[str, Any] | None" = None, bind_key: "str | None" = None, extension_config: "ExtensionConfigs | None" = None, observability_config: "ObservabilityConfig | None" = None, **kwargs: Any, ) -> None: """Initialize SQLite configuration. Args: connection_config: Configuration parameters including connection settings connection_instance: Pre-created pool instance migration_config: Migration configuration statement_config: Default SQL statement configuration driver_features: Optional driver feature configuration bind_key: Optional bind key for the configuration extension_config: Extension-specific configuration (e.g., Litestar plugin settings) observability_config: Adapter-level observability overrides for lifecycle hooks and observers **kwargs: Additional keyword arguments passed to the base configuration. """ config_dict: dict[str, Any] = dict(connection_config) if connection_config else {} if "database" not in config_dict or config_dict["database"] == ":memory:": config_dict["database"] = f"file:memory_{uuid.uuid4().hex}?mode=memory&cache=private" config_dict["uri"] = True elif "database" in config_dict: database_path = str(config_dict["database"]) if database_path.startswith("file:") and not config_dict.get("uri"): logger.debug( "Database URI detected (%s) but uri=True not set. " "Auto-enabling URI mode to prevent physical file creation.", database_path, ) config_dict["uri"] = True statement_config = statement_config or default_statement_config statement_config, driver_features = apply_driver_features(statement_config, driver_features) # Extract user connection hook before storing driver_features features_dict = dict(driver_features) if driver_features else {} self._user_connection_hook: Callable[[SqliteConnection], None] | None = features_dict.pop( "on_connection_create", None ) super().__init__( bind_key=bind_key, connection_instance=connection_instance, connection_config=config_dict, migration_config=migration_config, statement_config=statement_config, driver_features=features_dict, extension_config=extension_config, observability_config=observability_config, **kwargs, )
def _create_pool(self) -> SqliteConnectionPool: """Create connection pool from configuration.""" config_dict = build_connection_config(self.connection_config) pool_kwargs: dict[str, Any] = {} recycle_seconds = self.connection_config.get("pool_recycle_seconds") if recycle_seconds is not None: pool_kwargs["recycle_seconds"] = recycle_seconds health_check_interval = self.connection_config.get("health_check_interval") if health_check_interval is not None: pool_kwargs["health_check_interval"] = health_check_interval enable_optimizations = self.connection_config.get("enable_optimizations") if enable_optimizations is not None: pool_kwargs["enable_optimizations"] = enable_optimizations pool = SqliteConnectionPool( connection_parameters=config_dict, on_connection_create=self._user_connection_hook, **pool_kwargs ) if self.driver_features.get("enable_custom_adapters", False): self._register_type_adapters() return pool def _register_type_adapters(self) -> None: """Register custom type adapters and converters for SQLite. Called once during pool creation if enable_custom_adapters is True. Registers JSON serialization handlers if configured. """ if self.driver_features.get("enable_custom_adapters", False): register_type_handlers( json_serializer=self.driver_features.get("json_serializer"), json_deserializer=self.driver_features.get("json_deserializer"), ) def _close_pool(self) -> None: """Close the connection pool.""" if self.connection_instance: self.connection_instance.close()
[docs] def create_connection(self) -> SqliteConnection: """Get a SQLite connection from the pool. Returns: SqliteConnection: A connection from the pool """ pool = self.provide_pool() return pool.acquire()
[docs] def provide_connection(self, *args: "Any", **kwargs: "Any") -> "SqliteConnectionContext": """Provide a SQLite connection context manager. Args: *args: Additional arguments. **kwargs: Additional keyword arguments. Returns: A Sqlite connection context manager. """ return SqliteConnectionContext(self)
[docs] def provide_session( self, *_args: "Any", statement_config: "StatementConfig | None" = None, **_kwargs: "Any" ) -> "SqliteSessionContext": """Provide a SQLite driver session. Args: *_args: Additional arguments. statement_config: Optional statement configuration override. **_kwargs: Additional keyword arguments. Returns: A Sqlite driver session context manager. """ handler = _SqliteSessionConnectionHandler(self) return SqliteSessionContext( acquire_connection=handler.acquire_connection, release_connection=handler.release_connection, statement_config=statement_config or self.statement_config or default_statement_config, driver_features=self.driver_features, prepare_driver=self._prepare_driver, )
[docs] def get_signature_namespace(self) -> "dict[str, Any]": """Get the signature namespace for SQLite types. Returns: Dictionary mapping type names to types. """ namespace = super().get_signature_namespace() namespace.update({ "SqliteConnectionContext": SqliteConnectionContext, "SqliteConnection": SqliteConnection, "SqliteConnectionParams": SqliteConnectionParams, "SqliteConnectionPool": SqliteConnectionPool, "SqliteCursor": SqliteCursor, "SqliteDriver": SqliteDriver, "SqliteDriverFeatures": SqliteDriverFeatures, "SqliteExceptionHandler": SqliteExceptionHandler, "SqliteSessionContext": SqliteSessionContext, }) return namespace