Query Builder

SQLSpec includes an experimental fluent query builder API for programmatically constructing SQL queries. While raw SQL is recommended for most use cases, the query builder is useful for dynamic query construction.

Warning

The Query Builder API is experimental and subject to significant changes. Use raw SQL for production-critical queries where API stability is required.

Overview

The query builder provides a fluent, chainable API for constructing SQL statements:

Overview
# Build SELECT query
query = (
    sql.select("id", "name", "email").from_("users").where("status = ?").order_by("created_at DESC").limit(10)
)
# Execute with session
session.execute(query, "active")

Why Use the Query Builder?

Benefits

  • Type-safe query construction

  • Reusable query components

  • Dynamic filtering

  • Protection against syntax errors

  • IDE autocomplete support

When to Use

  • Complex dynamic queries with conditional filters

  • Query templates with variable components

  • Programmatic query generation

  • API query builders (search, filtering)

When to Use Raw SQL Instead

  • Static, well-defined queries

  • Complex joins and subqueries

  • Database-specific features

  • Performance-critical queries

  • Queries loaded from SQL files

SELECT Queries

Basic SELECT

Basic SELECT
# Simple select
query = sql.select("*").from_("users")
# SQL: SELECT * FROM users
session.execute(query)

# Specific columns
query = sql.select("id", "name", "email").from_("users")
# SQL: SELECT id, name, email FROM users
session.execute(query)

# With table alias
query = sql.select("u.id", "u.name").from_("users u")
# SQL: SELECT u.id, u.name FROM users u
session.execute(query)

WHERE Clauses

WHERE Clauses
# Simple WHERE
query = sql.select("*").from_("users").where("status = ?")
session.execute(query, "active")

# Multiple conditions (AND)
query = sql.select("*").from_("users").where("status = ?").where("created_at > ?")
# SQL: SELECT * FROM users WHERE status = ? AND created_at > ?
session.execute(query, "active", "2024-01-01")

# OR conditions
query = sql.select("*").from_("users").where("status = ? OR role = ?")
session.execute(query, "active", "admin")

# IN clause
query = sql.select("*").from_("users").where("id IN (?, ?, ?)")
session.execute(query, 1, 2, 3)

Dynamic Filtering

Build queries conditionally based on runtime values:

Dynamic Filtering
def search_users(name: Any | None = None, email: Any | None = None, status: Any | None = None) -> Any:
    query = sql.select("id", "name", "email", "status").from_("users")
    params = []

    if name:
        query = query.where("name LIKE ?")
        params.append(f"%{name}%")

    if email:
        query = query.where("email = ?")
        params.append(email)

    if status:
        query = query.where("status = ?")
        params.append(status)

    return session.execute(query, *params)

# Usage
search_users(name="Alice", status="active")

JOINs

JOINs
# INNER JOIN
query = sql.select("u.id", "u.name", "o.total").from_("users u").join("orders o", "u.id = o.user_id")
session.execute(query)
# SQL: SELECT u.id, u.name, o.total FROM users u
#      INNER JOIN orders o ON u.id = o.user_id

# LEFT JOIN
query = (
    sql
    .select("u.id", "u.name", "COUNT(o.id) as order_count")
    .from_("users u")
    .left_join("orders o", "u.id = o.user_id")
    .group_by("u.id", "u.name")
)
session.execute(query)

# Multiple JOINs
query = (
    sql
    .select("u.name", "o.id", "p.name as product")
    .from_("users u")
    .join("orders o", "u.id = o.user_id")
    .join("order_items oi", "o.id = oi.order_id")
    .join("products p", "oi.product_id = p.id")
)
session.execute(query)

Ordering and Limiting

Ordering and Limiting
# ORDER BY
query = sql.select("*").from_("users").order_by("created_at DESC")
session.execute(query)

# Multiple order columns
query = sql.select("*").from_("users").order_by("status ASC", "created_at DESC")
session.execute(query)

# LIMIT and OFFSET
query = sql.select("*").from_("users").limit(10).offset(20)
session.execute(query)

# Pagination helper
def paginate(page: int = 1, per_page: int = 20) -> Any:
    offset = (page - 1) * per_page
    return sql.select("*").from_("users").order_by("id").limit(per_page).offset(offset)

session.execute(paginate(2, 10))

Aggregations

Aggregations
# COUNT
# Using raw SQL aggregation
raw_count = sql.select("COUNT(*) as total").from_("users")
row_count = session.select_value(raw_count)
print(row_count)

# Using the builder's count helper
counted = sql.select(sql.count().as_("total")).from_("users")
counted_value = session.select_value(counted)
print(counted_value)

# GROUP BY
query = sql.select("status", "COUNT(*) as count").from_("users").group_by("status")
session.execute(query)

