SQL File Loader

SQLSpec includes a powerful SQL file loader that manages SQL statements from files using aiosql-style named queries. This is the recommended approach for organizing SQL in larger applications.

Overview

The SQL file loader provides:

  • Named Queries: Organize SQL with descriptive names

  • File-Based Organization: Keep SQL separate from Python code

  • Caching: 12x+ performance improvement with intelligent caching

  • Multi-Source Support: Load from local files, URIs, or cloud storage

  • Type Safety: Integrate loaded queries with schema mapping

Why Use SQL Files?

Benefits

  • Separation of concerns (SQL separate from Python)

  • SQL syntax highlighting in editors

  • Easier SQL review and maintenance

  • Database-specific dialects support

  • Version control friendly

  • Shared queries across application layers

When to Use

  • Production applications with many queries

  • Complex queries that benefit from SQL formatting

  • Queries used in multiple places

  • Team collaboration on SQL

  • Database migrations and scripts

Basic Usage

Creating SQL Files

Create a SQL file with named queries using -- name: comments:

-- sql/users.sql

-- name: get_user_by_id
SELECT id, username, email, created_at
FROM users
WHERE id = :user_id;

-- name: list_active_users
SELECT id, username, email, last_login
FROM users
WHERE is_active = true
ORDER BY username
LIMIT :limit OFFSET :offset;

-- name: create_user
INSERT INTO users (username, email, password_hash)
VALUES (:username, :email, :password_hash)
RETURNING id, username, email, created_at;

Loading SQL Files

Loading SQL files with SQLFileLoader
from sqlspec.loader import SQLFileLoader

# Create loader
loader = SQLFileLoader()

# Load SQL files
loader.load_sql(sql_file_1)

# Or load from a directory
loader.load_sql(sql_dir)

# List available queries
queries = loader.list_queries()
print(queries)  # ['get_user_by_id', 'list_active_users', 'create_user', ...]

Using Loaded Queries

Using loaded queries from SQLFileLoader
from sqlspec import SQLSpec
from sqlspec.adapters.sqlite import SqliteConfig

# Set up database
spec = SQLSpec()
config = SqliteConfig()
spec.add_config(config)

# Get SQL with parameters
user_query = loader.get_sql("get_user_by_id")

Query Naming Conventions

Query Name Syntax

-- name: query_name
SELECT ...

Naming Rules

  • Use descriptive, snake_case names

  • Include operation prefix: get_, list_, create_, update_, delete_

  • Indicate plurality: get_user vs list_users

  • Use namespaces with dots: users.get_by_id, orders.list_recent

-- Good names
-- name: get_user_by_id
-- name: list_active_users
-- name: create_user
-- name: update_user_email
-- name: delete_inactive_users

-- Namespaced names
-- name: analytics.daily_sales
-- name: reports.user_activity

Special Suffixes

The loader supports special suffixes for query types:

-- name: insert_user!
-- The ! suffix indicates a modifying query
INSERT INTO users (name) VALUES (:name);

-- name: get_users*
-- The * suffix indicates multiple results expected
SELECT * FROM users;

Dialect-Specific Queries

Specify Database Dialect

Use the -- dialect: comment for database-specific SQL:

-- sql/postgres_queries.sql

-- name: get_user_with_json
-- dialect: postgres
SELECT id, name, data->>'email' as email
FROM users
WHERE id = :user_id;

-- name: upsert_user
-- dialect: postgres
INSERT INTO users (id, name, email)
VALUES (:id, :name, :email)
ON CONFLICT (id) DO UPDATE
SET name = EXCLUDED.name, email = EXCLUDED.email;

Supported Dialects

  • postgres (PostgreSQL)

  • sqlite (SQLite)

  • mysql (MySQL/MariaDB)

  • oracle (Oracle Database)

  • mssql (Microsoft SQL Server)

  • duckdb (DuckDB)

  • bigquery (Google BigQuery)

The loader normalizes dialect names and handles common aliases (e.g., postgresqlpostgres).

Advanced Features

Adding Queries Programmatically

Adding queries programmatically to SQLFileLoader
# Add a query at runtime
loader.add_named_sql("health_check", "SELECT 'OK' as status, CURRENT_TIMESTAMP as timestamp")

# Add with dialect
loader.add_named_sql("postgres_version", "SELECT version()", dialect="postgres")

# Use the added query
health_sql = loader.get_sql("health_check")

Query Metadata

Get information about loaded queries:

Retrieving query metadata from SQLFileLoader
# Get file info for a query
file_info = loader.get_file_for_query("get_user_by_id")
if file_info:
    print(f"Query from: {file_info.path}")
    print(f"Checksum: {file_info.checksum}")
    print(f"Loaded at: {file_info.loaded_at}")

