Adapters

SQLSpec provides database adapters that enable connections to various database systems. Each adapter consists of configuration classes and driver implementations tailored to specific databases, providing a consistent interface while respecting database-specific capabilities.

Overview

Available adapters:

  • PostgreSQL: asyncpg, psycopg, psqlpy

  • SQLite: sqlite, aiosqlite

  • MySQL: asyncmy

  • DuckDB: duckdb

  • Oracle: oracledb

  • BigQuery: bigquery

  • Cross-Database: ADBC (Arrow Database Connectivity)

Each adapter implementation includes:

  1. Configuration class - Connection and pool settings

  2. Driver class - Query execution and transaction management

  3. Type mappings - Database-specific type conversions

  4. Parameter binding - Automatic parameter style conversion

PostgreSQL

AsyncPG

Homepage: https://github.com/MagicStack/asyncpg

PyPI: https://pypi.org/project/asyncpg/

Concurrency: Async-only

Connection Pooling: Native pooling via asyncpg.Pool

Parameter Style: $1, $2, $3 (PostgreSQL positional placeholders)

Special Features:

  • Binary protocol for efficient data transfer

  • pgvector support (automatic registration when pgvector is installed)

  • Native JSON/JSONB type codecs

  • Statement caching

  • Prepared statements

Known Limitations:

  • Async-only (no synchronous support)

  • Requires async/await syntax throughout application

Installation:

uv add sqlspec[asyncpg]

Configuration:

from sqlspec import SQLSpec
from sqlspec.adapters.asyncpg import AsyncpgConfig, AsyncpgPoolConfig

sql = SQLSpec()
db = sql.add_config(
    AsyncpgConfig(
        pool_config=AsyncpgPoolConfig(
            dsn="postgresql://user:password@localhost:5432/mydb",
            min_size=5,
            max_size=20,
            command_timeout=60.0
        )
    )
)

async with sql.provide_session(db) as session:
    result = await session.execute("SELECT * FROM users WHERE id = $1", [1])
    user = result.one()

API Reference:

class sqlspec.adapters.asyncpg.AsyncpgConfig[source]

Bases: AsyncDatabaseConfig[PoolConnectionProxy, Pool[Record], AsyncpgDriver]

Configuration for AsyncPG database connections using TypedDict.

driver_type

alias of AsyncpgDriver

connection_type

alias of PoolConnectionProxy

supports_transactional_ddl: ClassVar[bool] = True
supports_native_arrow_export: ClassVar[bool] = True
supports_native_arrow_import: ClassVar[bool] = True
supports_native_parquet_export: ClassVar[bool] = True
supports_native_parquet_import: ClassVar[bool] = True
__init__(*, pool_config=None, pool_instance=None, migration_config=None, statement_config=None, driver_features=None, bind_key=None, extension_config=None, observability_config=None)[source]

Initialize AsyncPG configuration.

Parameters:
  • pool_config – Pool configuration parameters (TypedDict or dict)

  • pool_instance – Existing pool instance to use

  • migration_config – Migration configuration

  • statement_config – Statement configuration override

  • driver_features – Driver features configuration (TypedDict or dict)

  • bind_key – Optional unique identifier for this configuration

  • extension_config – Extension-specific configuration (e.g., Litestar plugin settings)

  • observability_config – Adapter-level observability overrides for lifecycle hooks and observers

async close_pool()[source]

Close the connection pool.

Return type:

None

async create_connection()[source]

Create a single async connection from the pool.

Return type:

PoolConnectionProxy

Returns:

An AsyncPG connection instance.

provide_connection(*args, **kwargs)[source]

Provide an async connection context manager.

Parameters:
  • *args (Any) – Additional arguments.

  • **kwargs (Any) – Additional keyword arguments.

Yields:

An AsyncPG connection instance.

Return type:

AsyncGenerator[PoolConnectionProxy, None]

provide_session(*args, statement_config=None, **kwargs)[source]

Provide an async driver session context manager.

Parameters:
  • *args (Any) – Additional arguments.

  • statement_config (StatementConfig | None) – Optional statement configuration override.

  • **kwargs (Any) – Additional keyword arguments.

Yields:

An AsyncpgDriver instance.

Return type:

AsyncGenerator[AsyncpgDriver, None]

async provide_pool(*args, **kwargs)[source]

Provide async pool instance.

Return type:

Record

Returns:

The async connection pool.

get_signature_namespace()[source]

Get the signature namespace for AsyncPG types.

This provides all AsyncPG-specific types that Litestar needs to recognize to avoid serialization attempts.

Return type:

dict[str, typing.Any]

Returns:

Dictionary mapping type names to types.

pool_config
class sqlspec.adapters.asyncpg.AsyncpgDriver[source]

Bases: AsyncDriverAdapterBase

AsyncPG PostgreSQL driver for async database operations.

Supports COPY operations, numeric parameter style handling, PostgreSQL exception handling, transaction management, SQL statement compilation and caching, and parameter processing with type coercion.

dialect: DialectType | None = 'postgres'
__init__(connection, statement_config=None, driver_features=None)[source]

Initialize driver adapter with connection and configuration.

Parameters:
  • connection (PoolConnectionProxy) – Database connection instance

  • statement_config (StatementConfig | None) – Statement configuration for the driver

  • driver_features (dict[str, typing.Any] | None) – Driver-specific features like extensions, secrets, and connection callbacks

  • observability – Optional runtime handling lifecycle hooks, observers, and spans

with_cursor(connection)[source]

Create context manager for AsyncPG cursor.

Return type:

AsyncpgCursor

handle_database_exceptions()[source]

Handle database exceptions with PostgreSQL error codes.

Return type:

AbstractAsyncContextManager[None]

async execute_stack(stack, *, continue_on_error=False)[source]

Execute a StatementStack using asyncpg’s rapid batching.

Return type:

tuple[StackResult, ...]

async select_to_storage(statement, destination, /, *parameters, statement_config=None, partitioner=None, format_hint=None, telemetry=None, **kwargs)[source]

Execute a query and persist results to storage once native COPY is available.

Return type:

StorageBridgeJob

async load_from_arrow(table, source, *, partitioner=None, overwrite=False, telemetry=None)[source]

Load Arrow data into a PostgreSQL table via COPY.

async load_from_storage(table, source, *, file_format, partitioner=None, overwrite=False)[source]

Read an artifact from storage and ingest it via COPY.

Return type:

StorageBridgeJob

async begin()[source]

Begin a database transaction.

Return type:

None

async rollback()[source]

Rollback the current transaction.

Return type:

None

async commit()[source]

Commit the current transaction.

Return type:

None

property data_dictionary: AsyncDataDictionaryBase

Get the data dictionary for this driver.

Returns:

Data dictionary instance for metadata queries

class sqlspec.adapters.asyncpg.AsyncpgPoolConfig[source]

Bases: AsyncpgConnectionConfig

TypedDict for AsyncPG pool parameters, inheriting connection parameters.

min_size: NotRequired[int]
max_size: NotRequired[int]
max_queries: NotRequired[int]
max_inactive_connection_lifetime: NotRequired[float]
setup: NotRequired[Callable[[AsyncpgConnection], Awaitable[None]]]
init: NotRequired[Callable[[AsyncpgConnection], Awaitable[None]]]
loop: NotRequired[AbstractEventLoop]
connection_class: NotRequired[type[AsyncpgConnection]]
record_class: NotRequired[type[Record]]
extra: NotRequired[dict[str, Any]]
dsn: NotRequired[str]
host: NotRequired[str]
port: NotRequired[int]
user: NotRequired[str]
password: NotRequired[str]
database: NotRequired[str]
ssl: NotRequired[Any]
passfile: NotRequired[str]
direct_tls: NotRequired[bool]
connect_timeout: NotRequired[float]
command_timeout: NotRequired[float]
statement_cache_size: NotRequired[int]
max_cached_statement_lifetime: NotRequired[int]
max_cacheable_statement_size: NotRequired[int]
server_settings: NotRequired[dict[str, str]]

