Drivers and Querying

SQLSpec provides unified database drivers for multiple database systems, both synchronous and asynchronous. This guide covers all available drivers and query execution methods.

Overview

SQLSpec supports 10+ database backends through adapter drivers:

PostgreSQL
  • asyncpg (async)

  • psycopg (sync/async)

  • psqlpy (async)

  • ADBC (sync/async)

SQLite
  • sqlite3 (sync)

  • aiosqlite (async)

  • ADBC (sync/async)

MySQL
  • asyncmy (async)

Other Databases
  • DuckDB (sync)

  • Oracle (sync/async)

Cloud Databases
  • BigQuery (sync)

  • Spanner (sync)

All drivers implement a consistent API for query execution.

Driver Architecture

SQLSpec drivers follow a layered architecture:

  1. Config Layer: Database connection parameters

  2. Pool Layer: Connection pooling (where supported)

  3. Driver Layer: Query execution and result handling

  4. Session Layer: Transaction management

driver architecture
from sqlspec import SQLSpec
from sqlspec.adapters.asyncpg import AsyncpgConfig, AsyncpgPoolConfig

# Typical driver usage
spec = SQLSpec()
host = os.environ.get("SQLSPEC_USAGE_PG_HOST", "localhost")
port = int(os.environ.get("SQLSPEC_USAGE_PG_PORT", "5432"))
user = os.environ.get("SQLSPEC_USAGE_PG_USER", "postgres")
password = os.environ.get("SQLSPEC_USAGE_PG_PASSWORD", "postgres")
database = os.environ.get("SQLSPEC_USAGE_PG_DATABASE", "sqlspec")

db = spec.add_config(
    AsyncpgConfig(
        connection_config=AsyncpgPoolConfig(host=host, port=port, user=user, password=password, database=database)
    )
)  # Config layer, registers pool
async with spec.provide_session(db) as session:  # Session layer
    await session.execute("SELECT 1")  # Driver layer

PostgreSQL Drivers

psycopg (Sync/Async)

Official PostgreSQL adapter with both sync and async support.

psycopg sync
from sqlspec import SQLSpec
from sqlspec.adapters.psycopg import PsycopgSyncConfig

spec = SQLSpec()
dsn = os.environ.get("SQLSPEC_USAGE_PG_DSN", "postgresql://localhost/test")

# Sync version
config = PsycopgSyncConfig(connection_config={"conninfo": dsn, "min_size": 5, "max_size": 10})
db = spec.add_config(config)

with spec.provide_session(db) as session:
    create_table_query = """
    CREATE TABLE IF NOT EXISTS usage3_users (
        id SERIAL PRIMARY KEY,
        name VARCHAR(100),
        email VARCHAR(100) UNIQUE
    );
    """
    session.execute(create_table_query)
    # Insert with RETURNING
    session.execute(
        "INSERT INTO usage3_users (name, email) VALUES (%s, %s) RETURNING id", "Jane", "[email protected]"
    )
    session.execute("SELECT * FROM usage3_users")
psycopg async
from sqlspec import SQLSpec
from sqlspec.adapters.psycopg import PsycopgAsyncConfig

spec = SQLSpec()
dsn = os.environ.get("SQLSPEC_USAGE_PG_DSN", "postgresql://localhost/test")

# Async version
config = PsycopgAsyncConfig(connection_config={"conninfo": dsn, "min_size": 5, "max_size": 10})
db = spec.add_config(config)

async with spec.provide_session(db) as session:
    create_table_query = """
    CREATE TABLE IF NOT EXISTS usage4_users (
        id SERIAL PRIMARY KEY,
        name VARCHAR(100),
        email VARCHAR(100) UNIQUE
    );
    """
    await session.execute(create_table_query)
    # Insert with RETURNING
    await session.execute(
        "INSERT INTO usage4_users (name, email) VALUES (%s, %s) RETURNING id", "Bill", "[email protected]"
    )
    await session.execute("SELECT * FROM usage4_users")

Features:

  • Parameter style: %s (format) or %(name)s (pyformat)

  • Server-side cursors

  • COPY operations

  • Binary protocol

  • Pipeline mode (psycopg 3)

psqlpy (High Performance Async)

Rust-based async PostgreSQL driver for maximum performance.

