Framework Integrations

SQLSpec integrates seamlessly with popular Python web frameworks through plugins and dependency injection. This guide covers integration with Litestar, FastAPI, and other frameworks.

Overview

SQLSpec provides framework-specific plugins that handle:

  • Connection lifecycle management

  • Dependency injection

  • Transaction management

  • Request-scoped sessions

  • Automatic cleanup

Litestar Integration

The Litestar plugin provides first-class integration with comprehensive features.

Basic Setup

from litestar import Litestar
from sqlspec import SQLSpec
from sqlspec.adapters.asyncpg import AsyncpgConfig
from sqlspec.extensions.litestar import SQLSpecPlugin

# Configure database and create plugin
spec = SQLSpec()
db = spec.add_config(
    AsyncpgConfig(
        pool_config={
            "dsn": "postgresql://localhost/mydb",
            "min_size": 10,
            "max_size": 20,
        }
    )
)
sqlspec_plugin = SQLSpecPlugin(sqlspec=spec)

# Create Litestar app
app = Litestar(
    route_handlers=[...],
    plugins=[sqlspec_plugin]
)

Using Dependency Injection

The plugin provides dependency injection for connections, pools, and sessions:

from litestar import get
from sqlspec.driver import AsyncDriverAdapterBase

# Inject database session
@get("/users/{user_id:int}")
async def get_user(
    user_id: int,
    db_session: AsyncDriverAdapterBase
) -> dict:
    result = await db_session.execute(
        "SELECT id, name, email FROM users WHERE id = $1",
        user_id
    )
    return result.one()

# Inject connection pool
@get("/health")
async def health_check(db_pool) -> dict:
    async with db_pool.acquire() as conn:
        result = await conn.fetchval("SELECT 1")
        return {"status": "healthy" if result == 1 else "unhealthy"}

# Inject raw connection
@get("/stats")
async def stats(db_connection) -> dict:
    result = await db_connection.fetchval("SELECT COUNT(*) FROM users")
    return {"user_count": result}

Commit Modes

The plugin supports different transaction commit strategies configured via extension_config:

Manual Commit Mode (Default)

You control transaction boundaries explicitly:

from litestar import post
from sqlspec import SQLSpec
from sqlspec.adapters.asyncpg import AsyncpgConfig
from sqlspec.driver import AsyncDriverAdapterBase

spec = SQLSpec()
db = spec.add_config(
    AsyncpgConfig(
        pool_config={"dsn": "postgresql://..."},
        extension_config={
            "litestar": {"commit_mode": "manual"}  # Default
        }
    )
)

@post("/users")
async def create_user(
    data: dict,
    db_session: AsyncDriverAdapterBase
) -> dict:
    async with db_session.begin_transaction():
        result = await db_session.execute(
            "INSERT INTO users (name, email) VALUES ($1, $2) RETURNING id",
            data["name"],
            data["email"]
        )
        return result.one()

Autocommit Mode

Automatically commits on successful requests (2xx responses):

spec = SQLSpec()
db = spec.add_config(
    AsyncpgConfig(
        pool_config={"dsn": "postgresql://..."},
        extension_config={
            "litestar": {"commit_mode": "autocommit"}  # Auto-commit on 2xx
        }
    )
)

@post("/users")
async def create_user(
    data: dict,
    db_session: AsyncDriverAdapterBase
) -> dict:
    # Transaction begins automatically
    result = await db_session.execute(
        "INSERT INTO users (name, email) VALUES ($1, $2) RETURNING id",
        data["name"],
        data["email"]
    )
    # Commits automatically on success
    return result.one()

Autocommit with Redirects

Commits on both 2xx and 3xx responses:

spec = SQLSpec()
db = spec.add_config(
    AsyncpgConfig(
        pool_config={"dsn": "postgresql://..."},
        extension_config={
            "litestar": {"commit_mode": "autocommit_include_redirect"}
        }
    )
)

Custom Dependency Keys

Customize the dependency injection keys via extension_config:

from sqlspec import SQLSpec
from sqlspec.adapters.asyncpg import AsyncpgConfig
from sqlspec.driver import AsyncDriverAdapterBase

spec = SQLSpec()
db = spec.add_config(
    AsyncpgConfig(
        pool_config={"dsn": "postgresql://..."},
        extension_config={
            "litestar": {
                "connection_key": "database",    # Default: "db_connection"
                "pool_key": "db_pool",           # Default: "db_pool"
                "session_key": "session",        # Default: "db_session"
            }
        }
    )
)