Psycopg

Homepage: https://github.com/psycopg/psycopg

PyPI: https://pypi.org/project/psycopg/

Concurrency: Sync and async

Connection Pooling: Native pooling via psycopg_pool

Parameter Style: %s, %s, %s (PostgreSQL format style)

Special Features:

  • Both synchronous and asynchronous support

  • Modern psycopg3 API

  • pgvector support (automatic registration when pgvector is installed)

  • Server-side cursors

  • Pipeline mode for batch operations

Known Limitations:

  • Separate driver classes for sync (PsycopgSyncDriver) and async (PsycopgAsyncDriver)

Installation:

uv add sqlspec[psycopg]

Configuration (Async):

from sqlspec import SQLSpec
from sqlspec.adapters.psycopg import PsycopgAsyncConfig

sql = SQLSpec()
db = sql.add_config(
    PsycopgAsyncConfig(
        pool_config={
            "conninfo": "postgresql://user:password@localhost:5432/mydb",
            "min_size": 5,
            "max_size": 20
        }
    )
)

async with sql.provide_session(db) as session:
    result = await session.execute("SELECT * FROM users WHERE id = %s", [1])

Configuration (Sync):

from sqlspec import SQLSpec
from sqlspec.adapters.psycopg import PsycopgSyncConfig

sql = SQLSpec()
db = sql.add_config(
    PsycopgSyncConfig(
        pool_config={
            "conninfo": "postgresql://user:password@localhost:5432/mydb",
            "min_size": 5,
            "max_size": 20
        }
    )
)

with sql.provide_session(db) as session:
    result = session.execute("SELECT * FROM users WHERE id = %s", [1])

API Reference:

class sqlspec.adapters.psycopg.PsycopgSyncDriver[source]

Bases: PsycopgPipelineMixin, SyncDriverAdapterBase

PostgreSQL psycopg synchronous driver.

Provides synchronous database operations for PostgreSQL using psycopg3. Supports SQL statement execution with parameter binding, transaction management, result processing with column metadata, parameter style conversion, PostgreSQL arrays and JSON handling, COPY operations for bulk data transfer, and PostgreSQL-specific error handling.

dialect: DialectType | None = 'postgres'
__init__(connection, statement_config=None, driver_features=None)[source]
with_cursor(connection)[source]

Create context manager for PostgreSQL cursor.

Return type:

PsycopgSyncCursor

begin()[source]

Begin a database transaction on the current connection.

Return type:

None

rollback()[source]

Rollback the current transaction on the current connection.

Return type:

None

commit()[source]

Commit the current transaction on the current connection.

Return type:

None

handle_database_exceptions()[source]

Handle database-specific exceptions and wrap them appropriately.

Return type:

AbstractContextManager[None]

execute_stack(stack, *, continue_on_error=False)[source]

Execute a StatementStack using psycopg pipeline mode when supported.

Return type:

tuple[StackResult, ...]

select_to_storage(statement, destination, /, *parameters, statement_config=None, partitioner=None, format_hint=None, telemetry=None, **kwargs)[source]

Execute a query and stream Arrow results to storage (sync).

Return type:

StorageBridgeJob

load_from_arrow(table, source, *, partitioner=None, overwrite=False, telemetry=None)[source]

Load Arrow data into PostgreSQL using COPY.

load_from_storage(table, source, *, file_format, partitioner=None, overwrite=False)[source]

Load staged artifacts into PostgreSQL via COPY.

Return type:

StorageBridgeJob

property data_dictionary: SyncDataDictionaryBase

Get the data dictionary for this driver.

Returns:

Data dictionary instance for metadata queries

class sqlspec.adapters.psycopg.PsycopgAsyncDriver[source]

Bases: PsycopgPipelineMixin, AsyncDriverAdapterBase

PostgreSQL psycopg asynchronous driver.

Provides asynchronous database operations for PostgreSQL using psycopg3. Supports async SQL statement execution with parameter binding, async transaction management, async result processing with column metadata, parameter style conversion, PostgreSQL arrays and JSON handling, COPY operations for bulk data transfer, PostgreSQL-specific error handling, and async pub/sub support.

dialect: DialectType | None = 'postgres'
__init__(connection, statement_config=None, driver_features=None)[source]
with_cursor(connection)[source]

Create async context manager for PostgreSQL cursor.

Return type:

PsycopgAsyncCursor

async begin()[source]

Begin a database transaction on the current connection.

Return type:

None

async rollback()[source]

Rollback the current transaction on the current connection.

Return type:

None

async commit()[source]

Commit the current transaction on the current connection.

Return type:

None

handle_database_exceptions()[source]

Handle database-specific exceptions and wrap them appropriately.

Return type:

AbstractAsyncContextManager[None]

async execute_stack(stack, *, continue_on_error=False)[source]

Execute a StatementStack using psycopg async pipeline when supported.

Return type:

tuple[StackResult, ...]

async select_to_storage(statement, destination, /, *parameters, statement_config=None, partitioner=None, format_hint=None, telemetry=None, **kwargs)[source]

Execute a query and stream Arrow data to storage asynchronously.

Return type:

StorageBridgeJob

async load_from_arrow(table, source, *, partitioner=None, overwrite=False, telemetry=None)[source]

Load Arrow data into PostgreSQL asynchronously via COPY.

async load_from_storage(table, source, *, file_format, partitioner=None, overwrite=False)[source]

Load staged artifacts asynchronously.

Return type:

StorageBridgeJob

property data_dictionary: AsyncDataDictionaryBase

Get the data dictionary for this driver.

Returns:

Data dictionary instance for metadata queries

psqlpy

Homepage: https://github.com/psqlpy-python/psqlpy

PyPI: https://pypi.org/project/psqlpy/

Concurrency: Async-only

Connection Pooling: Native pooling (Rust-based implementation)

Parameter Style: $1, $2, $3 (PostgreSQL positional placeholders)

Special Features:

  • Rust-based driver for memory efficiency

  • Connection pooling

  • Transaction support

  • Type conversion

Known Limitations:

  • Async-only (no synchronous support)

  • Smaller ecosystem than asyncpg or psycopg

Installation:

uv add sqlspec[psqlpy]

Configuration:

from sqlspec import SQLSpec
from sqlspec.adapters.psqlpy import PsqlpyConfig

sql = SQLSpec()
db = sql.add_config(
    PsqlpyConfig(
        pool_config={
            "dsn": "postgresql://user:password@localhost:5432/mydb",
            "max_pool_size": 20
        }
    )
)

async with sql.provide_session(db) as session:
    result = await session.execute("SELECT * FROM users WHERE id = $1", [1])

API Reference:

SQLite

sqlite

Homepage: Built-in Python module

PyPI: N/A (included with Python)

Concurrency: Sync-only

Connection Pooling: Custom thread-local pooling

Parameter Style: ?, ?, ? (SQLite positional placeholders)

Special Features:

  • No external dependencies

  • File-based or in-memory databases

  • Thread-safe connections

  • Custom connection pooling for concurrency

Known Limitations:

  • Synchronous only (no async support)

  • Limited concurrent write performance

  • Thread-local connection management

Installation:

# No installation required - built into Python
uv add sqlspec

Configuration (In-Memory):

from sqlspec import SQLSpec
from sqlspec.adapters.sqlite import SqliteConfig

sql = SQLSpec()
db = sql.add_config(SqliteConfig(database=":memory:"))

