SQL File Loader

SQLSpec includes a powerful SQL file loader that manages SQL statements from files using aiosql-style named queries. This is the recommended approach for organizing SQL in larger applications.

Overview

The SQL file loader provides:

  • Named Queries: Organize SQL with descriptive names

  • File-Based Organization: Keep SQL separate from Python code

  • Caching: 12x+ performance improvement with intelligent caching

  • Multi-Source Support: Load from local files, URIs, or cloud storage

  • Type Safety: Integrate loaded queries with schema mapping

Why Use SQL Files?

Benefits

  • Separation of concerns (SQL separate from Python)

  • SQL syntax highlighting in editors

  • Easier SQL review and maintenance

  • Database-specific dialects support

  • Version control friendly

  • Shared queries across application layers

When to Use

  • Production applications with many queries

  • Complex queries that benefit from SQL formatting

  • Queries used in multiple places

  • Team collaboration on SQL

  • Database migrations and scripts

Basic Usage

Creating SQL Files

Create a SQL file with named queries using -- name: comments:

-- sql/users.sql

-- name: get_user_by_id
SELECT id, username, email, created_at
FROM users
WHERE id = :user_id;

-- name: list_active_users
SELECT id, username, email, last_login
FROM users
WHERE is_active = true
ORDER BY username
LIMIT :limit OFFSET :offset;

-- name: create_user
INSERT INTO users (username, email, password_hash)
VALUES (:username, :email, :password_hash)
RETURNING id, username, email, created_at;

Loading SQL Files

from sqlspec.loader import SQLFileLoader

# Create loader
loader = SQLFileLoader()

# Load SQL files
loader.load_sql("sql/users.sql", "sql/products.sql", "sql/orders.sql")

# Or load from a directory
loader.load_sql("sql/")

# List available queries
queries = loader.list_queries()
print(queries)  # ['get_user_by_id', 'list_active_users', 'create_user', ...]

Using Loaded Queries

from sqlspec import SQLSpec
from sqlspec.adapters.sqlite import SqliteConfig

# Set up database
spec = SQLSpec()
config = SqliteConfig()
spec.add_config(config)

# Get SQL with parameters
user_query = loader.get_sql("get_user_by_id", user_id=123)

# Execute with session
with spec.provide_session(config) as session:
    result = session.execute(user_query)
    user = result.one()

Query Naming Conventions

Query Name Syntax

-- name: query_name
SELECT ...

Naming Rules

  • Use descriptive, snake_case names

  • Include operation prefix: get_, list_, create_, update_, delete_

  • Indicate plurality: get_user vs list_users

  • Use namespaces with dots: users.get_by_id, orders.list_recent

-- Good names
-- name: get_user_by_id
-- name: list_active_users
-- name: create_user
-- name: update_user_email
-- name: delete_inactive_users

-- Namespaced names
-- name: analytics.daily_sales
-- name: reports.user_activity

Special Suffixes

The loader supports special suffixes for query types:

-- name: insert_user!
-- The ! suffix indicates a modifying query
INSERT INTO users (name) VALUES (:name);

-- name: get_users*
-- The * suffix indicates multiple results expected
SELECT * FROM users;

Dialect-Specific Queries

Specify Database Dialect

Use the -- dialect: comment for database-specific SQL:

-- sql/postgres_queries.sql

-- name: get_user_with_json
-- dialect: postgres
SELECT id, name, data->>'email' as email
FROM users
WHERE id = :user_id;

-- name: upsert_user
-- dialect: postgres
INSERT INTO users (id, name, email)
VALUES (:id, :name, :email)
ON CONFLICT (id) DO UPDATE
SET name = EXCLUDED.name, email = EXCLUDED.email;

Supported Dialects

  • postgres (PostgreSQL)

  • sqlite (SQLite)

  • mysql (MySQL/MariaDB)

  • oracle (Oracle Database)

  • mssql (Microsoft SQL Server)

  • duckdb (DuckDB)

  • bigquery (Google BigQuery)

The loader normalizes dialect names and handles common aliases (e.g., postgresqlpostgres).

Advanced Features

Adding Queries Programmatically

# Add a query at runtime
loader.add_named_sql(
    "health_check",
    "SELECT 'OK' as status, CURRENT_TIMESTAMP as timestamp"
)

# Add with dialect
loader.add_named_sql(
    "postgres_version",
    "SELECT version()",
    dialect="postgres"
)

# Use the added query
health_sql = loader.get_sql("health_check")

Query Metadata

Get information about loaded queries:

# Get file info for a query
file_info = loader.get_file_for_query("get_user_by_id")
if file_info:
    print(f"Query from: {file_info.path}")
    print(f"Checksum: {file_info.checksum}")
    print(f"Loaded at: {file_info.loaded_at}")

# Get all queries from a specific file
file_obj = loader.get_file("sql/users.sql")
if file_obj:
    print(f"Contains {len(file_obj.queries)} queries")
    for query in file_obj.queries:
        print(f"  - {query.name}")

Caching Behavior