@get("/users")
async def list_users(session: AsyncDriverAdapterBase) -> list:
    result = await session.execute("SELECT * FROM users")
    return result.all()

Multiple Database Configurations

The plugin supports multiple database configurations through a single SQLSpec instance:

from sqlspec import SQLSpec
from sqlspec.adapters.asyncpg import AsyncpgConfig
from sqlspec.driver import AsyncDriverAdapterBase
from sqlspec.extensions.litestar import SQLSpecPlugin

spec = SQLSpec()

# Main database
main_db = spec.add_config(
    AsyncpgConfig(
        pool_config={"dsn": "postgresql://localhost/main"},
        extension_config={
            "litestar": {
                "session_key": "main_db",
                "connection_key": "main_db_connection",
            }
        }
    )
)

# Analytics database
analytics_db = spec.add_config(
    AsyncpgConfig(
        pool_config={"dsn": "postgresql://localhost/analytics"},
        extension_config={
            "litestar": {
                "session_key": "analytics_db",
                "connection_key": "analytics_connection",
            }
        }
    )
)

# Create single plugin with all configs
app = Litestar(
    plugins=[SQLSpecPlugin(sqlspec=spec)]
)

# Use in handlers
@get("/report")
async def generate_report(
    main_db: AsyncDriverAdapterBase,
    analytics_db: AsyncDriverAdapterBase
) -> dict:
    users = await main_db.execute("SELECT COUNT(*) FROM users")
    events = await analytics_db.execute("SELECT COUNT(*) FROM events")
    return {
        "total_users": users.scalar(),
        "total_events": events.scalar()
    }

Session Storage Backend

Use SQLSpec as a session backend for Litestar:

from litestar import Litestar
from litestar.middleware.session.server_side import ServerSideSessionConfig
from sqlspec import SQLSpec
from sqlspec.adapters.asyncpg import AsyncpgConfig
from sqlspec.adapters.asyncpg.litestar import AsyncpgStore
from sqlspec.extensions.litestar import SQLSpecPlugin

# Configure database with session support
spec = SQLSpec()
db = spec.add_config(
    AsyncpgConfig(
        pool_config={"dsn": "postgresql://localhost/db"},
        extension_config={
            "litestar": {"session_table": "litestar_sessions"}
        },
        migration_config={
            "script_location": "migrations",
            "include_extensions": ["litestar"]
        }
    )
)

# Create session store using adapter-specific class
store = AsyncpgStore(db)

# Configure Litestar with plugin and session middleware
app = Litestar(
    plugins=[SQLSpecPlugin(sqlspec=spec)],
    middleware=[
        ServerSideSessionConfig(store=store).middleware
    ]
)

CLI Integration

The plugin provides CLI commands for database management:

# Generate migration
litestar db migrations generate -m "Add users table"

# Apply migrations (includes extension migrations)
litestar db migrations upgrade

# Rollback migration
litestar db migrations downgrade

# Show current migration version
litestar db migrations current

# Show migration history (verbose)
litestar db migrations current --verbose

Note

Extension migrations (like Litestar session tables) are included automatically when include_extensions contains "litestar" in your migration config.

Correlation Middleware

Enable request correlation tracking via extension_config:

from sqlspec import SQLSpec
from sqlspec.adapters.asyncpg import AsyncpgConfig

spec = SQLSpec()
db = spec.add_config(
    AsyncpgConfig(
        pool_config={"dsn": "postgresql://..."},
        extension_config={
            "litestar": {
                "enable_correlation_middleware": True,  # Default: True
                "correlation_header": "x-request-id",
                "correlation_headers": ["x-client-trace"],
                "auto_trace_headers": True,
             }
         }
     )
 )

# Queries will include correlation IDs in logs (header or generated UUID)
# Format: [correlation_id=abc123] SELECT * FROM users

FastAPI Integration

While SQLSpec doesn’t have a dedicated FastAPI plugin, integration is straightforward using dependency injection.

Basic Setup

from fastapi import FastAPI
from contextlib import asynccontextmanager
from sqlspec import SQLSpec
from sqlspec.adapters.asyncpg import AsyncpgConfig

# Configure database
spec = SQLSpec()
db = spec.add_config(
    AsyncpgConfig(
        pool_config={
            "dsn": "postgresql://localhost/mydb",
            "min_size": 10,
            "max_size": 20,
        }
    )
)

# Lifespan context manager
@asynccontextmanager
async def lifespan(app: FastAPI):
    # Startup
    yield
    # Shutdown
    await spec.close_all_pools()

app = FastAPI(lifespan=lifespan)