psqlpy
spec = SQLSpec()
dsn = os.environ.get("SQLSPEC_USAGE_PG_DSN", "postgresql://localhost/test")
config = PsqlpyConfig(connection_config={"dsn": dsn})
assert config is not None
async with spec.provide_session(config) as session:
    create_table_query = """CREATE TABLE IF NOT EXISTS usage5_users (
        id SERIAL PRIMARY KEY,
        name VARCHAR(100),
        email VARCHAR(100) UNIQUE
    );        """
    await session.execute(create_table_query)
    # Insert with RETURNING
    await session.execute(
        "INSERT INTO usage5_users (name, email) VALUES ($1, $2) RETURNING id", "Bob", "[email protected]"
    )
    await session.execute("SELECT * FROM usage5_users WHERE id = $1", 1)

Features:

  • Written in Rust for performance

  • Async-first design

  • Connection pooling

SQLite Drivers

sqlite3 (Synchronous)

Python’s built-in SQLite adapter.

sqlite config
from sqlspec.adapters.sqlite import SqliteConfig

# Use a temporary file for the SQLite database for test isolation
with tempfile.NamedTemporaryFile(suffix=".db", delete=False) as tmp_db_file:
    db_path = tmp_db_file.name

    spec = SQLSpec()

    db = spec.add_config(
        SqliteConfig(connection_config={"database": db_path, "timeout": 5.0, "check_same_thread": False})
    )

    try:
        with spec.provide_session(db) as session:
            # Create table
            session.execute("""
            CREATE TABLE IF NOT EXISTS usage6_users (
                id INTEGER PRIMARY KEY,
                name TEXT NOT NULL
            )
        """)

            # Insert with parameters
            session.execute("INSERT INTO usage6_users (name) VALUES (?)", "Alice")

            # Query
            result = session.execute("SELECT * FROM usage6_users")
            result.all()
    finally:
        # Clean up the temporary database file
        db.close_pool()
        Path(db_path).unlink()
sqlite
from sqlspec.adapters.sqlite import SqliteConfig

config = SqliteConfig(connection_config={"database": "myapp.db", "timeout": 5.0, "check_same_thread": False})
assert config.connection_config["database"] == "myapp.db"

Features:

  • Parameter style: ? (qmark)

  • Lightweight and embedded

  • Thread-local connections

  • Script execution support

aiosqlite (Asynchronous)

Async wrapper around sqlite3.

aiosqlite
from sqlspec import SQLSpec
from sqlspec.adapters.aiosqlite import AiosqliteConfig

database_file = tmp_path / "myapp.db"
config = AiosqliteConfig(connection_config={"database": database_file})
spec = SQLSpec()

async with spec.provide_session(config) as session:
    create_table_query = """CREATE TABLE IF NOT EXISTS usage8_users (
       id INTEGER PRIMARY KEY AUTOINCREMENT,
       name TEXT NOT NULL
   );"""
    await session.execute(create_table_query)
    await session.execute("INSERT INTO usage8_users (name) VALUES (?)", "Bob")
    await session.execute("SELECT * FROM usage8_users")

Features:

  • Async interface to SQLite

  • Thread pool execution for blocking operations

  • Same parameter style as sqlite3

MySQL Drivers

asyncmy (Asynchronous)

Pure Python async MySQL/MariaDB driver.

asyncmy
from sqlspec import SQLSpec
from sqlspec.adapters.asyncmy import AsyncmyConfig

spec = SQLSpec()

config = AsyncmyConfig(
    connection_config={
        "host": mysql_service.host,
        "port": mysql_service.port,
        "user": mysql_service.user,
        "password": mysql_service.password,
        "database": mysql_service.db,
        "minsize": 1,
        "maxsize": 10,
    }
)

async with spec.provide_session(config) as session:
    create_table_query = """CREATE TABLE IF NOT EXISTS usage9_users (
       id INT PRIMARY KEY,
       name VARCHAR(100),
       email VARCHAR(100)
   );"""
    await session.execute(create_table_query)
    # insert a user
    await session.execute(
        "INSERT INTO usage9_users (id, name, email) VALUES (%s, %s, %s)", (1, "John Doe", "[email protected]")
    )
    # query the user
    await session.execute("SELECT * FROM usage9_users WHERE id = %s", 1)

Features:

  • Parameter style: %s (format)

  • Connection pooling

  • MySQL-specific types

  • Character set support

Other Database Drivers

DuckDB (Analytical Database)

In-process analytical database optimized for OLAP workloads.

duckdb
import tempfile

from sqlspec import SQLSpec
from sqlspec.adapters.duckdb import DuckDBConfig