with sql.provide_session(db) as session:
    session.execute("CREATE TABLE users (id INTEGER PRIMARY KEY, name TEXT)")
    session.execute("INSERT INTO users (name) VALUES (?)", ["Alice"])

Configuration (File-Based):

from sqlspec import SQLSpec
from sqlspec.adapters.sqlite import SqliteConfig

sql = SQLSpec()
db = sql.add_config(
    SqliteConfig(
        pool_config={
            "database": "/path/to/database.db",
            "timeout": 30.0,
            "check_same_thread": False
        }
    )
)

API Reference:

class sqlspec.adapters.sqlite.SqliteConfig[source]

Bases: SyncDatabaseConfig[Connection, SqliteConnectionPool, SqliteDriver]

SQLite configuration with thread-local connections.

driver_type

alias of SqliteDriver

connection_type

alias of Connection

supports_transactional_ddl: ClassVar[bool] = True
supports_native_arrow_export: ClassVar[bool] = True
supports_native_arrow_import: ClassVar[bool] = True
supports_native_parquet_export: ClassVar[bool] = True
supports_native_parquet_import: ClassVar[bool] = True
__init__(*, pool_config=None, pool_instance=None, migration_config=None, statement_config=None, driver_features=None, bind_key=None, extension_config=None, observability_config=None)[source]

Initialize SQLite configuration.

Parameters:
  • pool_config (SqliteConnectionParams | dict[str, typing.Any] | None) – Configuration parameters including connection settings

  • pool_instance (SqliteConnectionPool | None) – Pre-created pool instance

  • migration_config (dict[str, typing.Any] | None) – Migration configuration

  • statement_config (StatementConfig | None) – Default SQL statement configuration

  • driver_features (SqliteDriverFeatures | dict[str, typing.Any] | None) – Optional driver feature configuration

  • bind_key (str | None) – Optional bind key for the configuration

  • extension_config (dict[str, dict[str, Any] | LitestarConfig | FastAPIConfig | StarletteConfig | FlaskConfig | ADKConfig | OpenTelemetryConfig | PrometheusConfig] | None) – Extension-specific configuration (e.g., Litestar plugin settings)

  • observability_config (ObservabilityConfig | None) – Adapter-level observability overrides for lifecycle hooks and observers

create_connection()[source]

Get a SQLite connection from the pool.

Returns:

A connection from the pool

Return type:

SqliteConnection

provide_connection(*args, **kwargs)[source]

Provide a SQLite connection context manager.

Yields:

SqliteConnection – A thread-local connection

Return type:

Generator[Connection, None, None]

provide_session(*args, statement_config=None, **kwargs)[source]

Provide a SQLite driver session.

Yields:

SqliteDriver – A driver instance with thread-local connection

Return type:

Generator[SqliteDriver, None, None]

get_signature_namespace()[source]

Get the signature namespace for SQLite types.

Return type:

dict[str, typing.Any]

Returns:

Dictionary mapping type names to types.

pool_config
class sqlspec.adapters.sqlite.SqliteDriver[source]

Bases: SyncDriverAdapterBase

SQLite driver implementation.

Provides SQL statement execution, transaction management, and result handling for SQLite databases using the standard sqlite3 module.

dialect: DialectType | None = 'sqlite'
__init__(connection, statement_config=None, driver_features=None)[source]

Initialize SQLite driver.

Parameters:
with_cursor(connection)[source]

Create context manager for SQLite cursor.

Parameters:

connection (Connection) – SQLite database connection

Return type:

SqliteCursor

Returns:

Cursor context manager for safe cursor operations

handle_database_exceptions()[source]

Handle database-specific exceptions and wrap them appropriately.

Return type:

AbstractContextManager[None]

Returns:

Context manager that converts SQLite exceptions to SQLSpec exceptions

select_to_storage(statement, destination, /, *parameters, statement_config=None, partitioner=None, format_hint=None, telemetry=None, **kwargs)[source]

Execute a query and write Arrow-compatible output to storage (sync).

Return type:

StorageBridgeJob

load_from_arrow(table, source, *, partitioner=None, overwrite=False, telemetry=None)[source]

Load Arrow data into SQLite using batched inserts.

load_from_storage(table, source, *, file_format, partitioner=None, overwrite=False)[source]

Load staged artifacts from storage into SQLite.

Return type:

StorageBridgeJob

begin()[source]

Begin a database transaction.

Raises:

SQLSpecError – If transaction cannot be started

Return type:

None

rollback()[source]

Rollback the current transaction.

Raises:

SQLSpecError – If transaction cannot be rolled back

Return type:

None

commit()[source]

Commit the current transaction.

Raises:

SQLSpecError – If transaction cannot be committed

Return type:

None

property data_dictionary: SyncDataDictionaryBase

Get the data dictionary for this driver.

Returns:

Data dictionary instance for metadata queries

aiosqlite

Homepage: https://github.com/omnilib/aiosqlite

PyPI: https://pypi.org/project/aiosqlite/

Concurrency: Async-only

Connection Pooling: Custom pooling

Parameter Style: ?, ?, ? (SQLite positional placeholders)

Special Features:

  • Async wrapper around Python’s sqlite3 module

  • Same SQLite features as synchronous version

  • Compatible with async frameworks

Known Limitations:

  • Async operations run on thread pool (not true async I/O)

  • Limited concurrent write performance (SQLite limitation)

Installation:

uv add sqlspec[aiosqlite]

Configuration:

from sqlspec import SQLSpec
from sqlspec.adapters.aiosqlite import AiosqliteConfig

sql = SQLSpec()
db = sql.add_config(
    AiosqliteConfig(
        pool_config={
            "database": "/path/to/database.db",
            "timeout": 30.0
        }
    )
)

async with sql.provide_session(db) as session:
    await session.execute("CREATE TABLE users (id INTEGER PRIMARY KEY, name TEXT)")
    await session.execute("INSERT INTO users (name) VALUES (?)", ["Alice"])

API Reference:

class sqlspec.adapters.aiosqlite.AiosqliteConfig[source]

Bases: AsyncDatabaseConfig[AiosqliteConnection, AiosqliteConnectionPool, AiosqliteDriver]

Database configuration for AioSQLite engine.

driver_type

alias of AiosqliteDriver

supports_transactional_ddl: ClassVar[bool] = True
supports_native_arrow_export: ClassVar[bool] = True
supports_native_arrow_import: ClassVar[bool] = True
supports_native_parquet_export: ClassVar[bool] = True
supports_native_parquet_import: ClassVar[bool] = True
__init__(*, pool_config=None, pool_instance=None, migration_config=None, statement_config=None, driver_features=None, bind_key=None, extension_config=None, observability_config=None)[source]

Initialize AioSQLite configuration.

Parameters:
  • pool_config (AiosqlitePoolParams | dict[str, typing.Any] | None) – Pool configuration parameters (TypedDict or dict)

  • pool_instance (AiosqliteConnectionPool | None) – Optional pre-configured connection pool instance.

  • migration_config (dict[str, typing.Any] | None) – Optional migration configuration.

  • statement_config (StatementConfig | None) – Optional statement configuration.

  • driver_features (AiosqliteDriverFeatures | dict[str, typing.Any] | None) – Optional driver feature configuration.

  • bind_key (str | None) – Optional unique identifier for this configuration.

  • extension_config (dict[str, dict[str, Any] | LitestarConfig | FastAPIConfig | StarletteConfig | FlaskConfig | ADKConfig | OpenTelemetryConfig | PrometheusConfig] | None) – Extension-specific configuration (e.g., Litestar plugin settings)

  • observability_config (ObservabilityConfig | None) – Adapter-level observability overrides for lifecycle hooks and observers

provide_connection(*args, **kwargs)[source]

Provide an async connection context manager.

