Adapters¶
SQLSpec provides database adapters that enable connections to various database systems. Each adapter consists of configuration classes and driver implementations tailored to specific databases, providing a consistent interface while respecting database-specific capabilities.
Overview¶
Available adapters:
PostgreSQL: asyncpg, psycopg, psqlpy
SQLite: sqlite, aiosqlite
MySQL: asyncmy
DuckDB: duckdb
Oracle: oracledb
BigQuery: bigquery
Cross-Database: ADBC (Arrow Database Connectivity)
Each adapter implementation includes:
Configuration class - Connection and pool settings
Driver class - Query execution and transaction management
Type mappings - Database-specific type conversions
Parameter binding - Automatic parameter style conversion
PostgreSQL¶
AsyncPG¶
Homepage: https://github.com/MagicStack/asyncpg
PyPI: https://pypi.org/project/asyncpg/
Concurrency: Async-only
Connection Pooling: Native pooling via asyncpg.Pool
Parameter Style: $1, $2, $3 (PostgreSQL positional placeholders)
Special Features:
Binary protocol for efficient data transfer
pgvector support (automatic registration when
pgvectoris installed)Native JSON/JSONB type codecs
Statement caching
Prepared statements
Known Limitations:
Async-only (no synchronous support)
Requires async/await syntax throughout application
Installation:
uv add sqlspec[asyncpg]
Configuration:
from sqlspec import SQLSpec
from sqlspec.adapters.asyncpg import AsyncpgConfig, AsyncpgPoolConfig
sql = SQLSpec()
db = sql.add_config(
AsyncpgConfig(
pool_config=AsyncpgPoolConfig(
dsn="postgresql://user:password@localhost:5432/mydb",
min_size=5,
max_size=20,
command_timeout=60.0
)
)
)
async with sql.provide_session(db) as session:
result = await session.execute("SELECT * FROM users WHERE id = $1", [1])
user = result.one()
API Reference:
- class sqlspec.adapters.asyncpg.AsyncpgConfig[source]¶
Bases:
AsyncDatabaseConfig[PoolConnectionProxy,Pool[Record],AsyncpgDriver]Configuration for AsyncPG database connections using TypedDict.
- driver_type¶
alias of
AsyncpgDriver
- connection_type¶
alias of
PoolConnectionProxy
- supports_transactional_ddl: ClassVar[bool] = True¶
- supports_native_arrow_export: ClassVar[bool] = True¶
- supports_native_arrow_import: ClassVar[bool] = True¶
- supports_native_parquet_export: ClassVar[bool] = True¶
- supports_native_parquet_import: ClassVar[bool] = True¶
- __init__(*, pool_config=None, pool_instance=None, migration_config=None, statement_config=None, driver_features=None, bind_key=None, extension_config=None, observability_config=None)[source]¶
Initialize AsyncPG configuration.
- Parameters:
pool_config¶ – Pool configuration parameters (TypedDict or dict)
pool_instance¶ – Existing pool instance to use
migration_config¶ – Migration configuration
statement_config¶ – Statement configuration override
driver_features¶ – Driver features configuration (TypedDict or dict)
bind_key¶ – Optional unique identifier for this configuration
extension_config¶ – Extension-specific configuration (e.g., Litestar plugin settings)
observability_config¶ – Adapter-level observability overrides for lifecycle hooks and observers
- async create_connection()[source]¶
Create a single async connection from the pool.
- Return type:
PoolConnectionProxy- Returns:
An AsyncPG connection instance.
- provide_session(*args, statement_config=None, **kwargs)[source]¶
Provide an async driver session context manager.
- Parameters:
- Yields:
An AsyncpgDriver instance.
- Return type:
- async provide_pool(*args, **kwargs)[source]¶
Provide async pool instance.
- Return type:
Record- Returns:
The async connection pool.
- get_signature_namespace()[source]¶
Get the signature namespace for AsyncPG types.
This provides all AsyncPG-specific types that Litestar needs to recognize to avoid serialization attempts.
- pool_config¶
- class sqlspec.adapters.asyncpg.AsyncpgDriver[source]¶
Bases:
AsyncDriverAdapterBaseAsyncPG PostgreSQL driver for async database operations.
Supports COPY operations, numeric parameter style handling, PostgreSQL exception handling, transaction management, SQL statement compilation and caching, and parameter processing with type coercion.
- dialect: DialectType | None = 'postgres'¶
- __init__(connection, statement_config=None, driver_features=None)[source]¶
Initialize driver adapter with connection and configuration.
- Parameters:
connection¶ (
PoolConnectionProxy) – Database connection instancestatement_config¶ (
StatementConfig|None) – Statement configuration for the driverdriver_features¶ (
dict[str, typing.Any] |None) – Driver-specific features like extensions, secrets, and connection callbacksobservability¶ – Optional runtime handling lifecycle hooks, observers, and spans
- with_cursor(connection)[source]¶
Create context manager for AsyncPG cursor.
- Return type:
AsyncpgCursor
- handle_database_exceptions()[source]¶
Handle database exceptions with PostgreSQL error codes.
- Return type:
- async execute_stack(stack, *, continue_on_error=False)[source]¶
Execute a StatementStack using asyncpg’s rapid batching.
- Return type:
- async select_to_storage(statement, destination, /, *parameters, statement_config=None, partitioner=None, format_hint=None, telemetry=None, **kwargs)[source]¶
Execute a query and persist results to storage once native COPY is available.
- Return type:
StorageBridgeJob
- async load_from_arrow(table, source, *, partitioner=None, overwrite=False, telemetry=None)[source]¶
Load Arrow data into a PostgreSQL table via COPY.
- async load_from_storage(table, source, *, file_format, partitioner=None, overwrite=False)[source]¶
Read an artifact from storage and ingest it via COPY.
- Return type:
StorageBridgeJob
- property data_dictionary: AsyncDataDictionaryBase¶
Get the data dictionary for this driver.
- Returns:
Data dictionary instance for metadata queries
- class sqlspec.adapters.asyncpg.AsyncpgPoolConfig[source]¶
Bases:
AsyncpgConnectionConfigTypedDict for AsyncPG pool parameters, inheriting connection parameters.
-
setup:
NotRequired[Callable[[AsyncpgConnection], Awaitable[None]]]¶
-
init:
NotRequired[Callable[[AsyncpgConnection], Awaitable[None]]]¶
-
loop:
NotRequired[AbstractEventLoop]¶
-
setup:
Psycopg¶
Homepage: https://github.com/psycopg/psycopg
PyPI: https://pypi.org/project/psycopg/
Concurrency: Sync and async
Connection Pooling: Native pooling via psycopg_pool
Parameter Style: %s, %s, %s (PostgreSQL format style)
Special Features:
Both synchronous and asynchronous support
Modern psycopg3 API
pgvector support (automatic registration when
pgvectoris installed)Server-side cursors
Pipeline mode for batch operations
Known Limitations:
Separate driver classes for sync (
PsycopgSyncDriver) and async (PsycopgAsyncDriver)
Installation:
uv add sqlspec[psycopg]
Configuration (Async):
from sqlspec import SQLSpec
from sqlspec.adapters.psycopg import PsycopgAsyncConfig
sql = SQLSpec()
db = sql.add_config(
PsycopgAsyncConfig(
pool_config={
"conninfo": "postgresql://user:password@localhost:5432/mydb",
"min_size": 5,
"max_size": 20
}
)
)
async with sql.provide_session(db) as session:
result = await session.execute("SELECT * FROM users WHERE id = %s", [1])
Configuration (Sync):
from sqlspec import SQLSpec
from sqlspec.adapters.psycopg import PsycopgSyncConfig
sql = SQLSpec()
db = sql.add_config(
PsycopgSyncConfig(
pool_config={
"conninfo": "postgresql://user:password@localhost:5432/mydb",
"min_size": 5,
"max_size": 20
}
)
)
with sql.provide_session(db) as session:
result = session.execute("SELECT * FROM users WHERE id = %s", [1])
API Reference:
- class sqlspec.adapters.psycopg.PsycopgSyncDriver[source]¶
Bases:
PsycopgPipelineMixin,SyncDriverAdapterBasePostgreSQL psycopg synchronous driver.
Provides synchronous database operations for PostgreSQL using psycopg3. Supports SQL statement execution with parameter binding, transaction management, result processing with column metadata, parameter style conversion, PostgreSQL arrays and JSON handling, COPY operations for bulk data transfer, and PostgreSQL-specific error handling.
- dialect: DialectType | None = 'postgres'¶
- with_cursor(connection)[source]¶
Create context manager for PostgreSQL cursor.
- Return type:
PsycopgSyncCursor
- handle_database_exceptions()[source]¶
Handle database-specific exceptions and wrap them appropriately.
- Return type:
- execute_stack(stack, *, continue_on_error=False)[source]¶
Execute a StatementStack using psycopg pipeline mode when supported.
- Return type:
- select_to_storage(statement, destination, /, *parameters, statement_config=None, partitioner=None, format_hint=None, telemetry=None, **kwargs)[source]¶
Execute a query and stream Arrow results to storage (sync).
- Return type:
StorageBridgeJob
- load_from_arrow(table, source, *, partitioner=None, overwrite=False, telemetry=None)[source]¶
Load Arrow data into PostgreSQL using COPY.
- load_from_storage(table, source, *, file_format, partitioner=None, overwrite=False)[source]¶
Load staged artifacts into PostgreSQL via COPY.
- Return type:
StorageBridgeJob
- property data_dictionary: SyncDataDictionaryBase¶
Get the data dictionary for this driver.
- Returns:
Data dictionary instance for metadata queries
- class sqlspec.adapters.psycopg.PsycopgAsyncDriver[source]¶
Bases:
PsycopgPipelineMixin,AsyncDriverAdapterBasePostgreSQL psycopg asynchronous driver.
Provides asynchronous database operations for PostgreSQL using psycopg3. Supports async SQL statement execution with parameter binding, async transaction management, async result processing with column metadata, parameter style conversion, PostgreSQL arrays and JSON handling, COPY operations for bulk data transfer, PostgreSQL-specific error handling, and async pub/sub support.
- dialect: DialectType | None = 'postgres'¶
- with_cursor(connection)[source]¶
Create async context manager for PostgreSQL cursor.
- Return type:
PsycopgAsyncCursor
- handle_database_exceptions()[source]¶
Handle database-specific exceptions and wrap them appropriately.
- Return type:
- async execute_stack(stack, *, continue_on_error=False)[source]¶
Execute a StatementStack using psycopg async pipeline when supported.
- Return type:
- async select_to_storage(statement, destination, /, *parameters, statement_config=None, partitioner=None, format_hint=None, telemetry=None, **kwargs)[source]¶
Execute a query and stream Arrow data to storage asynchronously.
- Return type:
StorageBridgeJob
- async load_from_arrow(table, source, *, partitioner=None, overwrite=False, telemetry=None)[source]¶
Load Arrow data into PostgreSQL asynchronously via COPY.
- async load_from_storage(table, source, *, file_format, partitioner=None, overwrite=False)[source]¶
Load staged artifacts asynchronously.
- Return type:
StorageBridgeJob
- property data_dictionary: AsyncDataDictionaryBase¶
Get the data dictionary for this driver.
- Returns:
Data dictionary instance for metadata queries
psqlpy¶
Homepage: https://github.com/psqlpy-python/psqlpy
PyPI: https://pypi.org/project/psqlpy/
Concurrency: Async-only
Connection Pooling: Native pooling (Rust-based implementation)
Parameter Style: $1, $2, $3 (PostgreSQL positional placeholders)
Special Features:
Rust-based driver for memory efficiency
Connection pooling
Transaction support
Type conversion
Known Limitations:
Async-only (no synchronous support)
Smaller ecosystem than asyncpg or psycopg
Installation:
uv add sqlspec[psqlpy]
Configuration:
from sqlspec import SQLSpec
from sqlspec.adapters.psqlpy import PsqlpyConfig
sql = SQLSpec()
db = sql.add_config(
PsqlpyConfig(
pool_config={
"dsn": "postgresql://user:password@localhost:5432/mydb",
"max_pool_size": 20
}
)
)
async with sql.provide_session(db) as session:
result = await session.execute("SELECT * FROM users WHERE id = $1", [1])
API Reference:
SQLite¶
sqlite¶
Homepage: Built-in Python module
PyPI: N/A (included with Python)
Concurrency: Sync-only
Connection Pooling: Custom thread-local pooling
Parameter Style: ?, ?, ? (SQLite positional placeholders)
Special Features:
No external dependencies
File-based or in-memory databases
Thread-safe connections
Custom connection pooling for concurrency
Known Limitations:
Synchronous only (no async support)
Limited concurrent write performance
Thread-local connection management
Installation:
# No installation required - built into Python
uv add sqlspec
Configuration (In-Memory):
from sqlspec import SQLSpec
from sqlspec.adapters.sqlite import SqliteConfig
sql = SQLSpec()
db = sql.add_config(SqliteConfig(database=":memory:"))
with sql.provide_session(db) as session:
session.execute("CREATE TABLE users (id INTEGER PRIMARY KEY, name TEXT)")
session.execute("INSERT INTO users (name) VALUES (?)", ["Alice"])
Configuration (File-Based):
from sqlspec import SQLSpec
from sqlspec.adapters.sqlite import SqliteConfig
sql = SQLSpec()
db = sql.add_config(
SqliteConfig(
pool_config={
"database": "/path/to/database.db",
"timeout": 30.0,
"check_same_thread": False
}
)
)
API Reference:
- class sqlspec.adapters.sqlite.SqliteConfig[source]¶
Bases:
SyncDatabaseConfig[Connection,SqliteConnectionPool,SqliteDriver]SQLite configuration with thread-local connections.
- driver_type¶
alias of
SqliteDriver
- connection_type¶
alias of
Connection
- supports_transactional_ddl: ClassVar[bool] = True¶
- supports_native_arrow_export: ClassVar[bool] = True¶
- supports_native_arrow_import: ClassVar[bool] = True¶
- supports_native_parquet_export: ClassVar[bool] = True¶
- supports_native_parquet_import: ClassVar[bool] = True¶
- __init__(*, pool_config=None, pool_instance=None, migration_config=None, statement_config=None, driver_features=None, bind_key=None, extension_config=None, observability_config=None)[source]¶
Initialize SQLite configuration.
- Parameters:
pool_config¶ (
SqliteConnectionParams|dict[str, typing.Any] |None) – Configuration parameters including connection settingspool_instance¶ (
SqliteConnectionPool|None) – Pre-created pool instancemigration_config¶ (
dict[str, typing.Any] |None) – Migration configurationstatement_config¶ (
StatementConfig|None) – Default SQL statement configurationdriver_features¶ (
SqliteDriverFeatures|dict[str, typing.Any] |None) – Optional driver feature configurationbind_key¶ (
str|None) – Optional bind key for the configurationextension_config¶ (
dict[str,dict[str,Any] |LitestarConfig|FastAPIConfig|StarletteConfig|FlaskConfig|ADKConfig|OpenTelemetryConfig|PrometheusConfig] |None) – Extension-specific configuration (e.g., Litestar plugin settings)observability_config¶ (
ObservabilityConfig|None) – Adapter-level observability overrides for lifecycle hooks and observers
- create_connection()[source]¶
Get a SQLite connection from the pool.
- Returns:
A connection from the pool
- Return type:
SqliteConnection
- provide_connection(*args, **kwargs)[source]¶
Provide a SQLite connection context manager.
- Yields:
SqliteConnection – A thread-local connection
- Return type:
- provide_session(*args, statement_config=None, **kwargs)[source]¶
Provide a SQLite driver session.
- Yields:
SqliteDriver – A driver instance with thread-local connection
- Return type:
- pool_config¶
- class sqlspec.adapters.sqlite.SqliteDriver[source]¶
Bases:
SyncDriverAdapterBaseSQLite driver implementation.
Provides SQL statement execution, transaction management, and result handling for SQLite databases using the standard sqlite3 module.
- dialect: DialectType | None = 'sqlite'¶
- __init__(connection, statement_config=None, driver_features=None)[source]¶
Initialize SQLite driver.
- Parameters:
connection¶ (
Connection) – SQLite database connectionstatement_config¶ (
StatementConfig|None) – Statement configuration settingsdriver_features¶ (
dict[str, typing.Any] |None) – Driver-specific feature flags
- with_cursor(connection)[source]¶
Create context manager for SQLite cursor.
- Parameters:
connection¶ (
Connection) – SQLite database connection- Return type:
SqliteCursor- Returns:
Cursor context manager for safe cursor operations
- handle_database_exceptions()[source]¶
Handle database-specific exceptions and wrap them appropriately.
- Return type:
- Returns:
Context manager that converts SQLite exceptions to SQLSpec exceptions
- select_to_storage(statement, destination, /, *parameters, statement_config=None, partitioner=None, format_hint=None, telemetry=None, **kwargs)[source]¶
Execute a query and write Arrow-compatible output to storage (sync).
- Return type:
StorageBridgeJob
- load_from_arrow(table, source, *, partitioner=None, overwrite=False, telemetry=None)[source]¶
Load Arrow data into SQLite using batched inserts.
- load_from_storage(table, source, *, file_format, partitioner=None, overwrite=False)[source]¶
Load staged artifacts from storage into SQLite.
- Return type:
StorageBridgeJob
- begin()[source]¶
Begin a database transaction.
- Raises:
SQLSpecError – If transaction cannot be started
- Return type:
- rollback()[source]¶
Rollback the current transaction.
- Raises:
SQLSpecError – If transaction cannot be rolled back
- Return type:
- commit()[source]¶
Commit the current transaction.
- Raises:
SQLSpecError – If transaction cannot be committed
- Return type:
- property data_dictionary: SyncDataDictionaryBase¶
Get the data dictionary for this driver.
- Returns:
Data dictionary instance for metadata queries
aiosqlite¶
Homepage: https://github.com/omnilib/aiosqlite
PyPI: https://pypi.org/project/aiosqlite/
Concurrency: Async-only
Connection Pooling: Custom pooling
Parameter Style: ?, ?, ? (SQLite positional placeholders)
Special Features:
Async wrapper around Python’s sqlite3 module
Same SQLite features as synchronous version
Compatible with async frameworks
Known Limitations:
Async operations run on thread pool (not true async I/O)
Limited concurrent write performance (SQLite limitation)
Installation:
uv add sqlspec[aiosqlite]
Configuration:
from sqlspec import SQLSpec
from sqlspec.adapters.aiosqlite import AiosqliteConfig
sql = SQLSpec()
db = sql.add_config(
AiosqliteConfig(
pool_config={
"database": "/path/to/database.db",
"timeout": 30.0
}
)
)
async with sql.provide_session(db) as session:
await session.execute("CREATE TABLE users (id INTEGER PRIMARY KEY, name TEXT)")
await session.execute("INSERT INTO users (name) VALUES (?)", ["Alice"])
API Reference:
- class sqlspec.adapters.aiosqlite.AiosqliteConfig[source]¶
Bases:
AsyncDatabaseConfig[AiosqliteConnection,AiosqliteConnectionPool,AiosqliteDriver]Database configuration for AioSQLite engine.
- driver_type¶
alias of
AiosqliteDriver
- supports_transactional_ddl: ClassVar[bool] = True¶
- supports_native_arrow_export: ClassVar[bool] = True¶
- supports_native_arrow_import: ClassVar[bool] = True¶
- supports_native_parquet_export: ClassVar[bool] = True¶
- supports_native_parquet_import: ClassVar[bool] = True¶
- __init__(*, pool_config=None, pool_instance=None, migration_config=None, statement_config=None, driver_features=None, bind_key=None, extension_config=None, observability_config=None)[source]¶
Initialize AioSQLite configuration.
- Parameters:
pool_config¶ (
AiosqlitePoolParams|dict[str, typing.Any] |None) – Pool configuration parameters (TypedDict or dict)pool_instance¶ (
AiosqliteConnectionPool|None) – Optional pre-configured connection pool instance.migration_config¶ (
dict[str, typing.Any] |None) – Optional migration configuration.statement_config¶ (
StatementConfig|None) – Optional statement configuration.driver_features¶ (
AiosqliteDriverFeatures|dict[str, typing.Any] |None) – Optional driver feature configuration.bind_key¶ (
str|None) – Optional unique identifier for this configuration.extension_config¶ (
dict[str,dict[str,Any] |LitestarConfig|FastAPIConfig|StarletteConfig|FlaskConfig|ADKConfig|OpenTelemetryConfig|PrometheusConfig] |None) – Extension-specific configuration (e.g., Litestar plugin settings)observability_config¶ (
ObservabilityConfig|None) – Adapter-level observability overrides for lifecycle hooks and observers
- provide_session(*_args, statement_config=None, **_kwargs)[source]¶
Provide an async driver session context manager.
- Parameters:
- Yields:
An AiosqliteDriver instance.
- Return type:
- async create_connection()[source]¶
Create a single async connection from the pool.
- Return type:
Connection- Returns:
An aiosqlite connection instance.
- async provide_pool()[source]¶
Provide async pool instance.
- Return type:
AiosqliteConnectionPool- Returns:
The async connection pool.
- pool_config¶
- class sqlspec.adapters.aiosqlite.AiosqliteDriver[source]¶
Bases:
AsyncDriverAdapterBaseAIOSQLite driver for async SQLite database operations.
- dialect: DialectType | None = 'sqlite'¶
- __init__(connection, statement_config=None, driver_features=None)[source]¶
Initialize driver adapter with connection and configuration.
- Parameters:
connection¶ (
Connection) – Database connection instancestatement_config¶ (
StatementConfig|None) – Statement configuration for the driverdriver_features¶ (
dict[str, typing.Any] |None) – Driver-specific features like extensions, secrets, and connection callbacksobservability¶ – Optional runtime handling lifecycle hooks, observers, and spans
- with_cursor(connection)[source]¶
Create async context manager for AIOSQLite cursor.
- Return type:
AiosqliteCursor
- async select_to_storage(statement, destination, /, *parameters, statement_config=None, partitioner=None, format_hint=None, telemetry=None, **kwargs)[source]¶
Execute a query and stream Arrow results into storage.
- Return type:
StorageBridgeJob
- async load_from_arrow(table, source, *, partitioner=None, overwrite=False, telemetry=None)[source]¶
Load Arrow data into SQLite using batched inserts.
- async load_from_storage(table, source, *, file_format, partitioner=None, overwrite=False)[source]¶
Load staged artifacts from storage into SQLite.
- Return type:
StorageBridgeJob
- property data_dictionary: AsyncDataDictionaryBase¶
Get the data dictionary for this driver.
- Returns:
Data dictionary instance for metadata queries
MySQL¶
asyncmy¶
Homepage: https://github.com/long2ice/asyncmy
PyPI: https://pypi.org/project/asyncmy/
Concurrency: Async-only
Connection Pooling: Native pooling
Parameter Style: %s, %s, %s (MySQL format style)
Special Features:
Cython-based implementation
Connection pooling
MySQL protocol support
Prepared statements
Known Limitations:
Async-only (no synchronous support)
Requires Cython build tools during installation
Installation:
uv add sqlspec[asyncmy]
Configuration:
from sqlspec import SQLSpec
from sqlspec.adapters.asyncmy import AsyncmyConfig
sql = SQLSpec()
db = sql.add_config(
AsyncmyConfig(
pool_config={
"host": "localhost",
"port": 3306,
"user": "root",
"password": "password",
"database": "mydb",
"minsize": 5,
"maxsize": 20
}
)
)
async with sql.provide_session(db) as session:
result = await session.execute("SELECT * FROM users WHERE id = %s", [1])
Driver Features:
json_serializer– Override JSON parameter serialization fordict/list/tupleinputs. Defaults tosqlspec.utils.serializers.to_json()and is invoked for every JSON parameter passed to AsyncMy.json_deserializer– Customize JSON result decoding. Defaults tosqlspec.utils.serializers.from_json()and automatically converts JSON columns into Python objects during fetches.
API Reference:
DuckDB¶
duckdb¶
Homepage: https://github.com/duckdb/duckdb
PyPI: https://pypi.org/project/duckdb/
Concurrency: Sync-only
Connection Pooling: Custom pooling
Parameter Style: ?, ?, ? (positional placeholders)
Special Features:
Embedded analytical database (OLAP)
Native Parquet and CSV support
Extension management (auto-install/load extensions)
Secrets management for API integrations
Arrow-native data transfer
Direct file querying without import
Shared memory databases for concurrency
Known Limitations:
Synchronous only (no async support)
Embedded database (not client-server)
Installation:
uv add sqlspec[duckdb]
Configuration (In-Memory):
from sqlspec import SQLSpec
from sqlspec.adapters.duckdb import DuckDBConfig
sql = SQLSpec()
db = sql.add_config(DuckDBConfig()) # Defaults to :memory:shared_db
with sql.provide_session(db) as session:
# Query Parquet file directly
result = session.execute("SELECT * FROM 'data.parquet' LIMIT 10")
Configuration (Persistent with Extensions):
from sqlspec import SQLSpec
from sqlspec.adapters.duckdb import DuckDBConfig, DuckDBExtensionConfig
sql = SQLSpec()
db = sql.add_config(
DuckDBConfig(
pool_config={
"database": "/path/to/analytics.db",
"threads": 4,
"memory_limit": "4GB"
},
driver_features={
"extensions": [
DuckDBExtensionConfig(name="httpfs"),
DuckDBExtensionConfig(name="parquet")
]
}
)
)
with sql.provide_session(db) as session:
# Query remote Parquet file
result = session.execute(
"SELECT * FROM 'https://example.com/data.parquet' LIMIT 10"
)
Community Extensions:
DuckDBConfig accepts the runtime flags DuckDB expects for community/unsigned extensions via
pool_config (for example allow_community_extensions=True,
allow_unsigned_extensions=True, enable_external_access=True). SQLSpec applies those
options with SET statements immediately after establishing each connection, so even older
DuckDB builds that do not recognize the options during duckdb.connect() will still enable the
required permissions before extensions are installed.
API Reference:
- class sqlspec.adapters.duckdb.DuckDBConfig[source]¶
Bases:
SyncDatabaseConfig[DuckDBPyConnection,DuckDBConnectionPool,DuckDBDriver]DuckDB configuration with connection pooling.
This configuration supports DuckDB’s features including:
Connection pooling
Extension management and installation
Secret management for API integrations
Auto configuration settings
Arrow integration
Direct file querying capabilities
Configurable type handlers for JSON serialization and UUID conversion
DuckDB Connection Pool Configuration: - Default pool size is 1-4 connections (DuckDB uses single connection by default) - Connection recycling is set to 24 hours by default (set to 0 to disable) - Shared memory databases use :memory:shared_db for proper concurrency
Type Handler Configuration via driver_features: - json_serializer: Custom JSON serializer for dict/list parameters.
Defaults to sqlspec.utils.serializers.to_json if not provided. Example: json_serializer=msgspec.json.encode(…).decode(‘utf-8’)
enable_uuid_conversion: Enable automatic UUID string conversion (default: True). When True, UUID strings in query results are automatically converted to UUID objects. When False, UUID strings are treated as regular strings.
Example
>>> import msgspec >>> from sqlspec.adapters.duckdb import DuckDBConfig >>> >>> # Custom JSON serializer >>> def custom_json(obj): ... return msgspec.json.encode(obj).decode("utf-8") >>> >>> config = DuckDBConfig( ... pool_config={"database": ":memory:"}, ... driver_features={ ... "json_serializer": custom_json, ... "enable_uuid_conversion": False, ... }, ... )
- driver_type¶
alias of
DuckDBDriver
- supports_transactional_ddl: ClassVar[bool] = True¶
- supports_native_arrow_export: ClassVar[bool] = True¶
- supports_native_arrow_import: ClassVar[bool] = True¶
- supports_native_parquet_export: ClassVar[bool] = True¶
- supports_native_parquet_import: ClassVar[bool] = True¶
- pool_config¶
- storage_partition_strategies: ClassVar[tuple[str, ...]] = ('fixed', 'rows_per_chunk', 'manifest')¶
- __init__(*, pool_config=None, pool_instance=None, migration_config=None, statement_config=None, driver_features=None, bind_key=None, extension_config=None, observability_config=None)[source]¶
Initialize DuckDB configuration.
- Parameters:
pool_config¶ (
DuckDBPoolParams|dict[str, typing.Any] |None) – Pool configuration parameterspool_instance¶ (
DuckDBConnectionPool|None) – Pre-created pool instancemigration_config¶ (
dict[str,Any] |None) – Migration configurationstatement_config¶ (
StatementConfig|None) – Statement configuration overridedriver_features¶ (
DuckDBDriverFeatures|dict[str, typing.Any] |None) – DuckDB-specific driver features including json_serializer and enable_uuid_conversion optionsbind_key¶ (
str|None) – Optional unique identifier for this configurationextension_config¶ (
dict[str,dict[str,Any] |LitestarConfig|FastAPIConfig|StarletteConfig|FlaskConfig|ADKConfig|OpenTelemetryConfig|PrometheusConfig] |None) – Extension-specific configuration (e.g., Litestar plugin settings)observability_config¶ (
ObservabilityConfig|None) – Adapter-level observability overrides for lifecycle hooks and observers
- create_connection()[source]¶
Get a DuckDB connection from the pool.
This method ensures the pool is created and returns a connection from the pool. The connection is checked out from the pool and must be properly managed by the caller.
- Returns:
A connection from the pool
- Return type:
DuckDBConnection
Note
For automatic connection management, prefer using provide_connection() or provide_session() which handle returning connections to the pool. The caller is responsible for returning the connection to the pool using pool.release(connection) when done.
- class sqlspec.adapters.duckdb.DuckDBDriver[source]¶
Bases:
SyncDriverAdapterBaseSynchronous DuckDB database driver.
Provides SQL statement execution, transaction management, and result handling for DuckDB databases. Supports multiple parameter styles including QMARK, NUMERIC, and NAMED_DOLLAR formats.
The driver handles script execution, batch operations, and integrates with the sqlspec.core modules for statement processing and caching.
- dialect: DialectType | None = 'duckdb'¶
- __init__(connection, statement_config=None, driver_features=None)[source]¶
Initialize driver adapter with connection and configuration.
- Parameters:
connection¶ (
DuckDBPyConnection) – Database connection instancestatement_config¶ (
StatementConfig|None) – Statement configuration for the driverdriver_features¶ (
dict[str, typing.Any] |None) – Driver-specific features like extensions, secrets, and connection callbacksobservability¶ – Optional runtime handling lifecycle hooks, observers, and spans
- with_cursor(connection)[source]¶
Create context manager for DuckDB cursor.
- Parameters:
connection¶ (
DuckDBPyConnection) – DuckDB connection instance- Return type:
DuckDBCursor- Returns:
DuckDBCursor context manager instance
- handle_database_exceptions()[source]¶
Handle database-specific exceptions and wrap them appropriately.
- Return type:
- Returns:
Context manager that catches and converts DuckDB exceptions
- property data_dictionary: SyncDataDictionaryBase¶
Get the data dictionary for this driver.
- Returns:
Data dictionary instance for metadata queries
- select_to_arrow(statement, /, *parameters, statement_config=None, return_format='table', native_only=False, batch_size=None, arrow_schema=None, **kwargs)[source]¶
Execute query and return results as Apache Arrow (DuckDB native path).
DuckDB provides native Arrow support via cursor.arrow(). This is the fastest path due to DuckDB’s columnar architecture.
- Parameters:
statement¶ – SQL statement, string, or QueryBuilder
*parameters¶ – Query parameters or filters
statement_config¶ – Optional statement configuration override
return_format¶ – “table” for pyarrow.Table (default), “batch” for RecordBatch
native_only¶ – Ignored for DuckDB (always uses native path)
batch_size¶ – Batch size hint (for future streaming implementation)
arrow_schema¶ – Optional pyarrow.Schema for type casting
**kwargs¶ – Additional keyword arguments
- Returns:
ArrowResult with native Arrow data
Example
>>> result = driver.select_to_arrow( ... "SELECT * FROM users WHERE age > ?", 18 ... ) >>> df = result.to_pandas() # Fast zero-copy conversion
- select_to_storage(statement, destination, /, *parameters, statement_config=None, partitioner=None, format_hint=None, telemetry=None, **kwargs)[source]¶
Persist DuckDB query output to a storage backend using Arrow fast paths.
- class sqlspec.adapters.duckdb.DuckDBExtensionConfig[source]¶
Bases:
TypedDictDuckDB extension configuration for auto-management.
BigQuery¶
bigquery¶
Homepage: https://github.com/googleapis/python-bigquery
PyPI: https://pypi.org/project/google-cloud-bigquery/
Concurrency: Sync-only
Connection Pooling: None (stateless HTTP API)
Parameter Style: @param1, @param2 (named parameters)
Special Features:
Google Cloud BigQuery integration
Serverless query execution
Standard SQL dialect
Automatic result pagination
Known Limitations:
Synchronous only (no async support)
Requires Google Cloud credentials
Query costs based on data scanned
No connection pooling (stateless API)
Installation:
uv add sqlspec[bigquery]
Configuration:
from google.oauth2 import service_account
from sqlspec import SQLSpec
from sqlspec.adapters.bigquery import BigQueryConfig
credentials = service_account.Credentials.from_service_account_file(
"/path/to/credentials.json"
)
sql = SQLSpec()
db = sql.add_config(
BigQueryConfig(
pool_config={
"project": "my-project-id",
"credentials": credentials
}
)
)
with sql.provide_session(db) as session:
result = session.execute(
"SELECT * FROM `project.dataset.table` WHERE id = @user_id",
{"user_id": 1}
)
API Reference:
Oracle¶
oracledb¶
Homepage: https://github.com/oracle/python-oracledb
PyPI: https://pypi.org/project/oracledb/
Concurrency: Sync and async
Connection Pooling: Native pooling
Parameter Style: :1, :2, :3 (Oracle positional placeholders)
Special Features:
Both synchronous and asynchronous support
Thin mode (pure Python) and thick mode (Oracle Client)
Oracle-specific data types (NUMBER, CLOB, BLOB, etc.)
Connection pooling
Two-phase commit support
NumPy VECTOR support (Oracle 23ai+) - automatic conversion for AI/ML embeddings
Known Limitations:
Separate configuration classes for sync and async
Thick mode requires Oracle Instant Client installation
VECTOR data type requires Oracle Database 23ai or higher
Installation:
uv add sqlspec[oracledb]
Configuration (Async, Thin Mode):
from sqlspec import SQLSpec
from sqlspec.adapters.oracledb import OracleAsyncConfig
sql = SQLSpec()
db = sql.add_config(
OracleAsyncConfig(
pool_config={
"user": "system",
"password": "oracle",
"dsn": "localhost:1521/XE",
"min": 5,
"max": 20
}
)
)
async with sql.provide_session(db) as session:
result = await session.execute("SELECT * FROM users WHERE id = :1", [1])
Configuration (Sync, Thin Mode):
from sqlspec import SQLSpec
from sqlspec.adapters.oracledb import OracleSyncConfig
sql = SQLSpec()
db = sql.add_config(
OracleSyncConfig(
pool_config={
"user": "system",
"password": "oracle",
"dsn": "localhost:1521/XE"
}
)
)
with sql.provide_session(db) as session:
result = session.execute("SELECT * FROM users WHERE id = :1", [1])
Configuration (with NumPy VECTOR Support - Oracle 23ai+):
import numpy as np
from sqlspec import SQLSpec
from sqlspec.adapters.oracledb import OracleAsyncConfig
sql = SQLSpec()
db = sql.add_config(
OracleAsyncConfig(
pool_config={
"user": "system",
"password": "oracle",
"dsn": "localhost:1521/FREEPDB1"
},
driver_features={
"enable_numpy_vectors": True # Enable automatic NumPy conversion
}
)
)
async with sql.provide_session(db) as session:
# Create table with VECTOR column
await session.execute("""
CREATE TABLE embeddings (
id NUMBER PRIMARY KEY,
text VARCHAR2(4000),
embedding VECTOR(768, FLOAT32)
)
""")
# Insert NumPy array - automatically converted to Oracle VECTOR
vector = np.random.rand(768).astype(np.float32)
await session.execute(
"INSERT INTO embeddings VALUES (:1, :2, :3)",
(1, "sample text", vector)
)
# Retrieve - automatically converted back to NumPy array
result = await session.select_one(
"SELECT * FROM embeddings WHERE id = :1",
(1,)
)
embedding = result["EMBEDDING"]
assert isinstance(embedding, np.ndarray)
assert embedding.dtype == np.float32
NumPy VECTOR Support Details:
Oracle Database 23ai introduces the VECTOR data type for AI/ML embeddings and similarity search. SQLSpec provides seamless NumPy integration for automatic bidirectional conversion.
Supported NumPy dtypes:
float32→VECTOR(*, FLOAT32)- General embeddings (recommended)float64→VECTOR(*, FLOAT64)- High-precision embeddingsint8→VECTOR(*, INT8)- Quantized embeddingsuint8→VECTOR(*, BINARY)- Binary/hash vectors
Requirements:
Oracle Database 23ai or higher
NumPy installed (
pip install numpy)enable_numpy_vectors=Trueindriver_features(opt-in)
Manual Conversion API:
For advanced use cases, use the type converter directly:
from sqlspec.adapters.oracledb.type_converter import OracleTypeConverter
converter = OracleTypeConverter()
# NumPy → Oracle VECTOR
oracle_array = converter.convert_numpy_to_vector(numpy_array)
# Oracle VECTOR → NumPy
numpy_array = converter.convert_vector_to_numpy(oracle_array)
Vector Similarity Search Example:
query_vector = np.random.rand(768).astype(np.float32)
results = await session.select_all("""
SELECT id, text,
VECTOR_DISTANCE(embedding, :1, COSINE) as distance
FROM embeddings
ORDER BY distance
FETCH FIRST 5 ROWS ONLY
""", (query_vector,))
API Reference:
Cross-Database¶
ADBC (Arrow Database Connectivity)¶
Homepage: https://github.com/apache/arrow-adbc
PyPI: Various (backend-specific):
PostgreSQL: https://pypi.org/project/adbc-driver-postgresql/
Concurrency: Sync-only
Connection Pooling: None (stateless connections)
Parameter Style: Varies by backend
PostgreSQL:
$1, $2, $3(numeric)SQLite:
?, ?, ?(qmark)DuckDB:
?, ?, ?(qmark)BigQuery:
@param1, @param2(named_at)Snowflake:
?, ?, ?(qmark)
Special Features:
Arrow-native data transfer (zero-copy)
Multi-backend support with unified interface
Direct Arrow table output
Automatic driver detection from URI or driver name
Efficient bulk data operations
Known Limitations:
Synchronous only (no async support)
No connection pooling
Backend-specific driver packages required
Parameter style varies by backend
Installation:
# PostgreSQL backend
uv add sqlspec[adbc-postgresql]
# SQLite backend
uv add sqlspec[adbc-sqlite]
# DuckDB backend
uv add sqlspec[adbc-duckdb]
# BigQuery backend
uv add sqlspec[adbc-bigquery]
# Snowflake backend
uv add sqlspec[adbc-snowflake]
Configuration (PostgreSQL):
from sqlspec import SQLSpec
from sqlspec.adapters.adbc import AdbcConfig
sql = SQLSpec()
db = sql.add_config(
AdbcConfig(
connection_config={
"driver_name": "postgresql",
"uri": "postgresql://user:password@localhost:5432/mydb"
}
)
)
with sql.provide_session(db) as session:
result = session.execute("SELECT * FROM users WHERE id = $1", [1])
# Get Arrow table directly
arrow_table = result.arrow()
Configuration (SQLite):
from sqlspec import SQLSpec
from sqlspec.adapters.adbc import AdbcConfig
sql = SQLSpec()
db = sql.add_config(
AdbcConfig(
connection_config={
"driver_name": "sqlite",
"uri": "/path/to/database.db"
}
)
)
with sql.provide_session(db) as session:
result = session.execute("SELECT * FROM users WHERE id = ?", [1])
Configuration (DuckDB):
from sqlspec import SQLSpec
from sqlspec.adapters.adbc import AdbcConfig
sql = SQLSpec()
db = sql.add_config(
AdbcConfig(
connection_config={
"driver_name": "duckdb",
"uri": "/path/to/analytics.db"
}
)
)
with sql.provide_session(db) as session:
result = session.execute("SELECT * FROM 'data.parquet' LIMIT 10")
API Reference:
- class sqlspec.adapters.adbc.AdbcConfig[source]¶
Bases:
NoPoolSyncConfig[Connection,AdbcDriver]ADBC configuration for Arrow Database Connectivity.
ADBC provides an interface for connecting to multiple database systems with Arrow-native data transfer.
Supports multiple database backends including PostgreSQL, SQLite, DuckDB, BigQuery, and Snowflake with automatic driver detection and loading.
- driver_type¶
alias of
AdbcDriver
- connection_type¶
alias of
Connection
- supports_native_arrow_export: ClassVar[bool] = True¶
- supports_native_arrow_import: ClassVar[bool] = True¶
- supports_native_parquet_export: ClassVar[bool] = True¶
- supports_native_parquet_import: ClassVar[bool] = True¶
- storage_partition_strategies: ClassVar[tuple[str, ...]] = ('fixed', 'rows_per_chunk')¶
- __init__(*, connection_config=None, migration_config=None, statement_config=None, driver_features=None, bind_key=None, extension_config=None, observability_config=None)[source]¶
Initialize configuration.
- Parameters:
connection_config¶ (
AdbcConnectionParams|dict[str,Any] |None) – Connection configuration parametersmigration_config¶ (
dict[str,Any] |None) – Migration configurationstatement_config¶ (
StatementConfig|None) – Default SQL statement configurationdriver_features¶ (
AdbcDriverFeatures|dict[str, typing.Any] |None) – Driver feature configuration (AdbcDriverFeatures)bind_key¶ (
str|None) – Optional unique identifier for this configurationextension_config¶ (
dict[str,dict[str,Any] |LitestarConfig|FastAPIConfig|StarletteConfig|FlaskConfig|ADKConfig|OpenTelemetryConfig|PrometheusConfig] |None) – Extension-specific configuration (e.g., Litestar plugin settings)observability_config¶ (
ObservabilityConfig|None) – Adapter-level observability overrides for lifecycle hooks and observers
- connection_config: dict[str, Any]¶
- create_connection()[source]¶
Create and return a new connection using the specified driver.
- Return type:
Connection- Returns:
A new connection instance.
- Raises:
ImproperConfigurationError – If the connection could not be established.
- class sqlspec.adapters.adbc.AdbcDriver[source]¶
Bases:
SyncDriverAdapterBaseADBC driver for Arrow Database Connectivity.
Provides database connectivity through ADBC with support for multiple database dialects, parameter style conversion, and transaction management.
- __init__(connection, statement_config=None, driver_features=None)[source]¶
Initialize driver adapter with connection and configuration.
- Parameters:
connection¶ (
Connection) – Database connection instancestatement_config¶ (
StatementConfig|None) – Statement configuration for the driverdriver_features¶ (
dict[str, typing.Any] |None) – Driver-specific features like extensions, secrets, and connection callbacksobservability¶ – Optional runtime handling lifecycle hooks, observers, and spans
- dialect: DialectType | None¶
- prepare_driver_parameters(parameters, statement_config, is_many=False, prepared_statement=None)[source]¶
Prepare parameters with cast-aware type coercion for ADBC.
For PostgreSQL, applies cast-aware parameter processing using metadata from the compiled statement. This allows proper handling of JSONB casts and other type conversions. Respects driver_features[‘enable_cast_detection’] configuration.
- Parameters:
- Return type:
- Returns:
Parameters with cast-aware type coercion applied
- with_cursor(connection)[source]¶
Create context manager for cursor.
- Parameters:
connection¶ (
Connection) – Database connection- Return type:
AdbcCursor- Returns:
Cursor context manager
- handle_database_exceptions()[source]¶
Handle database-specific exceptions and wrap them appropriately.
- Return type:
- Returns:
Exception handler context manager
- property data_dictionary: SyncDataDictionaryBase¶
Get the data dictionary for this driver.
- Returns:
Data dictionary instance for metadata queries
- select_to_arrow(statement, /, *parameters, statement_config=None, return_format='table', native_only=False, batch_size=None, arrow_schema=None, **kwargs)[source]¶
Execute query and return results as Apache Arrow (ADBC native path).
ADBC provides zero-copy Arrow support via cursor.fetch_arrow_table(). This is 5-10x faster than the conversion path for large datasets.
- Parameters:
statement¶ – SQL statement, string, or QueryBuilder
*parameters¶ – Query parameters or filters
statement_config¶ – Optional statement configuration override
return_format¶ – “table” for pyarrow.Table (default), “batch” for RecordBatch
native_only¶ – Ignored for ADBC (always uses native path)
batch_size¶ – Batch size hint (for future streaming implementation)
arrow_schema¶ – Optional pyarrow.Schema for type casting
**kwargs¶ – Additional keyword arguments
- Returns:
ArrowResult with native Arrow data
Example
>>> result = driver.select_to_arrow( ... "SELECT * FROM users WHERE age > $1", 18 ... ) >>> df = result.to_pandas() # Fast zero-copy conversion
- select_to_storage(statement, destination, /, *parameters, statement_config=None, partitioner=None, format_hint=None, telemetry=None, **kwargs)[source]¶
Stream query results to storage via the Arrow fast path.
Adapter Architecture¶
Common Patterns¶
All SQLSpec adapters follow consistent architectural patterns:
Configuration Class
Each adapter provides a configuration class that inherits from either:
AsyncDatabaseConfig- For async adaptersSyncDatabaseConfig- For sync adaptersNoPoolSyncConfig- For stateless adapters (ADBC, BigQuery)
Configuration classes handle:
Connection parameters
Pool settings (when applicable)
Driver-specific features
Statement configuration
Migration settings
Driver Class
Driver classes inherit from:
BaseAsyncDriver- For async query executionBaseSyncDriver- For sync query execution
Driver classes provide:
Query execution methods (
execute,select_one,select_all, etc.)Transaction management (
begin,commit,rollback)Result processing and type mapping
Parameter binding
Type Mappings
Each adapter includes database-specific type mappings in _types.py:
Python to database type conversions
Database to Python type conversions
Custom type handlers (JSON, UUID, arrays, etc.)
Parameter Binding¶
SQLSpec automatically converts parameter placeholders to database-specific styles:
Database |
Adapter |
Style |
Example |
|---|---|---|---|
PostgreSQL |
asyncpg, psqlpy |
|
|
PostgreSQL |
psycopg |
|
|
SQLite |
sqlite, aiosqlite |
|
|
MySQL |
asyncmy |
|
|
DuckDB |
duckdb |
|
|
Oracle |
oracledb |
|
|
BigQuery |
bigquery |
|
|
ADBC |
adbc (varies) |
Backend-specific |
See backend documentation |
Connection Pooling Types¶
Native Pooling
Adapters with native pooling use database driver’s built-in pool:
asyncpg (
asyncpg.Pool)psycopg (
psycopg_pool)asyncmy (native pooling)
oracledb (
oracledb.create_pool)
Custom Pooling
SQLSpec provides custom pooling for adapters without native support:
sqlite (thread-local pooling)
aiosqlite (async pooling)
duckdb (connection pooling)
No Pooling
Stateless adapters create connections per request:
ADBC (stateless Arrow connections)
BigQuery (stateless HTTP API)
Type Handler Pattern¶
SQLSpec supports optional type handlers for database-specific features that require external dependencies. Type handlers enable automatic type conversion at the database driver level.
Overview¶
Type handlers differ from type converters:
- Type Converters (
type_converter.py): Transform data after retrieval or before insertion
Pure Python transformations
Examples: JSON detection, datetime formatting
- Type Handlers (
_<feature>_handlers.py): Register with database driver for automatic conversion
Require optional dependencies
Examples: pgvector, NumPy arrays
Graceful Degradation¶
Type handlers gracefully degrade when optional dependencies are not installed:
Detection via
sqlspec._typingconstants (NUMPY_INSTALLED,PGVECTOR_INSTALLED)Auto-enable in
driver_featureswhen dependency availableNo errors if dependency missing - simply returns unconverted values
Optional Type Support Matrix¶
Adapter |
Feature |
Optional Dependency |
Configuration |
|---|---|---|---|
oracledb |
NumPy VECTOR |
|
|
asyncpg |
pgvector |
|
Auto-enabled ( |
psycopg |
pgvector |
|
Auto-enabled ( |
Configuring Type Handlers¶
Type handlers are configured via the driver_features parameter:
Automatic Detection (Recommended):
# NumPy vectors - auto-enabled when numpy installed
from sqlspec import SQLSpec
from sqlspec.adapters.oracledb import OracleAsyncConfig
sql = SQLSpec()
db = sql.add_config(OracleAsyncConfig(
pool_config={"dsn": "localhost:1521/FREEPDB1"}
# enable_numpy_vectors automatically set to True if numpy installed
))
Explicit Configuration:
# Explicitly disable optional feature
db = sql.add_config(OracleAsyncConfig(
pool_config={"dsn": "localhost:1521/FREEPDB1"},
driver_features={"enable_numpy_vectors": False} # Force disable
))
Check Feature Status:
from sqlspec._typing import NUMPY_INSTALLED, PGVECTOR_INSTALLED
print(f"NumPy available: {NUMPY_INSTALLED}")
print(f"pgvector available: {PGVECTOR_INSTALLED}")
NumPy VECTOR Support (Oracle)¶
Oracle Database 23ai introduces the VECTOR data type for AI/ML embeddings. SQLSpec provides seamless NumPy integration:
Requirements:
- Oracle Database 23ai or higher
- numpy package installed
- driver_features["enable_numpy_vectors"]=True (auto-enabled)
Supported dtypes:
- float32 → VECTOR(*, FLOAT32) - General embeddings (recommended)
- float64 → VECTOR(*, FLOAT64) - High-precision embeddings
- int8 → VECTOR(*, INT8) - Quantized embeddings
- uint8 → VECTOR(*, BINARY) - Binary/hash vectors
Example Usage:
See the Oracle adapter documentation above for complete examples.
pgvector Support (PostgreSQL)¶
PostgreSQL’s pgvector extension enables vector similarity search. SQLSpec automatically registers pgvector support when available:
Requirements:
- PostgreSQL with pgvector extension installed
- pgvector Python package installed
- Automatic registration (no configuration needed)
- Extension registration failures are downgraded to debug logs, so missing pgvector keeps the connection usable without vector support
Adapters with pgvector support:
- asyncpg - Async PostgreSQL driver
- psycopg - Sync and async PostgreSQL driver
Example Usage:
from sqlspec import SQLSpec
from sqlspec.adapters.asyncpg import AsyncpgConfig
sql = SQLSpec()
db = sql.add_config(AsyncpgConfig(
pool_config={"dsn": "postgresql://localhost/mydb"}
# pgvector automatically registered if available
))
async with sql.provide_session(db) as session:
# Enable pgvector extension
await session.execute("CREATE EXTENSION IF NOT EXISTS vector")
# Create table with vector column
await session.execute("""
CREATE TABLE items (
id SERIAL PRIMARY KEY,
embedding vector(384)
)
""")
# Insert vector (NumPy array or list)
embedding = [0.1, 0.2, 0.3] * 128 # 384 dimensions
await session.execute(
"INSERT INTO items (embedding) VALUES ($1)",
[embedding]
)
# Vector similarity search
results = await session.select_all("""
SELECT id, embedding <-> $1 as distance
FROM items
ORDER BY distance
LIMIT 5
""", [embedding])
Implementing Custom Type Handlers¶
To add type handlers for a new optional feature:
Define detection constant in
sqlspec/_typing.py:try: import optional_package OPTIONAL_PACKAGE_INSTALLED = True except ImportError: OPTIONAL_PACKAGE_INSTALLED = False
Create handler module (
adapters/<adapter>/_feature_handlers.py):from sqlspec._typing import OPTIONAL_PACKAGE_INSTALLED def register_handlers(connection): if not OPTIONAL_PACKAGE_INSTALLED: logger.debug("Optional package not installed") return # Register handlers with connection
Update config to auto-detect and initialize:
from sqlspec._typing import OPTIONAL_PACKAGE_INSTALLED class Config(AsyncDatabaseConfig): def __init__(self, *, driver_features=None, **kwargs): if driver_features is None: driver_features = {} if "enable_feature" not in driver_features: driver_features["enable_feature"] = OPTIONAL_PACKAGE_INSTALLED super().__init__(driver_features=driver_features, **kwargs) async def _init_connection(self, connection): if self.driver_features.get("enable_feature"): from ._feature_handlers import register_handlers register_handlers(connection)
Add tests (unit and integration):
import pytest from sqlspec._typing import OPTIONAL_PACKAGE_INSTALLED @pytest.mark.skipif(not OPTIONAL_PACKAGE_INSTALLED, reason="Package not installed") async def test_feature_roundtrip(session): # Test with optional package pass
Creating Custom Adapters¶
For information on creating custom database adapters, see:
Creating Adapters - Adapter development guide
Driver - Driver implementation details
Base - SQLSpec configuration system
See Also¶
Configuration - Configuration guide
Drivers and Querying - Query execution
SQLSpec Example Library - Usage examples
Driver - Driver reference
Base - SQLSpec registry