# Get all queries from a specific file
loader.get_file(tmp_path / "sql/queries/users.sql")
# if file_obj:
# print(f"Contains {len(file_obj.queries)} queries")
# for query in file_obj.queries:
# print(f"  - {query.name}")

Caching Behavior

The loader implements intelligent caching with 12x+ performance improvements:

Using caching with SQLFileLoader
# First load - reads from disk
loader.load_sql(tmp_path / "sql/queries/users.sql")

# Second load - uses cache (file already loaded)
loader.load_sql(tmp_path / "sql/queries/users.sql")

# Clear cache
loader.clear_cache()

# Force reload from disk
loader.load_sql(tmp_path / "sql/queries/users.sql")

Cache Features

  • File content checksums prevent stale data

  • Statement-level caching for instant access

  • Automatic invalidation on file changes (in development)

  • Configurable cache size limits

Loading Directories with Mixed Files

The loader gracefully handles directories containing both named query files and raw DDL/DML scripts:

migrations/
   schema.sql        # Raw DDL (no -- name:) → skipped
   queries.sql       # Named queries → loaded
   seed-data.sql     # Raw DML (no -- name:) → skipped
Loading a directory with mixed SQL files using SQLFileLoader
loader = SQLFileLoader()
loader.load_sql("migrations/")  # Only loads queries.sql

# Check what was loaded
queries = loader.list_queries()  # Only returns named queries

How it works:

  • Files without -- name: markers are gracefully skipped

  • Skipped files are logged at DEBUG level

  • Directory loading continues without errors

  • Only named query files are tracked and cached

When files are skipped:

  • Empty files

  • Files with only comments

  • Raw DDL scripts (CREATE TABLE, etc.)

  • Raw DML scripts without named markers

  • Files with -- dialect: but no -- name:

When files raise errors:

  • Duplicate query names within the same file

  • Malformed -- name: markers

  • Files with -- name: but no SQL content after parsing

Storage Backends

The loader supports multiple storage backends for loading SQL files.

Local Files

Loading SQL files from local filesystem using SQLFileLoader
from pathlib import Path

# Load from Path object
loader.load_sql(Path(tmp_path / "sql/queries/users.sql"))

# Load from string path
loader.load_sql(tmp_path / "sql/queries/users.sql")

# Load directory
loader.load_sql(tmp_path / "sql/")

File URIs

Loading SQL files from file URIs using SQLFileLoader
# Load from file:// URI
# change the path below to an absolute path on your system
loader.load_sql(f"file://{absolute_sql_file.resolve()}")

# Load from relative file URI
loader.load_sql(f"file://{absolute_sql_path}/queries.sql")

Cloud Storage (with fsspec)

When fsspec is installed, load from cloud storage:

Loading SQL files from cloud storage using SQLFileLoader
# S3
loader.load_sql("s3://my-bucket/sql/users.sql")

# Google Cloud Storage
loader.load_sql("gs://my-bucket/sql/users.sql")

# Azure Blob Storage
loader.load_sql("az://my-container/sql/users.sql")

# HTTP/HTTPS
loader.load_sql("https://example.com/queries/users.sql")

Integration with SQLSpec

Loader with SQLSpec Instance

Create a SQLSpec instance with an integrated loader:

Creating a SQLSpec instance with SQLFileLoader
from sqlspec import SQLSpec
from sqlspec.loader import SQLFileLoader

# Create loader
loader = SQLFileLoader()
loader.load_sql(tmp_path / "sql/")

# Create SQLSpec with loader
spec = SQLSpec(loader=loader)

# Access loader via SQLSpec
user_query = spec._sql_loader.get_sql("get_user_by_id")

Type-Safe Query Execution

Combine loaded queries with schema mapping:

Executing type-safe queries from SQLFileLoader with SQLSpec
from pydantic import BaseModel

class User(BaseModel):
    id: int
    username: str
    email: str

# Load and execute with type safety
query = loader.get_sql("get_user_by_id")

spec = SQLSpec(loader=loader)
config = SqliteConfig(connection_config={"database": ":memory:"})

with spec.provide_session(config) as session:
    session.execute("""CREATE TABLE users ( id INTEGER PRIMARY KEY, username TEXT, email TEXT)""")
    session.execute(
        """ INSERT INTO users (id, username, email) VALUES (1, 'alice', '[email protected]'), (2, 'bob', '[email protected]');"""
    )
    user: User = session.select_one(query, user_id=1, schema_type=User)

Practical Examples

Example 1: User Management