Dependency Injection

Create a dependency function for database sessions:

from collections.abc import AsyncGenerator

async def get_db_session() -> AsyncGenerator[AsyncDriverAdapterBase, None]:
    async with spec.provide_session(config) as session:
        yield session

# Use in route handlers
@app.get("/users/{user_id}")
async def get_user(
    user_id: int,
    db: AsyncDriverAdapterBase = Depends(get_db_session)
) -> dict:
    result = await db.execute(
        "SELECT id, name, email FROM users WHERE id = $1",
        user_id
    )
    return result.one()
from collections.abc import AsyncGenerator

async def get_db_session() -> AsyncGenerator[AsyncDriverAdapterBase]:
    async with spec.provide_session(config) as session:
        yield session

# Use in route handlers
@app.get("/users/{user_id}")
async def get_user(
    user_id: int,
    db: AsyncDriverAdapterBase = Depends(get_db_session)
) -> dict:
    result = await db.execute(
        "SELECT id, name, email FROM users WHERE id = $1",
        user_id
    )
    return result.one()

Transaction Management

Implement transaction handling with FastAPI:

@app.post("/users")
async def create_user(
    user_data: dict,
    db: AsyncDriverAdapterBase = Depends(get_db_session)
) -> dict:
    async with db.begin_transaction():
        result = await db.execute(
            "INSERT INTO users (name, email) VALUES ($1, $2) RETURNING id",
            user_data["name"],
            user_data["email"]
        )

        user_id = result.scalar()

        # Additional operations in same transaction
        await db.execute(
            "INSERT INTO audit_log (action, user_id) VALUES ($1, $2)",
            "user_created",
            user_id
        )

        return result.one()

Multiple Databases

Support multiple databases with different dependencies:

# Main database
main_db = spec.add_config(AsyncpgConfig(pool_config={"dsn": "postgresql://localhost/main"}))

# Analytics database
analytics_db = spec.add_config(AsyncpgConfig(pool_config={"dsn": "postgresql://localhost/analytics"}))

# Dependency functions
async def get_main_db():
    async with spec.provide_session(main_db) as session:
        yield session

async def get_analytics_db():
    async with spec.provide_session(analytics_db) as session:
        yield session

# Use in handlers
@app.get("/report")
async def generate_report(
    main_db: AsyncDriverAdapterBase = Depends(get_main_db),
    analytics_db: AsyncDriverAdapterBase = Depends(get_analytics_db)
) -> dict:
    users = await main_db.execute("SELECT COUNT(*) FROM users")
    events = await analytics_db.execute("SELECT COUNT(*) FROM events")
    return {
        "users": users.scalar(),
        "events": events.scalar()
    }

Sanic Integration

Integrate SQLSpec with Sanic using listeners and app context.

Basic Setup

from sanic import Sanic
from sqlspec import SQLSpec
from sqlspec.adapters.asyncpg import AsyncpgConfig

app = Sanic("MyApp")

# Initialize SQLSpec
spec = SQLSpec()
db = spec.add_config(AsyncpgConfig(pool_config={"dsn": "postgresql://localhost/db"}))

# Store in app context
app.ctx.sqlspec = spec
app.ctx.db_config = db

# Cleanup on shutdown
@app.before_server_stop
async def close_db(app, loop):
    await app.ctx.sqlspec.close_all_pools()

Using in Route Handlers

@app.get("/users/<user_id:int>")
async def get_user(request: Request, user_id: int):
    async with request.app.ctx.sqlspec.provide_session(request.app.ctx.db_config) as db:
        result = await db.execute(
            "SELECT id, name, email FROM users WHERE id = $1",
            user_id
        )
        return json(result.one())

Middleware for Automatic Sessions

@app.middleware("request")
async def add_db_session(request):
    request.ctx.db = await request.app.ctx.sqlspec.provide_session(
        request.app.ctx.db_config
    ).__aenter__()

@app.middleware("response")
async def cleanup_db_session(request, response):
    if hasattr(request.ctx, "db"):
        await request.ctx.db.__aexit__(None, None, None)

# Use in handlers
@app.get("/users")
async def list_users(request: Request):
    result = await request.ctx.db.execute("SELECT * FROM users")
    return json(result.rows)

Flask Integration

Integrate SQLSpec with Flask using synchronous drivers.

Basic Setup

from flask import Flask
from sqlspec import SQLSpec
from sqlspec.adapters.sqlite import SqliteConfig

app = Flask(__name__)

# Initialize SQLSpec
spec = SQLSpec()
db = spec.add_config(SqliteConfig(pool_config={"database": "app.db"}))