Parameters:
  • *args (Any) – Additional arguments.

  • **kwargs (Any) – Additional keyword arguments.

Yields:

An aiosqlite connection instance.

Return type:

AsyncGenerator[Connection, None]

provide_session(*_args, statement_config=None, **_kwargs)[source]

Provide an async driver session context manager.

Parameters:
  • *_args (Any) – Additional arguments.

  • statement_config (StatementConfig | None) – Optional statement configuration override.

  • **_kwargs (Any) – Additional keyword arguments.

Yields:

An AiosqliteDriver instance.

Return type:

AsyncGenerator[AiosqliteDriver, None]

async close_pool()[source]

Close the connection pool.

Return type:

None

async create_connection()[source]

Create a single async connection from the pool.

Return type:

Connection

Returns:

An aiosqlite connection instance.

async provide_pool()[source]

Provide async pool instance.

Return type:

AiosqliteConnectionPool

Returns:

The async connection pool.

get_signature_namespace()[source]

Get the signature namespace for aiosqlite types.

Return type:

dict[str, typing.Any]

Returns:

Dictionary mapping type names to types.

pool_config
class sqlspec.adapters.aiosqlite.AiosqliteDriver[source]

Bases: AsyncDriverAdapterBase

AIOSQLite driver for async SQLite database operations.

dialect: DialectType | None = 'sqlite'
__init__(connection, statement_config=None, driver_features=None)[source]

Initialize driver adapter with connection and configuration.

Parameters:
  • connection (Connection) – Database connection instance

  • statement_config (StatementConfig | None) – Statement configuration for the driver

  • driver_features (dict[str, typing.Any] | None) – Driver-specific features like extensions, secrets, and connection callbacks

  • observability – Optional runtime handling lifecycle hooks, observers, and spans

with_cursor(connection)[source]

Create async context manager for AIOSQLite cursor.

Return type:

AiosqliteCursor

handle_database_exceptions()[source]

Handle AIOSQLite-specific exceptions.

Return type:

AbstractAsyncContextManager[None]

async select_to_storage(statement, destination, /, *parameters, statement_config=None, partitioner=None, format_hint=None, telemetry=None, **kwargs)[source]

Execute a query and stream Arrow results into storage.

Return type:

StorageBridgeJob

async load_from_arrow(table, source, *, partitioner=None, overwrite=False, telemetry=None)[source]

Load Arrow data into SQLite using batched inserts.

async load_from_storage(table, source, *, file_format, partitioner=None, overwrite=False)[source]

Load staged artifacts from storage into SQLite.

Return type:

StorageBridgeJob

async begin()[source]

Begin a database transaction.

Return type:

None

async rollback()[source]

Rollback the current transaction.

Return type:

None

async commit()[source]

Commit the current transaction.

Return type:

None

property data_dictionary: AsyncDataDictionaryBase

Get the data dictionary for this driver.

Returns:

Data dictionary instance for metadata queries

MySQL

asyncmy

Homepage: https://github.com/long2ice/asyncmy

PyPI: https://pypi.org/project/asyncmy/

Concurrency: Async-only

Connection Pooling: Native pooling

Parameter Style: %s, %s, %s (MySQL format style)

Special Features:

  • Cython-based implementation

  • Connection pooling

  • MySQL protocol support

  • Prepared statements

Known Limitations:

  • Async-only (no synchronous support)

  • Requires Cython build tools during installation

Installation:

uv add sqlspec[asyncmy]

Configuration:

from sqlspec import SQLSpec
from sqlspec.adapters.asyncmy import AsyncmyConfig

sql = SQLSpec()
db = sql.add_config(
    AsyncmyConfig(
        pool_config={
            "host": "localhost",
            "port": 3306,
            "user": "root",
            "password": "password",
            "database": "mydb",
            "minsize": 5,
            "maxsize": 20
        }
    )
)

async with sql.provide_session(db) as session:
    result = await session.execute("SELECT * FROM users WHERE id = %s", [1])

Driver Features:

  • json_serializer – Override JSON parameter serialization for dict/list/tuple inputs. Defaults to sqlspec.utils.serializers.to_json() and is invoked for every JSON parameter passed to AsyncMy.

  • json_deserializer – Customize JSON result decoding. Defaults to sqlspec.utils.serializers.from_json() and automatically converts JSON columns into Python objects during fetches.

API Reference:

DuckDB

duckdb

Homepage: https://github.com/duckdb/duckdb

PyPI: https://pypi.org/project/duckdb/

Concurrency: Sync-only

Connection Pooling: Custom pooling

Parameter Style: ?, ?, ? (positional placeholders)

Special Features:

  • Embedded analytical database (OLAP)

  • Native Parquet and CSV support

  • Extension management (auto-install/load extensions)

  • Secrets management for API integrations

  • Arrow-native data transfer

  • Direct file querying without import

  • Shared memory databases for concurrency

Known Limitations:

  • Synchronous only (no async support)

  • Embedded database (not client-server)

Installation:

uv add sqlspec[duckdb]

Configuration (In-Memory):

from sqlspec import SQLSpec
from sqlspec.adapters.duckdb import DuckDBConfig

sql = SQLSpec()
db = sql.add_config(DuckDBConfig())  # Defaults to :memory:shared_db

with sql.provide_session(db) as session:
    # Query Parquet file directly
    result = session.execute("SELECT * FROM 'data.parquet' LIMIT 10")

Configuration (Persistent with Extensions):

from sqlspec import SQLSpec
from sqlspec.adapters.duckdb import DuckDBConfig, DuckDBExtensionConfig

sql = SQLSpec()
db = sql.add_config(
    DuckDBConfig(
        pool_config={
            "database": "/path/to/analytics.db",
            "threads": 4,
            "memory_limit": "4GB"
        },
        driver_features={
            "extensions": [
                DuckDBExtensionConfig(name="httpfs"),
                DuckDBExtensionConfig(name="parquet")
            ]
        }
    )
)

with sql.provide_session(db) as session:
    # Query remote Parquet file
    result = session.execute(
        "SELECT * FROM 'https://example.com/data.parquet' LIMIT 10"
    )

Community Extensions:

DuckDBConfig accepts the runtime flags DuckDB expects for community/unsigned extensions via pool_config (for example allow_community_extensions=True, allow_unsigned_extensions=True, enable_external_access=True). SQLSpec applies those options with SET statements immediately after establishing each connection, so even older DuckDB builds that do not recognize the options during duckdb.connect() will still enable the required permissions before extensions are installed.

API Reference:

class sqlspec.adapters.duckdb.DuckDBConfig[source]

Bases: SyncDatabaseConfig[DuckDBPyConnection, DuckDBConnectionPool, DuckDBDriver]

DuckDB configuration with connection pooling.

This configuration supports DuckDB’s features including:

  • Connection pooling

  • Extension management and installation

  • Secret management for API integrations

  • Auto configuration settings

  • Arrow integration

  • Direct file querying capabilities

  • Configurable type handlers for JSON serialization and UUID conversion

DuckDB Connection Pool Configuration: - Default pool size is 1-4 connections (DuckDB uses single connection by default) - Connection recycling is set to 24 hours by default (set to 0 to disable) - Shared memory databases use :memory:shared_db for proper concurrency

Type Handler Configuration via driver_features: - json_serializer: Custom JSON serializer for dict/list parameters.

Defaults to sqlspec.utils.serializers.to_json if not provided. Example: json_serializer=msgspec.json.encode(…).decode(‘utf-8’)

  • enable_uuid_conversion: Enable automatic UUID string conversion (default: True). When True, UUID strings in query results are automatically converted to UUID objects. When False, UUID strings are treated as regular strings.

Example