-- sql/users.sql

-- name: get_user
SELECT id, username, email, is_active, created_at
FROM users
WHERE id = :user_id;

-- name: list_users
SELECT id, username, email, is_active
FROM users
WHERE (:status IS NULL OR is_active = :status)
ORDER BY created_at DESC
LIMIT :limit OFFSET :offset;

-- name: create_user
INSERT INTO users (username, email, password_hash)
VALUES (:username, :email, :password_hash)
RETURNING id, username, email, created_at;

-- name: update_user
UPDATE users
SET username = :username,
    email = :email,
    updated_at = CURRENT_TIMESTAMP
WHERE id = :user_id
RETURNING id, username, email, updated_at;

-- name: delete_user
DELETE FROM users WHERE id = :user_id;
Using user management queries from SQLFileLoader
# Python code
from sqlspec import SQLSpec
from sqlspec.adapters.sqlite import SqliteConfig
from sqlspec.loader import SQLFileLoader

loader = SQLFileLoader()
loader.load_sql(tmp_path / "sql/users.sql")

spec = SQLSpec()
config = SqliteConfig()
spec.add_config(config)

with spec.provide_session(config) as session:
    session.execute(
        """CREATE TABLE users ( id INTEGER PRIMARY KEY, username TEXT, email TEXT, password_hash TEXT, active BOOLEAN DEFAULT 1)"""
    )
    # Create user
    create_query = loader.get_sql("create_user")
    result = session.execute(
        create_query, username="irma", email="[email protected]", password_hash="hashed_password"
    )
    user = result.one()
    user_id = user["id"]

    # Get user
    get_query = loader.get_sql("get_user")
    user = session.execute(get_query, user_id=user_id).one()

    # List users
    list_query = loader.get_sql("list_users")
    session.execute(list_query, status=True, limit=10, offset=0).data

Example 2: Analytics Queries

-- sql/analytics.sql

-- name: daily_sales
-- dialect: postgres
SELECT
    DATE(created_at) as sale_date,
    COUNT(*) as order_count,
    SUM(total_amount) as total_sales,
    AVG(total_amount) as avg_order_value
FROM orders
WHERE created_at >= :start_date
    AND created_at < :end_date
GROUP BY DATE(created_at)
ORDER BY sale_date;

-- name: top_products
SELECT
    p.name,
    p.category,
    COUNT(oi.id) as times_ordered,
    SUM(oi.quantity) as total_quantity,
    SUM(oi.quantity * oi.unit_price) as revenue
FROM products p
JOIN order_items oi ON p.id = oi.product_id
JOIN orders o ON oi.order_id = o.id
WHERE o.created_at >= :start_date
GROUP BY p.id, p.name, p.category
ORDER BY revenue DESC
LIMIT :limit;

-- name: customer_lifetime_value
WITH customer_orders AS (
    SELECT
        user_id,
        COUNT(*) as order_count,
        SUM(total_amount) as total_spent,
        MIN(created_at) as first_order,
        MAX(created_at) as last_order
    FROM orders
    GROUP BY user_id
)
SELECT
    u.id,
    u.username,
    u.email,
    co.order_count,
    co.total_spent,
    co.first_order,
    co.last_order,
    EXTRACT(EPOCH FROM (co.last_order - co.first_order)) / 86400 as customer_days
FROM users u
JOIN customer_orders co ON u.id = co.user_id
WHERE co.total_spent > :min_spent
ORDER BY co.total_spent DESC;
Using analytics queries from SQLFileLoader
import datetime

# Load analytics queries
loader.load_sql(tmp_path / "sql/analytics.sql")

# Run daily sales report
sales_query = loader.get_sql("daily_sales")
config = SqliteConfig()
spec = SQLSpec()
spec.add_config(config)
with spec.provide_session(config) as session:
    session.execute("""CREATE TABLE orders ( order_id INTEGER PRIMARY KEY, order_date DATE, total_amount REAL);""")
    session.execute("""
           CREATE TABLE order_items ( order_item_id INTEGER PRIMARY KEY, order_id INTEGER, product_id INTEGER, quantity INTEGER, order_date DATE);""")

    # Insert sample data
    session.execute("""
        INSERT INTO orders (order_id, order_date, total_amount) VALUES
        (1, '2025-01-05', 150.00),
        (2, '2025-01-15', 200.00),
        (3, '2025-01-20', 250.00);
    """)
    session.execute("""
        INSERT INTO order_items (order_item_id, order_id, product_id, quantity, order_date) VALUES
        (1, 1, 101, 2, '2025-01-05'),
        (2, 2, 102, 3, '2025-01-15'),
        (3, 3, 101, 1, '2025-01-20');
    """)
    session.execute(sales_query, start_date=datetime.date(2025, 1, 1), end_date=datetime.date(2025, 2, 1)).data

    # Top products
    products_query = loader.get_sql("top_products")
    session.execute(products_query, start_date=datetime.date(2025, 1, 1), limit=10).data