Using Request Context

def get_db():
    if 'db' not in g:
        g.db = spec.provide_session(db).__enter__()
    return g.db

@app.teardown_appcontext
def close_db(error):
    db = g.pop('db', None)
    if db is not None:
        db.__exit__(None, None, None)

# Use in routes
@app.route('/users/<int:user_id>')
def get_user(user_id):
    db = get_db()
    result = db.execute("SELECT * FROM users WHERE id = ?", user_id)
    return result.one()

Custom Integration Patterns

Context Manager Pattern

For frameworks without built-in dependency injection:

class DatabaseSession:
    def __init__(self, spec: SQLSpec, config):
        self.spec = spec
        self.config = config
        self.session = None

    async def __aenter__(self):
        self.session = await self.spec.provide_session(self.config).__aenter__()
        return self.session

    async def __aexit__(self, exc_type, exc_val, exc_tb):
        if self.session:
            await self.session.__aexit__(exc_type, exc_val, exc_tb)

# Usage
async with DatabaseSession(spec, config) as db:
    result = await db.execute("SELECT * FROM users")

Request-Scoped Sessions

Implement request-scoped database sessions:

from contextvars import ContextVar

db_session: ContextVar = ContextVar('db_session', default=None)

async def get_session():
    session = db_session.get()
    if session is None:
        session = await spec.provide_session(config).__aenter__()
        db_session.set(session)
    return session

async def cleanup_session():
    session = db_session.get()
    if session:
        await session.__aexit__(None, None, None)
        db_session.set(None)

Singleton Pattern

For simple applications with a single database:

class Database:
    _instance = None
    _spec = None
    _config = None

    def __new__(cls):
        if cls._instance is None:
            cls._instance = super().__new__(cls)
            cls._spec = SQLSpec()
            cls._config = cls._spec.add_config(
                AsyncpgConfig(pool_config={"dsn": "postgresql://localhost/db"})
            )
        return cls._instance

    async def session(self):
        return self._spec.provide_session(self._config)

# Usage
db = Database()
async with await db.session() as session:
    result = await session.execute("SELECT * FROM users")

Best Practices

1. Use Framework-Specific Plugins When Available

# Prefer Litestar plugin over manual setup
spec = SQLSpec()
db = spec.add_config(AsyncpgConfig(pool_config={"dsn": "postgresql://..."}))
app = Litestar(plugins=[SQLSpecPlugin(sqlspec=spec)])

2. Always Clean Up Pools

# FastAPI
@asynccontextmanager
async def lifespan(app: FastAPI):
    yield
    await spec.close_all_pools()

# Sanic
@app.before_server_stop
async def close_pools(app, loop):
    await spec.close_all_pools()

3. Use Dependency Injection

# Inject sessions, not global instances
async def get_db():
    async with spec.provide_session(config) as session:
        yield session

4. Handle Transactions Appropriately

# Use autocommit for simple CRUD
spec = SQLSpec()
db = spec.add_config(
    AsyncpgConfig(
        pool_config={"dsn": "postgresql://..."},
        extension_config={
            "litestar": {"commit_mode": "autocommit"}
        }
    )
)

# Manual transactions for complex operations
async with db_session.begin_transaction():
    # Multiple operations
    pass

5. Separate Database Logic

# Good: Separate repository layer
class UserRepository:
    def __init__(self, db: AsyncDriverAdapterBase):
        self.db = db

    async def get_user(self, user_id: int):
        result = await self.db.execute(
            "SELECT * FROM users WHERE id = $1",
            user_id
        )
        return result.one()

# Use in handlers
@app.get("/users/{user_id}")
async def get_user(
    user_id: int,
    db: AsyncDriverAdapterBase = Depends(get_db)
):
    repo = UserRepository(db)
    return await repo.get_user(user_id)

Testing

Testing with Framework Integration

import pytest
from sqlspec.adapters.sqlite import SqliteConfig

@pytest.fixture
async def test_db():
    spec = SQLSpec()
    db = spec.add_config(SqliteConfig(pool_config={"database": ":memory:"}))

    async with spec.provide_session(db) as session:
        # Set up test schema
        await session.execute("""
            CREATE TABLE users (
                id INTEGER PRIMARY KEY,
                name TEXT NOT NULL
            )
        """)
        yield session

async def test_create_user(test_db):
    result = await test_db.execute(
        "INSERT INTO users (name) VALUES ($1) RETURNING id",
        "Test User"
    )
    assert result.scalar() == 1

Next Steps

See Also