>>> import msgspec
>>> from sqlspec.adapters.duckdb import DuckDBConfig
>>>
>>> # Custom JSON serializer
>>> def custom_json(obj):
...     return msgspec.json.encode(obj).decode("utf-8")
>>>
>>> config = DuckDBConfig(
...     pool_config={"database": ":memory:"},
...     driver_features={
...         "json_serializer": custom_json,
...         "enable_uuid_conversion": False,
...     },
... )
driver_type

alias of DuckDBDriver

supports_transactional_ddl: ClassVar[bool] = True
supports_native_arrow_export: ClassVar[bool] = True
supports_native_arrow_import: ClassVar[bool] = True
supports_native_parquet_export: ClassVar[bool] = True
supports_native_parquet_import: ClassVar[bool] = True
pool_config
storage_partition_strategies: ClassVar[tuple[str, ...]] = ('fixed', 'rows_per_chunk', 'manifest')
__init__(*, pool_config=None, pool_instance=None, migration_config=None, statement_config=None, driver_features=None, bind_key=None, extension_config=None, observability_config=None)[source]

Initialize DuckDB configuration.

Parameters:
  • pool_config (DuckDBPoolParams | dict[str, typing.Any] | None) – Pool configuration parameters

  • pool_instance (DuckDBConnectionPool | None) – Pre-created pool instance

  • migration_config (dict[str, Any] | None) – Migration configuration

  • statement_config (StatementConfig | None) – Statement configuration override

  • driver_features (DuckDBDriverFeatures | dict[str, typing.Any] | None) – DuckDB-specific driver features including json_serializer and enable_uuid_conversion options

  • bind_key (str | None) – Optional unique identifier for this configuration

  • extension_config (dict[str, dict[str, Any] | LitestarConfig | FastAPIConfig | StarletteConfig | FlaskConfig | ADKConfig | OpenTelemetryConfig | PrometheusConfig] | None) – Extension-specific configuration (e.g., Litestar plugin settings)

  • observability_config (ObservabilityConfig | None) – Adapter-level observability overrides for lifecycle hooks and observers

create_connection()[source]

Get a DuckDB connection from the pool.

This method ensures the pool is created and returns a connection from the pool. The connection is checked out from the pool and must be properly managed by the caller.

Returns:

A connection from the pool

Return type:

DuckDBConnection

Note

For automatic connection management, prefer using provide_connection() or provide_session() which handle returning connections to the pool. The caller is responsible for returning the connection to the pool using pool.release(connection) when done.

provide_connection(*args, **kwargs)[source]

Provide a pooled DuckDB connection context manager.

Parameters:
  • *args (Any) – Additional arguments.

  • **kwargs (Any) – Additional keyword arguments.

Yields:

A DuckDB connection instance.

Return type:

Generator[DuckDBPyConnection, None, None]

provide_session(*args, statement_config=None, **kwargs)[source]

Provide a DuckDB driver session context manager.

Parameters:
  • *args (Any) – Additional arguments.

  • statement_config (StatementConfig | None) – Optional statement configuration override.

  • **kwargs (Any) – Additional keyword arguments.

Yields:

A context manager that yields a DuckDBDriver instance.

Return type:

Generator[DuckDBDriver, None, None]

get_signature_namespace()[source]

Get the signature namespace for DuckDB types.

This provides all DuckDB-specific types that Litestar needs to recognize to avoid serialization attempts.

Return type:

dict[str, typing.Any]

Returns:

Dictionary mapping type names to types.

class sqlspec.adapters.duckdb.DuckDBDriver[source]

Bases: SyncDriverAdapterBase

Synchronous DuckDB database driver.

Provides SQL statement execution, transaction management, and result handling for DuckDB databases. Supports multiple parameter styles including QMARK, NUMERIC, and NAMED_DOLLAR formats.

The driver handles script execution, batch operations, and integrates with the sqlspec.core modules for statement processing and caching.

dialect: DialectType | None = 'duckdb'
__init__(connection, statement_config=None, driver_features=None)[source]

Initialize driver adapter with connection and configuration.

Parameters:
  • connection (DuckDBPyConnection) – Database connection instance

  • statement_config (StatementConfig | None) – Statement configuration for the driver

  • driver_features (dict[str, typing.Any] | None) – Driver-specific features like extensions, secrets, and connection callbacks

  • observability – Optional runtime handling lifecycle hooks, observers, and spans

with_cursor(connection)[source]

Create context manager for DuckDB cursor.

Parameters:

connection (DuckDBPyConnection) – DuckDB connection instance

Return type:

DuckDBCursor

Returns:

DuckDBCursor context manager instance

handle_database_exceptions()[source]

Handle database-specific exceptions and wrap them appropriately.

Return type:

AbstractContextManager[None]

Returns:

Context manager that catches and converts DuckDB exceptions

begin()[source]

Begin a database transaction.

Return type:

None

rollback()[source]

Rollback the current transaction.

Return type:

None

commit()[source]

Commit the current transaction.

Return type:

None

property data_dictionary: SyncDataDictionaryBase

Get the data dictionary for this driver.

Returns:

Data dictionary instance for metadata queries

select_to_arrow(statement, /, *parameters, statement_config=None, return_format='table', native_only=False, batch_size=None, arrow_schema=None, **kwargs)[source]

Execute query and return results as Apache Arrow (DuckDB native path).

DuckDB provides native Arrow support via cursor.arrow(). This is the fastest path due to DuckDB’s columnar architecture.

Parameters:
  • statement – SQL statement, string, or QueryBuilder

  • *parameters – Query parameters or filters

  • statement_config – Optional statement configuration override

  • return_format – “table” for pyarrow.Table (default), “batch” for RecordBatch

  • native_only – Ignored for DuckDB (always uses native path)

  • batch_size – Batch size hint (for future streaming implementation)

  • arrow_schema – Optional pyarrow.Schema for type casting

  • **kwargs – Additional keyword arguments

Returns:

ArrowResult with native Arrow data

Example

>>> result = driver.select_to_arrow(
...     "SELECT * FROM users WHERE age > ?", 18
... )
>>> df = result.to_pandas()  # Fast zero-copy conversion
select_to_storage(statement, destination, /, *parameters, statement_config=None, partitioner=None, format_hint=None, telemetry=None, **kwargs)[source]

Persist DuckDB query output to a storage backend using Arrow fast paths.

load_from_arrow(table, source, *, partitioner=None, overwrite=False, telemetry=None)[source]

Load Arrow data into DuckDB using temporary table registration.

load_from_storage(table, source, *, file_format, partitioner=None, overwrite=False)[source]

Read an artifact from storage and load it into DuckDB.

Return type:

StorageBridgeJob

class sqlspec.adapters.duckdb.DuckDBExtensionConfig[source]

Bases: TypedDict

DuckDB extension configuration for auto-management.

name: str

Name of the extension to install/load.

version: NotRequired[str]

Specific version of the extension.

repository: NotRequired[str]

Repository for the extension (core, community, or custom URL).

force_install: NotRequired[bool]

Force reinstallation of the extension.

class sqlspec.adapters.duckdb.DuckDBSecretConfig[source]

Bases: TypedDict

DuckDB secret configuration for AI/API integrations.

secret_type: str

Type of secret (e.g., ‘openai’, ‘aws’, ‘azure’, ‘gcp’).

name: str

Name of the secret.

value: dict[str, Any]

Secret configuration values.

scope: NotRequired[str]

Scope of the secret (LOCAL or PERSISTENT).

BigQuery

bigquery

Homepage: https://github.com/googleapis/python-bigquery

PyPI: https://pypi.org/project/google-cloud-bigquery/

Concurrency: Sync-only

Connection Pooling: None (stateless HTTP API)

Parameter Style: @param1, @param2 (named parameters)