# HAVING
query = (
    sql.select("user_id", "COUNT(*) as order_count").from_("orders").group_by("user_id").having("COUNT(*) > ?")
)
session.execute(query, 1)

# Multiple aggregations
query = (
    sql
    .select(sql.column("created_at").as_("date"), "COUNT(*) as orders", "SUM(total) as revenue")
    .from_("orders")
    .group_by("date")
)
session.execute(query)

Subqueries

Subqueries
# Subquery in WHERE
subquery = sql.select("id").from_("orders").where("total > ?")
query = sql.select("*").from_("users").where(f"id IN ({subquery})")
session.execute(query, 100)

# Subquery in FROM
subquery = sql.select("user_id", "COUNT(*) as order_count").from_("orders").group_by("user_id")
query = sql.select("u.name", "o.order_count").from_("users u").join(subquery, "u.id = o.user_id", alias="o")
session.execute(query)

INSERT Queries

Basic INSERT

Basic INSERT
# Single row insert
query = sql.insert("users").columns("name", "email").values("Alice", "[email protected]")
# SQL: INSERT INTO users (name, email) VALUES (:name, :email)

session.execute(query)

Multiple Rows

Multiple Rows
# Multiple value sets
query = (
    sql
    .insert("users")
    .columns("name", "email")
    .values("Alice", "[email protected]")
    .values("Bob", "[email protected]")
    .values("Charlie", "[email protected]")
)

session.execute(query)

INSERT with RETURNING

INSERT with RETURNING
# PostgreSQL RETURNING clause
(sql.insert("users").columns("name", "email").values("?", "?").returning("id", "created_at"))

# SQLite does not support RETURNING, so we skip execution for this example.

UPDATE Queries

Basic UPDATE

Basic UPDATE
# Update with WHERE
query = sql.update("users").set("email", "[email protected]").where("id = 1")
# SQL: UPDATE users SET email = :email WHERE id = 1

session.execute(query)
# print(f"Updated {result.rows_affected} rows")

Multiple Columns

Multiple Columns
# Update multiple columns
from sqlglot import exp

query = (
    sql
    .update("users")
    .set("name", "New Name")
    .set("email", "[email protected]")
    .set("updated_at", exp.CurrentTimestamp())
    .where("id = 1")
)

session.execute(query)

Conditional Updates

Conditional Updates
# Dynamic update builder
def update_user(user_id: Any, **fields: Any) -> Any:
    query = sql.update("users")

    for field, value in fields.items():
        query = query.set(field, value)

    query = query.where(f"id = {user_id}")

    return session.execute(query)

# Usage
update_user(1, name="Alice Updated", email="[email protected]", status="active")

DELETE Queries

Basic DELETE

Basic DELETE
# Delete with WHERE
query = sql.delete().from_("users").where("id = ?")
# SQL: DELETE FROM users WHERE id = ?

session.execute(query, 1)
# print(f"Deleted {result.rows_affected} rows")

Multiple Conditions

Multiple Conditions
# Delete with multiple conditions
query = sql.delete().from_("users").where("status = ?").where("last_login < ?")

session.execute(query, "inactive", datetime.date(2024, 1, 1))

DDL Operations

CREATE TABLE

CREATE TABLE
# Create table
query = (
    sql
    .create_table("users")
    .if_not_exists()
    .column("id", "INTEGER PRIMARY KEY")
    .column("name", "TEXT NOT NULL")
    .column("email", "TEXT UNIQUE NOT NULL")
    .column("created_at", "TIMESTAMP DEFAULT CURRENT_TIMESTAMP")
)

session.execute(query)

DROP TABLE

DROP TABLE
# Drop table
query = sql.drop_table("users")

# Drop if exists
query = sql.drop_table("users").if_exists()

session.execute(query)

CREATE INDEX

CREATE INDEX
# Create index
query = sql.create_index("idx_users_email").on_table("users").columns("email").if_not_exists()
session.execute(query)

# Unique index
query = sql.create_index("idx_users_email_unique").on_table("users").columns("email").unique().if_not_exists()
session.execute(query)

Advanced Features

Window Functions

Window Functions
query = sql.select(
    "id", "name", "salary", "ROW_NUMBER() OVER (PARTITION BY department ORDER BY salary DESC) as rank"
).from_("employees")
session.execute(query)

CASE Expressions

CASE Expressions
# Use raw SQL for CASE expression for SQLite compatibility
case_expr = "CASE WHEN status = 'active' THEN 'Active User' WHEN status = 'pending' THEN 'Pending Approval' ELSE 'Inactive' END"
query = sql.select("id", "name", f"{case_expr} as status_label").from_("users")
session.execute(query)

Common Table Expressions (CTE)

Common Table Expressions (CTE)
# WITH clause (Common Table Expression)
cte = sql.select("user_id", "COUNT(*) as order_count").from_("orders").group_by("user_id")

