Source code for sqlspec.adapters.duckdb.config

"""DuckDB database configuration with connection pooling."""

from collections.abc import Callable, Sequence
from contextlib import contextmanager
from typing import TYPE_CHECKING, Any, ClassVar, TypedDict, cast

from typing_extensions import NotRequired

from sqlspec.adapters.duckdb._types import DuckDBConnection
from sqlspec.adapters.duckdb.driver import (
    DuckDBCursor,
    DuckDBDriver,
    DuckDBExceptionHandler,
    build_duckdb_statement_config,
)
from sqlspec.adapters.duckdb.pool import DuckDBConnectionPool
from sqlspec.config import ExtensionConfigs, SyncDatabaseConfig
from sqlspec.observability import ObservabilityConfig
from sqlspec.utils.serializers import to_json

if TYPE_CHECKING:
    from collections.abc import Callable, Generator

    from sqlspec.core import StatementConfig
__all__ = (
    "DuckDBConfig",
    "DuckDBConnectionParams",
    "DuckDBDriverFeatures",
    "DuckDBExtensionConfig",
    "DuckDBPoolParams",
    "DuckDBSecretConfig",
)
EXTENSION_FLAG_KEYS: "tuple[str, ...]" = (
    "allow_community_extensions",
    "allow_unsigned_extensions",
    "enable_external_access",
)


class DuckDBConnectionParams(TypedDict):
    """DuckDB connection parameters.

    Mirrors the keyword arguments accepted by duckdb.connect so callers can drive every DuckDB
    configuration switch directly through SQLSpec. All keys are optional and forwarded verbatim
    to DuckDB, either as top-level parameters or via the nested ``config`` dictionary when DuckDB
    expects them there.
    """

    database: NotRequired[str]
    read_only: NotRequired[bool]
    config: NotRequired[dict[str, Any]]
    memory_limit: NotRequired[str]
    threads: NotRequired[int]
    temp_directory: NotRequired[str]
    max_temp_directory_size: NotRequired[str]
    autoload_known_extensions: NotRequired[bool]
    autoinstall_known_extensions: NotRequired[bool]
    allow_community_extensions: NotRequired[bool]
    allow_unsigned_extensions: NotRequired[bool]
    extension_directory: NotRequired[str]
    custom_extension_repository: NotRequired[str]
    autoinstall_extension_repository: NotRequired[str]
    allow_persistent_secrets: NotRequired[bool]
    enable_external_access: NotRequired[bool]
    secret_directory: NotRequired[str]
    enable_object_cache: NotRequired[bool]
    parquet_metadata_cache: NotRequired[str]
    enable_external_file_cache: NotRequired[bool]
    checkpoint_threshold: NotRequired[str]
    enable_progress_bar: NotRequired[bool]
    progress_bar_time: NotRequired[float]
    enable_logging: NotRequired[bool]
    log_query_path: NotRequired[str]
    logging_level: NotRequired[str]
    preserve_insertion_order: NotRequired[bool]
    default_null_order: NotRequired[str]
    default_order: NotRequired[str]
    ieee_floating_point_ops: NotRequired[bool]
    binary_as_string: NotRequired[bool]
    arrow_large_buffer_size: NotRequired[bool]
    errors_as_json: NotRequired[bool]
    extra: NotRequired[dict[str, Any]]


class DuckDBPoolParams(DuckDBConnectionParams):
    """Complete pool configuration for DuckDB adapter.

    Extends DuckDBConnectionParams with pool sizing and lifecycle settings so SQLSpec can manage
    per-thread DuckDB connections safely while honoring DuckDB's thread-safety constraints.
    """

    pool_min_size: NotRequired[int]
    pool_max_size: NotRequired[int]
    pool_timeout: NotRequired[float]
    pool_recycle_seconds: NotRequired[int]