Special Features:

  • Google Cloud BigQuery integration

  • Serverless query execution

  • Standard SQL dialect

  • Automatic result pagination

Known Limitations:

  • Synchronous only (no async support)

  • Requires Google Cloud credentials

  • Query costs based on data scanned

  • No connection pooling (stateless API)

Installation:

uv add sqlspec[bigquery]

Configuration:

from google.oauth2 import service_account
from sqlspec import SQLSpec
from sqlspec.adapters.bigquery import BigQueryConfig

credentials = service_account.Credentials.from_service_account_file(
    "/path/to/credentials.json"
)

sql = SQLSpec()
db = sql.add_config(
    BigQueryConfig(
        pool_config={
            "project": "my-project-id",
            "credentials": credentials
        }
    )
)

with sql.provide_session(db) as session:
    result = session.execute(
        "SELECT * FROM `project.dataset.table` WHERE id = @user_id",
        {"user_id": 1}
    )

API Reference:

Oracle

oracledb

Homepage: https://github.com/oracle/python-oracledb

PyPI: https://pypi.org/project/oracledb/

Concurrency: Sync and async

Connection Pooling: Native pooling

Parameter Style: :1, :2, :3 (Oracle positional placeholders)

Special Features:

  • Both synchronous and asynchronous support

  • Thin mode (pure Python) and thick mode (Oracle Client)

  • Oracle-specific data types (NUMBER, CLOB, BLOB, etc.)

  • Connection pooling

  • Two-phase commit support

  • NumPy VECTOR support (Oracle 23ai+) - automatic conversion for AI/ML embeddings

Known Limitations:

  • Separate configuration classes for sync and async

  • Thick mode requires Oracle Instant Client installation

  • VECTOR data type requires Oracle Database 23ai or higher

Installation:

uv add sqlspec[oracledb]

Configuration (Async, Thin Mode):

from sqlspec import SQLSpec
from sqlspec.adapters.oracledb import OracleAsyncConfig

sql = SQLSpec()
db = sql.add_config(
    OracleAsyncConfig(
        pool_config={
            "user": "system",
            "password": "oracle",
            "dsn": "localhost:1521/XE",
            "min": 5,
            "max": 20
        }
    )
)

async with sql.provide_session(db) as session:
    result = await session.execute("SELECT * FROM users WHERE id = :1", [1])

Configuration (Sync, Thin Mode):

from sqlspec import SQLSpec
from sqlspec.adapters.oracledb import OracleSyncConfig

sql = SQLSpec()
db = sql.add_config(
    OracleSyncConfig(
        pool_config={
            "user": "system",
            "password": "oracle",
            "dsn": "localhost:1521/XE"
        }
    )
)

with sql.provide_session(db) as session:
    result = session.execute("SELECT * FROM users WHERE id = :1", [1])

Configuration (with NumPy VECTOR Support - Oracle 23ai+):

import numpy as np
from sqlspec import SQLSpec
from sqlspec.adapters.oracledb import OracleAsyncConfig

sql = SQLSpec()
db = sql.add_config(
    OracleAsyncConfig(
        pool_config={
            "user": "system",
            "password": "oracle",
            "dsn": "localhost:1521/FREEPDB1"
        },
        driver_features={
            "enable_numpy_vectors": True  # Enable automatic NumPy conversion
        }
    )
)

async with sql.provide_session(db) as session:
    # Create table with VECTOR column
    await session.execute("""
        CREATE TABLE embeddings (
            id NUMBER PRIMARY KEY,
            text VARCHAR2(4000),
            embedding VECTOR(768, FLOAT32)
        )
    """)

    # Insert NumPy array - automatically converted to Oracle VECTOR
    vector = np.random.rand(768).astype(np.float32)
    await session.execute(
        "INSERT INTO embeddings VALUES (:1, :2, :3)",
        (1, "sample text", vector)
    )

    # Retrieve - automatically converted back to NumPy array
    result = await session.select_one(
        "SELECT * FROM embeddings WHERE id = :1",
        (1,)
    )
    embedding = result["EMBEDDING"]
    assert isinstance(embedding, np.ndarray)
    assert embedding.dtype == np.float32

NumPy VECTOR Support Details:

Oracle Database 23ai introduces the VECTOR data type for AI/ML embeddings and similarity search. SQLSpec provides seamless NumPy integration for automatic bidirectional conversion.

Supported NumPy dtypes:

  • float32VECTOR(*, FLOAT32) - General embeddings (recommended)

  • float64VECTOR(*, FLOAT64) - High-precision embeddings

  • int8VECTOR(*, INT8) - Quantized embeddings

  • uint8VECTOR(*, BINARY) - Binary/hash vectors

Requirements:

  • Oracle Database 23ai or higher

  • NumPy installed (pip install numpy)

  • enable_numpy_vectors=True in driver_features (opt-in)

Manual Conversion API:

For advanced use cases, use the type converter directly:

from sqlspec.adapters.oracledb.type_converter import OracleTypeConverter

converter = OracleTypeConverter()

# NumPy → Oracle VECTOR
oracle_array = converter.convert_numpy_to_vector(numpy_array)

# Oracle VECTOR → NumPy
numpy_array = converter.convert_vector_to_numpy(oracle_array)

Vector Similarity Search Example:

query_vector = np.random.rand(768).astype(np.float32)

results = await session.select_all("""
    SELECT id, text,
           VECTOR_DISTANCE(embedding, :1, COSINE) as distance
    FROM embeddings
    ORDER BY distance
    FETCH FIRST 5 ROWS ONLY
""", (query_vector,))

API Reference:

Cross-Database

ADBC (Arrow Database Connectivity)

Homepage: https://github.com/apache/arrow-adbc

PyPI: Various (backend-specific):

Concurrency: Sync-only

Connection Pooling: None (stateless connections)

Parameter Style: Varies by backend

  • PostgreSQL: $1, $2, $3 (numeric)

  • SQLite: ?, ?, ? (qmark)

  • DuckDB: ?, ?, ? (qmark)

  • BigQuery: @param1, @param2 (named_at)

  • Snowflake: ?, ?, ? (qmark)

Special Features:

  • Arrow-native data transfer (zero-copy)

  • Multi-backend support with unified interface

  • Direct Arrow table output

  • Automatic driver detection from URI or driver name

  • Efficient bulk data operations

Known Limitations:

  • Synchronous only (no async support)

  • No connection pooling

  • Backend-specific driver packages required

  • Parameter style varies by backend

Installation:

# PostgreSQL backend
uv add sqlspec[adbc-postgresql]

# SQLite backend
uv add sqlspec[adbc-sqlite]

# DuckDB backend
uv add sqlspec[adbc-duckdb]

# BigQuery backend
uv add sqlspec[adbc-bigquery]

# Snowflake backend
uv add sqlspec[adbc-snowflake]

Configuration (PostgreSQL):

from sqlspec import SQLSpec
from sqlspec.adapters.adbc import AdbcConfig

sql = SQLSpec()
db = sql.add_config(
    AdbcConfig(
        connection_config={
            "driver_name": "postgresql",
            "uri": "postgresql://user:password@localhost:5432/mydb"
        }
    )
)

with sql.provide_session(db) as session:
    result = session.execute("SELECT * FROM users WHERE id = $1", [1])
    # Get Arrow table directly
    arrow_table = result.arrow()

Configuration (SQLite):

from sqlspec import SQLSpec
from sqlspec.adapters.adbc import AdbcConfig

sql = SQLSpec()
db = sql.add_config(
    AdbcConfig(
        connection_config={
            "driver_name": "sqlite",
            "uri": "/path/to/database.db"
        }
    )
)

with sql.provide_session(db) as session:
    result = session.execute("SELECT * FROM users WHERE id = ?", [1])