The loader implements intelligent caching with 12x+ performance improvements:

# First load - reads from disk
loader.load_sql("sql/users.sql")

# Second load - uses cache (file already loaded)
loader.load_sql("sql/users.sql")

# Clear cache
loader.clear_cache()

# Force reload from disk
loader.load_sql("sql/users.sql")

Cache Features

  • File content checksums prevent stale data

  • Statement-level caching for instant access

  • Automatic invalidation on file changes (in development)

  • Configurable cache size limits

Loading Directories with Mixed Files

The loader gracefully handles directories containing both named query files and raw DDL/DML scripts:

migrations/
   schema.sql        # Raw DDL (no -- name:) → skipped
   queries.sql       # Named queries → loaded
   seed-data.sql     # Raw DML (no -- name:) → skipped
loader = SQLFileLoader()
loader.load_sql("migrations/")  # Only loads queries.sql

# Check what was loaded
queries = loader.list_queries()  # Only returns named queries

How it works:

  • Files without -- name: markers are gracefully skipped

  • Skipped files are logged at DEBUG level

  • Directory loading continues without errors

  • Only named query files are tracked and cached

When files are skipped:

  • Empty files

  • Files with only comments

  • Raw DDL scripts (CREATE TABLE, etc.)

  • Raw DML scripts without named markers

  • Files with -- dialect: but no -- name:

When files raise errors:

  • Duplicate query names within the same file

  • Malformed -- name: markers

  • Files with -- name: but no SQL content after parsing

Storage Backends

The loader supports multiple storage backends for loading SQL files.

Local Files

from pathlib import Path

# Load from Path object
loader.load_sql(Path("sql/users.sql"))

# Load from string path
loader.load_sql("sql/users.sql")

# Load directory
loader.load_sql("sql/")

File URIs

# Load from file:// URI
loader.load_sql("file:///absolute/path/to/queries.sql")

# Load from relative file URI
loader.load_sql("file://sql/users.sql")

Cloud Storage (with fsspec)

When fsspec is installed, load from cloud storage:

# S3
loader.load_sql("s3://my-bucket/sql/users.sql")

# Google Cloud Storage
loader.load_sql("gs://my-bucket/sql/users.sql")

# Azure Blob Storage
loader.load_sql("az://my-container/sql/users.sql")

# HTTP/HTTPS
loader.load_sql("https://example.com/queries/users.sql")

Integration with SQLSpec

Loader with SQLSpec Instance

Create a SQLSpec instance with an integrated loader:

from sqlspec import SQLSpec
from sqlspec.loader import SQLFileLoader

# Create loader
loader = SQLFileLoader()
loader.load_sql("sql/")

# Create SQLSpec with loader
spec = SQLSpec(loader=loader)

# Access loader via SQLSpec
user_query = spec.loader.get_sql("get_user_by_id", user_id=1)

Type-Safe Query Execution

Combine loaded queries with schema mapping:

from pydantic import BaseModel
from datetime import datetime

class User(BaseModel):
    id: int
    username: str
    email: str
    created_at: datetime

# Load and execute with type safety
query = loader.get_sql("get_user_by_id", user_id=1)

with spec.provide_session(config) as session:
    result = session.execute(query, schema_type=User)
    user: User = result.one()  # Fully typed!

Practical Examples

Example 1: User Management

-- sql/users.sql

-- name: get_user
SELECT id, username, email, is_active, created_at
FROM users
WHERE id = :user_id;

-- name: list_users
SELECT id, username, email, is_active
FROM users
WHERE (:status IS NULL OR is_active = :status)
ORDER BY created_at DESC
LIMIT :limit OFFSET :offset;

-- name: create_user
INSERT INTO users (username, email, password_hash)
VALUES (:username, :email, :password_hash)
RETURNING id, username, email, created_at;

-- name: update_user
UPDATE users
SET username = :username,
    email = :email,
    updated_at = CURRENT_TIMESTAMP
WHERE id = :user_id
RETURNING id, username, email, updated_at;

-- name: delete_user
DELETE FROM users WHERE id = :user_id;
# Python code
from sqlspec import SQLSpec
from sqlspec.loader import SQLFileLoader
from sqlspec.adapters.sqlite import SqliteConfig

loader = SQLFileLoader()
loader.load_sql("sql/users.sql")

spec = SQLSpec()
config = SqliteConfig()
spec.add_config(config)

with spec.provide_session(config) as session:
    # Create user
    create_query = loader.get_sql(
        "create_user",
        username="alice",
        email="[email protected]",
        password_hash="hashed_password"
    )
    result = session.execute(create_query)
    user = result.one()
    user_id = user['id']

    # Get user
    get_query = loader.get_sql("get_user", user_id=user_id)
    user = session.execute(get_query).one()

    # List users
    list_query = loader.get_sql("list_users", status=True, limit=10, offset=0)
    users = session.execute(list_query).data

Example 2: Analytics Queries

-- sql/analytics.sql