[docs] class DuckDBExtensionConfig(TypedDict): """DuckDB extension configuration for auto-management.""" name: str """Name of the extension to install/load.""" version: NotRequired[str] """Specific version of the extension.""" repository: NotRequired[str] """Repository for the extension (core, community, or custom URL).""" force_install: NotRequired[bool] """Force reinstallation of the extension."""
[docs] class DuckDBSecretConfig(TypedDict): """DuckDB secret configuration for AI/API integrations.""" secret_type: str """Type of secret (e.g., 'openai', 'aws', 'azure', 'gcp').""" name: str """Name of the secret.""" value: dict[str, Any] """Secret configuration values.""" scope: NotRequired[str] """Scope of the secret (LOCAL or PERSISTENT)."""
class DuckDBDriverFeatures(TypedDict): """TypedDict for DuckDB driver features configuration. Attributes: extensions: List of extensions to install/load on connection creation. secrets: List of secrets to create for AI/API integrations. on_connection_create: Callback executed when connection is created. json_serializer: Custom JSON serializer for dict/list parameter conversion. Defaults to sqlspec.utils.serializers.to_json if not provided. enable_uuid_conversion: Enable automatic UUID string conversion. When True (default), UUID strings are automatically converted to UUID objects. When False, UUID strings are treated as regular strings. extension_flags: Connection-level flags (e.g., allow_community_extensions) applied via SET statements immediately after connection creation. """ extensions: NotRequired[Sequence[DuckDBExtensionConfig]] secrets: NotRequired[Sequence[DuckDBSecretConfig]] on_connection_create: NotRequired["Callable[[DuckDBConnection], DuckDBConnection | None]"] json_serializer: NotRequired["Callable[[Any], str]"] enable_uuid_conversion: NotRequired[bool] extension_flags: NotRequired[dict[str, Any]]
[docs] class DuckDBConfig(SyncDatabaseConfig[DuckDBConnection, DuckDBConnectionPool, DuckDBDriver]): """DuckDB configuration with connection pooling. This configuration supports DuckDB's features including: - Connection pooling - Extension management and installation - Secret management for API integrations - Auto configuration settings - Arrow integration - Direct file querying capabilities - Configurable type handlers for JSON serialization and UUID conversion DuckDB Connection Pool Configuration: - Default pool size is 1-4 connections (DuckDB uses single connection by default) - Connection recycling is set to 24 hours by default (set to 0 to disable) - Shared memory databases use `:memory:shared_db` for proper concurrency Type Handler Configuration via driver_features: - `json_serializer`: Custom JSON serializer for dict/list parameters. Defaults to `sqlspec.utils.serializers.to_json` if not provided. Example: `json_serializer=msgspec.json.encode(...).decode('utf-8')` - `enable_uuid_conversion`: Enable automatic UUID string conversion (default: True). When True, UUID strings in query results are automatically converted to UUID objects. When False, UUID strings are treated as regular strings. Example: >>> import msgspec >>> from sqlspec.adapters.duckdb import DuckDBConfig >>> >>> # Custom JSON serializer >>> def custom_json(obj): ... return msgspec.json.encode(obj).decode("utf-8") >>> >>> config = DuckDBConfig( ... pool_config={"database": ":memory:"}, ... driver_features={ ... "json_serializer": custom_json, ... "enable_uuid_conversion": False, ... }, ... ) """ driver_type: "ClassVar[type[DuckDBDriver]]" = DuckDBDriver connection_type: "ClassVar[type[DuckDBConnection]]" = DuckDBConnection supports_transactional_ddl: "ClassVar[bool]" = True supports_native_arrow_export: "ClassVar[bool]" = True supports_native_arrow_import: "ClassVar[bool]" = True supports_native_parquet_export: "ClassVar[bool]" = True supports_native_parquet_import: "ClassVar[bool]" = True storage_partition_strategies: "ClassVar[tuple[str, ...]]" = ("fixed", "rows_per_chunk", "manifest")
[docs] def __init__( self, *, pool_config: "DuckDBPoolParams | dict[str, Any] | None" = None, pool_instance: "DuckDBConnectionPool | None" = None, migration_config: dict[str, Any] | None = None, statement_config: "StatementConfig | None" = None, driver_features: "DuckDBDriverFeatures | dict[str, Any] | None" = None, bind_key: "str | None" = None, extension_config: "ExtensionConfigs | None" = None, observability_config: "ObservabilityConfig | None" = None, ) -> None: """Initialize DuckDB configuration. Args: pool_config: Pool configuration parameters pool_instance: Pre-created pool instance migration_config: Migration configuration statement_config: Statement configuration override driver_features: DuckDB-specific driver features including json_serializer and enable_uuid_conversion options bind_key: Optional unique identifier for this configuration extension_config: Extension-specific configuration (e.g., Litestar plugin settings) observability_config: Adapter-level observability overrides for lifecycle hooks and observers """ if pool_config is None: pool_config = {} pool_config.setdefault("database", ":memory:shared_db") if pool_config.get("database") in {":memory:", ""}: pool_config["database"] = ":memory:shared_db" extension_flags: dict[str, Any] = {} for key in tuple(pool_config.keys()): if key in EXTENSION_FLAG_KEYS: extension_flags[key] = pool_config.pop(key) # type: ignore[misc] processed_features: dict[str, Any] = dict(driver_features) if driver_features else {} user_connection_hook = cast( "Callable[[Any], None] | None", processed_features.pop("on_connection_create", None) ) processed_features.setdefault("enable_uuid_conversion", True) serializer = processed_features.setdefault("json_serializer", to_json) if extension_flags: existing_flags = cast("dict[str, Any]", processed_features.get("extension_flags", {})) merged_flags = {**existing_flags, **extension_flags} processed_features["extension_flags"] = merged_flags local_observability = observability_config if user_connection_hook is not None: def _wrap_lifecycle_hook(context: dict[str, Any]) -> None: connection = context.get("connection") if connection is None: return user_connection_hook(connection) lifecycle_override = ObservabilityConfig(lifecycle={"on_connection_create": [_wrap_lifecycle_hook]}) local_observability = ObservabilityConfig.merge(local_observability, lifecycle_override) base_statement_config = statement_config or build_duckdb_statement_config( json_serializer=cast("Callable[[Any], str]", serializer) ) super().__init__( bind_key=bind_key, pool_config=dict(pool_config), pool_instance=pool_instance, migration_config=migration_config, statement_config=base_statement_config, driver_features=processed_features, extension_config=extension_config, observability_config=local_observability, )
def _get_connection_config_dict(self) -> "dict[str, Any]": """Get connection configuration as plain dict for pool creation.""" return { k: v for k, v in self.pool_config.items() if v is not None and k not in {"pool_min_size", "pool_max_size", "pool_timeout", "pool_recycle_seconds", "extra"} } def _create_pool(self) -> DuckDBConnectionPool: """Create connection pool from configuration.""" connection_config = self._get_connection_config_dict() extensions = self.driver_features.get("extensions", None) secrets = self.driver_features.get("secrets", None) extension_flags = self.driver_features.get("extension_flags", None) extensions_dicts = [dict(ext) for ext in extensions] if extensions else None secrets_dicts = [dict(secret) for secret in secrets] if secrets else None extension_flags_dict = dict(extension_flags) if extension_flags else None return DuckDBConnectionPool( connection_config=connection_config, extensions=extensions_dicts, extension_flags=extension_flags_dict, secrets=secrets_dicts, **self.pool_config, ) def _close_pool(self) -> None: """Close the connection pool.""" if self.pool_instance: self.pool_instance.close()
[docs] def create_connection(self) -> DuckDBConnection: """Get a DuckDB connection from the pool. This method ensures the pool is created and returns a connection from the pool. The connection is checked out from the pool and must be properly managed by the caller. Returns: DuckDBConnection: A connection from the pool Note: For automatic connection management, prefer using provide_connection() or provide_session() which handle returning connections to the pool. The caller is responsible for returning the connection to the pool using pool.release(connection) when done. """ pool = self.provide_pool() return pool.acquire()
[docs] @contextmanager def provide_connection(self, *args: Any, **kwargs: Any) -> "Generator[DuckDBConnection, None, None]": """Provide a pooled DuckDB connection context manager. Args: *args: Additional arguments. **kwargs: Additional keyword arguments. Yields: A DuckDB connection instance. """ pool = self.provide_pool() with pool.get_connection() as connection: yield connection
[docs] @contextmanager def provide_session( self, *args: Any, statement_config: "StatementConfig | None" = None, **kwargs: Any ) -> "Generator[DuckDBDriver, None, None]": """Provide a DuckDB driver session context manager. Args: *args: Additional arguments. statement_config: Optional statement configuration override. **kwargs: Additional keyword arguments. Yields: A context manager that yields a DuckDBDriver instance. """ with self.provide_connection(*args, **kwargs) as connection: driver = self.driver_type( connection=connection, statement_config=statement_config or self.statement_config, driver_features=self.driver_features, ) yield self._prepare_driver(driver)
[docs] def get_signature_namespace(self) -> "dict[str, Any]": """Get the signature namespace for DuckDB types. This provides all DuckDB-specific types that Litestar needs to recognize to avoid serialization attempts. Returns: Dictionary mapping type names to types. """ namespace = super().get_signature_namespace() namespace.update({ "DuckDBConnection": DuckDBConnection, "DuckDBConnectionParams": DuckDBConnectionParams, "DuckDBConnectionPool": DuckDBConnectionPool, "DuckDBCursor": DuckDBCursor, "DuckDBDriver": DuckDBDriver, "DuckDBDriverFeatures": DuckDBDriverFeatures, "DuckDBExceptionHandler": DuckDBExceptionHandler, "DuckDBExtensionConfig": DuckDBExtensionConfig, "DuckDBPoolParams": DuckDBPoolParams, "DuckDBSecretConfig": DuckDBSecretConfig, }) return namespace