"""Parameter processing pipeline orchestrator."""
from collections import OrderedDict
from collections.abc import Callable, Mapping, Sequence
from typing import Any, cast
from mypy_extensions import mypyc_attr
from sqlspec.core.parameters._alignment import looks_like_execute_many
from sqlspec.core.parameters._converter import ParameterConverter
from sqlspec.core.parameters._types import (
ConvertedParameters,
ParameterInfo,
ParameterPayload,
ParameterProcessingResult,
ParameterProfile,
ParameterStyle,
ParameterStyleConfig,
TypedParameter,
wrap_with_type,
)
from sqlspec.core.parameters._validator import ParameterValidator
from sqlspec.utils.dispatch import TypeDispatcher
__all__ = ("ParameterProcessor", "structural_fingerprint", "value_fingerprint")
# Threshold for sampling execute_many parameters instead of full iteration
_EXECUTE_MANY_SAMPLE_THRESHOLD = 10
# Number of records to sample for type signatures
_EXECUTE_MANY_SAMPLE_SIZE = 3
TypeCoercionFallback = tuple[type, Callable[[Any], Any]]
_TYPE_COERCION_DISPATCHERS: "dict[tuple[TypeCoercionFallback, ...], TypeDispatcher[Callable[[Any], Any]]]" = {}
def _make_cache_key_tuple(
sql: str,
param_fingerprint: Any,
input_style: str,
exec_style: str,
dialect: str | None,
is_many: bool,
wrap_types: bool | None = None,
normalize_for_parsing: bool | None = None,
) -> tuple[Any, ...]:
"""Build the shared SQL and parameter processor cache key tuple."""
if wrap_types is None and normalize_for_parsing is None:
return (sql, param_fingerprint, input_style, exec_style, dialect, is_many)
return (sql, param_fingerprint, input_style, exec_style, dialect, is_many, wrap_types, normalize_for_parsing)
def structural_fingerprint(parameters: "ParameterPayload", is_many: bool = False) -> Any:
"""Return a structural fingerprint for caching parameter payloads.
Returns a hashable tuple representing the structure (keys, types, count).
Avoids string formatting for performance.
Note: Uses Python 3.7+ dict insertion order instead of sorted() for determinism.
This means fingerprints depend on the order keys were inserted, which is typically
consistent within a single codebase.
"""
if parameters is None:
return None
# Fast type dispatch: check concrete types first (2-4x faster than ABC isinstance)
param_type = type(parameters)
# Handle dict (most common Mapping type) - fast path
if param_type is dict:
dict_params = cast("dict[str, Any]", parameters)
if not dict_params:
return ("dict",)
# Use dict insertion order (Python 3.7+ guaranteed) instead of sorted()
# This is O(n) vs O(n log n) and produces consistent fingerprints for
# parameters constructed in the same order (typical usage pattern)
keys = tuple(dict_params.keys())
type_sig = tuple(type(v) for v in dict_params.values())
return ("dict", keys, type_sig)
# Handle list and tuple (most common Sequence types) - fast path
if param_type is list or param_type is tuple:
seq_params = cast("Sequence[Any]", parameters)
if not seq_params:
return ("seq",)
# Optimization: Fast path for single-item sequence (extremely common)
if len(seq_params) == 1:
return ("seq", (type(seq_params[0]),))
if is_many:
return _fingerprint_execute_many(seq_params)
# Single execution with sequence parameters
type_sig = tuple(type(v) for v in seq_params)
return ("seq", type_sig)
# Fallback to ABC checks for custom types (Mapping, Sequence subclasses)
if isinstance(parameters, Mapping):
if not parameters:
return ("dict",)
keys = tuple(parameters.keys())
type_sig = tuple(type(v) for v in parameters.values())
return ("dict", keys, type_sig)
if isinstance(parameters, Sequence) and not isinstance(parameters, (str, bytes, bytearray)):
if not parameters:
return ("seq",)
if len(parameters) == 1:
return ("seq", (type(parameters[0]),))
if is_many:
return _fingerprint_execute_many(parameters)
type_sig = tuple(type(v) for v in parameters)
return ("seq", type_sig)
# Scalar parameter
return ("scalar", param_type)
def _fingerprint_execute_many(parameters: "Sequence[Any]") -> Any:
"""Generate fingerprint for execute_many parameters.
Extracted to reduce code duplication and allow inlining of the common single-execution path.
"""
param_count = len(parameters)
sample_size = (
min(_EXECUTE_MANY_SAMPLE_SIZE, param_count) if param_count > _EXECUTE_MANY_SAMPLE_THRESHOLD else param_count
)
first = parameters[0]
first_type = type(first)
# Fast type dispatch for first element
if first_type is dict:
keys = tuple(first.keys())
type_sig = tuple(type(v) for v in first.values())
return ("many_dict", keys, type_sig, param_count)
if first_type is list or first_type is tuple:
type_sigs: list[tuple[type, ...]] = []
for i in range(sample_size):
param_item: Any = parameters[i]
type_sigs.append(tuple(type(v) for v in param_item))
return ("many_seq", tuple(type_sigs), param_count)
# Fallback to ABC checks
if isinstance(first, Mapping):
keys = tuple(first.keys())
type_sig = tuple(type(v) for v in first.values())
return ("many_dict", keys, type_sig, param_count)
if isinstance(first, Sequence) and not isinstance(first, (str, bytes)):
type_sigs = []
for i in range(sample_size):
param_item = parameters[i]
type_sigs.append(tuple(type(v) for v in param_item))
return ("many_seq", tuple(type_sigs), param_count)
# Scalar values in sequence for execute_many
type_sig = tuple(type(parameters[i]) for i in range(sample_size))
return ("many_scalar", type_sig, param_count)
def value_fingerprint(parameters: "ParameterPayload") -> Any:
"""Return a value-based fingerprint for parameter payloads.
Unlike structural_fingerprint, this includes actual parameter VALUES in the hash.
Used for static script compilation where SQL has values embedded directly.
Args:
parameters: Original parameter payload supplied by the caller.
Returns:
Hashable representation including parameter values.
"""
if parameters is None:
return None
# Use repr for value-based hashing - includes both structure and values
# Return as tuple to match structural_fingerprint return type (hashable)
return ("values", repr(parameters))
def _type_coercion_fallbacks(
type_coercion_map: "dict[type, Callable[[Any], Any]]",
) -> "tuple[TypeCoercionFallback, ...]":
return tuple(type_coercion_map.items())
def _get_type_coercion_dispatcher(
fallback_items: "tuple[TypeCoercionFallback, ...]",
) -> "TypeDispatcher[Callable[[Any], Any]]":
dispatcher = _TYPE_COERCION_DISPATCHERS.get(fallback_items)
if dispatcher is not None:
return dispatcher
dispatcher = TypeDispatcher["Callable[[Any], Any]"]()
dispatcher.register_all(fallback_items)
_TYPE_COERCION_DISPATCHERS[fallback_items] = dispatcher
return dispatcher
def _resolve_type_coercion(
value: object,
type_coercion_map: "dict[type, Callable[[Any], Any]]",
fallback_items: "tuple[TypeCoercionFallback, ...]",
) -> object:
value_type = type(value)
exact_converter = type_coercion_map.get(value_type)
if exact_converter is not None:
return exact_converter(value)
fallback_converter = _get_type_coercion_dispatcher(fallback_items).get(value)
if fallback_converter is not None:
return fallback_converter(value)
return value
def _coerce_nested_value(
value: object,
type_coercion_map: "dict[type, Callable[[Any], Any]]",
fallback_items: "tuple[TypeCoercionFallback, ...]",
) -> object:
# Fast type dispatch for common types
value_type = type(value)
if value_type is list or value_type is tuple:
seq_value = cast("Sequence[Any]", value)
return [_coerce_parameter_value(item, type_coercion_map, fallback_items) for item in seq_value]
if value_type is dict:
dict_value = cast("dict[Any, Any]", value)
return {key: _coerce_parameter_value(val, type_coercion_map, fallback_items) for key, val in dict_value.items()}
return value
def _coerce_parameter_value(
value: object,
type_coercion_map: "dict[type, Callable[[Any], Any]]",
fallback_items: "tuple[TypeCoercionFallback, ...]",
) -> object:
if value is None:
return value
value_type = type(value)
# Fast path: check TypedParameter by type identity (2-4x faster than isinstance)
if value_type is TypedParameter:
typed_param = cast("TypedParameter", value)
wrapped_value: object = typed_param.value
if wrapped_value is None:
return wrapped_value
coerced = _resolve_type_coercion(wrapped_value, type_coercion_map, fallback_items)
if coerced is wrapped_value:
return wrapped_value
return _coerce_nested_value(coerced, type_coercion_map, fallback_items)
coerced = _resolve_type_coercion(value, type_coercion_map, fallback_items)
if coerced is value:
return value
return _coerce_nested_value(coerced, type_coercion_map, fallback_items)
def _coerce_sequence_preserving_identity(
seq_value: "Sequence[Any]",
type_coercion_map: "dict[type, Callable[[Any], Any]]",
fallback_items: "tuple[TypeCoercionFallback, ...]",
) -> "Sequence[Any] | list[Any]":
updated_seq: list[Any] | None = None
for idx, item in enumerate(seq_value):
coerced_value = _coerce_parameter_value(item, type_coercion_map, fallback_items)
if updated_seq is None:
if coerced_value is item:
continue
updated_seq = list(seq_value[:idx])
updated_seq.append(coerced_value)
if updated_seq is None:
return seq_value
return updated_seq
def _coerce_mapping_preserving_identity(
mapping: "Mapping[Any, Any]",
type_coercion_map: "dict[type, Callable[[Any], Any]]",
fallback_items: "tuple[TypeCoercionFallback, ...]",
) -> "Mapping[Any, Any] | dict[Any, Any]":
updated_mapping: dict[Any, Any] | None = None
for key, val in mapping.items():
coerced_value = _coerce_parameter_value(val, type_coercion_map, fallback_items)
if updated_mapping is None:
if coerced_value is val:
continue
updated_mapping = dict(mapping)
updated_mapping[key] = coerced_value
if updated_mapping is None:
return mapping
return updated_mapping
def _coerce_parameter_set(
param_set: object,
type_coercion_map: "dict[type, Callable[[Any], Any]]",
fallback_items: "tuple[TypeCoercionFallback, ...]",
) -> object:
# Fast type dispatch for common types
param_type = type(param_set)
if param_type is list:
return _coerce_sequence_preserving_identity(cast("list[Any]", param_set), type_coercion_map, fallback_items)
if param_type is tuple:
seq_value = cast("tuple[Any, ...]", param_set)
coerced_seq = _coerce_sequence_preserving_identity(seq_value, type_coercion_map, fallback_items)
if coerced_seq is seq_value:
return seq_value
return tuple(cast("list[Any]", coerced_seq))
if param_type is dict:
return _coerce_mapping_preserving_identity(cast("dict[Any, Any]", param_set), type_coercion_map, fallback_items)
# Fallback to ABC checks for custom types
if isinstance(param_set, Sequence) and not isinstance(param_set, (str, bytes)):
seq_fallback = param_set
coerced_seq = _coerce_sequence_preserving_identity(seq_fallback, type_coercion_map, fallback_items)
if coerced_seq is seq_fallback:
return param_set
return coerced_seq
if isinstance(param_set, Mapping):
coerced_mapping = _coerce_mapping_preserving_identity(param_set, type_coercion_map, fallback_items)
if coerced_mapping is param_set:
return param_set
return coerced_mapping
return _coerce_parameter_value(param_set, type_coercion_map, fallback_items)
def _coerce_parameters_payload(
parameters: "ParameterPayload",
type_coercion_map: "dict[type, Callable[[Any], Any]]",
fallback_items: "tuple[TypeCoercionFallback, ...]",
is_many: bool,
) -> object:
# Fast type dispatch for common types
param_type = type(parameters)
if param_type is list:
seq_params = cast("list[Any]", parameters)
if is_many:
updated_many: list[Any] | None = None
for idx, param_set in enumerate(seq_params):
coerced_set = _coerce_parameter_set(param_set, type_coercion_map, fallback_items)
if updated_many is None:
if coerced_set is param_set:
continue
updated_many = seq_params[:idx]
updated_many.append(coerced_set)
if updated_many is None:
return seq_params
return updated_many
updated_seq: list[Any] | None = None
for idx, item in enumerate(seq_params):
coerced_item = _coerce_parameter_value(item, type_coercion_map, fallback_items)
if updated_seq is None:
if coerced_item is item:
continue
updated_seq = seq_params[:idx]
updated_seq.append(coerced_item)
if updated_seq is None:
return seq_params
return updated_seq
if param_type is tuple:
tuple_params = cast("tuple[Any, ...]", parameters)
if is_many:
return [_coerce_parameter_set(param_set, type_coercion_map, fallback_items) for param_set in tuple_params]
return [_coerce_parameter_value(item, type_coercion_map, fallback_items) for item in tuple_params]
if param_type is dict:
dict_params = cast("dict[Any, Any]", parameters)
updated_mapping: dict[Any, Any] | None = None
for key, val in dict_params.items():
coerced_value = _coerce_parameter_value(val, type_coercion_map, fallback_items)
if updated_mapping is None:
if coerced_value is val:
continue
updated_mapping = dict(dict_params)
updated_mapping[key] = coerced_value
if updated_mapping is None:
return dict_params
return updated_mapping
# Fallback to ABC checks for custom types
if is_many and isinstance(parameters, Sequence) and not isinstance(parameters, (str, bytes)):
return [_coerce_parameter_set(param_set, type_coercion_map, fallback_items) for param_set in parameters]
if isinstance(parameters, Sequence) and not isinstance(parameters, (str, bytes)):
return [_coerce_parameter_value(item, type_coercion_map, fallback_items) for item in parameters]
if isinstance(parameters, Mapping):
return {key: _coerce_parameter_value(val, type_coercion_map, fallback_items) for key, val in parameters.items()}
return _coerce_parameter_value(parameters, type_coercion_map, fallback_items)
[docs]
@mypyc_attr(allow_interpreted_subclasses=False)
class ParameterProcessor:
"""Parameter processing engine coordinating conversion phases."""
__slots__ = ("_cache", "_cache_hits", "_cache_max_size", "_cache_misses", "_converter", "_validator")
DEFAULT_CACHE_SIZE = 1000
[docs]
def __init__(
self,
*,
converter: "ParameterConverter | None" = None,
validator: "ParameterValidator | None" = None,
cache_max_size: int | None = None,
validator_cache_max_size: int | None = None,
) -> None:
self._cache: OrderedDict[Any, ParameterProcessingResult] = OrderedDict()
if cache_max_size is None:
cache_max_size = self.DEFAULT_CACHE_SIZE
self._cache_max_size = max(cache_max_size, 0)
self._cache_hits = 0
self._cache_misses = 0
if converter is None:
if validator is None:
validator_cache = validator_cache_max_size
if validator_cache is None:
validator_cache = self._cache_max_size
validator = ParameterValidator(cache_max_size=validator_cache)
self._validator = validator
self._converter = ParameterConverter(self._validator)
else:
self._converter = converter
if validator is None:
self._validator = converter.validator
else:
self._validator = validator
self._converter.validator = validator
if validator_cache_max_size is not None and isinstance(self._validator, ParameterValidator):
self._validator.set_cache_max_size(validator_cache_max_size)
[docs]
def clear_cache(self) -> None:
"""Clear cached processing results and reset stats."""
self._cache.clear()
self._cache_hits = 0
self._cache_misses = 0
if isinstance(self._validator, ParameterValidator):
self._validator.clear_cache()
[docs]
def cache_stats(self) -> "dict[str, int]":
"""Return cache statistics for parameter processing."""
stats = {
"hits": self._cache_hits,
"misses": self._cache_misses,
"size": len(self._cache),
"max_size": self._cache_max_size,
}
if isinstance(self._validator, ParameterValidator):
validator_stats = self._validator.cache_stats()
stats["validator_hits"] = validator_stats["hits"]
stats["validator_misses"] = validator_stats["misses"]
stats["validator_size"] = validator_stats["size"]
stats["validator_max_size"] = validator_stats["max_size"]
else:
stats["validator_hits"] = 0
stats["validator_misses"] = 0
stats["validator_size"] = 0
stats["validator_max_size"] = 0
return stats
def _compile_static_script(
self,
sql: str,
parameters: "ParameterPayload",
config: "ParameterStyleConfig",
is_many: bool,
cache_key: Any | None,
input_named_parameters: "tuple[str, ...]",
) -> "ParameterProcessingResult":
coerced_params = parameters
if config.type_coercion_map and parameters:
coerced_params = self._coerce_parameter_types(parameters, config.type_coercion_map, is_many)
static_sql, static_params = self._converter.convert_placeholder_style(
sql, coerced_params, ParameterStyle.STATIC, is_many, strict_named_parameters=config.strict_named_parameters
)
result = ParameterProcessingResult(
static_sql,
static_params,
ParameterProfile.empty(),
sqlglot_sql=static_sql,
input_named_parameters=input_named_parameters,
applied_wrap_types=False,
)
return self._store_cached_result(cache_key, result)
def _select_execution_style(
self, original_styles: "set[ParameterStyle]", config: "ParameterStyleConfig"
) -> "ParameterStyle":
if len(original_styles) == 1 and config.supported_execution_parameter_styles is not None:
original_style = next(iter(original_styles))
if original_style in config.supported_execution_parameter_styles:
return original_style
return config.default_execution_parameter_style or config.default_parameter_style
def _wrap_parameter_types(self, parameters: "ParameterPayload") -> "ConvertedParameters":
# Fast type dispatch for common types
param_type = type(parameters)
if param_type is list:
source = cast("list[Any]", parameters)
wrapped_values: list[Any] | None = None
for idx, value in enumerate(source):
wrapped = wrap_with_type(value)
if wrapped_values is None:
if wrapped is value:
continue
wrapped_values = source[:idx]
wrapped_values.append(wrapped)
if wrapped_values is None:
return cast("ConvertedParameters", parameters)
return wrapped_values
if param_type is tuple:
tuple_source = cast("tuple[Any, ...]", parameters)
wrapped_values = [wrap_with_type(value) for value in tuple_source]
if all(wrapped is value for wrapped, value in zip(wrapped_values, tuple_source, strict=False)):
return cast("ConvertedParameters", parameters)
return wrapped_values
if param_type is dict:
source_mapping = cast("dict[str, Any]", parameters)
wrapped_mapping: dict[str, Any] | None = None
for key, value in source_mapping.items():
wrapped = wrap_with_type(value)
if wrapped_mapping is None:
if wrapped is value:
continue
wrapped_mapping = dict(source_mapping)
wrapped_mapping[key] = wrapped
if wrapped_mapping is None:
return cast("ConvertedParameters", parameters)
return wrapped_mapping
# Fallback to ABC checks for custom types
if isinstance(parameters, Sequence) and not isinstance(parameters, (str, bytes)):
wrapped_values = [wrap_with_type(value) for value in parameters]
if all(wrapped is value for wrapped, value in zip(wrapped_values, parameters, strict=False)):
return cast("ConvertedParameters", parameters)
return wrapped_values
if isinstance(parameters, Mapping):
fallback_mapping: dict[str, Any] | None = None
for key, value in parameters.items():
wrapped = wrap_with_type(value)
if fallback_mapping is None:
if wrapped is value:
continue
fallback_mapping = dict(parameters)
fallback_mapping[key] = wrapped
if fallback_mapping is None:
return cast("ConvertedParameters", parameters)
return fallback_mapping
return None
def _coerce_parameter_types(
self,
parameters: "ParameterPayload",
type_coercion_map: "dict[type, Callable[[Any], Any]]",
is_many: bool = False,
) -> "ConvertedParameters":
fallback_items = _type_coercion_fallbacks(type_coercion_map)
result = _coerce_parameters_payload(parameters, type_coercion_map, fallback_items, is_many)
# Fast type narrowing - _coerce_parameters_payload returns object but produces concrete types
if result is None:
return None
result_type = type(result)
if result_type is dict:
return cast("ConvertedParameters", result)
if result_type is list:
return cast("ConvertedParameters", result)
if result_type is tuple:
return cast("ConvertedParameters", result)
return None
def _store_cached_result(
self, cache_key: Any | None, result: "ParameterProcessingResult"
) -> "ParameterProcessingResult":
if self._cache_max_size <= 0 or cache_key is None:
return result
self._cache[cache_key] = result
self._cache.move_to_end(cache_key)
if len(self._cache) > self._cache_max_size:
self._cache.popitem(last=False)
return result
def _transform_cached_parameters(
self,
parameters: "ParameterPayload",
cached_profile: "ParameterProfile",
config: "ParameterStyleConfig",
*,
input_named_parameters: "tuple[str, ...]",
is_many: bool,
apply_wrap_types: bool,
) -> "ConvertedParameters":
"""Apply parameter transformations for a cache hit.
Uses cached metadata to efficiently transform parameters without re-parsing SQL.
This ensures new parameter values undergo the same transformations as the original
cached request (type wrapping, coercion, named-to-positional mapping).
Args:
parameters: New parameter payload to transform.
cached_profile: Cached ParameterProfile with execution parameter metadata.
config: Parameter style configuration.
input_named_parameters: Cached input named parameter order.
is_many: Whether this is execute_many.
apply_wrap_types: Whether to wrap parameters with type metadata.
Returns:
Transformed parameters matching the cached SQL's placeholder format.
"""
if parameters is None:
return None
processed = cast("ConvertedParameters", parameters)
if apply_wrap_types and processed:
processed = self._wrap_parameter_types(processed)
if config.type_coercion_map and processed:
processed = self._coerce_parameter_types(processed, config.type_coercion_map, is_many)
if processed:
positional_styles = {
ParameterStyle.QMARK.value,
ParameterStyle.NUMERIC.value,
ParameterStyle.POSITIONAL_COLON.value,
ParameterStyle.POSITIONAL_PYFORMAT.value,
}
named_styles = {
ParameterStyle.NAMED_COLON.value,
ParameterStyle.NAMED_AT.value,
ParameterStyle.NAMED_DOLLAR.value,
ParameterStyle.NAMED_PYFORMAT.value,
}
cached_styles = cached_profile.styles
if input_named_parameters and any(style in positional_styles for style in cached_styles):
processed = self._map_named_to_positional(
processed, input_named_parameters, is_many, strict=config.strict_named_parameters
)
elif any(style in named_styles for style in cached_styles):
processed = self._map_positional_to_named(processed, cached_profile, is_many)
return processed
def _map_positional_to_named(
self, parameters: "ConvertedParameters", cached_profile: "ParameterProfile", is_many: bool
) -> "ConvertedParameters":
"""Map a positional sequence to a dict keyed by cached placeholder names."""
cached_param_info = cached_profile.parameters
if not cached_param_info:
return parameters
if is_many and isinstance(parameters, (list, tuple)):
rows = cast("Sequence[Any]", parameters)
mapped_rows: list[Any] = []
for row in rows:
if isinstance(row, Mapping):
mapped_rows.append(row)
continue
if isinstance(row, (list, tuple)) and not isinstance(row, (str, bytes, bytearray)):
mapped_rows.append({
(param.name or f"param_{param.ordinal}"): row[idx]
for idx, param in enumerate(cached_param_info)
if idx < len(row)
})
continue
mapped_rows.append(row)
return mapped_rows
if isinstance(parameters, Mapping):
return parameters
if isinstance(parameters, (list, tuple)) and not isinstance(parameters, (str, bytes, bytearray)):
seq = cast("Sequence[Any]", parameters)
return {
(param.name or f"param_{param.ordinal}"): seq[idx]
for idx, param in enumerate(cached_param_info)
if idx < len(seq)
}
return parameters
def _map_named_to_positional(
self, parameters: "ConvertedParameters", named_order: "tuple[str, ...]", is_many: bool, strict: bool = False
) -> "ConvertedParameters":
"""Map named parameters (dict) to positional (tuple) using cached order.
Args:
parameters: Current parameters (dict or sequence).
named_order: Tuple of parameter names in placeholder order.
is_many: Whether this is execute_many.
strict: Whether to raise an error if required parameters are missing.
Returns:
Parameters converted to positional tuple if input was dict, else unchanged.
Raises:
SQLSpecError: If strict is True and required parameters are missing.
"""
if not named_order:
return parameters
param_type = type(parameters)
if is_many and (param_type is list or param_type is tuple):
parameter_rows = cast("Sequence[Any]", parameters)
updated_rows: list[Any] | None = None
for idx, row in enumerate(parameter_rows):
row_type = type(row)
if row_type is dict:
row_dict: dict[str, Any] = row
if strict:
missing = [name for name in named_order if name not in row_dict]
if missing:
from sqlspec.exceptions import SQLSpecError
msg = f"Missing required parameters: {missing}"
raise SQLSpecError(msg)
mapped_row: Any = tuple(row_dict.get(name) for name in named_order)
elif isinstance(row, Mapping):
# Fallback for custom Mapping types
if strict:
missing = [name for name in named_order if name not in row]
if missing:
from sqlspec.exceptions import SQLSpecError
msg = f"Missing required parameters: {missing}"
raise SQLSpecError(msg)
mapped_row = tuple(row.get(name) for name in named_order)
else:
mapped_row = row
if updated_rows is None:
if mapped_row is row:
continue
updated_rows = list(parameter_rows[:idx])
updated_rows.append(mapped_row)
if updated_rows is None:
return parameters
if param_type is tuple:
return tuple(updated_rows)
return updated_rows
if param_type is dict:
dict_parameters = cast("dict[str, Any]", parameters)
if strict:
missing = [name for name in named_order if name not in dict_parameters]
if missing:
from sqlspec.exceptions import SQLSpecError
msg = f"Missing required parameters: {missing}"
raise SQLSpecError(msg)
return tuple(dict_parameters.get(name) for name in named_order)
# Fallback for custom Mapping types
if isinstance(parameters, Mapping):
if strict:
missing = [name for name in named_order if name not in parameters]
if missing:
from sqlspec.exceptions import SQLSpecError
msg = f"Missing required parameters: {missing}"
raise SQLSpecError(msg)
return tuple(parameters.get(name) for name in named_order)
return parameters
def _needs_mapping_normalization(
self, payload: "ParameterPayload", param_info: "list[ParameterInfo]", is_many: bool
) -> bool:
if not payload or not param_info:
return False
has_named_placeholders = any(
param.style
in {
ParameterStyle.NAMED_COLON,
ParameterStyle.NAMED_AT,
ParameterStyle.NAMED_DOLLAR,
ParameterStyle.NAMED_PYFORMAT,
}
for param in param_info
)
if has_named_placeholders:
return False
looks_many = is_many or looks_like_execute_many(payload)
if not looks_many:
return False
# Fast type dispatch for common types
payload_type = type(payload)
if payload_type is dict:
return True
if payload_type is list or payload_type is tuple:
# Check if any item is a dict (fast path) or Mapping (fallback)
seq_payload = cast("Sequence[Any]", payload)
for item in seq_payload:
item_type = type(item)
if item_type is dict:
return True
if isinstance(item, Mapping):
return True
return False
# Fallback for custom types
if isinstance(payload, Mapping):
return True
if isinstance(payload, Sequence) and not isinstance(payload, (str, bytes, bytearray)):
return any(isinstance(item, Mapping) for item in payload)
return False
def _normalize_sql_for_parsing(
self, sql: str, param_info: "list[ParameterInfo]", config: "ParameterStyleConfig"
) -> str:
"""Normalize SQL for sqlglot parsing by converting unsupported parameter styles.
When a parameter style is not in config.supported_parameter_styles (what sqlglot
can parse for this dialect), convert it to config.default_parameter_style.
Args:
sql: SQL string with parameters.
param_info: List of detected parameter placeholders.
config: Parameter style configuration.
Returns:
SQL string with parameters converted to a sqlglot-compatible style.
"""
if not self._needs_parse_normalization(param_info, config):
return sql
# Convert to the default style that sqlglot can parse for this dialect
target_style = config.default_parameter_style
normalized_sql, _ = self._converter.convert_placeholder_style(
sql, None, target_style, is_many=False, param_info=param_info
)
return normalized_sql
def _make_processor_cache_key(
self,
sql: str,
parameters: "ParameterPayload",
config: "ParameterStyleConfig",
is_many: bool,
dialect: str | None,
wrap_types: bool,
normalize_for_parsing: bool,
*,
param_fingerprint: Any | None = None,
) -> tuple[Any, ...]:
if param_fingerprint is None:
# For static script compilation, we must include actual values in the fingerprint
# because the SQL will have values embedded directly (e.g., VALUES (1, 'foo'))
if config.needs_static_script_compilation:
param_fingerprint = value_fingerprint(parameters)
else:
# Use structural fingerprint (keys + types, not values) for better cache hit rates
param_fingerprint = structural_fingerprint(parameters, is_many)
dialect_marker = dialect or "default"
# Include both input and execution parameter styles to avoid cache collisions
# (e.g., MySQL asyncmy uses ? for input but %s for execution)
input_style = config.default_parameter_style.value if config.default_parameter_style else "unknown"
exec_style = (
config.default_execution_parameter_style.value if config.default_execution_parameter_style else input_style
)
return _make_cache_key_tuple(
sql, param_fingerprint, input_style, exec_style, dialect_marker, is_many, wrap_types, normalize_for_parsing
)
def process(
self,
sql: str,
parameters: "ParameterPayload",
config: "ParameterStyleConfig",
dialect: str | None = None,
is_many: bool = False,
wrap_types: bool = True,
param_fingerprint: Any | None = None,
) -> "ParameterProcessingResult":
return self._process_internal(
sql,
parameters,
config,
dialect=dialect,
is_many=is_many,
wrap_types=wrap_types,
normalize_for_parsing=True,
param_fingerprint=param_fingerprint,
)
[docs]
def process_for_execution(
self,
sql: str,
parameters: "ParameterPayload",
config: "ParameterStyleConfig",
dialect: str | None = None,
is_many: bool = False,
wrap_types: bool = True,
parsed_expression: Any = None,
param_fingerprint: Any | None = None,
) -> "ParameterProcessingResult":
"""Process parameters for execution without parse normalization.
Args:
sql: SQL string to process.
parameters: Parameter payload.
config: Parameter style configuration.
dialect: Optional SQL dialect.
is_many: Whether this is execute_many.
wrap_types: Whether to wrap parameters with type metadata.
parsed_expression: Pre-parsed SQLGlot expression to preserve through pipeline.
param_fingerprint: Pre-computed parameter fingerprint for cache key.
Returns:
ParameterProcessingResult with execution SQL and parameters.
"""
return self._process_internal(
sql,
parameters,
config,
dialect=dialect,
is_many=is_many,
wrap_types=wrap_types,
normalize_for_parsing=False,
parsed_expression=parsed_expression,
param_fingerprint=param_fingerprint,
)
def _process_internal(
self,
sql: str,
parameters: "ParameterPayload",
config: "ParameterStyleConfig",
*,
dialect: str | None,
is_many: bool,
wrap_types: bool,
normalize_for_parsing: bool,
parsed_expression: Any = None,
param_fingerprint: Any | None = None,
) -> "ParameterProcessingResult":
cache_key = None
if self._cache_max_size > 0:
cache_key = self._make_processor_cache_key(
sql,
parameters,
config,
is_many,
dialect,
wrap_types,
normalize_for_parsing,
param_fingerprint=param_fingerprint,
)
cached_result = self._cache.get(cache_key)
if cached_result is not None:
self._cache.move_to_end(cache_key)
self._cache_hits += 1
# For static script compilation, parameters are embedded directly in SQL.
# Cache key includes parameter values, so a hit means same SQL with same values.
# Return None for parameters since the driver shouldn't receive any.
if config.needs_static_script_compilation:
return ParameterProcessingResult(
cached_result.sql,
None,
cached_result.parameter_profile,
sqlglot_sql=cached_result.sqlglot_sql,
parsed_expression=cached_result.parsed_expression,
input_named_parameters=cached_result.input_named_parameters,
applied_wrap_types=cached_result.applied_wrap_types,
)
# Return cached SQL transformation with NEW parameters transformed
# to match the cached SQL's placeholder format
transformed_params = self._transform_cached_parameters(
parameters,
cached_result.parameter_profile,
config,
input_named_parameters=cached_result.input_named_parameters,
is_many=is_many,
apply_wrap_types=cached_result.applied_wrap_types,
)
# Apply output transformer if present (it may further transform params)
final_sql = cached_result.sql
if config.output_transformer:
final_sql, transformed_params = config.output_transformer(final_sql, transformed_params)
return ParameterProcessingResult(
final_sql,
transformed_params,
cached_result.parameter_profile,
sqlglot_sql=cached_result.sqlglot_sql,
parsed_expression=cached_result.parsed_expression,
input_named_parameters=cached_result.input_named_parameters,
applied_wrap_types=cached_result.applied_wrap_types,
)
self._cache_misses += 1
param_info = self._validator.extract_parameters(sql)
original_styles = {p.style for p in param_info} if param_info else set()
needs_execution_conversion = self._needs_execution_placeholder_conversion(param_info, config)
input_named_parameters = tuple(dict.fromkeys(p.name for p in param_info if p.name is not None))
if config.needs_static_script_compilation and param_info and parameters and not is_many:
return self._compile_static_script(
sql, parameters, config, is_many, cache_key, input_named_parameters=input_named_parameters
)
requires_mapping = self._needs_mapping_normalization(parameters, param_info, is_many)
if (
not needs_execution_conversion
and not config.type_coercion_map
and not config.output_transformer
and not requires_mapping
):
normalized_sql = self._normalize_sql_for_parsing(sql, param_info, config) if normalize_for_parsing else sql
result = ParameterProcessingResult(
sql,
parameters,
ParameterProfile(param_info),
sqlglot_sql=normalized_sql,
parsed_expression=parsed_expression,
input_named_parameters=input_named_parameters,
applied_wrap_types=False,
)
return self._store_cached_result(cache_key, result)
processed_sql, processed_parameters = sql, parameters
if requires_mapping:
target_style = self._select_execution_style(original_styles, config)
processed_sql, processed_parameters = self._converter.convert_placeholder_style(
processed_sql,
processed_parameters,
target_style,
is_many,
strict_named_parameters=config.strict_named_parameters,
param_info=param_info,
)
param_info = self._converter.convert_parameter_info_style(param_info, target_style)
original_styles = {target_style}
needs_execution_conversion = False
applied_wrap_types = False
if processed_parameters and wrap_types:
wrapped_parameters = self._wrap_parameter_types(processed_parameters)
if wrapped_parameters is not processed_parameters:
processed_parameters = wrapped_parameters
applied_wrap_types = True
if config.type_coercion_map and processed_parameters:
processed_parameters = self._coerce_parameter_types(processed_parameters, config.type_coercion_map, is_many)
processed_sql, processed_parameters, converted_param_info = self._convert_placeholders_for_execution(
processed_sql,
processed_parameters,
config,
param_info,
original_styles,
needs_execution_conversion,
is_many,
)
if config.output_transformer:
processed_sql, processed_parameters = config.output_transformer(processed_sql, processed_parameters)
final_param_info = converted_param_info if converted_param_info is not None else param_info
final_profile = ParameterProfile(final_param_info)
sqlglot_sql = (
self._normalize_sql_for_parsing(processed_sql, final_param_info, config)
if normalize_for_parsing
else processed_sql
)
result = ParameterProcessingResult(
processed_sql,
processed_parameters,
final_profile,
sqlglot_sql=sqlglot_sql,
parsed_expression=parsed_expression,
input_named_parameters=input_named_parameters,
applied_wrap_types=applied_wrap_types,
)
return self._store_cached_result(cache_key, result)
def _needs_execution_placeholder_conversion(
self, param_info: "list[ParameterInfo]", config: "ParameterStyleConfig"
) -> bool:
"""Determine whether execution placeholder conversion is required."""
if config.needs_static_script_compilation:
return True
if not param_info:
return False
current_styles = {param.style for param in param_info}
if (
config.allow_mixed_parameter_styles
and len(current_styles) > 1
and config.supported_execution_parameter_styles is not None
and len(config.supported_execution_parameter_styles) > 1
and all(style in config.supported_execution_parameter_styles for style in current_styles)
):
return False
if len(current_styles) > 1:
return True
if len(current_styles) == 1:
current_style = next(iter(current_styles))
supported_styles = config.supported_execution_parameter_styles
if supported_styles is None:
return True
return current_style not in supported_styles
return True
def _needs_parse_normalization(self, param_info: "list[ParameterInfo]", config: "ParameterStyleConfig") -> bool:
"""Check if SQL needs normalization before sqlglot parsing.
A style needs normalization if it's NOT in config.supported_parameter_styles,
which represents what sqlglot can parse for this driver's dialect.
Args:
param_info: List of detected parameter placeholders.
config: Parameter style configuration with supported_parameter_styles.
Returns:
True if any parameter style is not supported by sqlglot for this dialect.
"""
supported = config.supported_parameter_styles
return any(p.style not in supported for p in param_info)
def _convert_placeholders_for_execution(
self,
sql: str,
parameters: "ParameterPayload",
config: "ParameterStyleConfig",
param_info: "list[ParameterInfo]",
original_styles: "set[ParameterStyle]",
needs_execution_conversion: bool,
is_many: bool,
) -> "tuple[str, ConvertedParameters, list[ParameterInfo] | None]":
if not needs_execution_conversion:
# Convert parameters to concrete type for return
if parameters is None:
return sql, None, None
if isinstance(parameters, dict):
return sql, parameters, None
if isinstance(parameters, list):
return sql, parameters, None
if isinstance(parameters, tuple):
return sql, parameters, None
if isinstance(parameters, Mapping):
return sql, dict(parameters), None
if isinstance(parameters, Sequence) and not isinstance(parameters, (str, bytes)):
return sql, list(parameters), None
return sql, None, None
target_style = self._select_execution_style(original_styles, config)
converted_param_info = self._converter.convert_parameter_info_style(param_info, target_style)
if is_many and config.preserve_original_params_for_many and isinstance(parameters, (list, tuple)):
processed_sql, _ = self._converter.convert_placeholder_style(
sql,
parameters,
target_style,
is_many,
strict_named_parameters=config.strict_named_parameters,
param_info=param_info,
)
return processed_sql, parameters, converted_param_info
processed_sql, processed_parameters = self._converter.convert_placeholder_style(
sql,
parameters,
target_style,
is_many,
strict_named_parameters=config.strict_named_parameters,
param_info=param_info,
)
return processed_sql, processed_parameters, converted_param_info