Drivers and Querying¶
SQLSpec provides unified database drivers for multiple database systems, both synchronous and asynchronous. This guide covers all available drivers and query execution methods.
Overview¶
SQLSpec supports 10+ database backends through adapter drivers:
asyncpg (async)
psycopg (sync/async)
psqlpy (async)
ADBC (sync/async)
sqlite3 (sync)
aiosqlite (async)
ADBC (sync/async)
asyncmy (async)
DuckDB (sync)
Oracle (sync/async)
BigQuery (sync)
All drivers implement a consistent API for query execution.
Driver Architecture¶
SQLSpec drivers follow a layered architecture:
Config Layer: Database connection parameters
Pool Layer: Connection pooling (where supported)
Driver Layer: Query execution and result handling
Session Layer: Transaction management
# Typical driver usage
spec = SQLSpec()
db = spec.add_config(AsyncpgConfig(pool_config={...})) # Config layer, registers pool
async with spec.provide_session(db) as session: # Session layer
result = await session.execute("SELECT 1") # Driver layer
PostgreSQL Drivers¶
asyncpg (Recommended for Async)¶
Async PostgreSQL driver with native connection pooling.
from sqlspec import SQLSpec
from sqlspec.adapters.asyncpg import AsyncpgConfig
spec = SQLSpec()
db = spec.add_config(
AsyncpgConfig(
pool_config={
"dsn": "postgresql://user:pass@localhost:5432/mydb",
"min_size": 10,
"max_size": 20,
}
)
)
async with spec.provide_session(db) as session:
# Basic query
result = await session.execute("SELECT * FROM users WHERE id = $1", 1)
user = result.one()
# Insert with RETURNING
result = await session.execute(
"INSERT INTO users (name, email) VALUES ($1, $2) RETURNING id",
"Alice",
"[email protected]"
)
new_id = result.scalar()
Features:
Parameter style:
$1, $2, ...(numeric)Native prepared statements
Binary protocol support
Connection pooling
Copy operations for bulk data
psycopg (Sync/Async)¶
Official PostgreSQL adapter with both sync and async support.
from sqlspec.adapters.psycopg import PsycopgConfig
# Async version
config = PsycopgConfig(
pool_config={
"conninfo": "postgresql://localhost/db",
"min_size": 5,
"max_size": 10,
}
)
async with spec.provide_session(config) as session:
result = await session.execute("SELECT * FROM users")
# Sync version (use psycopg sync config)
with spec.provide_session(config) as session:
result = session.execute("SELECT * FROM users")
Features:
Parameter style:
%s(format) or%(name)s(pyformat)Server-side cursors
COPY operations
Binary protocol
Pipeline mode (psycopg 3)
psqlpy (High Performance Async)¶
Rust-based async PostgreSQL driver for maximum performance.
from sqlspec.adapters.psqlpy import PsqlpyConfig
config = PsqlpyConfig(
pool_config={
"dsn": "postgresql://localhost/db",
"max_pool_size": 20,
}
)
async with spec.provide_session(config) as session:
result = await session.execute("SELECT * FROM users WHERE id = $1", 1)
Features:
Written in Rust for performance
Async-first design
Connection pooling
SQLite Drivers¶
sqlite3 (Synchronous)¶
Python’s built-in SQLite adapter.
from sqlspec.adapters.sqlite import SqliteConfig
config = SqliteConfig(
pool_config={
"database": "myapp.db",
"timeout": 5.0,
"check_same_thread": False,
}
)
with spec.provide_session(config) as session:
# Create table
session.execute("""
CREATE TABLE IF NOT EXISTS users (
id INTEGER PRIMARY KEY,
name TEXT NOT NULL
)
""")
# Insert with parameters
session.execute(
"INSERT INTO users (name) VALUES (?)",
"Alice"
)
# Query
result = session.execute("SELECT * FROM users")
users = result.all()
Features:
Parameter style:
?(qmark)Lightweight and embedded
Thread-local connections
Script execution support
aiosqlite (Asynchronous)¶
Async wrapper around sqlite3.
from sqlspec.adapters.aiosqlite import AiosqliteConfig
config = AiosqliteConfig(
pool_config={"database": "myapp.db"}
)
async with spec.provide_session(config) as session:
await session.execute(
"INSERT INTO users (name) VALUES (?)",
"Bob"
)
result = await session.execute("SELECT * FROM users")
Features:
Async interface to SQLite
Thread pool execution for blocking operations
Same parameter style as sqlite3
MySQL Drivers¶
asyncmy (Asynchronous)¶
Pure Python async MySQL/MariaDB driver.
from sqlspec.adapters.asyncmy import AsyncmyConfig
config = AsyncmyConfig(
pool_config={
"host": "localhost",
"port": 3306,
"user": "myuser",
"password": "mypassword",
"database": "mydb",
"minsize": 1,
"maxsize": 10,
}
)
async with spec.provide_session(config) as session:
result = await session.execute(
"SELECT * FROM users WHERE id = %s",
1
)
Features:
Parameter style:
%s(format)Connection pooling
MySQL-specific types
Character set support
Other Database Drivers¶
DuckDB (Analytical Database)¶
In-process analytical database optimized for OLAP workloads.
from sqlspec.adapters.duckdb import DuckDBConfig
# In-memory
config = DuckDBConfig()
# Persistent
config = DuckDBConfig(
pool_config={"database": "analytics.duckdb"}
)
with spec.provide_session(config) as session:
# Create table from Parquet
session.execute("""
CREATE TABLE users AS
SELECT * FROM read_parquet('users.parquet')
""")
# Analytical query
result = session.execute("""
SELECT date_trunc('day', created_at) as day,
count(*) as user_count
FROM users
GROUP BY day
ORDER BY day
""")
Features:
OLAP-optimized query engine
Parquet/CSV support
Columnar storage
Fast aggregations
Oracle Database¶
Oracle database support with python-oracledb.
from sqlspec.adapters.oracledb import OracleDBConfig
config = OracleDBConfig(
pool_config={
"user": "myuser",
"password": "mypassword",
"dsn": "localhost:1521/ORCLPDB",
}
)
with spec.provide_session(config) as session:
result = session.execute(
"SELECT * FROM employees WHERE employee_id = :id",
id=100
)
Features:
Parameter style:
:name(named)Both sync and async modes
Connection pooling
Oracle-specific types
BigQuery¶
Google Cloud BigQuery for large-scale analytics.
from sqlspec.adapters.bigquery import BigQueryConfig
config = BigQueryConfig(
pool_config={
"project": "my-project",
"credentials": credentials_object,
}
)
with spec.provide_session(config) as session:
result = session.execute("""
SELECT DATE(timestamp) as date,
COUNT(*) as events
FROM `project.dataset.events`
WHERE timestamp >= @start_date
GROUP BY date
""", start_date=datetime.date(2025, 1, 1))
Features:
Parameter style:
@name(named_at)Job-based execution
Massive scale analytics
Standard SQL support
Query Execution Methods¶
All drivers support these query execution methods through sessions.
execute()¶
Execute any SQL statement and return results.
# SELECT query
result = session.execute("SELECT * FROM users WHERE id = ?", 1)
# INSERT query
result = session.execute(
"INSERT INTO users (name, email) VALUES (?, ?)",
"Alice",
"[email protected]"
)
# UPDATE query
result = session.execute(
"UPDATE users SET email = ? WHERE id = ?",
"[email protected]",
1
)
print(f"Updated {result.rows_affected} rows")
# DELETE query
result = session.execute("DELETE FROM users WHERE id = ?", 1)
execute_many()¶
Execute a statement with multiple parameter sets (batch insert/update).
# Batch insert
session.execute_many(
"INSERT INTO users (name, email) VALUES (?, ?)",
[
("Alice", "[email protected]"),
("Bob", "[email protected]"),
("Charlie", "[email protected]"),
]
)
# Batch update
session.execute_many(
"UPDATE users SET status = ? WHERE id = ?",
[
("active", 1),
("inactive", 2),
]
)
select()¶
Execute a SELECT query and return all rows.
users = session.execute("SELECT * FROM users WHERE status = ?", "active")
# Returns list of dictionaries: [{"id": 1, "name": "Alice", ...}, ...]
select_one()¶
Execute a SELECT query expecting exactly one result.
user = session.select_one("SELECT * FROM users WHERE id = ?", 1)
# Returns single dictionary: {"id": 1, "name": "Alice", ...}
# Raises NotFoundError if no results
# Raises MultipleResultsFoundError if multiple results
select_one_or_none()¶
Execute a SELECT query returning one or no results.
user = session.select_one_or_none("SELECT * FROM users WHERE email = ?", "[email protected]")
# Returns dictionary or None
# Raises MultipleResultsFoundError if multiple results
select_value()¶
Execute a SELECT query returning a single scalar value.
count = session.select_value("SELECT COUNT(*) FROM users")
# Returns: 42
latest_id = session.select_value("SELECT MAX(id) FROM users")
# Returns: 100
Working with Results¶
SQLResult Object¶
All queries return a SQLResult object with rich result information.
result = session.execute("SELECT id, name, email FROM users")
# Access raw data
result.data # List of dictionaries
result.column_names # ["id", "name", "email"]
result.rows_affected # For INSERT/UPDATE/DELETE
result.operation_type # "SELECT", "INSERT", etc.
# Convenience methods
user = result.one() # Single row (raises if not exactly 1)
user = result.one_or_none() # Single row or None
value = result.scalar() # First column of first row
Iterating Results¶
result = session.execute("SELECT * FROM users")
# Get all rows and iterate
users = result.all()
for user in users:
print(f"{user['name']}: {user['email']}")
# List comprehension
names = [user['name'] for user in result.all()]
Schema Mapping¶
Map results to typed objects automatically.
from pydantic import BaseModel
class User(BaseModel):
id: int
name: str
email: str
# Execute query
result = session.execute("SELECT id, name, email FROM users")
# Map results to typed User instances
users: list[User] = result.all(schema_type=User)
# Or get single typed result
user_result = session.execute("SELECT id, name, email FROM users WHERE id = ?", 1)
user: User = user_result.one(schema_type=User)
Transactions¶
Manual Transaction Control¶
with spec.provide_session(config) as session:
try:
session.begin()
session.execute("INSERT INTO users (name) VALUES (?)", "Alice")
session.execute("INSERT INTO logs (action) VALUES (?)", "user_created")
session.commit()
except Exception:
session.rollback()
raise
Context Manager Transactions¶
async with spec.provide_session(config) as session:
async with session.begin():
await session.execute("UPDATE accounts SET balance = balance - 100 WHERE id = ?", 1)
await session.execute("UPDATE accounts SET balance = balance + 100 WHERE id = ?", 2)
# Auto-commits on success, auto-rollbacks on exception
Parameter Binding¶
Positional Parameters¶
# SQLite, DuckDB (?)
session.execute("SELECT * FROM users WHERE id = ?", 1)
# PostgreSQL (asyncpg) ($1, $2, ...)
session.execute("SELECT * FROM users WHERE id = $1 AND status = $2", 1, "active")
# MySQL (%s)
session.execute("SELECT * FROM users WHERE id = %s", 1)
Named Parameters¶
# SQLite, Oracle (:name)
session.execute(
"SELECT * FROM users WHERE id = :id AND status = :status",
id=1,
status="active"
)
# BigQuery (@name)
session.execute(
"SELECT * FROM users WHERE created_at >= @start_date",
start_date=datetime.date(2025, 1, 1)
)
Type Coercion¶
SQLSpec automatically coerces types based on driver requirements:
# Booleans to integers (SQLite)
session.execute("INSERT INTO users (is_active) VALUES (?)", True)
# SQLite receives: 1
# Datetime to ISO format (JSON databases)
session.execute(
"INSERT INTO events (timestamp) VALUES (?)",
datetime.datetime.now()
)
Script Execution¶
Execute multiple SQL statements in one call:
session.execute("""
CREATE TABLE users (
id INTEGER PRIMARY KEY,
name TEXT NOT NULL
);
CREATE TABLE posts (
id INTEGER PRIMARY KEY,
user_id INTEGER,
title TEXT,
FOREIGN KEY (user_id) REFERENCES users(id)
);
CREATE INDEX idx_posts_user_id ON posts(user_id);
""")
Performance Tips¶
1. Use Connection Pooling
config = AsyncpgConfig(
pool_config={
"dsn": "postgresql://localhost/db",
"min_size": 10,
"max_size": 20,
}
)
2. Batch Operations
Use execute_many() for bulk inserts:
# Fast batch insert
session.execute_many(
"INSERT INTO users (name) VALUES (?)",
[(name,) for name in large_list]
)
3. Prepared Statements
Drivers like asyncpg automatically prepare frequently-used statements.
4. Use Appropriate Methods
# Instead of:
result = session.execute("SELECT COUNT(*) FROM users")
count = result.scalar()
# Use:
count = session.select_value("SELECT COUNT(*) FROM users")
Driver Selection Guide¶
Choose the right driver for your use case:
Database |
Recommended Driver |
Use Case |
|---|---|---|
PostgreSQL (Async) |
asyncpg |
Async applications with connection pooling |
PostgreSQL (Sync) |
psycopg |
Traditional sync applications |
SQLite (Async) |
aiosqlite |
Async web applications with SQLite |
SQLite (Sync) |
sqlite3 |
Simple applications, testing |
MySQL (Async) |
asyncmy |
Async MySQL applications |
Analytics |
DuckDB |
OLAP, data analysis, reporting |
Cloud Analytics |
BigQuery |
Large-scale cloud data warehousing |
Next Steps¶
Query Builder - Build queries programmatically
SQL File Loader - Load queries from SQL files
Driver - Detailed driver API reference
See Also¶
Configuration - Configure database connections
Execution Flow - Understanding query execution
Framework Integrations - Framework-specific usage