# Use a temporary directory for the DuckDB database for test isolation
with tempfile.TemporaryDirectory() as tmpdir:
    db_path = Path(tmpdir) / "analytics.duckdb"

    spec = SQLSpec()
    # In-memory
    in_memory_db = spec.add_config(DuckDBConfig())
    persistent_db = spec.add_config(DuckDBConfig(connection_config={"database": str(db_path)}))

    try:
        # Test with in-memory config
        with spec.provide_session(in_memory_db) as session:
            # Create table from Parquet
            session.execute(f"""
               CREATE TABLE if not exists users AS
               SELECT * FROM read_parquet('{Path(__file__).parent.parent / "queries/users.parquet"}')
           """)

            # Analytical query
            session.execute("""
               SELECT date_trunc('day', created_at) as day,
                      count(*) as user_count
               FROM users
               GROUP BY day
               ORDER BY day
           """)

        # Test with persistent config
        with spec.provide_session(persistent_db) as session:
            # Create table from Parquet
            session.execute(f"""
               CREATE TABLE if not exists users AS
               SELECT * FROM read_parquet('{Path(__file__).parent.parent / "queries/users.parquet"}')
           """)

            # Analytical query
            session.execute("""
               SELECT date_trunc('day', created_at) as day,
                      count(*) as user_count
               FROM users
               GROUP BY day
               ORDER BY day
           """)
    finally:
        # Close the pool for the persistent config
        in_memory_db.close_pool()
        persistent_db.close_pool()
        # The TemporaryDirectory context manager handles directory cleanup automatically

Features:

  • OLAP-optimized query engine

  • Parquet/CSV support

  • Columnar storage

  • Fast aggregations

Oracle Database

Oracle database support with python-oracledb.

oracle
from sqlspec import SQLSpec
from sqlspec.adapters.oracledb import OracleSyncConfig

spec = SQLSpec()
config = OracleSyncConfig(
    connection_config={
        "user": oracle_service.user,
        "password": oracle_service.password,
        "host": oracle_service.host,
        "port": oracle_service.port,
        "service_name": oracle_service.service_name,
    }
)

with spec.provide_session(config) as session:
    create_table_sql = """CREATE TABLE if not exists employees (
       employee_id NUMBER PRIMARY KEY,
       first_name VARCHAR2(50),
       last_name VARCHAR2(50)
   )"""
    session.execute(create_table_sql)
    session.execute("""
       INSERT INTO employees (employee_id, first_name, last_name) VALUES (100, 'John', 'Doe')
   """)

    session.execute("SELECT * FROM employees WHERE employee_id = :id", id=100)

Features:

  • Parameter style: :name (named)

  • Both sync and async modes

  • Connection pooling

  • Oracle-specific types

BigQuery

Google Cloud BigQuery for large-scale analytics.

from sqlspec.adapters.bigquery import BigQueryConfig

config = BigQueryConfig(
    connection_config={
        "project": "my-project",
        "credentials": credentials_object,
    }
)

with spec.provide_session(config) as session:
    result = session.execute("""
        SELECT DATE(timestamp) as date,
               COUNT(*) as events
        FROM `project.dataset.events`
        WHERE timestamp >= @start_date
        GROUP BY date
    """, start_date=datetime.date(2025, 1, 1))

Features:

  • Parameter style: @name (named_at)

  • Job-based execution

  • Massive scale analytics

  • Standard SQL support

Google Cloud Spanner

Globally distributed, horizontally scalable database with strong consistency.

from sqlspec.adapters.spanner import SpannerSyncConfig

config = SpannerSyncConfig(
    connection_config={
        "project": "my-project",
        "instance": "my-instance",
        "database": "my-database",
    }
)

with spec.provide_session(config) as session:
    result = session.execute("""
        SELECT user_id, name, email
        FROM users
        WHERE created_at > @start_date
    """, start_date=datetime.datetime(2025, 1, 1))

Features:

  • Parameter style: @name (named_at)

  • GoogleSQL and PostgreSQL dialect modes

  • Horizontal scalability with strong consistency

  • Interleaved tables for parent-child relationships

  • Row deletion policies for automatic data lifecycle

  • Native UUID and JSON support

Query Execution Methods

All drivers support these query execution methods through sessions.

execute()

Execute any SQL statement and return results.

execute
from sqlspec import SQLSpec
from sqlspec.adapters.sqlite import SqliteConfig

spec = SQLSpec()
config = SqliteConfig(connection_config={"database": ":memory:", "timeout": 5.0, "check_same_thread": False})
with spec.provide_session(config) as session:
    create_table_query = (
        """create table if not exists users (id default int primary key, name varchar(128), email text)"""
    )

    _ = session.execute(create_table_query)
    # Examples are documentation snippets; ensure module importable
    result = session.execute("SELECT * FROM users WHERE id = ?", 1)

    # INSERT query
    result = session.execute("INSERT INTO users (name, email) VALUES (?, ?)", "Alice", "[email protected]")

    # UPDATE query
    result = session.execute("UPDATE users SET email = ? WHERE id = ?", "[email protected]", 1)
    print(f"Updated {result.rows_affected} rows")

    # DELETE query
    result = session.execute("DELETE FROM users WHERE id = ?", 1)

