"""DDL statement builders.
Provides builders for DDL operations including CREATE, DROP, ALTER,
TRUNCATE, and other schema manipulation statements.
"""
from typing import TYPE_CHECKING, Any, Union
from sqlglot import exp
from typing_extensions import Self
from sqlspec.builder._base import BuiltQuery, QueryBuilder
from sqlspec.builder._select import Select
from sqlspec.core import SQL, SQLResult
from sqlspec.utils.type_guards import has_sqlglot_expression, has_with_method
if TYPE_CHECKING:
from sqlglot.dialects.dialect import DialectType
from sqlspec.builder._column import ColumnExpression
from sqlspec.core import StatementConfig
__all__ = (
"AlterOperation",
"AlterTable",
"ColumnDefinition",
"CommentOn",
"ConstraintDefinition",
"CreateIndex",
"CreateMaterializedView",
"CreateSchema",
"CreateTable",
"CreateTableAsSelect",
"CreateView",
"DDLBuilder",
"DropIndex",
"DropMaterializedView",
"DropSchema",
"DropTable",
"DropView",
"RenameTable",
"Truncate",
)
CONSTRAINT_TYPE_PRIMARY_KEY = "PRIMARY KEY"
CONSTRAINT_TYPE_FOREIGN_KEY = "FOREIGN KEY"
CONSTRAINT_TYPE_UNIQUE = "UNIQUE"
CONSTRAINT_TYPE_CHECK = "CHECK"
FOREIGN_KEY_ACTION_CASCADE = "CASCADE"
FOREIGN_KEY_ACTION_SET_NULL = "SET NULL"
FOREIGN_KEY_ACTION_SET_DEFAULT = "SET DEFAULT"
FOREIGN_KEY_ACTION_RESTRICT = "RESTRICT"
FOREIGN_KEY_ACTION_NO_ACTION = "NO ACTION"
VALID_FOREIGN_KEY_ACTIONS = {
FOREIGN_KEY_ACTION_CASCADE,
FOREIGN_KEY_ACTION_SET_NULL,
FOREIGN_KEY_ACTION_SET_DEFAULT,
FOREIGN_KEY_ACTION_RESTRICT,
FOREIGN_KEY_ACTION_NO_ACTION,
None,
}
VALID_CONSTRAINT_TYPES = {
CONSTRAINT_TYPE_PRIMARY_KEY,
CONSTRAINT_TYPE_FOREIGN_KEY,
CONSTRAINT_TYPE_UNIQUE,
CONSTRAINT_TYPE_CHECK,
}
CURRENT_TIMESTAMP_KEYWORD = "CURRENT_TIMESTAMP"
CURRENT_DATE_KEYWORD = "CURRENT_DATE"
CURRENT_TIME_KEYWORD = "CURRENT_TIME"
def build_column_expression(col: "ColumnDefinition") -> "exp.Expression":
"""Build SQLGlot expression for a column definition."""
col_def = exp.ColumnDef(this=exp.to_identifier(col.name), kind=exp.DataType.build(col.dtype))
constraints: list[exp.ColumnConstraint] = []
if col.not_null:
constraints.append(exp.ColumnConstraint(kind=exp.NotNullColumnConstraint()))
if col.primary_key:
constraints.append(exp.ColumnConstraint(kind=exp.PrimaryKeyColumnConstraint()))
if col.unique:
constraints.append(exp.ColumnConstraint(kind=exp.UniqueColumnConstraint()))
if col.default is not None:
default_expr: exp.Expression | None = None
if isinstance(col.default, str):
default_upper = col.default.upper()
if default_upper == CURRENT_TIMESTAMP_KEYWORD:
default_expr = exp.CurrentTimestamp()
elif default_upper == CURRENT_DATE_KEYWORD:
default_expr = exp.CurrentDate()
elif default_upper == CURRENT_TIME_KEYWORD:
default_expr = exp.CurrentTime()
elif "(" in col.default:
default_expr = exp.maybe_parse(col.default)
else:
default_expr = exp.convert(col.default)
else:
default_expr = exp.convert(col.default)
constraints.append(exp.ColumnConstraint(kind=exp.DefaultColumnConstraint(this=default_expr)))
if col.check:
constraints.append(exp.ColumnConstraint(kind=exp.Check(this=exp.maybe_parse(col.check))))
if col.comment:
constraints.append(exp.ColumnConstraint(kind=exp.CommentColumnConstraint(this=exp.convert(col.comment))))
if col.generated:
constraints.append(
exp.ColumnConstraint(kind=exp.GeneratedAsIdentityColumnConstraint(this=exp.maybe_parse(col.generated)))
)
if col.collate:
constraints.append(exp.ColumnConstraint(kind=exp.CollateColumnConstraint(this=exp.to_identifier(col.collate))))
if constraints:
col_def.set("constraints", constraints)
return col_def
def build_constraint_expression(constraint: "ConstraintDefinition") -> "exp.Expression | None":
"""Build SQLGlot expression for a table constraint."""
if constraint.constraint_type == CONSTRAINT_TYPE_PRIMARY_KEY:
pk_constraint = exp.PrimaryKey(expressions=[exp.to_identifier(col) for col in constraint.columns])
if constraint.name:
return exp.Constraint(this=exp.to_identifier(constraint.name), expression=pk_constraint)
return pk_constraint
if constraint.constraint_type == CONSTRAINT_TYPE_FOREIGN_KEY:
fk_constraint = exp.ForeignKey(
expressions=[exp.to_identifier(col) for col in constraint.columns],
reference=exp.Reference(
this=exp.to_table(constraint.references_table) if constraint.references_table else None,
expressions=[exp.to_identifier(col) for col in constraint.references_columns],
on_delete=constraint.on_delete,
on_update=constraint.on_update,
),
)
if constraint.name:
return exp.Constraint(this=exp.to_identifier(constraint.name), expression=fk_constraint)
return fk_constraint
if constraint.constraint_type == CONSTRAINT_TYPE_UNIQUE:
unique_constraint = exp.UniqueKeyProperty(expressions=[exp.to_identifier(col) for col in constraint.columns])
if constraint.name:
return exp.Constraint(this=exp.to_identifier(constraint.name), expression=unique_constraint)
return unique_constraint
if constraint.constraint_type == CONSTRAINT_TYPE_CHECK:
check_expr = exp.Check(this=exp.maybe_parse(constraint.condition) if constraint.condition else None)
if constraint.name:
return exp.Constraint(this=exp.to_identifier(constraint.name), expression=check_expr)
return check_expr
return None
[docs]
class DDLBuilder(QueryBuilder):
"""Base class for DDL builders (CREATE, DROP, ALTER, etc)."""
__slots__ = ()
[docs]
def __init__(self, dialect: "DialectType" = None) -> None:
super().__init__(dialect=dialect)
self._expression: exp.Expression | None = None
def _create_base_expression(self) -> exp.Expression:
msg = "Subclasses must implement _create_base_expression."
raise NotImplementedError(msg)
@property
def _expected_result_type(self) -> "type[SQLResult]":
return SQLResult
[docs]
def build(self, dialect: "DialectType" = None) -> "BuiltQuery":
if self._expression is None:
self._expression = self._create_base_expression()
return super().build(dialect=dialect)
[docs]
def to_statement(self, config: "StatementConfig | None" = None) -> "SQL":
return super().to_statement(config=config)
[docs]
class ColumnDefinition:
"""Column definition for CREATE TABLE."""
__slots__ = (
"auto_increment",
"check",
"collate",
"comment",
"default",
"dtype",
"generated",
"name",
"not_null",
"primary_key",
"unique",
)
[docs]
def __init__(
self,
name: str,
dtype: str,
default: "Any | None" = None,
not_null: bool = False,
primary_key: bool = False,
unique: bool = False,
auto_increment: bool = False,
comment: "str | None" = None,
check: "str | None" = None,
generated: "str | None" = None,
collate: "str | None" = None,
) -> None:
self.name = name
self.dtype = dtype
self.default = default
self.not_null = not_null
self.primary_key = primary_key
self.unique = unique
self.auto_increment = auto_increment
self.comment = comment
self.check = check
self.generated = generated
self.collate = collate
[docs]
class ConstraintDefinition:
"""Constraint definition for CREATE TABLE."""
__slots__ = (
"columns",
"condition",
"constraint_type",
"deferrable",
"initially_deferred",
"name",
"on_delete",
"on_update",
"references_columns",
"references_table",
)
[docs]
def __init__(
self,
constraint_type: str,
name: "str | None" = None,
columns: "list[str] | None" = None,
references_table: "str | None" = None,
references_columns: "list[str] | None" = None,
condition: "str | None" = None,
on_delete: "str | None" = None,
on_update: "str | None" = None,
deferrable: bool = False,
initially_deferred: bool = False,
) -> None:
self.constraint_type = constraint_type
self.name = name
self.columns = columns or []
self.references_table = references_table
self.references_columns = references_columns or []
self.condition = condition
self.on_delete = on_delete
self.on_update = on_update
self.deferrable = deferrable
self.initially_deferred = initially_deferred
[docs]
class CreateTable(DDLBuilder):
"""Builder for CREATE TABLE statements with columns and constraints.
Example:
builder = (
CreateTable("users")
.column("id", "SERIAL", primary_key=True)
.column("email", "VARCHAR(255)", not_null=True, unique=True)
.column("created_at", "TIMESTAMP", default="CURRENT_TIMESTAMP")
.foreign_key_constraint("org_id", "organizations", "id")
)
sql = builder.build().sql
"""
__slots__ = (
"_columns",
"_constraints",
"_if_not_exists",
"_like_table",
"_partition_by",
"_schema",
"_table_name",
"_table_options",
"_tablespace",
"_temporary",
)
[docs]
def __init__(self, table_name: str, dialect: "DialectType" = None) -> None:
super().__init__(dialect=dialect)
self._table_name = table_name
self._if_not_exists = False
self._temporary = False
self._columns: list[ColumnDefinition] = []
self._constraints: list[ConstraintDefinition] = []
self._table_options: dict[str, Any] = {}
self._schema: str | None = None
self._tablespace: str | None = None
self._like_table: str | None = None
self._partition_by: str | None = None
[docs]
def in_schema(self, schema_name: str) -> "Self":
"""Set the schema for the table."""
self._schema = schema_name
return self
[docs]
def if_not_exists(self) -> "Self":
"""Add IF NOT EXISTS clause."""
self._if_not_exists = True
return self
[docs]
def temporary(self) -> "Self":
"""Create a temporary table."""
self._temporary = True
return self
[docs]
def like(self, source_table: str) -> "Self":
"""Create table LIKE another table."""
self._like_table = source_table
return self
[docs]
def tablespace(self, name: str) -> "Self":
"""Set tablespace for the table."""
self._tablespace = name
return self
[docs]
def partition_by(self, partition_spec: str) -> "Self":
"""Set partitioning specification."""
self._partition_by = partition_spec
return self
@property
def columns(self) -> "list[ColumnDefinition]":
"""Get the list of column definitions for this table.
Returns:
List of ColumnDefinition objects.
"""
return self._columns
[docs]
def column(
self,
name: str,
dtype: str,
default: "Any | None" = None,
not_null: bool = False,
primary_key: bool = False,
unique: bool = False,
auto_increment: bool = False,
comment: "str | None" = None,
check: "str | None" = None,
generated: "str | None" = None,
collate: "str | None" = None,
) -> "Self":
"""Add a column definition to the table."""
if not name:
self._raise_sql_builder_error("Column name must be a non-empty string")
if not dtype:
self._raise_sql_builder_error("Column type must be a non-empty string")
if any(col.name == name for col in self._columns):
self._raise_sql_builder_error(f"Column '{name}' already defined")
column_def = ColumnDefinition(
name=name,
dtype=dtype,
default=default,
not_null=not_null,
primary_key=primary_key,
unique=unique,
auto_increment=auto_increment,
comment=comment,
check=check,
generated=generated,
collate=collate,
)
self._columns.append(column_def)
if primary_key and not self._has_primary_key_constraint():
self.primary_key_constraint([name])
return self
[docs]
def primary_key_constraint(self, columns: "str | list[str]", name: "str | None" = None) -> "Self":
"""Add a primary key constraint."""
col_list = [columns] if isinstance(columns, str) else list(columns)
if not col_list:
self._raise_sql_builder_error("Primary key must include at least one column")
existing_pk = self._find_primary_key_constraint()
if existing_pk:
for col in col_list:
if col not in existing_pk.columns:
existing_pk.columns.append(col)
else:
constraint = ConstraintDefinition(constraint_type=CONSTRAINT_TYPE_PRIMARY_KEY, name=name, columns=col_list)
self._constraints.append(constraint)
return self
[docs]
def foreign_key_constraint(
self,
columns: "str | list[str]",
references_table: str,
references_columns: "str | list[str]",
name: "str | None" = None,
on_delete: "str | None" = None,
on_update: "str | None" = None,
deferrable: bool = False,
initially_deferred: bool = False,
) -> "Self":
"""Add a foreign key constraint."""
col_list = [columns] if isinstance(columns, str) else list(columns)
ref_col_list = [references_columns] if isinstance(references_columns, str) else list(references_columns)
if len(col_list) != len(ref_col_list):
self._raise_sql_builder_error("Foreign key columns and referenced columns must have same length")
self._validate_foreign_key_action(on_delete, "ON DELETE")
self._validate_foreign_key_action(on_update, "ON UPDATE")
constraint = ConstraintDefinition(
constraint_type=CONSTRAINT_TYPE_FOREIGN_KEY,
name=name,
columns=col_list,
references_table=references_table,
references_columns=ref_col_list,
on_delete=on_delete.upper() if on_delete else None,
on_update=on_update.upper() if on_update else None,
deferrable=deferrable,
initially_deferred=initially_deferred,
)
self._constraints.append(constraint)
return self
[docs]
def unique_constraint(self, columns: "str | list[str]", name: "str | None" = None) -> "Self":
"""Add a unique constraint."""
col_list = [columns] if isinstance(columns, str) else list(columns)
if not col_list:
self._raise_sql_builder_error("Unique constraint must include at least one column")
constraint = ConstraintDefinition(constraint_type=CONSTRAINT_TYPE_UNIQUE, name=name, columns=col_list)
self._constraints.append(constraint)
return self
[docs]
def check_constraint(self, condition: Union[str, "ColumnExpression"], name: "str | None" = None) -> "Self":
"""Add a check constraint."""
if not condition:
self._raise_sql_builder_error("Check constraint must have a condition")
condition_str: str
if has_sqlglot_expression(condition):
sqlglot_expr = condition.sqlglot_expression
condition_str = sqlglot_expr.sql(dialect=self.dialect) if sqlglot_expr else str(condition)
else:
condition_str = str(condition)
constraint = ConstraintDefinition(constraint_type=CONSTRAINT_TYPE_CHECK, name=name, condition=condition_str)
self._constraints.append(constraint)
return self
[docs]
def engine(self, engine_name: str) -> "Self":
"""Set storage engine (MySQL/MariaDB)."""
self._table_options["engine"] = engine_name
return self
[docs]
def charset(self, charset_name: str) -> "Self":
"""Set character set."""
self._table_options["charset"] = charset_name
return self
[docs]
def collate(self, collation: str) -> "Self":
"""Set table collation."""
self._table_options["collate"] = collation
return self
[docs]
def with_option(self, key: str, value: "Any") -> "Self":
"""Add custom table option."""
self._table_options[key] = value
return self
def _create_base_expression(self) -> "exp.Expression":
"""Create the SQLGlot expression for CREATE TABLE."""
if not self._columns and not self._like_table:
self._raise_sql_builder_error("Table must have at least one column or use LIKE clause")
column_defs: list[exp.Expression] = []
for col in self._columns:
col_expr = build_column_expression(col)
column_defs.append(col_expr)
for constraint in self._constraints:
if self._is_redundant_single_column_primary_key(constraint):
continue
constraint_expr = build_constraint_expression(constraint)
if constraint_expr:
column_defs.append(constraint_expr)
props: list[exp.Property] = []
if self._table_options.get("engine"):
props.append(
exp.Property(
this=exp.to_identifier("ENGINE"), value=exp.to_identifier(self._table_options.get("engine"))
)
)
if self._tablespace:
props.append(exp.Property(this=exp.to_identifier("TABLESPACE"), value=exp.to_identifier(self._tablespace)))
if self._partition_by:
props.append(exp.Property(this=exp.to_identifier("PARTITION BY"), value=exp.convert(self._partition_by)))
for key, value in self._table_options.items():
if key != "engine":
props.append(exp.Property(this=exp.to_identifier(key.upper()), value=exp.convert(value)))
properties_node = exp.Properties(expressions=props) if props else None
if self._schema:
table_identifier = exp.Table(this=exp.to_identifier(self._table_name), db=exp.to_identifier(self._schema))
else:
table_identifier = exp.Table(this=exp.to_identifier(self._table_name))
schema_expr = exp.Schema(this=table_identifier, expressions=column_defs)
like_expr = None
if self._like_table:
like_expr = exp.to_table(self._like_table)
return exp.Create(
kind="TABLE",
this=schema_expr,
exists=self._if_not_exists,
temporary=self._temporary,
properties=properties_node,
like=like_expr,
)
def _has_primary_key_constraint(self) -> bool:
"""Check if table already has a primary key constraint."""
return any(c.constraint_type == CONSTRAINT_TYPE_PRIMARY_KEY for c in self._constraints)
def _find_primary_key_constraint(self) -> "ConstraintDefinition | None":
"""Find existing primary key constraint."""
return next((c for c in self._constraints if c.constraint_type == CONSTRAINT_TYPE_PRIMARY_KEY), None)
def _validate_foreign_key_action(self, action: "str | None", action_type: str) -> None:
"""Validate foreign key action (ON DELETE or ON UPDATE)."""
if action and action.upper() not in VALID_FOREIGN_KEY_ACTIONS:
self._raise_sql_builder_error(f"Invalid {action_type} action: {action}")
def _is_redundant_single_column_primary_key(self, constraint: "ConstraintDefinition") -> bool:
"""Check if constraint is a redundant single-column primary key."""
if constraint.constraint_type != CONSTRAINT_TYPE_PRIMARY_KEY or len(constraint.columns) != 1:
return False
col_name = constraint.columns[0]
return any(c.name == col_name and c.primary_key for c in self._columns)
[docs]
class DropTable(DDLBuilder):
"""Builder for DROP TABLE [IF EXISTS] ... [CASCADE|RESTRICT]."""
__slots__ = ("_cascade", "_if_exists", "_table_name")
[docs]
def __init__(self, table_name: str, dialect: "DialectType" = None) -> None:
"""Initialize DROP TABLE with table name.
Args:
table_name: Name of the table to drop
dialect: SQL dialect to use
"""
super().__init__(dialect=dialect)
self._table_name = table_name
self._if_exists = False
self._cascade: bool | None = None
def table(self, name: str) -> Self:
self._table_name = name
return self
def if_exists(self) -> Self:
self._if_exists = True
return self
def cascade(self) -> Self:
self._cascade = True
return self
def restrict(self) -> Self:
self._cascade = False
return self
def _create_base_expression(self) -> exp.Expression:
if not self._table_name:
self._raise_sql_builder_error("Table name must be set for DROP TABLE.")
return exp.Drop(
kind="TABLE", this=exp.to_table(self._table_name), exists=self._if_exists, cascade=self._cascade
)
[docs]
class DropIndex(DDLBuilder):
"""Builder for DROP INDEX [IF EXISTS] ... [ON table] [CASCADE|RESTRICT]."""
__slots__ = ("_cascade", "_if_exists", "_index_name", "_table_name")
[docs]
def __init__(self, index_name: str, dialect: "DialectType" = None) -> None:
"""Initialize DROP INDEX with index name.
Args:
index_name: Name of the index to drop
dialect: SQL dialect to use
"""
super().__init__(dialect=dialect)
self._index_name = index_name
self._table_name: str | None = None
self._if_exists = False
self._cascade: bool | None = None
def name(self, index_name: str) -> Self:
self._index_name = index_name
return self
def on_table(self, table_name: str) -> Self:
self._table_name = table_name
return self
def if_exists(self) -> Self:
self._if_exists = True
return self
def cascade(self) -> Self:
self._cascade = True
return self
def restrict(self) -> Self:
self._cascade = False
return self
def _create_base_expression(self) -> exp.Expression:
if not self._index_name:
self._raise_sql_builder_error("Index name must be set for DROP INDEX.")
return exp.Drop(
kind="INDEX",
this=exp.to_identifier(self._index_name),
table=exp.to_table(self._table_name) if self._table_name else None,
exists=self._if_exists,
cascade=self._cascade,
)
[docs]
class DropView(DDLBuilder):
"""Builder for DROP VIEW [IF EXISTS] ... [CASCADE|RESTRICT]."""
__slots__ = ("_cascade", "_if_exists", "_view_name")
[docs]
def __init__(self, view_name: str, dialect: "DialectType" = None) -> None:
"""Initialize DROP VIEW with view name.
Args:
view_name: Name of the view to drop
dialect: SQL dialect to use
"""
super().__init__(dialect=dialect)
self._view_name = view_name
self._if_exists = False
self._cascade: bool | None = None
def name(self, view_name: str) -> Self:
self._view_name = view_name
return self
def if_exists(self) -> Self:
self._if_exists = True
return self
def cascade(self) -> Self:
self._cascade = True
return self
def restrict(self) -> Self:
self._cascade = False
return self
def _create_base_expression(self) -> exp.Expression:
if not self._view_name:
self._raise_sql_builder_error("View name must be set for DROP VIEW.")
return exp.Drop(
kind="VIEW", this=exp.to_identifier(self._view_name), exists=self._if_exists, cascade=self._cascade
)
[docs]
class DropSchema(DDLBuilder):
"""Builder for DROP SCHEMA [IF EXISTS] ... [CASCADE|RESTRICT]."""
__slots__ = ("_cascade", "_if_exists", "_schema_name")
[docs]
def __init__(self, schema_name: str, dialect: "DialectType" = None) -> None:
"""Initialize DROP SCHEMA with schema name.
Args:
schema_name: Name of the schema to drop
dialect: SQL dialect to use
"""
super().__init__(dialect=dialect)
self._schema_name = schema_name
self._if_exists = False
self._cascade: bool | None = None
def name(self, schema_name: str) -> Self:
self._schema_name = schema_name
return self
def if_exists(self) -> Self:
self._if_exists = True
return self
def cascade(self) -> Self:
self._cascade = True
return self
def restrict(self) -> Self:
self._cascade = False
return self
def _create_base_expression(self) -> exp.Expression:
if not self._schema_name:
self._raise_sql_builder_error("Schema name must be set for DROP SCHEMA.")
return exp.Drop(
kind="SCHEMA", this=exp.to_identifier(self._schema_name), exists=self._if_exists, cascade=self._cascade
)
[docs]
class DropMaterializedView(DDLBuilder):
"""Builder for DROP MATERIALIZED VIEW [IF EXISTS] ... [CASCADE|RESTRICT]."""
__slots__ = ("_cascade", "_if_exists", "_view_name")
[docs]
def __init__(self, view_name: str, dialect: "DialectType" = None) -> None:
"""Initialize DROP MATERIALIZED VIEW with view name.
Args:
view_name: Name of the materialized view to drop
dialect: SQL dialect to use
"""
super().__init__(dialect=dialect)
self._view_name = view_name
self._if_exists = False
self._cascade: bool | None = None
[docs]
def name(self, view_name: str) -> Self:
"""Set the materialized view name."""
self._view_name = view_name
return self
[docs]
def if_exists(self) -> Self:
"""Add IF EXISTS clause."""
self._if_exists = True
return self
[docs]
def cascade(self) -> Self:
"""Add CASCADE to drop dependent objects."""
self._cascade = True
return self
[docs]
def restrict(self) -> Self:
"""Add RESTRICT to prevent dropping if dependencies exist."""
self._cascade = False
return self
def _create_base_expression(self) -> exp.Expression:
if not self._view_name:
self._raise_sql_builder_error("View name must be set for DROP MATERIALIZED VIEW.")
return exp.Drop(
kind="MATERIALIZED VIEW",
this=exp.to_identifier(self._view_name),
exists=self._if_exists,
cascade=self._cascade,
)
[docs]
class CreateIndex(DDLBuilder):
"""Builder for CREATE [UNIQUE] INDEX [IF NOT EXISTS] ... ON ... (...)."""
__slots__ = ("_columns", "_if_not_exists", "_index_name", "_table_name", "_unique", "_using", "_where")
[docs]
def __init__(self, index_name: str, dialect: "DialectType" = None) -> None:
"""Initialize CREATE INDEX with index name.
Args:
index_name: Name of the index to create
dialect: SQL dialect to use
"""
super().__init__(dialect=dialect)
self._index_name = index_name
self._table_name: str | None = None
self._columns: list[str | exp.Ordered | exp.Expression] = []
self._unique = False
self._if_not_exists = False
self._using: str | None = None
self._where: str | exp.Expression | None = None
def name(self, index_name: str) -> Self:
self._index_name = index_name
return self
def on_table(self, table_name: str) -> Self:
self._table_name = table_name
return self
def columns(self, *cols: str | exp.Ordered | exp.Expression) -> Self:
self._columns.extend(cols)
return self
def expressions(self, *exprs: str | exp.Expression) -> Self:
self._columns.extend(exprs)
return self
def unique(self) -> Self:
self._unique = True
return self
def if_not_exists(self) -> Self:
self._if_not_exists = True
return self
def using(self, method: str) -> Self:
self._using = method
return self
def where(self, condition: str | exp.Expression) -> Self:
self._where = condition
return self
def _create_base_expression(self) -> exp.Expression:
"""Build the CREATE INDEX expression used by this builder.
Columns are turned into raw expressions (not ``Ordered``) to preserve natural NULL ordering,
string ``where`` clauses become expressions, and the final ``exp.Index`` is wrapped in an ``exp.Create`` with the configured flags.
"""
if not self._index_name or not self._table_name:
self._raise_sql_builder_error("Index name and table name must be set for CREATE INDEX.")
cols: list[exp.Expression] = []
for col in self._columns:
if isinstance(col, str):
cols.append(exp.column(col))
else:
cols.append(col)
where_expr = None
if self._where:
where_expr = exp.condition(self._where) if isinstance(self._where, str) else self._where
index_params = exp.IndexParameters(columns=cols) if cols else None
index_expr = exp.Index(
this=exp.to_identifier(self._index_name), table=exp.to_table(self._table_name), params=index_params
)
if where_expr:
index_expr.set("where", where_expr)
return exp.Create(kind="INDEX", this=index_expr, unique=self._unique, exists=self._if_not_exists)
[docs]
class Truncate(DDLBuilder):
"""Builder for TRUNCATE TABLE ... [CASCADE|RESTRICT] [RESTART IDENTITY|CONTINUE IDENTITY]."""
__slots__ = ("_cascade", "_identity", "_table_name")
[docs]
def __init__(self, table_name: str, dialect: "DialectType" = None) -> None:
"""Initialize TRUNCATE with table name.
Args:
table_name: Name of the table to truncate
dialect: SQL dialect to use
"""
super().__init__(dialect=dialect)
self._table_name = table_name
self._cascade: bool | None = None
self._identity: str | None = None
def table(self, name: str) -> Self:
self._table_name = name
return self
def cascade(self) -> Self:
self._cascade = True
return self
def restrict(self) -> Self:
self._cascade = False
return self
def restart_identity(self) -> Self:
self._identity = "RESTART"
return self
def continue_identity(self) -> Self:
self._identity = "CONTINUE"
return self
def _create_base_expression(self) -> exp.Expression:
if not self._table_name:
self._raise_sql_builder_error("Table name must be set for TRUNCATE TABLE.")
identity_expr = exp.Var(this=self._identity) if self._identity else None
return exp.TruncateTable(this=exp.to_table(self._table_name), cascade=self._cascade, identity=identity_expr)
[docs]
class AlterOperation:
"""Represents a single ALTER TABLE operation."""
__slots__ = (
"after_column",
"column_definition",
"column_name",
"constraint_definition",
"constraint_name",
"first",
"new_name",
"new_type",
"operation_type",
"using_expression",
)
[docs]
def __init__(
self,
operation_type: str,
column_name: "str | None" = None,
column_definition: "ColumnDefinition | None" = None,
constraint_name: "str | None" = None,
constraint_definition: "ConstraintDefinition | None" = None,
new_type: "str | None" = None,
new_name: "str | None" = None,
after_column: "str | None" = None,
first: bool = False,
using_expression: "str | None" = None,
) -> None:
self.operation_type = operation_type
self.column_name = column_name
self.column_definition = column_definition
self.constraint_name = constraint_name
self.constraint_definition = constraint_definition
self.new_type = new_type
self.new_name = new_name
self.after_column = after_column
self.first = first
self.using_expression = using_expression
[docs]
class CreateSchema(DDLBuilder):
"""Builder for CREATE SCHEMA [IF NOT EXISTS] schema_name [AUTHORIZATION user_name]."""
__slots__ = ("_authorization", "_if_not_exists", "_schema_name")
[docs]
def __init__(self, schema_name: str, dialect: "DialectType" = None) -> None:
"""Initialize CREATE SCHEMA with schema name.
Args:
schema_name: Name of the schema to create
dialect: SQL dialect to use
"""
super().__init__(dialect=dialect)
self._schema_name = schema_name
self._if_not_exists = False
self._authorization: str | None = None
def name(self, schema_name: str) -> Self:
self._schema_name = schema_name
return self
def if_not_exists(self) -> Self:
self._if_not_exists = True
return self
def authorization(self, user_name: str) -> Self:
self._authorization = user_name
return self
def _create_base_expression(self) -> exp.Expression:
if not self._schema_name:
self._raise_sql_builder_error("Schema name must be set for CREATE SCHEMA.")
props: list[exp.Property] = []
if self._authorization:
props.append(
exp.Property(this=exp.to_identifier("AUTHORIZATION"), value=exp.to_identifier(self._authorization))
)
properties_node = exp.Properties(expressions=props) if props else None
return exp.Create(
kind="SCHEMA",
this=exp.to_identifier(self._schema_name),
exists=self._if_not_exists,
properties=properties_node,
)
[docs]
class CreateTableAsSelect(DDLBuilder):
"""Builder for CREATE TABLE [IF NOT EXISTS] ... AS SELECT ... (CTAS).
Example:
builder = (
CreateTableAsSelectBuilder()
.name("my_table")
.if_not_exists()
.columns("id", "name")
.as_select(select_builder)
)
sql = builder.build().sql
Methods:
- name(table_name: str): Set the table name.
- if_not_exists(): Add IF NOT EXISTS.
- columns(*cols: str): Set explicit column list (optional).
- as_select(select_query): Set the SELECT source (SQL, SelectBuilder, or str).
"""
__slots__ = ("_columns", "_if_not_exists", "_select_query", "_table_name")
[docs]
def __init__(self, dialect: "DialectType" = None) -> None:
super().__init__(dialect=dialect)
self._table_name: str | None = None
self._if_not_exists = False
self._columns: list[str] = []
self._select_query: object | None = None
def name(self, table_name: str) -> Self:
self._table_name = table_name
return self
def if_not_exists(self) -> Self:
self._if_not_exists = True
return self
def columns(self, *cols: str) -> Self:
self._columns = list(cols)
return self
def as_select(self, select_query: "str | exp.Expression") -> Self:
self._select_query = select_query
return self
def _create_base_expression(self) -> exp.Expression:
if not self._table_name:
self._raise_sql_builder_error("Table name must be set for CREATE TABLE AS SELECT.")
if self._select_query is None:
self._raise_sql_builder_error("SELECT query must be set for CREATE TABLE AS SELECT.")
select_expr = None
select_parameters = None
if isinstance(self._select_query, SQL):
select_expr = self._select_query.expression
select_parameters = self._select_query.parameters
elif isinstance(self._select_query, Select):
select_expr = self._select_query.get_expression()
select_parameters = self._select_query.parameters
with_ctes = self._select_query.with_ctes
if with_ctes and select_expr and isinstance(select_expr, exp.Select):
for alias, cte in with_ctes.items():
if has_with_method(select_expr):
select_expr = select_expr.with_(cte.this, as_=alias, copy=False)
elif isinstance(self._select_query, str):
select_expr = exp.maybe_parse(self._select_query)
select_parameters = None
else:
self._raise_sql_builder_error("Unsupported type for SELECT query in CTAS.")
if select_expr is None:
self._raise_sql_builder_error("SELECT query must be a valid SELECT expression.")
if select_parameters:
for p_name, p_value in select_parameters.items():
self._parameters[p_name] = p_value
schema_expr = None
if self._columns:
schema_expr = exp.Schema(expressions=[exp.column(c) for c in self._columns])
return exp.Create(
kind="TABLE",
this=exp.to_table(self._table_name),
exists=self._if_not_exists,
expression=select_expr,
schema=schema_expr,
)
[docs]
class CreateMaterializedView(DDLBuilder):
"""Builder for CREATE MATERIALIZED VIEW [IF NOT EXISTS] ... AS SELECT ..."""
__slots__ = (
"_columns",
"_hints",
"_if_not_exists",
"_refresh_mode",
"_select_query",
"_storage_parameters",
"_tablespace",
"_using_index",
"_view_name",
"_with_data",
)
[docs]
def __init__(self, view_name: str, dialect: "DialectType" = None) -> None:
"""Initialize CREATE MATERIALIZED VIEW with view name.
Args:
view_name: Name of the materialized view to create
dialect: SQL dialect to use
"""
super().__init__(dialect=dialect)
self._view_name = view_name
self._if_not_exists = False
self._columns: list[str] = []
self._select_query: str | exp.Expression | None = None
self._with_data: bool | None = None
self._refresh_mode: str | None = None
self._storage_parameters: dict[str, Any] = {}
self._tablespace: str | None = None
self._using_index: str | None = None
self._hints: list[str] = []
def name(self, view_name: str) -> Self:
self._view_name = view_name
return self
def if_not_exists(self) -> Self:
self._if_not_exists = True
return self
def columns(self, *cols: str) -> Self:
self._columns = list(cols)
return self
def as_select(self, select_query: "str | exp.Expression") -> Self:
self._select_query = select_query
return self
def with_data(self) -> Self:
self._with_data = True
return self
def no_data(self) -> Self:
self._with_data = False
return self
def refresh_mode(self, mode: str) -> Self:
self._refresh_mode = mode
return self
def storage_parameter(self, key: str, value: Any) -> Self:
self._storage_parameters[key] = value
return self
def tablespace(self, name: str) -> Self:
self._tablespace = name
return self
def using_index(self, index_name: str) -> Self:
self._using_index = index_name
return self
def with_hint(self, hint: str) -> Self:
self._hints.append(hint)
return self
def _create_base_expression(self) -> exp.Expression:
if not self._view_name:
self._raise_sql_builder_error("View name must be set for CREATE MATERIALIZED VIEW.")
if self._select_query is None:
self._raise_sql_builder_error("SELECT query must be set for CREATE MATERIALIZED VIEW.")
select_expr: exp.Expression | None = None
select_parameters: dict[str, Any] | None = None
if isinstance(self._select_query, SQL):
select_expr = self._select_query.expression
select_parameters = self._select_query.parameters
elif isinstance(self._select_query, Select):
select_expr = self._select_query.get_expression()
select_parameters = self._select_query.parameters
elif isinstance(self._select_query, str):
select_expr = exp.maybe_parse(self._select_query)
select_parameters = None
else:
self._raise_sql_builder_error("Unsupported type for SELECT query in materialized view.")
if select_expr is None or not isinstance(select_expr, exp.Select):
self._raise_sql_builder_error("SELECT query must be a valid SELECT expression.")
if select_parameters:
for p_name, p_value in select_parameters.items():
self._parameters[p_name] = p_value
schema_expr = None
if self._columns:
schema_expr = exp.Schema(expressions=[exp.column(c) for c in self._columns])
props: list[exp.Property] = []
if self._refresh_mode:
props.append(exp.Property(this=exp.to_identifier("REFRESH_MODE"), value=exp.convert(self._refresh_mode)))
if self._tablespace:
props.append(exp.Property(this=exp.to_identifier("TABLESPACE"), value=exp.to_identifier(self._tablespace)))
if self._using_index:
props.append(
exp.Property(this=exp.to_identifier("USING_INDEX"), value=exp.to_identifier(self._using_index))
)
for k, v in self._storage_parameters.items():
props.append(exp.Property(this=exp.to_identifier(k), value=exp.convert(str(v))))
if self._with_data is not None:
props.append(exp.Property(this=exp.to_identifier("WITH_DATA" if self._with_data else "NO_DATA")))
props.extend(exp.Property(this=exp.to_identifier("HINT"), value=exp.convert(hint)) for hint in self._hints)
properties_node = exp.Properties(expressions=props) if props else None
return exp.Create(
kind="MATERIALIZED_VIEW",
this=exp.to_identifier(self._view_name),
exists=self._if_not_exists,
expression=select_expr,
schema=schema_expr,
properties=properties_node,
)
[docs]
class CreateView(DDLBuilder):
"""Builder for CREATE VIEW [IF NOT EXISTS] ... AS SELECT ..."""
__slots__ = ("_columns", "_hints", "_if_not_exists", "_select_query", "_view_name")
[docs]
def __init__(self, view_name: str, dialect: "DialectType" = None) -> None:
"""Initialize CREATE VIEW with view name.
Args:
view_name: Name of the view to create
dialect: SQL dialect to use
"""
super().__init__(dialect=dialect)
self._view_name = view_name
self._if_not_exists = False
self._columns: list[str] = []
self._select_query: str | exp.Expression | None = None
self._hints: list[str] = []
def name(self, view_name: str) -> Self:
self._view_name = view_name
return self
def if_not_exists(self) -> Self:
self._if_not_exists = True
return self
def columns(self, *cols: str) -> Self:
self._columns = list(cols)
return self
def as_select(self, select_query: "str | exp.Expression") -> Self:
self._select_query = select_query
return self
def with_hint(self, hint: str) -> Self:
self._hints.append(hint)
return self
def _create_base_expression(self) -> exp.Expression:
if not self._view_name:
self._raise_sql_builder_error("View name must be set for CREATE VIEW.")
if self._select_query is None:
self._raise_sql_builder_error("SELECT query must be set for CREATE VIEW.")
select_expr: exp.Expression | None = None
select_parameters: dict[str, Any] | None = None
if isinstance(self._select_query, SQL):
select_expr = self._select_query.expression
select_parameters = self._select_query.parameters
elif isinstance(self._select_query, Select):
select_expr = self._select_query.get_expression()
select_parameters = self._select_query.parameters
elif isinstance(self._select_query, str):
select_expr = exp.maybe_parse(self._select_query)
select_parameters = None
else:
self._raise_sql_builder_error("Unsupported type for SELECT query in view.")
if select_expr is None or not isinstance(select_expr, exp.Select):
self._raise_sql_builder_error("SELECT query must be a valid SELECT expression.")
if select_parameters:
for p_name, p_value in select_parameters.items():
self._parameters[p_name] = p_value
schema_expr = None
if self._columns:
schema_expr = exp.Schema(expressions=[exp.column(c) for c in self._columns])
props: list[exp.Property] = [
exp.Property(this=exp.to_identifier("HINT"), value=exp.convert(h)) for h in self._hints
]
properties_node = exp.Properties(expressions=props) if props else None
return exp.Create(
kind="VIEW",
this=exp.to_identifier(self._view_name),
exists=self._if_not_exists,
expression=select_expr,
schema=schema_expr,
properties=properties_node,
)
[docs]
class AlterTable(DDLBuilder):
"""Builder for ALTER TABLE operations.
Example:
builder = (
AlterTable("users")
.add_column("email", "VARCHAR(255)", not_null=True)
.drop_column("old_field")
.add_constraint("check_age", "CHECK (age >= 18)")
)
"""
__slots__ = ("_if_exists", "_operations", "_schema", "_table_name")
[docs]
def __init__(self, table_name: str, dialect: "DialectType" = None) -> None:
super().__init__(dialect=dialect)
self._table_name = table_name
self._operations: list[AlterOperation] = []
self._schema: str | None = None
self._if_exists = False
[docs]
def if_exists(self) -> "Self":
"""Add IF EXISTS clause."""
self._if_exists = True
return self
[docs]
def add_column(
self,
name: str,
dtype: str,
default: "Any | None" = None,
not_null: bool = False,
unique: bool = False,
comment: "str | None" = None,
after: "str | None" = None,
first: bool = False,
) -> "Self":
"""Add a new column to the table."""
if not name:
self._raise_sql_builder_error("Column name must be a non-empty string")
if not dtype:
self._raise_sql_builder_error("Column type must be a non-empty string")
column_def = ColumnDefinition(
name=name, dtype=dtype, default=default, not_null=not_null, unique=unique, comment=comment
)
operation = AlterOperation(
operation_type="ADD COLUMN", column_definition=column_def, after_column=after, first=first
)
self._operations.append(operation)
return self
[docs]
def drop_column(self, name: str, cascade: bool = False) -> "Self":
"""Drop a column from the table."""
if not name:
self._raise_sql_builder_error("Column name must be a non-empty string")
operation = AlterOperation(operation_type="DROP COLUMN CASCADE" if cascade else "DROP COLUMN", column_name=name)
self._operations.append(operation)
return self
[docs]
def alter_column_type(self, name: str, new_type: str, using: "str | None" = None) -> "Self":
"""Change the type of an existing column."""
if not name:
self._raise_sql_builder_error("Column name must be a non-empty string")
if not new_type:
self._raise_sql_builder_error("New type must be a non-empty string")
operation = AlterOperation(
operation_type="ALTER COLUMN TYPE", column_name=name, new_type=new_type, using_expression=using
)
self._operations.append(operation)
return self
[docs]
def rename_column(self, old_name: str, new_name: str) -> "Self":
"""Rename a column."""
if not old_name:
self._raise_sql_builder_error("Old column name must be a non-empty string")
if not new_name:
self._raise_sql_builder_error("New column name must be a non-empty string")
operation = AlterOperation(operation_type="RENAME COLUMN", column_name=old_name, new_name=new_name)
self._operations.append(operation)
return self
[docs]
def add_constraint(
self,
constraint_type: str,
columns: "str | list[str] | None" = None,
name: "str | None" = None,
references_table: "str | None" = None,
references_columns: "str | list[str] | None" = None,
condition: "str | ColumnExpression | None" = None,
on_delete: "str | None" = None,
on_update: "str | None" = None,
) -> "Self":
"""Add a constraint to the table.
Args:
constraint_type: Type of constraint ('PRIMARY KEY', 'FOREIGN KEY', 'UNIQUE', 'CHECK')
columns: Column(s) for the constraint (not needed for CHECK)
name: Optional constraint name
references_table: Table referenced by foreign key
references_columns: Columns referenced by foreign key
condition: CHECK constraint condition
on_delete: Foreign key ON DELETE action
on_update: Foreign key ON UPDATE action
"""
if constraint_type.upper() not in VALID_CONSTRAINT_TYPES:
self._raise_sql_builder_error(f"Invalid constraint type: {constraint_type}")
col_list = None
if columns is not None:
col_list = [columns] if isinstance(columns, str) else list(columns)
ref_col_list = None
if references_columns is not None:
ref_col_list = [references_columns] if isinstance(references_columns, str) else list(references_columns)
condition_str: str | None = None
if condition is not None:
if has_sqlglot_expression(condition):
sqlglot_expr = condition.sqlglot_expression
condition_str = sqlglot_expr.sql(dialect=self.dialect) if sqlglot_expr else str(condition)
else:
condition_str = str(condition)
constraint_def = ConstraintDefinition(
constraint_type=constraint_type.upper(),
name=name,
columns=col_list or [],
references_table=references_table,
references_columns=ref_col_list or [],
condition=condition_str,
on_delete=on_delete,
on_update=on_update,
)
operation = AlterOperation(operation_type="ADD CONSTRAINT", constraint_definition=constraint_def)
self._operations.append(operation)
return self
[docs]
def drop_constraint(self, name: str, cascade: bool = False) -> "Self":
"""Drop a constraint from the table."""
if not name:
self._raise_sql_builder_error("Constraint name must be a non-empty string")
operation = AlterOperation(
operation_type="DROP CONSTRAINT CASCADE" if cascade else "DROP CONSTRAINT", constraint_name=name
)
self._operations.append(operation)
return self
[docs]
def set_not_null(self, column: str) -> "Self":
"""Set a column to NOT NULL."""
operation = AlterOperation(operation_type="ALTER COLUMN SET NOT NULL", column_name=column)
self._operations.append(operation)
return self
[docs]
def drop_not_null(self, column: str) -> "Self":
"""Remove NOT NULL constraint from a column."""
operation = AlterOperation(operation_type="ALTER COLUMN DROP NOT NULL", column_name=column)
self._operations.append(operation)
return self
def _create_base_expression(self) -> "exp.Expression":
"""Create the SQLGlot expression for ALTER TABLE."""
if not self._operations:
self._raise_sql_builder_error("At least one operation must be specified for ALTER TABLE")
if self._schema:
table = exp.Table(this=exp.to_identifier(self._table_name), db=exp.to_identifier(self._schema))
else:
table = exp.to_table(self._table_name)
actions: list[exp.Expression] = [self._build_operation_expression(op) for op in self._operations]
return exp.Alter(this=table, kind="TABLE", actions=actions, exists=self._if_exists)
def _build_operation_expression(self, op: "AlterOperation") -> exp.Expression:
"""Build a structured SQLGlot expression for a single alter operation."""
op_type = op.operation_type.upper()
if op_type == "ADD COLUMN":
if not op.column_definition:
self._raise_sql_builder_error("Column definition required for ADD COLUMN")
return build_column_expression(op.column_definition)
if op_type == "DROP COLUMN":
return exp.Drop(this=exp.to_identifier(op.column_name), kind="COLUMN", exists=True)
if op_type == "DROP COLUMN CASCADE":
return exp.Drop(this=exp.to_identifier(op.column_name), kind="COLUMN", cascade=True, exists=True)
if op_type == "ALTER COLUMN TYPE":
if not op.new_type:
self._raise_sql_builder_error("New type required for ALTER COLUMN TYPE")
return exp.AlterColumn(
this=exp.to_identifier(op.column_name),
dtype=exp.DataType.build(op.new_type),
using=exp.maybe_parse(op.using_expression) if op.using_expression else None,
)
if op_type == "RENAME COLUMN":
return exp.RenameColumn(this=exp.to_identifier(op.column_name), to=exp.to_identifier(op.new_name))
if op_type == "ADD CONSTRAINT":
if not op.constraint_definition:
self._raise_sql_builder_error("Constraint definition required for ADD CONSTRAINT")
constraint_expr = build_constraint_expression(op.constraint_definition)
return exp.AddConstraint(this=constraint_expr)
if op_type == "DROP CONSTRAINT":
return exp.Drop(this=exp.to_identifier(op.constraint_name), kind="CONSTRAINT", exists=True)
if op_type == "DROP CONSTRAINT CASCADE":
return exp.Drop(this=exp.to_identifier(op.constraint_name), kind="CONSTRAINT", cascade=True, exists=True)
if op_type == "ALTER COLUMN SET NOT NULL":
return exp.AlterColumn(this=exp.to_identifier(op.column_name), allow_null=False)
if op_type == "ALTER COLUMN DROP NOT NULL":
return exp.AlterColumn(this=exp.to_identifier(op.column_name), drop=True, allow_null=True)
if op_type == "ALTER COLUMN SET DEFAULT":
if not op.column_definition or op.column_definition.default is None:
self._raise_sql_builder_error("Default value required for SET DEFAULT")
default_val = op.column_definition.default
default_expr: exp.Expression | None
if isinstance(default_val, str):
if self._is_sql_function_default(default_val):
default_expr = exp.maybe_parse(default_val)
else:
default_expr = exp.convert(default_val)
elif isinstance(default_val, (int, float)):
default_expr = exp.convert(default_val)
elif default_val is True:
default_expr = exp.true()
elif default_val is False:
default_expr = exp.false()
else:
default_expr = exp.convert(str(default_val))
return exp.AlterColumn(this=exp.to_identifier(op.column_name), default=default_expr)
if op_type == "ALTER COLUMN DROP DEFAULT":
return exp.AlterColumn(this=exp.to_identifier(op.column_name), kind="DROP DEFAULT")
self._raise_sql_builder_error(f"Unknown operation type: {op.operation_type}")
raise AssertionError
def _is_sql_function_default(self, default_val: str) -> bool:
"""Check if default value is a SQL function or expression."""
default_upper = default_val.upper()
return (
default_upper in {CURRENT_TIMESTAMP_KEYWORD, CURRENT_DATE_KEYWORD, CURRENT_TIME_KEYWORD}
or "(" in default_val
)