Configuration (DuckDB):

from sqlspec import SQLSpec
from sqlspec.adapters.adbc import AdbcConfig

sql = SQLSpec()
db = sql.add_config(
    AdbcConfig(
        connection_config={
            "driver_name": "duckdb",
            "uri": "/path/to/analytics.db"
        }
    )
)

with sql.provide_session(db) as session:
    result = session.execute("SELECT * FROM 'data.parquet' LIMIT 10")

API Reference:

class sqlspec.adapters.adbc.AdbcConfig[source]

Bases: NoPoolSyncConfig[Connection, AdbcDriver]

ADBC configuration for Arrow Database Connectivity.

ADBC provides an interface for connecting to multiple database systems with Arrow-native data transfer.

Supports multiple database backends including PostgreSQL, SQLite, DuckDB, BigQuery, and Snowflake with automatic driver detection and loading.

driver_type

alias of AdbcDriver

connection_type

alias of Connection

supports_transactional_ddl: ClassVar[bool] = False
supports_native_arrow_export: ClassVar[bool] = True
supports_native_arrow_import: ClassVar[bool] = True
supports_native_parquet_export: ClassVar[bool] = True
supports_native_parquet_import: ClassVar[bool] = True
storage_partition_strategies: ClassVar[tuple[str, ...]] = ('fixed', 'rows_per_chunk')
__init__(*, connection_config=None, migration_config=None, statement_config=None, driver_features=None, bind_key=None, extension_config=None, observability_config=None)[source]

Initialize configuration.

Parameters:
  • connection_config (AdbcConnectionParams | dict[str, Any] | None) – Connection configuration parameters

  • migration_config (dict[str, Any] | None) – Migration configuration

  • statement_config (StatementConfig | None) – Default SQL statement configuration

  • driver_features (AdbcDriverFeatures | dict[str, typing.Any] | None) – Driver feature configuration (AdbcDriverFeatures)

  • bind_key (str | None) – Optional unique identifier for this configuration

  • extension_config (dict[str, dict[str, Any] | LitestarConfig | FastAPIConfig | StarletteConfig | FlaskConfig | ADKConfig | OpenTelemetryConfig | PrometheusConfig] | None) – Extension-specific configuration (e.g., Litestar plugin settings)

  • observability_config (ObservabilityConfig | None) – Adapter-level observability overrides for lifecycle hooks and observers

connection_config: dict[str, Any]
create_connection()[source]

Create and return a new connection using the specified driver.

Return type:

Connection

Returns:

A new connection instance.

Raises:

ImproperConfigurationError – If the connection could not be established.

provide_connection(*args, **kwargs)[source]

Provide a connection context manager.

Parameters:
  • *args (Any) – Additional arguments.

  • **kwargs (Any) – Additional keyword arguments.

Yields:

A connection instance.

Return type:

Generator[Connection, None, None]

provide_session(*args, statement_config=None, **kwargs)[source]

Provide a driver session context manager.

Parameters:
  • *args (Any) – Additional arguments.

  • statement_config (StatementConfig | None) – Optional statement configuration override.

  • **kwargs (Any) – Additional keyword arguments.

Return type:

AbstractContextManager[AdbcDriver]

Returns:

A context manager that yields an AdbcDriver instance.

get_signature_namespace()[source]

Get the signature namespace for types.

Return type:

dict[str, typing.Any]

Returns:

Dictionary mapping type names to types.

class sqlspec.adapters.adbc.AdbcDriver[source]

Bases: SyncDriverAdapterBase

ADBC driver for Arrow Database Connectivity.

Provides database connectivity through ADBC with support for multiple database dialects, parameter style conversion, and transaction management.

__init__(connection, statement_config=None, driver_features=None)[source]

Initialize driver adapter with connection and configuration.

Parameters:
  • connection (Connection) – Database connection instance

  • statement_config (StatementConfig | None) – Statement configuration for the driver

  • driver_features (dict[str, typing.Any] | None) – Driver-specific features like extensions, secrets, and connection callbacks

  • observability – Optional runtime handling lifecycle hooks, observers, and spans

dialect: DialectType | None
prepare_driver_parameters(parameters, statement_config, is_many=False, prepared_statement=None)[source]

Prepare parameters with cast-aware type coercion for ADBC.

For PostgreSQL, applies cast-aware parameter processing using metadata from the compiled statement. This allows proper handling of JSONB casts and other type conversions. Respects driver_features[‘enable_cast_detection’] configuration.

Parameters:
  • parameters (Any) – Parameters in any format

  • statement_config (StatementConfig) – Statement configuration

  • is_many (bool) – Whether this is for execute_many operation

  • prepared_statement (Any | None) – Prepared statement containing the original SQL statement

Return type:

Any

Returns:

Parameters with cast-aware type coercion applied

with_cursor(connection)[source]

Create context manager for cursor.

Parameters:

connection (Connection) – Database connection

Return type:

AdbcCursor

Returns:

Cursor context manager

handle_database_exceptions()[source]

Handle database-specific exceptions and wrap them appropriately.

Return type:

AbstractContextManager[None]

Returns:

Exception handler context manager

begin()[source]

Begin database transaction.

Return type:

None

rollback()[source]

Rollback database transaction.

Return type:

None

commit()[source]

Commit database transaction.

Return type:

None

property data_dictionary: SyncDataDictionaryBase

Get the data dictionary for this driver.

Returns:

Data dictionary instance for metadata queries

select_to_arrow(statement, /, *parameters, statement_config=None, return_format='table', native_only=False, batch_size=None, arrow_schema=None, **kwargs)[source]

Execute query and return results as Apache Arrow (ADBC native path).

ADBC provides zero-copy Arrow support via cursor.fetch_arrow_table(). This is 5-10x faster than the conversion path for large datasets.

Parameters:
  • statement – SQL statement, string, or QueryBuilder

  • *parameters – Query parameters or filters

  • statement_config – Optional statement configuration override

  • return_format – “table” for pyarrow.Table (default), “batch” for RecordBatch

  • native_only – Ignored for ADBC (always uses native path)

  • batch_size – Batch size hint (for future streaming implementation)

  • arrow_schema – Optional pyarrow.Schema for type casting

  • **kwargs – Additional keyword arguments

Returns:

ArrowResult with native Arrow data

Example

>>> result = driver.select_to_arrow(
...     "SELECT * FROM users WHERE age > $1", 18
... )
>>> df = result.to_pandas()  # Fast zero-copy conversion
select_to_storage(statement, destination, /, *parameters, statement_config=None, partitioner=None, format_hint=None, telemetry=None, **kwargs)[source]

Stream query results to storage via the Arrow fast path.

load_from_arrow(table, source, *, partitioner=None, overwrite=False, telemetry=None)[source]

Ingest an Arrow payload directly through the ADBC cursor.

load_from_storage(table, source, *, file_format, partitioner=None, overwrite=False)[source]

Read an artifact from storage and ingest it via ADBC.

Return type:

StorageBridgeJob

Adapter Architecture

Common Patterns

All SQLSpec adapters follow consistent architectural patterns:

Configuration Class

Each adapter provides a configuration class that inherits from either:

  • AsyncDatabaseConfig - For async adapters

  • SyncDatabaseConfig - For sync adapters

  • NoPoolSyncConfig - For stateless adapters (ADBC, BigQuery)

Configuration classes handle:

  • Connection parameters

  • Pool settings (when applicable)

  • Driver-specific features

  • Statement configuration

  • Migration settings

Driver Class

Driver classes inherit from:

  • BaseAsyncDriver - For async query execution

  • BaseSyncDriver - For sync query execution

Driver classes provide:

  • Query execution methods (execute, select_one, select_all, etc.)

  • Transaction management (begin, commit, rollback)

  • Result processing and type mapping

  • Parameter binding

