Drivers and Querying¶
SQLSpec provides unified database drivers for multiple database systems, both synchronous and asynchronous. This guide covers all available drivers and query execution methods.
Overview¶
SQLSpec supports 10+ database backends through adapter drivers:
asyncpg (async)
psycopg (sync/async)
psqlpy (async)
ADBC (sync/async)
sqlite3 (sync)
aiosqlite (async)
ADBC (sync/async)
asyncmy (async)
DuckDB (sync)
Oracle (sync/async)
BigQuery (sync)
Spanner (sync)
All drivers implement a consistent API for query execution.
Driver Architecture¶
SQLSpec drivers follow a layered architecture:
Config Layer: Database connection parameters
Pool Layer: Connection pooling (where supported)
Driver Layer: Query execution and result handling
Session Layer: Transaction management
from sqlspec import SQLSpec
from sqlspec.adapters.asyncpg import AsyncpgConfig, AsyncpgPoolConfig
# Typical driver usage
spec = SQLSpec()
host = os.environ.get("SQLSPEC_USAGE_PG_HOST", "localhost")
port = int(os.environ.get("SQLSPEC_USAGE_PG_PORT", "5432"))
user = os.environ.get("SQLSPEC_USAGE_PG_USER", "postgres")
password = os.environ.get("SQLSPEC_USAGE_PG_PASSWORD", "postgres")
database = os.environ.get("SQLSPEC_USAGE_PG_DATABASE", "sqlspec")
db = spec.add_config(
AsyncpgConfig(
connection_config=AsyncpgPoolConfig(host=host, port=port, user=user, password=password, database=database)
)
) # Config layer, registers pool
async with spec.provide_session(db) as session: # Session layer
await session.execute("SELECT 1") # Driver layer
PostgreSQL Drivers¶
asyncpg (Recommended for Async)¶
Async PostgreSQL driver with native connection pooling.
from sqlspec import SQLSpec
from sqlspec.adapters.asyncpg import AsyncpgConfig
spec = SQLSpec()
dsn = os.environ.get("SQLSPEC_USAGE_PG_DSN", "postgresql://localhost/test")
db = spec.add_config(AsyncpgConfig(connection_config={"dsn": dsn, "min_size": 10, "max_size": 20}))
async with spec.provide_session(db) as session:
create_table_query = """
CREATE TABLE IF NOT EXISTS usage2_users (
id SERIAL PRIMARY KEY,
name VARCHAR(100),
email VARCHAR(100) UNIQUE
);
"""
await session.execute(create_table_query)
# Insert with RETURNING
result = await session.execute(
"INSERT INTO usage2_users (name, email) VALUES ($1, $2) RETURNING id", "Gretta", "[email protected]"
)
new_id = result.scalar()
print(f"Inserted user with ID: {new_id}")
# Basic query
result = await session.execute("SELECT * FROM usage2_users WHERE id = $1", 1)
user = result.one()
print(f"User: {user}")
Features:
Parameter style:
$1, $2, ...(numeric)Native prepared statements
Binary protocol support
Connection pooling
Copy operations for bulk data
psycopg (Sync/Async)¶
Official PostgreSQL adapter with both sync and async support.
from sqlspec import SQLSpec
from sqlspec.adapters.psycopg import PsycopgSyncConfig
spec = SQLSpec()
dsn = os.environ.get("SQLSPEC_USAGE_PG_DSN", "postgresql://localhost/test")
# Sync version
config = PsycopgSyncConfig(connection_config={"conninfo": dsn, "min_size": 5, "max_size": 10})
db = spec.add_config(config)
with spec.provide_session(db) as session:
create_table_query = """
CREATE TABLE IF NOT EXISTS usage3_users (
id SERIAL PRIMARY KEY,
name VARCHAR(100),
email VARCHAR(100) UNIQUE
);
"""
session.execute(create_table_query)
# Insert with RETURNING
session.execute(
"INSERT INTO usage3_users (name, email) VALUES (%s, %s) RETURNING id", "Jane", "[email protected]"
)
session.execute("SELECT * FROM usage3_users")
from sqlspec import SQLSpec
from sqlspec.adapters.psycopg import PsycopgAsyncConfig
spec = SQLSpec()
dsn = os.environ.get("SQLSPEC_USAGE_PG_DSN", "postgresql://localhost/test")
# Async version
config = PsycopgAsyncConfig(connection_config={"conninfo": dsn, "min_size": 5, "max_size": 10})
db = spec.add_config(config)
async with spec.provide_session(db) as session:
create_table_query = """
CREATE TABLE IF NOT EXISTS usage4_users (
id SERIAL PRIMARY KEY,
name VARCHAR(100),
email VARCHAR(100) UNIQUE
);
"""
await session.execute(create_table_query)
# Insert with RETURNING
await session.execute(
"INSERT INTO usage4_users (name, email) VALUES (%s, %s) RETURNING id", "Bill", "[email protected]"
)
await session.execute("SELECT * FROM usage4_users")
Features:
Parameter style:
%s(format) or%(name)s(pyformat)Server-side cursors
COPY operations
Binary protocol
Pipeline mode (psycopg 3)
psqlpy (High Performance Async)¶
Rust-based async PostgreSQL driver for maximum performance.
spec = SQLSpec()
dsn = os.environ.get("SQLSPEC_USAGE_PG_DSN", "postgresql://localhost/test")
config = PsqlpyConfig(connection_config={"dsn": dsn})
assert config is not None
async with spec.provide_session(config) as session:
create_table_query = """CREATE TABLE IF NOT EXISTS usage5_users (
id SERIAL PRIMARY KEY,
name VARCHAR(100),
email VARCHAR(100) UNIQUE
); """
await session.execute(create_table_query)
# Insert with RETURNING
await session.execute(
"INSERT INTO usage5_users (name, email) VALUES ($1, $2) RETURNING id", "Bob", "[email protected]"
)
await session.execute("SELECT * FROM usage5_users WHERE id = $1", 1)
Features:
Written in Rust for performance
Async-first design
Connection pooling
SQLite Drivers¶
sqlite3 (Synchronous)¶
Python’s built-in SQLite adapter.
from sqlspec.adapters.sqlite import SqliteConfig
# Use a temporary file for the SQLite database for test isolation
with tempfile.NamedTemporaryFile(suffix=".db", delete=False) as tmp_db_file:
db_path = tmp_db_file.name
spec = SQLSpec()
db = spec.add_config(
SqliteConfig(connection_config={"database": db_path, "timeout": 5.0, "check_same_thread": False})
)
try:
with spec.provide_session(db) as session:
# Create table
session.execute("""
CREATE TABLE IF NOT EXISTS usage6_users (
id INTEGER PRIMARY KEY,
name TEXT NOT NULL
)
""")
# Insert with parameters
session.execute("INSERT INTO usage6_users (name) VALUES (?)", "Alice")
# Query
result = session.execute("SELECT * FROM usage6_users")
result.all()
finally:
# Clean up the temporary database file
db.close_pool()
Path(db_path).unlink()
from sqlspec.adapters.sqlite import SqliteConfig
config = SqliteConfig(connection_config={"database": "myapp.db", "timeout": 5.0, "check_same_thread": False})
assert config.connection_config["database"] == "myapp.db"
Features:
Parameter style:
?(qmark)Lightweight and embedded
Thread-local connections
Script execution support
aiosqlite (Asynchronous)¶
Async wrapper around sqlite3.
from sqlspec import SQLSpec
from sqlspec.adapters.aiosqlite import AiosqliteConfig
database_file = tmp_path / "myapp.db"
config = AiosqliteConfig(connection_config={"database": database_file})
spec = SQLSpec()
async with spec.provide_session(config) as session:
create_table_query = """CREATE TABLE IF NOT EXISTS usage8_users (
id INTEGER PRIMARY KEY AUTOINCREMENT,
name TEXT NOT NULL
);"""
await session.execute(create_table_query)
await session.execute("INSERT INTO usage8_users (name) VALUES (?)", "Bob")
await session.execute("SELECT * FROM usage8_users")
Features:
Async interface to SQLite
Thread pool execution for blocking operations
Same parameter style as sqlite3
MySQL Drivers¶
asyncmy (Asynchronous)¶
Pure Python async MySQL/MariaDB driver.
from sqlspec import SQLSpec
from sqlspec.adapters.asyncmy import AsyncmyConfig
spec = SQLSpec()
config = AsyncmyConfig(
connection_config={
"host": mysql_service.host,
"port": mysql_service.port,
"user": mysql_service.user,
"password": mysql_service.password,
"database": mysql_service.db,
"minsize": 1,
"maxsize": 10,
}
)
async with spec.provide_session(config) as session:
create_table_query = """CREATE TABLE IF NOT EXISTS usage9_users (
id INT PRIMARY KEY,
name VARCHAR(100),
email VARCHAR(100)
);"""
await session.execute(create_table_query)
# insert a user
await session.execute(
"INSERT INTO usage9_users (id, name, email) VALUES (%s, %s, %s)", (1, "John Doe", "[email protected]")
)
# query the user
await session.execute("SELECT * FROM usage9_users WHERE id = %s", 1)
Features:
Parameter style:
%s(format)Connection pooling
MySQL-specific types
Character set support
Other Database Drivers¶
DuckDB (Analytical Database)¶
In-process analytical database optimized for OLAP workloads.
import tempfile
from sqlspec import SQLSpec
from sqlspec.adapters.duckdb import DuckDBConfig
# Use a temporary directory for the DuckDB database for test isolation
with tempfile.TemporaryDirectory() as tmpdir:
db_path = Path(tmpdir) / "analytics.duckdb"
spec = SQLSpec()
# In-memory
in_memory_db = spec.add_config(DuckDBConfig())
persistent_db = spec.add_config(DuckDBConfig(connection_config={"database": str(db_path)}))
try:
# Test with in-memory config
with spec.provide_session(in_memory_db) as session:
# Create table from Parquet
session.execute(f"""
CREATE TABLE if not exists users AS
SELECT * FROM read_parquet('{Path(__file__).parent.parent / "queries/users.parquet"}')
""")
# Analytical query
session.execute("""
SELECT date_trunc('day', created_at) as day,
count(*) as user_count
FROM users
GROUP BY day
ORDER BY day
""")
# Test with persistent config
with spec.provide_session(persistent_db) as session:
# Create table from Parquet
session.execute(f"""
CREATE TABLE if not exists users AS
SELECT * FROM read_parquet('{Path(__file__).parent.parent / "queries/users.parquet"}')
""")
# Analytical query
session.execute("""
SELECT date_trunc('day', created_at) as day,
count(*) as user_count
FROM users
GROUP BY day
ORDER BY day
""")
finally:
# Close the pool for the persistent config
in_memory_db.close_pool()
persistent_db.close_pool()
# The TemporaryDirectory context manager handles directory cleanup automatically
Features:
OLAP-optimized query engine
Parquet/CSV support
Columnar storage
Fast aggregations
Oracle Database¶
Oracle database support with python-oracledb.
from sqlspec import SQLSpec
from sqlspec.adapters.oracledb import OracleSyncConfig
spec = SQLSpec()
config = OracleSyncConfig(
connection_config={
"user": oracle_service.user,
"password": oracle_service.password,
"host": oracle_service.host,
"port": oracle_service.port,
"service_name": oracle_service.service_name,
}
)
with spec.provide_session(config) as session:
create_table_sql = """CREATE TABLE if not exists employees (
employee_id NUMBER PRIMARY KEY,
first_name VARCHAR2(50),
last_name VARCHAR2(50)
)"""
session.execute(create_table_sql)
session.execute("""
INSERT INTO employees (employee_id, first_name, last_name) VALUES (100, 'John', 'Doe')
""")
session.execute("SELECT * FROM employees WHERE employee_id = :id", id=100)
Features:
Parameter style:
:name(named)Both sync and async modes
Connection pooling
Oracle-specific types
BigQuery¶
Google Cloud BigQuery for large-scale analytics.
from sqlspec.adapters.bigquery import BigQueryConfig
config = BigQueryConfig(
connection_config={
"project": "my-project",
"credentials": credentials_object,
}
)
with spec.provide_session(config) as session:
result = session.execute("""
SELECT DATE(timestamp) as date,
COUNT(*) as events
FROM `project.dataset.events`
WHERE timestamp >= @start_date
GROUP BY date
""", start_date=datetime.date(2025, 1, 1))
Features:
Parameter style:
@name(named_at)Job-based execution
Massive scale analytics
Standard SQL support
Google Cloud Spanner¶
Globally distributed, horizontally scalable database with strong consistency.
from sqlspec.adapters.spanner import SpannerSyncConfig
config = SpannerSyncConfig(
connection_config={
"project": "my-project",
"instance": "my-instance",
"database": "my-database",
}
)
with spec.provide_session(config) as session:
result = session.execute("""
SELECT user_id, name, email
FROM users
WHERE created_at > @start_date
""", start_date=datetime.datetime(2025, 1, 1))
Features:
Parameter style:
@name(named_at)GoogleSQL and PostgreSQL dialect modes
Horizontal scalability with strong consistency
Interleaved tables for parent-child relationships
Row deletion policies for automatic data lifecycle
Native UUID and JSON support
Query Execution Methods¶
All drivers support these query execution methods through sessions.
execute()¶
Execute any SQL statement and return results.
from sqlspec import SQLSpec
from sqlspec.adapters.sqlite import SqliteConfig
spec = SQLSpec()
config = SqliteConfig(connection_config={"database": ":memory:", "timeout": 5.0, "check_same_thread": False})
with spec.provide_session(config) as session:
create_table_query = (
"""create table if not exists users (id default int primary key, name varchar(128), email text)"""
)
_ = session.execute(create_table_query)
# Examples are documentation snippets; ensure module importable
result = session.execute("SELECT * FROM users WHERE id = ?", 1)
# INSERT query
result = session.execute("INSERT INTO users (name, email) VALUES (?, ?)", "Alice", "[email protected]")
# UPDATE query
result = session.execute("UPDATE users SET email = ? WHERE id = ?", "[email protected]", 1)
print(f"Updated {result.rows_affected} rows")
# DELETE query
result = session.execute("DELETE FROM users WHERE id = ?", 1)
execute_many()¶
Execute a statement with multiple parameter sets (batch insert/update).
from sqlspec import SQLSpec
from sqlspec.adapters.sqlite import SqliteConfig
spec = SQLSpec()
config = SqliteConfig(connection_config={"database": ":memory:", "timeout": 5.0, "check_same_thread": False})
with spec.provide_session(config) as session:
create_table_query = """create table if not exists users (id default int primary key, name varchar(128), email text, status varchar(32))"""
_ = session.execute(create_table_query)
# Batch examples are documentation-only
# Batch insert
session.execute_many(
"INSERT INTO users (id, name, email, status) VALUES (?, ?, ?, ?)",
[
(1, "Alice", "[email protected]", "active"),
(2, "Bob", "[email protected]", "inactive"),
(3, "Charlie", "[email protected]", "active"),
],
)
# Batch update
session.execute_many("UPDATE users SET status = ? WHERE id = ?", [("inactive", 1), ("active", 2)])
select()¶
Execute a SELECT query and return all rows.
results = session.select("SELECT * FROM users")
print(results)
# Returns list of dictionaries: [{"id": 1, "name": "Alice", ...}, ...]
select_one()¶
Execute a SELECT query expecting exactly one result.
user = session.select_one("SELECT * FROM users WHERE id = ?", 1)
print(user)
# Returns single dictionary: {"id": 1, "name": "Alice", ...}
# Raises NotFoundError if no results
# Raises MultipleResultsFoundError if multiple results
select_one_or_none()¶
Execute a SELECT query returning one or no results.
user = session.select_one_or_none("SELECT * FROM users WHERE email = ?", "[email protected]")
# Returns dictionary or None
# Raises MultipleResultsFoundError if multiple results
select_value()¶
Execute a SELECT query returning a single scalar value.
session.select_value("SELECT COUNT(*) FROM users")
# Returns: 3
session.select_value("SELECT MAX(id) FROM users")
# Returns: 3
asyncpg-Compatible Method Aliases (fetch*)¶
For users transitioning from asyncpg or preferring its naming convention, SQLSpec provides fetch* aliases for all select* methods. Both naming styles are fully supported and produce identical results.
Primary Method |
asyncpg-Compatible Alias |
|---|---|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
Example using fetch() instead of select():
# Both styles work identically
async with spec.provide_session(config) as session:
# Primary naming (recommended in docs)
users = await session.select("SELECT * FROM users WHERE age > ?", 18)
# asyncpg-compatible naming (identical behavior)
users = await session.fetch("SELECT * FROM users WHERE age > ?", 18)
# fetch_one() is equivalent to select_one()
user = await session.fetch_one("SELECT * FROM users WHERE id = ?", 1)
# fetch_value() is equivalent to select_value()
count = await session.fetch_value("SELECT COUNT(*) FROM users")
Note: Both naming conventions are fully supported and will not be deprecated. Choose the style that feels most natural for your codebase. The SQLSpec documentation primarily uses select* methods, but fetch* aliases are equally valid.
Working with Results¶
SQLResult Object¶
All queries return a SQLResult object with rich result information.
result = session.execute("SELECT id, name, email FROM users")
# Access raw data
result.data # List of dictionaries
result.column_names # ["id", "name", "email"]
result.rows_affected # For INSERT/UPDATE/DELETE
result.operation_type # "SELECT", "INSERT", etc.
# Convenience methods
with pytest.raises(ValueError): # noqa: PT011
user = result.one() # Single row (raises if not exactly 1)
with pytest.raises(ValueError): # noqa: PT011
user = result.one_or_none() # Single row or None
with pytest.raises(ValueError): # noqa: PT011
result.scalar() # First column of first row
Iterating Results¶
result = session.execute("SELECT * FROM users")
# Get all rows and iterate
users = result.all()
for user in users:
print(f"{user['name']}: {user['email']}")
# List comprehension
[user["name"] for user in result.all()]
Schema Mapping¶
Map results to typed objects automatically.# end-example
from pydantic import BaseModel
class User(BaseModel):
id: int
name: str
email: str
# Execute query
result = session.execute("SELECT id, name, email FROM users")
# Map results to typed User instances
users: list[User] = result.all(schema_type=User)
# Or get single typed result
user_result = session.execute("SELECT id, name, email FROM users WHERE id = ?", 1)
user: User = user_result.one(schema_type=User)
Transactions¶
Manual Transaction Control¶
try:
session.begin()
session.execute("INSERT INTO users (name) VALUES (?)", "Alice")
session.execute("INSERT INTO logs (action) VALUES (?)", "user_created")
session.commit()
except Exception:
session.rollback()
raise
Context Manager Transactions¶
session.begin() returns a coroutine, so wrap it in your own helper if you
prefer context manager semantics.
async transaction helper¶@asynccontextmanager
async def transactional_scope(session: "AiosqliteDriver") -> "AsyncIterator[None]":
await session.begin()
try:
yield
except Exception:
await session.rollback()
raise
else:
await session.commit()
async with spec.provide_session(config) as session:
await session.execute("DROP TABLE IF EXISTS accounts")
await session.execute("CREATE TABLE accounts (id INTEGER PRIMARY KEY, balance INTEGER)")
await session.execute_many("INSERT INTO accounts (id, balance) VALUES (?, ?)", [(1, 200), (2, 100)])
async with transactional_scope(session):
await session.execute("UPDATE accounts SET balance = balance - 50 WHERE id = :id", id=1)
await session.execute("UPDATE accounts SET balance = balance + 50 WHERE id = :id", id=2)
Parameter Binding¶
Positional Parameters¶
positional parameters¶session.execute("INSERT INTO users (id, status, name) VALUES (?, ?, ?)", 1, "active", "Alice")
user = session.select_one("SELECT name, status FROM users WHERE id = ?", 1)
Named Parameters¶
named parameters¶session.execute(
"INSERT INTO logs (id, action, created_at) VALUES (:id, :action, :created)",
id=1,
action="created",
created="2025-01-01T00:00:00",
)
row = session.select_one("SELECT action FROM logs WHERE id = :id", id=1)
Type Coercion¶
SQLSpec automatically coerces types based on driver requirements:
type coercion¶session.execute(
"INSERT INTO events (id, is_active, ts) VALUES (:id, :is_active, :ts)",
{"id": 1, "is_active": True, "ts": dt.datetime(2025, 1, 1, 12, 0, 0)},
)
row = session.select_one("SELECT is_active, ts FROM events WHERE id = :id", id=1)
Script Execution¶
Execute multiple SQL statements in one call:
script execution¶session.execute_script(
"""
CREATE TABLE users (
id INTEGER PRIMARY KEY,
name TEXT NOT NULL
);
CREATE TABLE posts (
id INTEGER PRIMARY KEY,
user_id INTEGER,
title TEXT,
FOREIGN KEY (user_id) REFERENCES users(id)
);
CREATE INDEX idx_posts_user_id ON posts(user_id);
"""
)
Performance Tips¶
1. Use Connection Pooling
asyncpg connection pooling¶config = AsyncpgConfig(connection_config={"dsn": dsn, "min_size": 10, "max_size": 20})
2. Batch Operations
Use execute_many() for bulk inserts:
batch inserts¶names = ["alice", "bob", "carla"]
session.execute_many("INSERT INTO users (name) VALUES (?)", [(name,) for name in names])
3. Prepared Statements
Drivers like asyncpg automatically prepare frequently-used statements.
4. Use Appropriate Methods
# Instead of:
result = session.execute("SELECT COUNT(*) FROM users")
count = result.scalar()
# Use:
count = session.select_value("SELECT COUNT(*) FROM users")
Driver Selection Guide¶
Choose the right driver for your use case:
Database |
Recommended Driver |
Use Case |
|---|---|---|
PostgreSQL (Async) |
asyncpg |
Async applications with connection pooling |
PostgreSQL (Sync) |
psycopg |
Traditional sync applications |
SQLite (Async) |
aiosqlite |
Async web applications with SQLite |
SQLite (Sync) |
sqlite3 |
Simple applications, testing |
MySQL (Async) |
asyncmy |
Async MySQL applications |
Analytics |
DuckDB |
OLAP, data analysis, reporting |
Cloud Analytics |
BigQuery |
Large-scale cloud data warehousing |
Global Scale |
Spanner |
Globally distributed, strongly consistent workloads |
Next Steps¶
Query Builder - Build queries programmatically
SQL File Loader - Load queries from SQL files
Driver - Detailed driver API reference
See Also¶
Configuration - Configure database connections
Execution Flow - Understanding query execution
Framework Integrations - Framework-specific usage