query = (
    sql
    .select("users.name", "user_orders.order_count")
    .with_("user_orders", cte)
    .from_("users")
    .join("user_orders", "users.id = user_orders.user_id")
)

session.execute(query)

Query Composition

Reusable Query Components

Reusable Query Components
# Base query
base_query = sql.select("id", "name", "email", "status").from_("users")

# Add filters based on context
def active_users() -> Any:
    return base_query.where("status = 'active'")

def recent_users(days: int = 7) -> Any:
    return base_query.where("created_at >= ?")

# Use in different contexts
session.execute(active_users())
session.execute(recent_users(), datetime.date.today() - datetime.timedelta(days=7))

Query Templates

Query Templates
class UserQueries:
    @staticmethod
    def by_id(user_id: int) -> "Select":
        return sql.select("*").from_("users").where(f"id = {user_id}")

    @staticmethod
    def by_email(email: str) -> "Select":
        return sql.select("*").from_("users").where(f"email = '{email}'")

    @staticmethod
    def search(filters: dict[str, Any]) -> "Select":
        query = sql.select("*").from_("users")

        if "name" in filters:
            query = query.where(f"name LIKE '%{filters['name']}%'")

        if "status" in filters:
            query = query.where(f"status = '{filters['status']}'")

        return query

# Usage
user = session.select_one(UserQueries.by_id(1))
print(user)

query = UserQueries.search({"name": "Alice", "status": "active"})
users = session.select(query)
print(len(users))

Best Practices

1. Use Raw SQL for Static Queries

1. Use Raw SQL for Static Queries
# Prefer this for simple, static queries:
session.execute("SELECT * FROM users WHERE id = ?", 1)

# Over this:
query = sql.select("*").from_("users").where("id = ?")
session.execute(query, 1)

2. Builder for Dynamic Queries

2. Builder for Dynamic Queries
# Good use case: dynamic filtering
def search_products(
    category: Any | None = None, min_price: Any | None = None, in_stock: Any | None = None
) -> Any:
    query = sql.select("*").from_("products")
    params = []

    if category:
        query = query.where("category_id = ?")
        params.append(category)

    if min_price:
        query = query.where("price >= ?")
        params.append(min_price)

    if in_stock:
        query = query.where("stock > 0")

    return session.execute(query, *params)

3. Parameterize User Input

3. Parameterize User Input
# Always use placeholders for user input
search_term = "Alice"  # Example user input
query = sql.select("*").from_("users").where("name LIKE ?")
session.execute(query, f"%{search_term}%")

4. Type Safety with Schema Mapping

4. Type Safety with Schema Mapping
class User(BaseModel):
    id: int
    name: str
    email: str

query = sql.select("id", "name", "email").from_("users")
result = session.execute(query)
result.all(schema_type=User)

5. Test Generated SQL

5. Test Generated SQL
# Check generated SQL during development
query = sql.select("*").from_("users").where("id = ?")
print(query)  # Shows generated SQL

Limitations

The query builder has some limitations:

Complex Subqueries

For very complex subqueries, raw SQL is often clearer:

Complex Subqueries
# This is easier to read as raw SQL:
session.execute("""
    WITH ranked_users AS (
        SELECT id, name,
               ROW_NUMBER() OVER (PARTITION BY region ORDER BY created_at DESC) as rn
        FROM users
    )
    SELECT * FROM ranked_users WHERE rn <= 5
""")

Database-Specific Features

Database-specific syntax may not be supported:

Database-Specific Features
# SQLite JSON functions (use raw SQL)
# Note: SQLite uses json_extract() instead of PostgreSQL's ->> operator
session.execute(
    "SELECT json_extract(data, '$.name') FROM events WHERE json_extract(data, '$.name') = ?", "Alice"
)

Performance

The builder adds minimal overhead, but raw SQL is always fastest for known queries.

Migration from Raw SQL

When migrating from raw SQL to the query builder:

Migration from Raw SQL
# Before: Raw SQL
session.execute(
    """
    SELECT u.id, u.name, COUNT(o.id) as order_count
    FROM users u
    LEFT JOIN orders o ON u.id = o.user_id
    WHERE u.status = ?
    GROUP BY u.id, u.name
    HAVING COUNT(o.id) > ?
    ORDER BY order_count DESC
    LIMIT ?
""",
    "active",
    5,
    10,
)

# After: Query Builder
query = (
    sql
    .select("users.id", "users.name", "COUNT(orders.id) as order_count")
    .from_("users")
    .left_join("orders", "users.id = orders.user_id")
    .where("users.status = 'active'")
    .group_by("users.id", "users.name")
    .having("COUNT(orders.id) > 5")
    .order_by("order_count DESC")
    .limit(10)
)
session.execute(query)

Only migrate queries that benefit from dynamic construction.

Next Steps

See Also