execute_many()

Execute a statement with multiple parameter sets (batch insert/update).

execute_many
from sqlspec import SQLSpec
from sqlspec.adapters.sqlite import SqliteConfig

spec = SQLSpec()
config = SqliteConfig(connection_config={"database": ":memory:", "timeout": 5.0, "check_same_thread": False})
with spec.provide_session(config) as session:
    create_table_query = """create table if not exists users (id default int primary key, name varchar(128), email text, status varchar(32))"""

    _ = session.execute(create_table_query)
    # Batch examples are documentation-only

    # Batch insert
    session.execute_many(
        "INSERT INTO users (id, name, email, status) VALUES (?, ?, ?, ?)",
        [
            (1, "Alice", "[email protected]", "active"),
            (2, "Bob", "[email protected]", "inactive"),
            (3, "Charlie", "[email protected]", "active"),
        ],
    )
    # Batch update
    session.execute_many("UPDATE users SET status = ? WHERE id = ?", [("inactive", 1), ("active", 2)])

select()

Execute a SELECT query and return all rows.

select
results = session.select("SELECT * FROM users")
print(results)
# Returns list of dictionaries: [{"id": 1, "name": "Alice", ...}, ...]

select_one()

Execute a SELECT query expecting exactly one result.

select_one
user = session.select_one("SELECT * FROM users WHERE id = ?", 1)
print(user)
# Returns single dictionary: {"id": 1, "name": "Alice", ...}
# Raises NotFoundError if no results
# Raises MultipleResultsFoundError if multiple results

select_one_or_none()

Execute a SELECT query returning one or no results.

select_one_or_none
user = session.select_one_or_none("SELECT * FROM users WHERE email = ?", "[email protected]")
# Returns dictionary or None
# Raises MultipleResultsFoundError if multiple results

select_value()

Execute a SELECT query returning a single scalar value.

select_value
session.select_value("SELECT COUNT(*) FROM users")
# Returns: 3
session.select_value("SELECT MAX(id) FROM users")
# Returns: 3

asyncpg-Compatible Method Aliases (fetch*)

For users transitioning from asyncpg or preferring its naming convention, SQLSpec provides fetch* aliases for all select* methods. Both naming styles are fully supported and produce identical results.

Method Aliases

Primary Method

asyncpg-Compatible Alias

select()

fetch()

select_one()

fetch_one()

select_one_or_none()

fetch_one_or_none()

select_value()

fetch_value()

select_value_or_none()

fetch_value_or_none()

select_to_arrow()

fetch_to_arrow()

select_with_total()

fetch_with_total()

Example using fetch() instead of select():

# Both styles work identically
async with spec.provide_session(config) as session:
    # Primary naming (recommended in docs)
    users = await session.select("SELECT * FROM users WHERE age > ?", 18)

    # asyncpg-compatible naming (identical behavior)
    users = await session.fetch("SELECT * FROM users WHERE age > ?", 18)

    # fetch_one() is equivalent to select_one()
    user = await session.fetch_one("SELECT * FROM users WHERE id = ?", 1)

    # fetch_value() is equivalent to select_value()
    count = await session.fetch_value("SELECT COUNT(*) FROM users")

Note: Both naming conventions are fully supported and will not be deprecated. Choose the style that feels most natural for your codebase. The SQLSpec documentation primarily uses select* methods, but fetch* aliases are equally valid.

Working with Results

SQLResult Object

All queries return a SQLResult object with rich result information.

SQLResult object
result = session.execute("SELECT id, name, email FROM users")
# Access raw data
result.data  # List of dictionaries
result.column_names  # ["id", "name", "email"]
result.rows_affected  # For INSERT/UPDATE/DELETE
result.operation_type  # "SELECT", "INSERT", etc.
# Convenience methods
with pytest.raises(ValueError):  # noqa: PT011
    user = result.one()  # Single row (raises if not exactly 1)
with pytest.raises(ValueError):  # noqa: PT011
    user = result.one_or_none()  # Single row or None
with pytest.raises(ValueError):  # noqa: PT011
    result.scalar()  # First column of first row

Iterating Results

iterating results
result = session.execute("SELECT * FROM users")
# Get all rows and iterate
users = result.all()
for user in users:
    print(f"{user['name']}: {user['email']}")