Type Mappings

Each adapter includes database-specific type mappings in _types.py:

  • Python to database type conversions

  • Database to Python type conversions

  • Custom type handlers (JSON, UUID, arrays, etc.)

Parameter Binding

SQLSpec automatically converts parameter placeholders to database-specific styles:

Database

Adapter

Style

Example

PostgreSQL

asyncpg, psqlpy

$1, $2

SELECT * FROM users WHERE id = $1

PostgreSQL

psycopg

%s

SELECT * FROM users WHERE id = %s

SQLite

sqlite, aiosqlite

?

SELECT * FROM users WHERE id = ?

MySQL

asyncmy

%s

SELECT * FROM users WHERE id = %s

DuckDB

duckdb

?

SELECT * FROM users WHERE id = ?

Oracle

oracledb

:1, :2

SELECT * FROM users WHERE id = :1

BigQuery

bigquery

@param

SELECT * FROM users WHERE id = @user_id

ADBC

adbc (varies)

Backend-specific

See backend documentation

Connection Pooling Types

Native Pooling

Adapters with native pooling use database driver’s built-in pool:

  • asyncpg (asyncpg.Pool)

  • psycopg (psycopg_pool)

  • asyncmy (native pooling)

  • oracledb (oracledb.create_pool)

Custom Pooling

SQLSpec provides custom pooling for adapters without native support:

  • sqlite (thread-local pooling)

  • aiosqlite (async pooling)

  • duckdb (connection pooling)

No Pooling

Stateless adapters create connections per request:

  • ADBC (stateless Arrow connections)

  • BigQuery (stateless HTTP API)

Type Handler Pattern

SQLSpec supports optional type handlers for database-specific features that require external dependencies. Type handlers enable automatic type conversion at the database driver level.

Overview

Type handlers differ from type converters:

Type Converters (type_converter.py):
  • Transform data after retrieval or before insertion

  • Pure Python transformations

  • Examples: JSON detection, datetime formatting

Type Handlers (_<feature>_handlers.py):
  • Register with database driver for automatic conversion

  • Require optional dependencies

  • Examples: pgvector, NumPy arrays

Graceful Degradation

Type handlers gracefully degrade when optional dependencies are not installed:

  • Detection via sqlspec._typing constants (NUMPY_INSTALLED, PGVECTOR_INSTALLED)

  • Auto-enable in driver_features when dependency available

  • No errors if dependency missing - simply returns unconverted values

Optional Type Support Matrix

Adapter

Feature

Optional Dependency

Configuration

oracledb

NumPy VECTOR

numpy

driver_features["enable_numpy_vectors"]

asyncpg

pgvector

pgvector

Auto-enabled (driver_features["enable_pgvector"])

psycopg

pgvector

pgvector

Auto-enabled (driver_features["enable_pgvector"])

Configuring Type Handlers

Type handlers are configured via the driver_features parameter:

Automatic Detection (Recommended):

# NumPy vectors - auto-enabled when numpy installed
from sqlspec import SQLSpec
from sqlspec.adapters.oracledb import OracleAsyncConfig

sql = SQLSpec()
db = sql.add_config(OracleAsyncConfig(
    pool_config={"dsn": "localhost:1521/FREEPDB1"}
    # enable_numpy_vectors automatically set to True if numpy installed
))

Explicit Configuration:

# Explicitly disable optional feature
db = sql.add_config(OracleAsyncConfig(
    pool_config={"dsn": "localhost:1521/FREEPDB1"},
    driver_features={"enable_numpy_vectors": False}  # Force disable
))

Check Feature Status:

from sqlspec._typing import NUMPY_INSTALLED, PGVECTOR_INSTALLED

print(f"NumPy available: {NUMPY_INSTALLED}")
print(f"pgvector available: {PGVECTOR_INSTALLED}")

NumPy VECTOR Support (Oracle)

Oracle Database 23ai introduces the VECTOR data type for AI/ML embeddings. SQLSpec provides seamless NumPy integration:

Requirements: - Oracle Database 23ai or higher - numpy package installed - driver_features["enable_numpy_vectors"]=True (auto-enabled)

Supported dtypes: - float32VECTOR(*, FLOAT32) - General embeddings (recommended) - float64VECTOR(*, FLOAT64) - High-precision embeddings - int8VECTOR(*, INT8) - Quantized embeddings - uint8VECTOR(*, BINARY) - Binary/hash vectors

Example Usage:

See the Oracle adapter documentation above for complete examples.

pgvector Support (PostgreSQL)

PostgreSQL’s pgvector extension enables vector similarity search. SQLSpec automatically registers pgvector support when available:

Requirements: - PostgreSQL with pgvector extension installed - pgvector Python package installed - Automatic registration (no configuration needed) - Extension registration failures are downgraded to debug logs, so missing pgvector keeps the connection usable without vector support

Adapters with pgvector support: - asyncpg - Async PostgreSQL driver - psycopg - Sync and async PostgreSQL driver

Example Usage:

from sqlspec import SQLSpec
from sqlspec.adapters.asyncpg import AsyncpgConfig

sql = SQLSpec()
db = sql.add_config(AsyncpgConfig(
    pool_config={"dsn": "postgresql://localhost/mydb"}
    # pgvector automatically registered if available
))

async with sql.provide_session(db) as session:
    # Enable pgvector extension
    await session.execute("CREATE EXTENSION IF NOT EXISTS vector")

    # Create table with vector column
    await session.execute("""
        CREATE TABLE items (
            id SERIAL PRIMARY KEY,
            embedding vector(384)
        )
    """)

    # Insert vector (NumPy array or list)
    embedding = [0.1, 0.2, 0.3] * 128  # 384 dimensions
    await session.execute(
        "INSERT INTO items (embedding) VALUES ($1)",
        [embedding]
    )

    # Vector similarity search
    results = await session.select_all("""
        SELECT id, embedding <-> $1 as distance
        FROM items
        ORDER BY distance
        LIMIT 5
    """, [embedding])

Implementing Custom Type Handlers

To add type handlers for a new optional feature:

  1. Define detection constant in sqlspec/_typing.py:

    try:
        import optional_package
        OPTIONAL_PACKAGE_INSTALLED = True
    except ImportError:
        OPTIONAL_PACKAGE_INSTALLED = False
    
  2. Create handler module (adapters/<adapter>/_feature_handlers.py):

    from sqlspec._typing import OPTIONAL_PACKAGE_INSTALLED
    
    def register_handlers(connection):
        if not OPTIONAL_PACKAGE_INSTALLED:
            logger.debug("Optional package not installed")
            return
        # Register handlers with connection
    
  3. Update config to auto-detect and initialize:

    from sqlspec._typing import OPTIONAL_PACKAGE_INSTALLED
    
    class Config(AsyncDatabaseConfig):
        def __init__(self, *, driver_features=None, **kwargs):
            if driver_features is None:
                driver_features = {}
            if "enable_feature" not in driver_features:
                driver_features["enable_feature"] = OPTIONAL_PACKAGE_INSTALLED
            super().__init__(driver_features=driver_features, **kwargs)
    
        async def _init_connection(self, connection):
            if self.driver_features.get("enable_feature"):
                from ._feature_handlers import register_handlers
                register_handlers(connection)
    
  4. Add tests (unit and integration):

    import pytest
    from sqlspec._typing import OPTIONAL_PACKAGE_INSTALLED
    
    @pytest.mark.skipif(not OPTIONAL_PACKAGE_INSTALLED, reason="Package not installed")
    async def test_feature_roundtrip(session):
        # Test with optional package
        pass
    

Creating Custom Adapters

For information on creating custom database adapters, see:

See Also