"""SQL factory for creating SQL builders and column expressions.
Provides statement builders (select, insert, update, etc.) and column expressions.
"""
import logging
from collections.abc import Mapping, Sequence
from typing import TYPE_CHECKING, Any, Union, cast
import sqlglot
from sqlglot import exp
from sqlglot.dialects.dialect import DialectType
from sqlglot.errors import ParseError as SQLGlotParseError
from sqlspec.builder._column import Column
from sqlspec.builder._ddl import (
AlterTable,
CommentOn,
CreateIndex,
CreateMaterializedView,
CreateSchema,
CreateTable,
CreateTableAsSelect,
CreateView,
DropIndex,
DropSchema,
DropTable,
DropView,
RenameTable,
Truncate,
)
from sqlspec.builder._delete import Delete
from sqlspec.builder._expression_wrappers import (
AggregateExpression,
ConversionExpression,
FunctionExpression,
MathExpression,
StringExpression,
)
from sqlspec.builder._insert import Insert
from sqlspec.builder._join import JoinBuilder
from sqlspec.builder._merge import Merge
from sqlspec.builder._parsing_utils import extract_expression, to_expression
from sqlspec.builder._select import Case, Select, SubqueryBuilder, WindowFunctionBuilder
from sqlspec.builder._update import Update
from sqlspec.core import SQL
from sqlspec.exceptions import SQLBuilderError
if TYPE_CHECKING:
from collections.abc import Mapping, Sequence
from sqlspec.builder._expression_wrappers import ExpressionWrapper
__all__ = (
"AlterTable",
"Case",
"Column",
"CommentOn",
"CreateIndex",
"CreateMaterializedView",
"CreateSchema",
"CreateTable",
"CreateTableAsSelect",
"CreateView",
"Delete",
"DropIndex",
"DropSchema",
"DropTable",
"DropView",
"Insert",
"Merge",
"RenameTable",
"SQLFactory",
"Select",
"Truncate",
"Update",
"WindowFunctionBuilder",
"build_copy_from_statement",
"build_copy_statement",
"build_copy_to_statement",
"sql",
)
logger = logging.getLogger("sqlspec")
MIN_SQL_LIKE_STRING_LENGTH = 6
MIN_DECODE_ARGS = 2
SQL_STARTERS = {
"SELECT",
"INSERT",
"UPDATE",
"DELETE",
"MERGE",
"WITH",
"CALL",
"DECLARE",
"BEGIN",
"END",
"CREATE",
"DROP",
"ALTER",
"TRUNCATE",
"RENAME",
"GRANT",
"REVOKE",
"SET",
"SHOW",
"USE",
"EXPLAIN",
"OPTIMIZE",
"VACUUM",
"COPY",
}
def _normalize_copy_dialect(dialect: DialectType | None) -> str:
if dialect is None:
return "postgres"
if isinstance(dialect, str):
return dialect
return str(dialect)
def _to_copy_schema(table: str, columns: "Sequence[str] | None") -> exp.Expression:
base = exp.table_(table)
if not columns:
return base
column_nodes = [exp.column(column_name) for column_name in columns]
return exp.Schema(this=base, expressions=column_nodes)
def _build_copy_expression(
*, direction: str, table: str, location: str, columns: "Sequence[str] | None", options: "Mapping[str, Any] | None"
) -> exp.Copy:
copy_args: dict[str, Any] = {"this": _to_copy_schema(table, columns), "files": [exp.Literal.string(location)]}
if direction == "from":
copy_args["kind"] = True
elif direction == "to":
copy_args["kind"] = False
if options:
params: list[exp.CopyParameter] = []
for key, value in options.items():
identifier = exp.Var(this=str(key).upper())
value_expression: exp.Expression
if isinstance(value, bool):
value_expression = exp.Boolean(this=value)
elif value is None:
value_expression = exp.null()
elif isinstance(value, (int, float)):
value_expression = exp.Literal.number(value)
elif isinstance(value, (list, tuple)):
elements = [exp.Literal.string(str(item)) for item in value]
value_expression = exp.Array(expressions=elements)
else:
value_expression = exp.Literal.string(str(value))
params.append(exp.CopyParameter(this=identifier, expression=value_expression))
copy_args["params"] = params
return exp.Copy(**copy_args)
def build_copy_statement(
*,
direction: str,
table: str,
location: str,
columns: "Sequence[str] | None" = None,
options: "Mapping[str, Any] | None" = None,
dialect: DialectType | None = None,
) -> SQL:
expression = _build_copy_expression(
direction=direction, table=table, location=location, columns=columns, options=options
)
rendered = expression.sql(dialect=_normalize_copy_dialect(dialect))
return SQL(rendered)
def build_copy_from_statement(
table: str,
source: str,
*,
columns: "Sequence[str] | None" = None,
options: "Mapping[str, Any] | None" = None,
dialect: DialectType | None = None,
) -> SQL:
return build_copy_statement(
direction="from", table=table, location=source, columns=columns, options=options, dialect=dialect
)
def build_copy_to_statement(
table: str,
target: str,
*,
columns: "Sequence[str] | None" = None,
options: "Mapping[str, Any] | None" = None,
dialect: DialectType | None = None,
) -> SQL:
return build_copy_statement(
direction="to", table=table, location=target, columns=columns, options=options, dialect=dialect
)
[docs]
class SQLFactory:
"""Factory for creating SQL builders and column expressions."""
[docs]
@classmethod
def detect_sql_type(cls, sql: str, dialect: DialectType = None) -> str:
try:
parsed_expr = sqlglot.parse_one(sql, read=dialect)
if parsed_expr and parsed_expr.key:
return parsed_expr.key.upper()
if parsed_expr:
command_type = type(parsed_expr).__name__.upper()
if command_type == "COMMAND" and parsed_expr.this:
return str(parsed_expr.this).upper()
return command_type
except SQLGlotParseError:
logger.debug("Failed to parse SQL for type detection: %s", sql[:100])
except (ValueError, TypeError, AttributeError) as e:
logger.warning("Unexpected error during SQL type detection for '%s...': %s", sql[:50], e)
return "UNKNOWN"
[docs]
def __init__(self, dialect: DialectType = None) -> None:
"""Initialize the SQL factory.
Args:
dialect: Default SQL dialect to use for all builders.
"""
self.dialect = dialect
[docs]
def __call__(self, statement: str, dialect: DialectType = None) -> "Any":
"""Create a SelectBuilder from a SQL string, or SQL object for DML with RETURNING.
Args:
statement: The SQL statement string.
dialect: Optional SQL dialect.
Returns:
SelectBuilder instance for SELECT/WITH statements,
SQL object for DML statements with RETURNING clause.
Raises:
SQLBuilderError: If the SQL is not a SELECT/CTE/DML+RETURNING statement.
"""
try:
parsed_expr = sqlglot.parse_one(statement, read=dialect or self.dialect)
except Exception as e:
msg = f"Failed to parse SQL: {e}"
raise SQLBuilderError(msg) from e
actual_type = type(parsed_expr).__name__.upper()
expr_type_map = {
"SELECT": "SELECT",
"INSERT": "INSERT",
"UPDATE": "UPDATE",
"DELETE": "DELETE",
"MERGE": "MERGE",
"WITH": "WITH",
}
actual_type_str = expr_type_map.get(actual_type, actual_type)
if actual_type_str == "SELECT" or (
actual_type_str == "WITH" and parsed_expr.this and isinstance(parsed_expr.this, exp.Select)
):
builder = Select(dialect=dialect or self.dialect)
builder.set_expression(parsed_expr)
return builder
if actual_type_str in {"INSERT", "UPDATE", "DELETE"} and parsed_expr.args.get("returning") is not None:
return SQL(statement)
msg = (
f"sql(...) only supports SELECT statements or DML statements with RETURNING clause. "
f"Detected type: {actual_type_str}. "
f"Use sql.{actual_type_str.lower()}() instead."
)
raise SQLBuilderError(msg)
[docs]
def select(
self, *columns_or_sql: Union[str, exp.Expression, Column, "SQL", "Case"], dialect: DialectType = None
) -> "Select":
builder_dialect = dialect or self.dialect
if len(columns_or_sql) == 1 and isinstance(columns_or_sql[0], str):
sql_candidate = columns_or_sql[0].strip()
if self._looks_like_sql(sql_candidate):
detected = self.detect_sql_type(sql_candidate, dialect=builder_dialect)
if detected not in {"SELECT", "WITH"}:
msg = (
f"sql.select() expects a SELECT or WITH statement, got {detected}. "
f"Use sql.{detected.lower()}() if a dedicated builder exists, or ensure the SQL is SELECT/WITH."
)
raise SQLBuilderError(msg)
select_builder = Select(dialect=builder_dialect)
return self._populate_select_from_sql(select_builder, sql_candidate)
select_builder = Select(dialect=builder_dialect)
if columns_or_sql:
select_builder.select(*columns_or_sql)
return select_builder
[docs]
def insert(self, table_or_sql: str | None = None, dialect: DialectType = None) -> "Insert":
builder_dialect = dialect or self.dialect
builder = Insert(dialect=builder_dialect)
if table_or_sql:
if self._looks_like_sql(table_or_sql):
detected = self.detect_sql_type(table_or_sql, dialect=builder_dialect)
if detected not in {"INSERT", "SELECT"}:
msg = (
f"sql.insert() expects INSERT or SELECT (for insert-from-select), got {detected}. "
f"Use sql.{detected.lower()}() if a dedicated builder exists, "
f"or ensure the SQL is INSERT/SELECT."
)
raise SQLBuilderError(msg)
return self._populate_insert_from_sql(builder, table_or_sql)
return builder.into(table_or_sql)
return builder
[docs]
def update(self, table_or_sql: str | None = None, dialect: DialectType = None) -> "Update":
builder_dialect = dialect or self.dialect
builder = Update(dialect=builder_dialect)
if table_or_sql:
if self._looks_like_sql(table_or_sql):
detected = self.detect_sql_type(table_or_sql, dialect=builder_dialect)
if detected != "UPDATE":
msg = (
f"sql.update() expects UPDATE statement, got {detected}. "
f"Use sql.{detected.lower()}() if a dedicated builder exists."
)
raise SQLBuilderError(msg)
return self._populate_update_from_sql(builder, table_or_sql)
return builder.table(table_or_sql)
return builder
[docs]
def delete(self, table_or_sql: str | None = None, dialect: DialectType = None) -> "Delete":
builder_dialect = dialect or self.dialect
builder = Delete(dialect=builder_dialect)
if table_or_sql and self._looks_like_sql(table_or_sql):
detected = self.detect_sql_type(table_or_sql, dialect=builder_dialect)
if detected != "DELETE":
msg = (
f"sql.delete() expects DELETE statement, got {detected}. "
f"Use sql.{detected.lower()}() if a dedicated builder exists."
)
raise SQLBuilderError(msg)
return self._populate_delete_from_sql(builder, table_or_sql)
return builder
[docs]
def merge(self, table_or_sql: str | None = None, dialect: DialectType = None) -> "Merge":
builder_dialect = dialect or self.dialect
builder = Merge(dialect=builder_dialect)
if table_or_sql:
if self._looks_like_sql(table_or_sql):
detected = self.detect_sql_type(table_or_sql, dialect=builder_dialect)
if detected != "MERGE":
msg = (
f"sql.merge() expects MERGE statement, got {detected}. "
f"Use sql.{detected.lower()}() if a dedicated builder exists."
)
raise SQLBuilderError(msg)
return self._populate_merge_from_sql(builder, table_or_sql)
return builder.into(table_or_sql)
return builder
@property
def merge_(self) -> "Merge":
"""Create a new MERGE builder (property shorthand).
Property that returns a new Merge builder instance using the factory's
default dialect. Cleaner syntax alternative to merge() method.
Examples:
query = sql.merge_.into("products").using(data, alias="src")
query = sql.merge_.into("products", alias="t").on("t.id = src.id")
Returns:
New Merge builder instance
"""
return Merge(dialect=self.dialect)
[docs]
def upsert(self, table: str, dialect: DialectType = None) -> "Merge | Insert":
"""Create an upsert builder (MERGE or INSERT ON CONFLICT).
Automatically selects the appropriate builder based on database dialect:
- PostgreSQL 15+, Oracle, BigQuery: Returns MERGE builder
- SQLite, DuckDB, MySQL: Returns INSERT builder with ON CONFLICT support
Args:
table: Target table name
dialect: Optional SQL dialect (uses factory default if not provided)
Returns:
MERGE builder for supported databases, INSERT builder otherwise
Examples:
PostgreSQL/Oracle/BigQuery (uses MERGE):
upsert_query = (
sql.upsert("products", dialect="postgres")
.using([{"id": 1, "name": "Product 1"}], alias="src")
.on("t.id = src.id")
.when_matched_then_update(name="src.name")
.when_not_matched_then_insert(id="src.id", name="src.name")
)
SQLite/DuckDB/MySQL (uses INSERT ON CONFLICT):
upsert_query = (
sql.upsert("products", dialect="sqlite")
.values(id=1, name="Product 1")
.on_conflict("id")
.do_update(name="EXCLUDED.name")
)
"""
builder_dialect = dialect or self.dialect
dialect_str = str(builder_dialect).lower() if builder_dialect else None
merge_supported = {"postgres", "postgresql", "oracle", "bigquery"}
if dialect_str in merge_supported:
return self.merge(table, dialect=builder_dialect)
return self.insert(table, dialect=builder_dialect)
[docs]
def create_table(self, table_name: str, dialect: DialectType = None) -> "CreateTable":
"""Create a CREATE TABLE builder.
Args:
table_name: Name of the table to create
dialect: Optional SQL dialect
Returns:
CreateTable builder instance
"""
return CreateTable(table_name, dialect=dialect or self.dialect)
[docs]
def create_table_as_select(self, dialect: DialectType = None) -> "CreateTableAsSelect":
"""Create a CREATE TABLE AS SELECT builder.
Args:
dialect: Optional SQL dialect
Returns:
CreateTableAsSelect builder instance
"""
return CreateTableAsSelect(dialect=dialect or self.dialect)
[docs]
def create_view(self, view_name: str, dialect: DialectType = None) -> "CreateView":
"""Create a CREATE VIEW builder.
Args:
view_name: Name of the view to create
dialect: Optional SQL dialect
Returns:
CreateView builder instance
"""
return CreateView(view_name, dialect=dialect or self.dialect)
[docs]
def create_materialized_view(self, view_name: str, dialect: DialectType = None) -> "CreateMaterializedView":
"""Create a CREATE MATERIALIZED VIEW builder.
Args:
view_name: Name of the materialized view to create
dialect: Optional SQL dialect
Returns:
CreateMaterializedView builder instance
"""
return CreateMaterializedView(view_name, dialect=dialect or self.dialect)
[docs]
def create_index(self, index_name: str, dialect: DialectType = None) -> "CreateIndex":
"""Create a CREATE INDEX builder.
Args:
index_name: Name of the index to create
dialect: Optional SQL dialect
Returns:
CreateIndex builder instance
"""
return CreateIndex(index_name, dialect=dialect or self.dialect)
[docs]
def create_schema(self, schema_name: str, dialect: DialectType = None) -> "CreateSchema":
"""Create a CREATE SCHEMA builder.
Args:
schema_name: Name of the schema to create
dialect: Optional SQL dialect
Returns:
CreateSchema builder instance
"""
return CreateSchema(schema_name, dialect=dialect or self.dialect)
[docs]
def drop_table(self, table_name: str, dialect: DialectType = None) -> "DropTable":
"""Create a DROP TABLE builder.
Args:
table_name: Name of the table to drop
dialect: Optional SQL dialect
Returns:
DropTable builder instance
"""
return DropTable(table_name, dialect=dialect or self.dialect)
[docs]
def drop_view(self, view_name: str, dialect: DialectType = None) -> "DropView":
"""Create a DROP VIEW builder.
Args:
view_name: Name of the view to drop
dialect: Optional SQL dialect
Returns:
DropView builder instance
"""
return DropView(view_name, dialect=dialect or self.dialect)
[docs]
def drop_index(self, index_name: str, dialect: DialectType = None) -> "DropIndex":
"""Create a DROP INDEX builder.
Args:
index_name: Name of the index to drop
dialect: Optional SQL dialect
Returns:
DropIndex builder instance
"""
return DropIndex(index_name, dialect=dialect or self.dialect)
[docs]
def drop_schema(self, schema_name: str, dialect: DialectType = None) -> "DropSchema":
"""Create a DROP SCHEMA builder.
Args:
schema_name: Name of the schema to drop
dialect: Optional SQL dialect
Returns:
DropSchema builder instance
"""
return DropSchema(schema_name, dialect=dialect or self.dialect)
[docs]
def alter_table(self, table_name: str, dialect: DialectType = None) -> "AlterTable":
"""Create an ALTER TABLE builder.
Args:
table_name: Name of the table to alter
dialect: Optional SQL dialect
Returns:
AlterTable builder instance
"""
return AlterTable(table_name, dialect=dialect or self.dialect)
[docs]
def rename_table(self, old_name: str, dialect: DialectType = None) -> "RenameTable":
"""Create a RENAME TABLE builder.
Args:
old_name: Current name of the table
dialect: Optional SQL dialect
Returns:
RenameTable builder instance
"""
return RenameTable(old_name, dialect=dialect or self.dialect)
[docs]
def copy_from(
self,
table: str,
source: str,
*,
columns: "Sequence[str] | None" = None,
options: "Mapping[str, Any] | None" = None,
dialect: DialectType | None = None,
) -> SQL:
"""Build a COPY ... FROM statement."""
effective_dialect = dialect or self.dialect
return build_copy_from_statement(table, source, columns=columns, options=options, dialect=effective_dialect)
[docs]
def copy_to(
self,
table: str,
target: str,
*,
columns: "Sequence[str] | None" = None,
options: "Mapping[str, Any] | None" = None,
dialect: DialectType | None = None,
) -> SQL:
"""Build a COPY ... TO statement."""
effective_dialect = dialect or self.dialect
return build_copy_to_statement(table, target, columns=columns, options=options, dialect=effective_dialect)
[docs]
def copy(
self,
table: str,
*,
source: str | None = None,
target: str | None = None,
columns: "Sequence[str] | None" = None,
options: "Mapping[str, Any] | None" = None,
dialect: DialectType | None = None,
) -> SQL:
"""Build a COPY statement, inferring direction from provided arguments."""
if (source is None and target is None) or (source is not None and target is not None):
msg = "Provide either 'source' or 'target' (but not both) to sql.copy()."
raise SQLBuilderError(msg)
if source is not None:
return self.copy_from(table, source, columns=columns, options=options, dialect=dialect)
target_value = cast("str", target)
return self.copy_to(table, target_value, columns=columns, options=options, dialect=dialect)
@staticmethod
def _looks_like_sql(candidate: str, expected_type: str | None = None) -> bool:
"""Determine if a string looks like SQL.
Args:
candidate: String to check
expected_type: Expected SQL statement type (SELECT, INSERT, etc.)
Returns:
True if the string appears to be SQL
"""
if not candidate or len(candidate.strip()) < MIN_SQL_LIKE_STRING_LENGTH:
return False
candidate_upper = candidate.strip().upper()
if expected_type:
return candidate_upper.startswith(expected_type.upper())
if any(candidate_upper.startswith(starter) for starter in SQL_STARTERS):
return " " in candidate
return False
def _populate_insert_from_sql(self, builder: "Insert", sql_string: str) -> "Insert":
"""Parse SQL string and populate INSERT builder using SQLGlot directly."""
try:
parsed_expr: exp.Expression = exp.maybe_parse(sql_string, dialect=self.dialect)
if isinstance(parsed_expr, exp.Insert):
builder.set_expression(parsed_expr)
return builder
if isinstance(parsed_expr, exp.Select):
logger.info("Detected SELECT statement for INSERT - may need target table specification")
return builder
logger.warning("Cannot create INSERT from %s statement", type(parsed_expr).__name__)
except Exception as e:
logger.warning("Failed to parse INSERT SQL, falling back to traditional mode: %s", e)
return builder
def _populate_select_from_sql(self, builder: "Select", sql_string: str) -> "Select":
"""Parse SQL string and populate SELECT builder using SQLGlot directly."""
try:
parsed_expr: exp.Expression = exp.maybe_parse(sql_string, dialect=self.dialect)
if isinstance(parsed_expr, exp.Select):
builder.set_expression(parsed_expr)
return builder
logger.warning("Cannot create SELECT from %s statement", type(parsed_expr).__name__)
except Exception as e:
logger.warning("Failed to parse SELECT SQL, falling back to traditional mode: %s", e)
return builder
def _populate_update_from_sql(self, builder: "Update", sql_string: str) -> "Update":
"""Parse SQL string and populate UPDATE builder using SQLGlot directly."""
try:
parsed_expr: exp.Expression = exp.maybe_parse(sql_string, dialect=self.dialect)
if isinstance(parsed_expr, exp.Update):
builder.set_expression(parsed_expr)
return builder
logger.warning("Cannot create UPDATE from %s statement", type(parsed_expr).__name__)
except Exception as e:
logger.warning("Failed to parse UPDATE SQL, falling back to traditional mode: %s", e)
return builder
def _populate_delete_from_sql(self, builder: "Delete", sql_string: str) -> "Delete":
"""Parse SQL string and populate DELETE builder using SQLGlot directly."""
try:
parsed_expr: exp.Expression = exp.maybe_parse(sql_string, dialect=self.dialect)
if isinstance(parsed_expr, exp.Delete):
builder.set_expression(parsed_expr)
return builder
logger.warning("Cannot create DELETE from %s statement", type(parsed_expr).__name__)
except Exception as e:
logger.warning("Failed to parse DELETE SQL, falling back to traditional mode: %s", e)
return builder
def _populate_merge_from_sql(self, builder: "Merge", sql_string: str) -> "Merge":
"""Parse SQL string and populate MERGE builder using SQLGlot directly."""
try:
parsed_expr: exp.Expression = exp.maybe_parse(sql_string, dialect=self.dialect)
if isinstance(parsed_expr, exp.Merge):
builder.set_expression(parsed_expr)
return builder
logger.warning("Cannot create MERGE from %s statement", type(parsed_expr).__name__)
except Exception as e:
logger.warning("Failed to parse MERGE SQL, falling back to traditional mode: %s", e)
return builder
[docs]
def column(self, name: str, table: str | None = None) -> Column:
"""Create a column reference.
Args:
name: Column name.
table: Optional table name.
Returns:
Column object that supports method chaining and operator overloading.
"""
return Column(name, table)
@property
def case_(self) -> "Case":
"""Create a CASE expression builder.
Returns:
Case builder instance for CASE expression building.
Example:
```python
case_expr = (
sql.case_.when("x = 1", "one")
.when("x = 2", "two")
.else_("other")
.end()
)
aliased_case = (
sql.case_.when("status = 'active'", 1)
.else_(0)
.as_("is_active")
)
```
"""
return Case()
@property
def row_number_(self) -> "WindowFunctionBuilder":
"""Create a ROW_NUMBER() window function builder."""
return WindowFunctionBuilder("row_number")
@property
def rank_(self) -> "WindowFunctionBuilder":
"""Create a RANK() window function builder."""
return WindowFunctionBuilder("rank")
@property
def dense_rank_(self) -> "WindowFunctionBuilder":
"""Create a DENSE_RANK() window function builder."""
return WindowFunctionBuilder("dense_rank")
@property
def lag_(self) -> "WindowFunctionBuilder":
"""Create a LAG() window function builder."""
return WindowFunctionBuilder("lag")
@property
def lead_(self) -> "WindowFunctionBuilder":
"""Create a LEAD() window function builder."""
return WindowFunctionBuilder("lead")
@property
def exists_(self) -> "SubqueryBuilder":
"""Create an EXISTS subquery builder."""
return SubqueryBuilder("exists")
@property
def in_(self) -> "SubqueryBuilder":
"""Create an IN subquery builder."""
return SubqueryBuilder("in")
@property
def any_(self) -> "SubqueryBuilder":
"""Create an ANY subquery builder."""
return SubqueryBuilder("any")
@property
def all_(self) -> "SubqueryBuilder":
"""Create an ALL subquery builder."""
return SubqueryBuilder("all")
@property
def inner_join_(self) -> "JoinBuilder":
"""Create an INNER JOIN builder."""
return JoinBuilder("inner join")
@property
def left_join_(self) -> "JoinBuilder":
"""Create a LEFT JOIN builder."""
return JoinBuilder("left join")
@property
def right_join_(self) -> "JoinBuilder":
"""Create a RIGHT JOIN builder."""
return JoinBuilder("right join")
@property
def full_join_(self) -> "JoinBuilder":
"""Create a FULL OUTER JOIN builder."""
return JoinBuilder("full join")
@property
def cross_join_(self) -> "JoinBuilder":
"""Create a CROSS JOIN builder."""
return JoinBuilder("cross join")
@property
def lateral_join_(self) -> "JoinBuilder":
"""Create a LATERAL JOIN builder.
Returns:
JoinBuilder configured for LATERAL JOIN
Example:
```python
query = (
sql.select("u.name", "arr.value")
.from_("users u")
.join(sql.lateral_join_("UNNEST(u.tags)").on("true"))
)
```
"""
return JoinBuilder("lateral join", lateral=True)
@property
def left_lateral_join_(self) -> "JoinBuilder":
"""Create a LEFT LATERAL JOIN builder.
Returns:
JoinBuilder configured for LEFT LATERAL JOIN
"""
return JoinBuilder("left join", lateral=True)
@property
def cross_lateral_join_(self) -> "JoinBuilder":
"""Create a CROSS LATERAL JOIN builder.
Returns:
JoinBuilder configured for CROSS LATERAL JOIN
"""
return JoinBuilder("cross join", lateral=True)
[docs]
def __getattr__(self, name: str) -> "Column":
"""Dynamically create column references.
Args:
name: Column name.
Returns:
Column object for the given name.
Note:
Special SQL constructs like case_, row_number_, etc. are
handled as properties for type safety.
"""
return Column(name)
[docs]
@staticmethod
def raw(sql_fragment: str, **parameters: Any) -> "exp.Expression | SQL":
"""Create a raw SQL expression from a string fragment with optional parameters.
Args:
sql_fragment: Raw SQL string to parse into an expression.
**parameters: Named parameters for parameter binding.
Returns:
SQLGlot expression from the parsed SQL fragment (if no parameters).
SQL statement object (if parameters provided).
Raises:
SQLBuilderError: If the SQL fragment cannot be parsed.
Example:
```python
expr = sql.raw("COALESCE(name, 'Unknown')")
stmt = sql.raw(
"LOWER(name) LIKE LOWER(:pattern)", pattern=f"%{query}%"
)
expr = sql.raw(
"price BETWEEN :min_price AND :max_price",
min_price=100,
max_price=500,
)
query = sql.select(
"name",
sql.raw(
"ROW_NUMBER() OVER (PARTITION BY department ORDER BY salary DESC)"
),
).from_("employees")
```
"""
if not parameters:
try:
parsed: exp.Expression = exp.maybe_parse(sql_fragment)
except Exception as e:
msg = f"Failed to parse raw SQL fragment '{sql_fragment}': {e}"
raise SQLBuilderError(msg) from e
return parsed
return SQL(sql_fragment, parameters)
[docs]
def count(
self, column: Union[str, exp.Expression, "ExpressionWrapper", "Case", "Column"] = "*", distinct: bool = False
) -> AggregateExpression:
"""Create a COUNT expression.
Args:
column: Column to count (default "*").
distinct: Whether to use COUNT DISTINCT.
Returns:
COUNT expression.
"""
if isinstance(column, str) and column == "*":
expr = exp.Count(this=exp.Star(), distinct=distinct)
else:
col_expr = extract_expression(column)
expr = exp.Count(this=col_expr, distinct=distinct)
return AggregateExpression(expr)
[docs]
def count_distinct(self, column: Union[str, exp.Expression, "ExpressionWrapper", "Case"]) -> AggregateExpression:
"""Create a COUNT(DISTINCT column) expression.
Args:
column: Column to count distinct values.
Returns:
COUNT DISTINCT expression.
"""
return self.count(column, distinct=True)
[docs]
@staticmethod
def sum(
column: Union[str, exp.Expression, "ExpressionWrapper", "Case"], distinct: bool = False
) -> AggregateExpression:
"""Create a SUM expression.
Args:
column: Column to sum.
distinct: Whether to use SUM DISTINCT.
Returns:
SUM expression.
"""
col_expr = extract_expression(column)
return AggregateExpression(exp.Sum(this=col_expr, distinct=distinct))
[docs]
@staticmethod
def avg(column: Union[str, exp.Expression, "ExpressionWrapper", "Case"]) -> AggregateExpression:
"""Create an AVG expression.
Args:
column: Column to average.
Returns:
AVG expression.
"""
col_expr = extract_expression(column)
return AggregateExpression(exp.Avg(this=col_expr))
[docs]
@staticmethod
def max(column: Union[str, exp.Expression, "ExpressionWrapper", "Case"]) -> AggregateExpression:
"""Create a MAX expression.
Args:
column: Column to find maximum.
Returns:
MAX expression.
"""
col_expr = extract_expression(column)
return AggregateExpression(exp.Max(this=col_expr))
[docs]
@staticmethod
def min(column: Union[str, exp.Expression, "ExpressionWrapper", "Case"]) -> AggregateExpression:
"""Create a MIN expression.
Args:
column: Column to find minimum.
Returns:
MIN expression.
"""
col_expr = extract_expression(column)
return AggregateExpression(exp.Min(this=col_expr))
[docs]
@staticmethod
def rollup(*columns: str | exp.Expression) -> FunctionExpression:
"""Create a ROLLUP expression for GROUP BY clauses.
Args:
*columns: Columns to include in the rollup.
Returns:
ROLLUP expression.
Example:
```python
query = (
sql.select("product", "region", sql.sum("sales"))
.from_("sales_data")
.group_by(sql.rollup("product", "region"))
)
```
"""
column_exprs = [exp.column(col) if isinstance(col, str) else col for col in columns]
return FunctionExpression(exp.Rollup(expressions=column_exprs))
[docs]
@staticmethod
def cube(*columns: str | exp.Expression) -> FunctionExpression:
"""Create a CUBE expression for GROUP BY clauses.
Args:
*columns: Columns to include in the cube.
Returns:
CUBE expression.
Example:
```python
query = (
sql.select("product", "region", sql.sum("sales"))
.from_("sales_data")
.group_by(sql.cube("product", "region"))
)
```
"""
column_exprs = [exp.column(col) if isinstance(col, str) else col for col in columns]
return FunctionExpression(exp.Cube(expressions=column_exprs))
[docs]
@staticmethod
def grouping_sets(*column_sets: tuple[str, ...] | list[str]) -> FunctionExpression:
"""Create a GROUPING SETS expression for GROUP BY clauses.
Args:
*column_sets: Sets of columns to group by.
Returns:
GROUPING SETS expression.
Example:
```python
query = (
sql.select("product", "region", sql.sum("sales"))
.from_("sales_data")
.group_by(
sql.grouping_sets(("product",), ("region",), ())
)
)
```
"""
set_expressions = []
for column_set in column_sets:
if isinstance(column_set, (tuple, list)):
if len(column_set) == 0:
set_expressions.append(exp.Tuple(expressions=[]))
else:
columns = [exp.column(col) for col in column_set]
set_expressions.append(exp.Tuple(expressions=columns))
else:
set_expressions.append(exp.column(column_set))
return FunctionExpression(exp.GroupingSets(expressions=set_expressions))
[docs]
@staticmethod
def any(values: list[Any] | exp.Expression | str) -> FunctionExpression:
"""Create an ANY expression for use with comparison operators.
Args:
values: Values, expression, or subquery for the ANY clause.
Returns:
ANY expression.
Example:
```python
subquery = sql.select("user_id").from_("active_users")
query = (
sql.select("*")
.from_("users")
.where(sql.id.eq(sql.any(subquery)))
)
```
"""
if isinstance(values, list):
literals = [SQLFactory.to_literal(v) for v in values]
return FunctionExpression(exp.Any(this=exp.Array(expressions=literals)))
if isinstance(values, str):
parsed: exp.Expression = exp.maybe_parse(values)
return FunctionExpression(exp.Any(this=parsed))
return FunctionExpression(exp.Any(this=values))
[docs]
@staticmethod
def not_any_(values: list[Any] | exp.Expression | str) -> FunctionExpression:
"""Create a NOT ANY expression for use with comparison operators.
Args:
values: Values, expression, or subquery for the NOT ANY clause.
Returns:
NOT ANY expression.
Example:
```python
subquery = sql.select("user_id").from_("blocked_users")
query = (
sql.select("*")
.from_("users")
.where(sql.id.neq(sql.not_any(subquery)))
)
```
"""
return SQLFactory.any(values)
[docs]
@staticmethod
def concat(*expressions: str | exp.Expression) -> StringExpression:
"""Create a CONCAT expression.
Args:
*expressions: Expressions to concatenate.
Returns:
CONCAT expression.
"""
exprs = [exp.column(expr) if isinstance(expr, str) else expr for expr in expressions]
return StringExpression(exp.Concat(expressions=exprs))
[docs]
@staticmethod
def upper(column: str | exp.Expression) -> StringExpression:
"""Create an UPPER expression.
Args:
column: Column to convert to uppercase.
Returns:
UPPER expression.
"""
col_expr = exp.column(column) if isinstance(column, str) else column
return StringExpression(exp.Upper(this=col_expr))
[docs]
@staticmethod
def lower(column: str | exp.Expression) -> StringExpression:
"""Create a LOWER expression.
Args:
column: Column to convert to lowercase.
Returns:
LOWER expression.
"""
col_expr = exp.column(column) if isinstance(column, str) else column
return StringExpression(exp.Lower(this=col_expr))
[docs]
@staticmethod
def length(column: str | exp.Expression) -> StringExpression:
"""Create a LENGTH expression.
Args:
column: Column to get length of.
Returns:
LENGTH expression.
"""
col_expr = exp.column(column) if isinstance(column, str) else column
return StringExpression(exp.Length(this=col_expr))
[docs]
@staticmethod
def round(column: str | exp.Expression, decimals: int = 0) -> MathExpression:
"""Create a ROUND expression.
Args:
column: Column to round.
decimals: Number of decimal places.
Returns:
ROUND expression.
"""
col_expr = exp.column(column) if isinstance(column, str) else column
if decimals == 0:
return MathExpression(exp.Round(this=col_expr))
return MathExpression(exp.Round(this=col_expr, expression=exp.Literal.number(decimals)))
[docs]
@staticmethod
def to_literal(value: Any) -> FunctionExpression:
"""Convert a Python value to a SQLGlot literal expression.
Uses SQLGlot's built-in exp.convert() function for literal creation.
Handles all Python primitive types:
- None -> exp.Null (renders as NULL)
- bool -> exp.Boolean (renders as TRUE/FALSE or 1/0 based on dialect)
- int/float -> exp.Literal with is_number=True
- str -> exp.Literal with is_string=True
- exp.Expression -> returned as-is (passthrough)
Args:
value: Python value or SQLGlot expression to convert.
Returns:
SQLGlot expression representing the literal value.
"""
if isinstance(value, exp.Expression):
return FunctionExpression(value)
return FunctionExpression(exp.convert(value))
[docs]
@staticmethod
def decode(column: str | exp.Expression, *args: str | exp.Expression | Any) -> FunctionExpression:
"""Create a DECODE expression (Oracle-style conditional logic).
DECODE compares column to each search value and returns the corresponding result.
If no match is found, returns the default value (if provided) or NULL.
Args:
column: Column to compare.
*args: Alternating search values and results, with optional default at the end.
Format: search1, result1, search2, result2, ..., [default]
Raises:
ValueError: If fewer than two search/result pairs are provided.
Returns:
CASE expression equivalent to DECODE.
Example:
```python
sql.decode(
"status", "A", "Active", "I", "Inactive", "Unknown"
)
```
"""
col_expr = exp.column(column) if isinstance(column, str) else column
if len(args) < MIN_DECODE_ARGS:
msg = "DECODE requires at least one search/result pair"
raise ValueError(msg)
conditions = []
default = None
for i in range(0, len(args) - 1, 2):
if i + 1 >= len(args):
default = to_expression(args[i])
break
search_val = args[i]
result_val = args[i + 1]
search_expr = to_expression(search_val)
result_expr = to_expression(result_val)
condition = exp.EQ(this=col_expr, expression=search_expr)
conditions.append(exp.If(this=condition, true=result_expr))
return FunctionExpression(exp.Case(ifs=conditions, default=default))
[docs]
@staticmethod
def cast(column: str | exp.Expression, data_type: str) -> ConversionExpression:
"""Create a CAST expression for type conversion.
Args:
column: Column or expression to cast.
data_type: Target data type (e.g., 'INT', 'VARCHAR(100)', 'DECIMAL(10,2)').
Returns:
CAST expression.
"""
col_expr = exp.column(column) if isinstance(column, str) else column
return ConversionExpression(exp.Cast(this=col_expr, to=exp.DataType.build(data_type)))
[docs]
@staticmethod
def coalesce(*expressions: str | exp.Expression) -> ConversionExpression:
"""Create a COALESCE expression.
Args:
*expressions: Expressions to coalesce.
Returns:
COALESCE expression.
"""
exprs = [exp.column(expr) if isinstance(expr, str) else expr for expr in expressions]
return ConversionExpression(exp.Coalesce(expressions=exprs))
[docs]
@staticmethod
def nvl(column: str | exp.Expression, substitute_value: str | exp.Expression | Any) -> ConversionExpression:
"""Create an NVL (Oracle-style) expression using COALESCE.
Args:
column: Column to check for NULL.
substitute_value: Value to use if column is NULL.
Returns:
COALESCE expression equivalent to NVL.
"""
col_expr = exp.column(column) if isinstance(column, str) else column
sub_expr = to_expression(substitute_value)
return ConversionExpression(exp.Coalesce(expressions=[col_expr, sub_expr]))
[docs]
@staticmethod
def nvl2(
column: str | exp.Expression,
value_if_not_null: str | exp.Expression | Any,
value_if_null: str | exp.Expression | Any,
) -> ConversionExpression:
"""Create an NVL2 (Oracle-style) expression using CASE.
NVL2 returns value_if_not_null if column is not NULL,
otherwise returns value_if_null.
Args:
column: Column to check for NULL.
value_if_not_null: Value to use if column is NOT NULL.
value_if_null: Value to use if column is NULL.
Returns:
CASE expression equivalent to NVL2.
Example:
```python
sql.nvl2("salary", "Has Salary", "No Salary")
```
"""
col_expr = exp.column(column) if isinstance(column, str) else column
not_null_expr = to_expression(value_if_not_null)
null_expr = to_expression(value_if_null)
is_null = exp.Is(this=col_expr, expression=exp.Null())
condition = exp.Not(this=is_null)
when_clause = exp.If(this=condition, true=not_null_expr)
return ConversionExpression(exp.Case(ifs=[when_clause], default=null_expr))
[docs]
@staticmethod
def bulk_insert(table_name: str, column_count: int, placeholder_style: str = "?") -> FunctionExpression:
"""Create bulk INSERT expression for executemany operations.
For bulk loading operations like CSV ingestion where
an INSERT expression with placeholders for executemany() is needed.
Args:
table_name: Name of the table to insert into
column_count: Number of columns (for placeholder generation)
placeholder_style: Placeholder style ("?" for SQLite/PostgreSQL, "%s" for MySQL, ":1" for Oracle)
Returns:
INSERT expression with placeholders for bulk operations
Example:
```python
from sqlspec import sql
insert_expr = sql.bulk_insert("my_table", 3)
insert_expr = sql.bulk_insert(
"my_table", 3, placeholder_style="%s"
)
insert_expr = sql.bulk_insert(
"my_table", 3, placeholder_style=":1"
)
```
"""
return FunctionExpression(
exp.Insert(
this=exp.Table(this=exp.to_identifier(table_name)),
expression=exp.Values(
expressions=[
exp.Tuple(expressions=[exp.Placeholder(this=placeholder_style) for _ in range(column_count)])
]
),
)
)
[docs]
def truncate(self, table_name: str) -> "Truncate":
"""Create a TRUNCATE TABLE builder.
Args:
table_name: Name of the table to truncate
Returns:
TruncateTable builder instance
Example:
```python
from sqlspec import sql
truncate_sql = sql.truncate_table("my_table").build().sql
truncate_sql = (
sql.truncate_table("my_table")
.cascade()
.restart_identity()
.build()
.sql
)
```
"""
return Truncate(table_name, dialect=self.dialect)
[docs]
@staticmethod
def case() -> "Case":
"""Create a CASE expression builder.
Returns:
CaseExpressionBuilder for building CASE expressions.
"""
return Case()
[docs]
def row_number(
self,
partition_by: str | list[str] | exp.Expression | None = None,
order_by: str | list[str] | exp.Expression | None = None,
) -> FunctionExpression:
"""Create a ROW_NUMBER() window function.
Args:
partition_by: Columns to partition by.
order_by: Columns to order by.
Returns:
ROW_NUMBER window function expression.
"""
return self._create_window_function("ROW_NUMBER", [], partition_by, order_by)
[docs]
def rank(
self,
partition_by: str | list[str] | exp.Expression | None = None,
order_by: str | list[str] | exp.Expression | None = None,
) -> FunctionExpression:
"""Create a RANK() window function.
Args:
partition_by: Columns to partition by.
order_by: Columns to order by.
Returns:
RANK window function expression.
"""
return self._create_window_function("RANK", [], partition_by, order_by)
[docs]
def dense_rank(
self,
partition_by: str | list[str] | exp.Expression | None = None,
order_by: str | list[str] | exp.Expression | None = None,
) -> FunctionExpression:
"""Create a DENSE_RANK() window function.
Args:
partition_by: Columns to partition by.
order_by: Columns to order by.
Returns:
DENSE_RANK window function expression.
"""
return self._create_window_function("DENSE_RANK", [], partition_by, order_by)
@staticmethod
def _create_window_function(
func_name: str,
func_args: list[exp.Expression],
partition_by: str | list[str] | exp.Expression | None = None,
order_by: str | list[str] | exp.Expression | None = None,
) -> FunctionExpression:
"""Helper to create window function expressions.
Args:
func_name: Name of the window function.
func_args: Arguments to the function.
partition_by: Columns to partition by.
order_by: Columns to order by.
Returns:
Window function expression.
"""
func_expr = exp.Anonymous(this=func_name, expressions=func_args)
over_args: dict[str, Any] = {}
if partition_by:
if isinstance(partition_by, str):
over_args["partition_by"] = [exp.column(partition_by)]
elif isinstance(partition_by, list):
over_args["partition_by"] = [exp.column(col) for col in partition_by]
elif isinstance(partition_by, exp.Expression):
over_args["partition_by"] = [partition_by]
if order_by:
if isinstance(order_by, str):
over_args["order"] = exp.Order(expressions=[exp.column(order_by).asc()])
elif isinstance(order_by, list):
over_args["order"] = exp.Order(expressions=[exp.column(col).asc() for col in order_by])
elif isinstance(order_by, exp.Expression):
over_args["order"] = exp.Order(expressions=[order_by])
return FunctionExpression(exp.Window(this=func_expr, **over_args))
sql = SQLFactory()