# List comprehension
[user["name"] for user in result.all()]

Schema Mapping

Map results to typed objects automatically.# end-example

schema mapping
from pydantic import BaseModel

class User(BaseModel):
    id: int
    name: str
    email: str

# Execute query
result = session.execute("SELECT id, name, email FROM users")

# Map results to typed User instances
users: list[User] = result.all(schema_type=User)

# Or get single typed result
user_result = session.execute("SELECT id, name, email FROM users WHERE id = ?", 1)
user: User = user_result.one(schema_type=User)

Transactions

Manual Transaction Control

manual transaction control
try:
    session.begin()
    session.execute("INSERT INTO users (name) VALUES (?)", "Alice")
    session.execute("INSERT INTO logs (action) VALUES (?)", "user_created")
    session.commit()
except Exception:
    session.rollback()
    raise

Context Manager Transactions

session.begin() returns a coroutine, so wrap it in your own helper if you prefer context manager semantics.

async transaction helper
@asynccontextmanager
async def transactional_scope(session: "AiosqliteDriver") -> "AsyncIterator[None]":
    await session.begin()
    try:
        yield
    except Exception:
        await session.rollback()
        raise
    else:
        await session.commit()

async with spec.provide_session(config) as session:
    await session.execute("DROP TABLE IF EXISTS accounts")
    await session.execute("CREATE TABLE accounts (id INTEGER PRIMARY KEY, balance INTEGER)")
    await session.execute_many("INSERT INTO accounts (id, balance) VALUES (?, ?)", [(1, 200), (2, 100)])

    async with transactional_scope(session):
        await session.execute("UPDATE accounts SET balance = balance - 50 WHERE id = :id", id=1)
        await session.execute("UPDATE accounts SET balance = balance + 50 WHERE id = :id", id=2)

Parameter Binding

Positional Parameters

positional parameters
session.execute("INSERT INTO users (id, status, name) VALUES (?, ?, ?)", 1, "active", "Alice")
user = session.select_one("SELECT name, status FROM users WHERE id = ?", 1)

Named Parameters

named parameters
session.execute(
    "INSERT INTO logs (id, action, created_at) VALUES (:id, :action, :created)",
    id=1,
    action="created",
    created="2025-01-01T00:00:00",
)
row = session.select_one("SELECT action FROM logs WHERE id = :id", id=1)

Type Coercion

SQLSpec automatically coerces types based on driver requirements:

type coercion
session.execute(
    "INSERT INTO events (id, is_active, ts) VALUES (:id, :is_active, :ts)",
    {"id": 1, "is_active": True, "ts": dt.datetime(2025, 1, 1, 12, 0, 0)},
)
row = session.select_one("SELECT is_active, ts FROM events WHERE id = :id", id=1)

Script Execution

Execute multiple SQL statements in one call:

script execution
session.execute_script(
    """
    CREATE TABLE users (
        id INTEGER PRIMARY KEY,
        name TEXT NOT NULL
    );

    CREATE TABLE posts (
        id INTEGER PRIMARY KEY,
        user_id INTEGER,
        title TEXT,
        FOREIGN KEY (user_id) REFERENCES users(id)
    );

    CREATE INDEX idx_posts_user_id ON posts(user_id);
    """
)

Performance Tips

1. Use Connection Pooling

asyncpg connection pooling
config = AsyncpgConfig(connection_config={"dsn": dsn, "min_size": 10, "max_size": 20})

2. Batch Operations

Use execute_many() for bulk inserts:

batch inserts
names = ["alice", "bob", "carla"]
session.execute_many("INSERT INTO users (name) VALUES (?)", [(name,) for name in names])

3. Prepared Statements

Drivers like asyncpg automatically prepare frequently-used statements.

4. Use Appropriate Methods

# Instead of:
result = session.execute("SELECT COUNT(*) FROM users")
count = result.scalar()

# Use:
count = session.select_value("SELECT COUNT(*) FROM users")

Driver Selection Guide

Choose the right driver for your use case:

Database

Recommended Driver

Use Case

PostgreSQL (Async)

asyncpg

Async applications with connection pooling

PostgreSQL (Sync)

psycopg

Traditional sync applications

SQLite (Async)

aiosqlite

Async web applications with SQLite

SQLite (Sync)

sqlite3

Simple applications, testing

MySQL (Async)

asyncmy

Async MySQL applications

Analytics

DuckDB

OLAP, data analysis, reporting

Cloud Analytics

BigQuery

Large-scale cloud data warehousing

Global Scale

Spanner

Globally distributed, strongly consistent workloads

Next Steps

See Also