Example 3: Multi-Database Setup

Using SQLFileLoader with multiple database dialects
# Different SQL files for different databases
loader = SQLFileLoader()
loader.load_sql(tmp_path / "sql/postgres/", tmp_path / "sql/sqlite/", tmp_path / "sql/shared/")

# Queries automatically select correct dialect
pg_query = loader.get_sql("upsert_user")  # Uses Postgres ON CONFLICT
sqlite_query = loader.get_sql("get_user")  # Uses shared query

from sqlspec import SQLSpec

spec = SQLSpec()
postgres_config = AsyncpgConfig(
    connection_config={
        "user": postgres_service.user,
        "password": postgres_service.password,
        "host": postgres_service.host,
        "port": postgres_service.port,
        "database": postgres_service.database,
    }
)
sqlite_config = SqliteConfig()
# Execute on appropriate database
async with spec.provide_session(postgres_config) as pg_session:
    await pg_session.execute("""CREATE TABLE users_sf1 ( id INTEGER PRIMARY KEY, username TEXT, email TEXT)""")
    await pg_session.execute(
        """ INSERT INTO users_sf1 (id, username, email) VALUES (1, 'old_name', '[email protected]');"""
    )

    await pg_session.execute(pg_query, **params)

with spec.provide_session(sqlite_config) as sqlite_session:
    sqlite_session.execute("""CREATE TABLE users_sf1 ( id INTEGER PRIMARY KEY, username TEXT, email TEXT)""")
    sqlite_session.execute(
        """ INSERT INTO users_sf1 (id, username, email) VALUES (1, 'john_doe', '[email protected]');"""
    )
    sqlite_session.execute(sqlite_query, user_id=1)

Best Practices

1. Organize by Domain

sql/

users.sql # User management

products.sql # Product catalog

orders.sql # Order processing

analytics.sql # Reports and analytics

admin.sql # Admin operations

2. Use Clear Query Names

-- Good: Clear intent
-- name: get_active_users_by_role
-- name: create_order_with_items
-- name: delete_expired_sessions

-- Bad: Unclear
-- name: query1
-- name: get_data
-- name: do_stuff

3. Document Complex Queries

-- name: calculate_inventory_levels
-- Calculate current inventory levels accounting for pending orders
-- and returns items below reorder threshold
SELECT
    p.id,
    p.name,
    p.current_stock,
    COALESCE(SUM(oi.quantity), 0) as pending_orders,
    p.current_stock - COALESCE(SUM(oi.quantity), 0) as available,
    p.reorder_point
FROM products p
LEFT JOIN order_items oi ON p.id = oi.product_id
LEFT JOIN orders o ON oi.order_id = o.id
WHERE o.status = 'pending' OR o.status IS NULL
GROUP BY p.id, p.name, p.current_stock, p.reorder_point
HAVING p.current_stock - COALESCE(SUM(oi.quantity), 0) < p.reorder_point;

4. Use Parameters, Not String Formatting

-- Good: Parameterized
-- name: get_user_by_email
SELECT * FROM users WHERE email = :email;

-- Bad: Vulnerable to injection (don't do this)
-- name: get_user_by_email
SELECT * FROM users WHERE email = '{email}';

5. Version Control SQL Files

  • Commit SQL files to version control

  • Review SQL changes in pull requests

  • Track query performance over time

Troubleshooting

Query Not Found

Handling query not found errors with SQLFileLoader
try:
    loader.get_sql("nonexistent_query")
except SQLFileNotFoundError:
    print("Query not found. Available queries:")
    print(loader.list_queries())

File Load Errors

Handling file load errors with SQLFileLoader
from sqlspec.exceptions import SQLFileNotFoundError, SQLFileParseError

try:
    loader.load_sql("sql/queries.sql")
except SQLFileNotFoundError as e:
    print(f"File not found: {e}")
except SQLFileParseError as e:
    print(f"Failed to parse SQL file: {e}")

Debugging Loaded Queries

Debugging loaded queries with SQLFileLoader
# Print query SQL
query = loader.get_sql("create_user")
print(f"SQL: {query}")
print(f"Parameters: {query.parameters}")

# Inspect file metadata
file_info = loader.get_file_for_query("create_user")
print(f"Loaded from: {file_info.path}")

Next Steps

See Also