-- name: daily_sales
-- dialect: postgres
SELECT
    DATE(created_at) as sale_date,
    COUNT(*) as order_count,
    SUM(total_amount) as total_sales,
    AVG(total_amount) as avg_order_value
FROM orders
WHERE created_at >= :start_date
    AND created_at < :end_date
GROUP BY DATE(created_at)
ORDER BY sale_date;

-- name: top_products
SELECT
    p.name,
    p.category,
    COUNT(oi.id) as times_ordered,
    SUM(oi.quantity) as total_quantity,
    SUM(oi.quantity * oi.unit_price) as revenue
FROM products p
JOIN order_items oi ON p.id = oi.product_id
JOIN orders o ON oi.order_id = o.id
WHERE o.created_at >= :start_date
GROUP BY p.id, p.name, p.category
ORDER BY revenue DESC
LIMIT :limit;

-- name: customer_lifetime_value
WITH customer_orders AS (
    SELECT
        user_id,
        COUNT(*) as order_count,
        SUM(total_amount) as total_spent,
        MIN(created_at) as first_order,
        MAX(created_at) as last_order
    FROM orders
    GROUP BY user_id
)
SELECT
    u.id,
    u.username,
    u.email,
    co.order_count,
    co.total_spent,
    co.first_order,
    co.last_order,
    EXTRACT(EPOCH FROM (co.last_order - co.first_order)) / 86400 as customer_days
FROM users u
JOIN customer_orders co ON u.id = co.user_id
WHERE co.total_spent > :min_spent
ORDER BY co.total_spent DESC;
import datetime

# Load analytics queries
loader.load_sql("sql/analytics.sql")

# Run daily sales report
sales_query = loader.get_sql(
    "daily_sales",
    start_date=datetime.date(2025, 1, 1),
    end_date=datetime.date(2025, 2, 1)
)
sales = session.execute(sales_query).data

# Top products
products_query = loader.get_sql("top_products", start_date=datetime.date(2025, 1, 1), limit=10)
top_products = session.execute(products_query).data

Example 3: Multi-Database Setup

# Different SQL files for different databases
loader = SQLFileLoader()
loader.load_sql("sql/postgres/", "sql/sqlite/", "sql/shared/")

# Queries automatically select correct dialect
pg_query = loader.get_sql("upsert_user")  # Uses Postgres ON CONFLICT
sqlite_query = loader.get_sql("get_user")  # Uses shared query

# Execute on appropriate database
async with spec.provide_session(postgres_config) as pg_session:
    await pg_session.execute(pg_query, **params)

with spec.provide_session(sqlite_config) as sqlite_session:
    sqlite_session.execute(sqlite_query, user_id=1)

Best Practices

1. Organize by Domain

sql/

users.sql # User management

products.sql # Product catalog

orders.sql # Order processing

analytics.sql # Reports and analytics

admin.sql # Admin operations

2. Use Clear Query Names

-- Good: Clear intent
-- name: get_active_users_by_role
-- name: create_order_with_items
-- name: delete_expired_sessions

-- Bad: Unclear
-- name: query1
-- name: get_data
-- name: do_stuff

3. Document Complex Queries

-- name: calculate_inventory_levels
-- Calculate current inventory levels accounting for pending orders
-- and returns items below reorder threshold
SELECT
    p.id,
    p.name,
    p.current_stock,
    COALESCE(SUM(oi.quantity), 0) as pending_orders,
    p.current_stock - COALESCE(SUM(oi.quantity), 0) as available,
    p.reorder_point
FROM products p
LEFT JOIN order_items oi ON p.id = oi.product_id
LEFT JOIN orders o ON oi.order_id = o.id
WHERE o.status = 'pending' OR o.status IS NULL
GROUP BY p.id, p.name, p.current_stock, p.reorder_point
HAVING p.current_stock - COALESCE(SUM(oi.quantity), 0) < p.reorder_point;

4. Use Parameters, Not String Formatting

-- Good: Parameterized
-- name: get_user_by_email
SELECT * FROM users WHERE email = :email;

-- Bad: Vulnerable to injection (don't do this)
-- name: get_user_by_email
SELECT * FROM users WHERE email = '{email}';

5. Version Control SQL Files

  • Commit SQL files to version control

  • Review SQL changes in pull requests

  • Track query performance over time

Troubleshooting

Query Not Found

try:
    query = loader.get_sql("nonexistent_query")
except KeyError:
    print("Query not found. Available queries:")
    print(loader.list_queries())

File Load Errors

from sqlspec.exceptions import SQLFileNotFoundError, SQLFileParseError

try:
    loader.load_sql("sql/queries.sql")
except SQLFileNotFoundError as e:
    print(f"File not found: {e}")
except SQLFileParseError as e:
    print(f"Failed to parse SQL file: {e}")

Debugging Loaded Queries

# Print query SQL
query = loader.get_sql("get_user", user_id=1)
print(f"SQL: {query}")
print(f"Parameters: {query.parameters}")

# Inspect file metadata
file_info = loader.get_file_for_query("get_user")
print(f"Loaded from: {file_info.path}")

Next Steps

See Also