"""SQL result classes for query execution results.
This module provides result classes for handling SQL query execution results
including regular results and Apache Arrow format results.
Classes:
StatementResult: Abstract base class for SQL results.
SQLResult: Standard implementation for regular results.
ArrowResult: Apache Arrow format results for data interchange.
"""
from abc import ABC, abstractmethod
from collections.abc import Iterable, Iterator
from typing import TYPE_CHECKING, Any, Optional, cast, overload
from mypy_extensions import mypyc_attr
from typing_extensions import TypeVar
from sqlspec.core.compiler import OperationType
from sqlspec.core.statement import SQL
from sqlspec.storage import (
AsyncStoragePipeline,
StorageDestination,
StorageFormat,
StorageTelemetry,
SyncStoragePipeline,
)
from sqlspec.utils.module_loader import ensure_pandas, ensure_polars, ensure_pyarrow
from sqlspec.utils.schema import to_schema
if TYPE_CHECKING:
from sqlspec.typing import ArrowTable, PandasDataFrame, PolarsDataFrame, SchemaT
__all__ = ("ArrowResult", "EmptyResult", "SQLResult", "StackResult", "StatementResult")
T = TypeVar("T")
@mypyc_attr(allow_interpreted_subclasses=False)
class StatementResult(ABC, Iterable[Any]):
"""Abstract base class for SQL statement execution results.
Provides a common interface for handling different types of SQL operation
results. Subclasses implement specific behavior for SELECT, INSERT, UPDATE,
DELETE, and script operations.
Attributes:
statement: The original SQL statement that was executed.
data: The result data from the operation.
rows_affected: Number of rows affected by the operation.
last_inserted_id: Last inserted ID from INSERT operations.
execution_time: Time taken to execute the statement in seconds.
metadata: Additional metadata about the operation.
"""
__slots__ = ("data", "execution_time", "last_inserted_id", "metadata", "rows_affected", "statement")
def __init__(
self,
statement: "SQL",
data: Any = None,
rows_affected: int = 0,
last_inserted_id: int | str | None = None,
execution_time: float | None = None,
metadata: Optional["dict[str, Any]"] = None,
) -> None:
"""Initialize statement result.
Args:
statement: The original SQL statement that was executed.
data: The result data from the operation.
rows_affected: Number of rows affected by the operation.
last_inserted_id: Last inserted ID from the operation.
execution_time: Time taken to execute the statement in seconds.
metadata: Additional metadata about the operation.
"""
self.statement = statement
self.data = data
self.rows_affected = rows_affected
self.last_inserted_id = last_inserted_id
self.execution_time = execution_time
self.metadata = metadata if metadata is not None else {}
@abstractmethod
def __iter__(self) -> Iterator[Any]:
"""Iterate over result rows."""
@abstractmethod
def is_success(self) -> bool:
"""Check if the operation was successful.
Returns:
True if the operation completed successfully, False otherwise.
"""
@abstractmethod
def get_data(self) -> "Any":
"""Get the processed data from the result.
Returns:
The processed result data in an appropriate format.
"""
def get_metadata(self, key: str, default: Any = None) -> Any:
"""Get metadata value by key.
Args:
key: The metadata key to retrieve.
default: Default value if key is not found.
Returns:
The metadata value or default.
"""
return self.metadata.get(key, default)
def set_metadata(self, key: str, value: Any) -> None:
"""Set metadata value by key.
Args:
key: The metadata key to set.
value: The value to set.
"""
self.metadata[key] = value
@property
def operation_type(self) -> OperationType:
"""Get operation type from the statement.
Returns:
The type of SQL operation that produced this result.
"""
return self.statement.operation_type
[docs]
@mypyc_attr(allow_interpreted_subclasses=False)
class SQLResult(StatementResult):
"""Result class for SQL operations that return rows or affect rows.
Handles SELECT, INSERT, UPDATE, DELETE operations. For DML operations with
RETURNING clauses, the returned data is stored in the data attribute.
The operation_type attribute indicates the nature of the operation.
For script execution, tracks multiple statement results and errors.
"""
__slots__ = (
"_operation_type",
"column_names",
"error",
"errors",
"has_more",
"inserted_ids",
"operation_index",
"parameters",
"statement_results",
"successful_statements",
"total_count",
"total_statements",
)
_operation_type: OperationType
[docs]
def __init__(
self,
statement: "SQL",
data: list[dict[str, Any]] | None = None,
rows_affected: int = 0,
last_inserted_id: int | str | None = None,
execution_time: float | None = None,
metadata: Optional["dict[str, Any]"] = None,
error: Exception | None = None,
operation_type: OperationType = "SELECT",
operation_index: int | None = None,
parameters: Any | None = None,
column_names: Optional["list[str]"] = None,
total_count: int | None = None,
has_more: bool = False,
inserted_ids: Optional["list[int | str]"] = None,
statement_results: Optional["list[SQLResult]"] = None,
errors: Optional["list[str]"] = None,
total_statements: int = 0,
successful_statements: int = 0,
) -> None:
"""Initialize SQL result.
Args:
statement: The original SQL statement that was executed.
data: The result data from the operation.
rows_affected: Number of rows affected by the operation.
last_inserted_id: Last inserted ID from the operation.
execution_time: Time taken to execute the statement in seconds.
metadata: Additional metadata about the operation.
error: Exception that occurred during execution.
operation_type: Type of SQL operation performed.
operation_index: Index of operation in a script.
parameters: Parameters used for the query.
column_names: Names of columns in the result set.
total_count: Total number of rows in the complete result set.
has_more: Whether there are additional result pages available.
inserted_ids: List of IDs from INSERT operations.
statement_results: Results from individual statements in a script.
errors: List of error messages for script execution.
total_statements: Total number of statements in a script.
successful_statements: Count of successful statements in a script.
"""
super().__init__(
statement=statement,
data=data,
rows_affected=rows_affected,
last_inserted_id=last_inserted_id,
execution_time=execution_time,
metadata=metadata,
)
self.error = error
self._operation_type = operation_type
self.operation_index = operation_index
self.parameters = parameters
self.column_names = column_names or []
self.total_count = total_count
self.has_more = has_more
self.inserted_ids = inserted_ids or []
self.statement_results = statement_results or []
self.errors = errors or []
self.total_statements = total_statements
self.successful_statements = successful_statements
if not self.column_names and data and len(data) > 0:
self.column_names = list(data[0].keys())
if self.total_count is None:
self.total_count = len(data) if data is not None else 0
@property
def operation_type(self) -> OperationType:
"""Get operation type for this result.
Returns:
The type of SQL operation that produced this result.
"""
return self._operation_type
[docs]
def is_success(self) -> bool:
"""Check if the operation was successful.
Returns:
True if operation was successful, False otherwise.
"""
op_type = self.operation_type.upper()
if op_type == "SCRIPT" or self.statement_results:
return not self.errors and self.total_statements == self.successful_statements
if op_type == "SELECT":
return self.data is not None and self.rows_affected >= 0
if op_type in {"INSERT", "UPDATE", "DELETE", "EXECUTE"}:
return self.rows_affected >= 0
return False
@overload
def get_data(self, *, schema_type: "type[SchemaT]") -> "list[SchemaT]": ...
@overload
def get_data(self, *, schema_type: None = None) -> "list[dict[str, Any]]": ...
[docs]
def get_data(self, *, schema_type: "type[SchemaT] | None" = None) -> "list[SchemaT] | list[dict[str, Any]]":
"""Get the data from the result.
For regular operations, returns the list of rows.
For script operations, returns a summary dictionary containing
execution statistics and results.
Args:
schema_type: Optional schema type to transform the data into.
Supports Pydantic models, dataclasses, msgspec structs, attrs classes, and TypedDict.
Returns:
List of result rows (optionally transformed to schema_type) or script summary.
"""
op_type_upper = self.operation_type.upper()
if op_type_upper == "SCRIPT":
failed_statements = self.total_statements - self.successful_statements
return [
{
"total_statements": self.total_statements,
"successful_statements": self.successful_statements,
"failed_statements": failed_statements,
"errors": self.errors,
"statement_results": self.statement_results,
"total_rows_affected": self.get_total_rows_affected(),
}
]
data = cast("list[dict[str, Any]]", self.data or [])
if schema_type:
return cast("list[SchemaT]", to_schema(data, schema_type=schema_type))
return data
[docs]
def add_statement_result(self, result: "SQLResult") -> None:
"""Add a statement result to the script execution results.
Args:
result: Statement result to add.
"""
self.statement_results.append(result)
self.total_statements += 1
if result.is_success():
self.successful_statements += 1
[docs]
def get_total_rows_affected(self) -> int:
"""Get the total number of rows affected across all statements.
Returns:
Total rows affected.
"""
if self.statement_results:
total = 0
for stmt in self.statement_results:
if stmt.rows_affected and stmt.rows_affected > 0:
total += stmt.rows_affected
return total
return self.rows_affected if self.rows_affected and self.rows_affected > 0 else 0
@property
def num_rows(self) -> int:
"""Get the number of rows affected (alias for get_total_rows_affected).
Returns:
Total rows affected.
"""
return self.get_total_rows_affected()
@property
def num_columns(self) -> int:
"""Get the number of columns in the result data.
Returns:
Number of columns.
"""
return len(self.column_names) if self.column_names else 0
@overload
def get_first(self, *, schema_type: "type[SchemaT]") -> "SchemaT | None": ...
@overload
def get_first(self, *, schema_type: None = None) -> "dict[str, Any] | None": ...
[docs]
def get_first(self, *, schema_type: "type[SchemaT] | None" = None) -> "SchemaT | dict[str, Any] | None":
"""Get the first row from the result, if any.
Args:
schema_type: Optional schema type to transform the data into.
Supports Pydantic models, dataclasses, msgspec structs, attrs classes, and TypedDict.
Returns:
First row (optionally transformed to schema_type) or None if no data.
"""
if not self.data:
return None
row = cast("dict[str, Any]", self.data[0])
if schema_type:
return to_schema(row, schema_type=schema_type)
return row
[docs]
def get_count(self) -> int:
"""Get the number of rows in the current result set (e.g., a page of data).
Returns:
Number of rows in current result set.
"""
return len(self.data) if self.data is not None else 0
[docs]
def is_empty(self) -> bool:
"""Check if the result set (self.data) is empty.
Returns:
True if result set is empty.
"""
return not self.data if self.data is not None else True
[docs]
def get_affected_count(self) -> int:
"""Get the number of rows affected by a DML operation.
Returns:
Number of affected rows.
"""
return self.rows_affected or 0
[docs]
def was_inserted(self) -> bool:
"""Check if this was an INSERT operation.
Returns:
True if INSERT operation.
"""
return self.operation_type.upper() == "INSERT"
[docs]
def was_updated(self) -> bool:
"""Check if this was an UPDATE operation.
Returns:
True if UPDATE operation.
"""
return self.operation_type.upper() == "UPDATE"
[docs]
def was_deleted(self) -> bool:
"""Check if this was a DELETE operation.
Returns:
True if DELETE operation.
"""
return self.operation_type.upper() == "DELETE"
[docs]
def __len__(self) -> int:
"""Get the number of rows in the result set.
Returns:
Number of rows in the data.
"""
return len(self.data) if self.data is not None else 0
[docs]
def __getitem__(self, index: int) -> "dict[str, Any]":
"""Get a row by index.
Args:
index: Row index
Returns:
The row at the specified index
"""
if self.data is None:
msg = "No data available"
raise IndexError(msg)
return cast("dict[str, Any]", self.data[index])
[docs]
def __iter__(self) -> "Iterator[dict[str, Any]]":
"""Iterate over the rows in the result.
Returns:
Iterator that yields each row as a dictionary
"""
return iter(self.data or [])
@overload
def all(self, *, schema_type: "type[SchemaT]") -> "list[SchemaT]": ...
@overload
def all(self, *, schema_type: None = None) -> list[dict[str, Any]]: ...
[docs]
def all(self, *, schema_type: "type[SchemaT] | None" = None) -> "list[SchemaT] | list[dict[str, Any]]":
"""Return all rows as a list.
Args:
schema_type: Optional schema type to transform the data into.
Supports Pydantic models, dataclasses, msgspec structs, attrs classes, and TypedDict.
Returns:
List of all rows (optionally transformed to schema_type)
"""
data = cast("list[dict[str, Any]]", self.data or [])
if schema_type:
return cast("list[SchemaT]", to_schema(data, schema_type=schema_type))
return data
@overload
def one(self, *, schema_type: "type[SchemaT]") -> "SchemaT": ...
@overload
def one(self, *, schema_type: None = None) -> "dict[str, Any]": ...
[docs]
def one(self, *, schema_type: "type[SchemaT] | None" = None) -> "SchemaT | dict[str, Any]":
"""Return exactly one row.
Args:
schema_type: Optional schema type to transform the data into.
Supports Pydantic models, dataclasses, msgspec structs, attrs classes, and TypedDict.
Returns:
The single row (optionally transformed to schema_type)
Raises:
ValueError: If no results or more than one result
"""
if not self.data:
msg = "No result found, exactly one row expected"
raise ValueError(msg)
data_len = len(self.data)
if data_len == 0:
msg = "No result found, exactly one row expected"
raise ValueError(msg)
if data_len > 1:
msg = f"Multiple results found ({data_len}), exactly one row expected"
raise ValueError(msg)
row = cast("dict[str, Any]", self.data[0])
if schema_type:
return to_schema(row, schema_type=schema_type)
return row
@overload
def one_or_none(self, *, schema_type: "type[SchemaT]") -> "SchemaT | None": ...
@overload
def one_or_none(self, *, schema_type: None = None) -> "dict[str, Any] | None": ...
[docs]
def one_or_none(self, *, schema_type: "type[SchemaT] | None" = None) -> "SchemaT | dict[str, Any] | None":
"""Return at most one row.
Args:
schema_type: Optional schema type to transform the data into.
Supports Pydantic models, dataclasses, msgspec structs, attrs classes, and TypedDict.
Returns:
The single row (optionally transformed to schema_type) or None if no results
Raises:
ValueError: If more than one result
"""
if not self.data:
return None
data_len = len(self.data)
if data_len == 0:
return None
if data_len > 1:
msg = f"Multiple results found ({data_len}), at most one row expected"
raise ValueError(msg)
row = cast("dict[str, Any]", self.data[0])
if schema_type:
return to_schema(row, schema_type=schema_type)
return row
[docs]
def scalar(self) -> Any:
"""Return the first column of the first row.
Returns:
The scalar value from first column of first row
"""
row = self.one()
return next(iter(row.values()))
[docs]
def scalar_or_none(self) -> Any:
"""Return the first column of the first row, or None if no results.
Returns:
The scalar value from first column of first row, or None
"""
row = self.one_or_none()
if row is None:
return None
return next(iter(row.values()))
[docs]
def write_to_storage_sync(
self,
destination: "StorageDestination",
*,
format_hint: "StorageFormat | None" = None,
storage_options: "dict[str, Any] | None" = None,
pipeline: "SyncStoragePipeline | None" = None,
) -> "StorageTelemetry":
active_pipeline = pipeline or SyncStoragePipeline()
rows = self.get_data()
return active_pipeline.write_rows(rows, destination, format_hint=format_hint, storage_options=storage_options)
[docs]
async def write_to_storage_async(
self,
destination: "StorageDestination",
*,
format_hint: "StorageFormat | None" = None,
storage_options: "dict[str, Any] | None" = None,
pipeline: "AsyncStoragePipeline | None" = None,
) -> "StorageTelemetry":
active_pipeline = pipeline or AsyncStoragePipeline()
rows = self.get_data()
return await active_pipeline.write_rows(
rows, destination, format_hint=format_hint, storage_options=storage_options
)
@mypyc_attr(allow_interpreted_subclasses=False)
class ArrowResult(StatementResult):
"""Result class for SQL operations that return Apache Arrow data.
Used when database drivers support returning results in Apache Arrow
format for data interchange. Suitable for analytics workloads and
data science applications.
Attributes:
schema: Arrow schema information for the result data.
"""
__slots__ = ("schema",)
def __init__(
self,
statement: "SQL",
data: Any,
rows_affected: int = 0,
last_inserted_id: int | str | None = None,
execution_time: float | None = None,
metadata: Optional["dict[str, Any]"] = None,
schema: Optional["dict[str, Any]"] = None,
) -> None:
"""Initialize Arrow result.
Args:
statement: The original SQL statement that was executed.
data: The Apache Arrow Table containing the result data.
rows_affected: Number of rows affected by the operation.
last_inserted_id: Last inserted ID (if applicable).
execution_time: Time taken to execute the statement in seconds.
metadata: Additional metadata about the operation.
schema: Optional Arrow schema information.
"""
super().__init__(
statement=statement,
data=data,
rows_affected=rows_affected,
last_inserted_id=last_inserted_id,
execution_time=execution_time,
metadata=metadata,
)
self.schema = schema
def is_success(self) -> bool:
"""Check if the operation was successful.
Returns:
True if Arrow table data is available, False otherwise.
"""
return self.data is not None
def get_data(self) -> "ArrowTable":
"""Get the Apache Arrow Table from the result.
Returns:
The Arrow table containing the result data.
Raises:
ValueError: If no Arrow table is available.
TypeError: If data is not an Arrow Table.
"""
if self.data is None:
msg = "No Arrow table available for this result"
raise ValueError(msg)
ensure_pyarrow()
import pyarrow as pa
if not isinstance(self.data, pa.Table):
msg = f"Expected an Arrow Table, but got {type(self.data).__name__}"
raise TypeError(msg)
return self.data
@property
def column_names(self) -> "list[str]":
"""Get the column names from the Arrow table.
Returns:
List of column names.
Raises:
ValueError: If no Arrow table is available.
"""
if self.data is None:
msg = "No Arrow table available"
raise ValueError(msg)
return cast("list[str]", self.data.column_names)
@property
def num_rows(self) -> int:
"""Get the number of rows in the Arrow table.
Returns:
Number of rows.
Raises:
ValueError: If no Arrow table is available.
"""
if self.data is None:
msg = "No Arrow table available"
raise ValueError(msg)
return cast("int", self.data.num_rows)
@property
def num_columns(self) -> int:
"""Get the number of columns in the Arrow table.
Returns:
Number of columns.
Raises:
ValueError: If no Arrow table is available.
"""
if self.data is None:
msg = "No Arrow table available"
raise ValueError(msg)
return cast("int", self.data.num_columns)
def to_pandas(self) -> "PandasDataFrame":
"""Convert Arrow data to pandas DataFrame.
Returns:
pandas DataFrame containing the result data.
Raises:
ValueError: If no Arrow table is available.
Examples:
>>> result = session.select_to_arrow("SELECT * FROM users")
>>> df = result.to_pandas()
>>> print(df.head())
"""
if self.data is None:
msg = "No Arrow table available"
raise ValueError(msg)
ensure_pandas()
import pandas as pd
result = self.data.to_pandas()
if not isinstance(result, pd.DataFrame):
msg = f"Expected a pandas DataFrame, but got {type(result).__name__}"
raise TypeError(msg)
return result
def to_polars(self) -> "PolarsDataFrame":
"""Convert Arrow data to Polars DataFrame.
Returns:
Polars DataFrame containing the result data.
Raises:
ValueError: If no Arrow table is available.
Examples:
>>> result = session.select_to_arrow("SELECT * FROM users")
>>> df = result.to_polars()
>>> print(df.head())
"""
if self.data is None:
msg = "No Arrow table available"
raise ValueError(msg)
ensure_polars()
import polars as pl
result = pl.from_arrow(self.data)
if not isinstance(result, pl.DataFrame):
msg = f"Expected a Polars DataFrame, but got {type(result).__name__}"
raise TypeError(msg)
return result
def to_dict(self) -> "list[dict[str, Any]]":
"""Convert Arrow data to list of dictionaries.
Returns:
List of dictionaries, one per row.
Raises:
ValueError: If no Arrow table is available.
Examples:
>>> result = session.select_to_arrow(
... "SELECT id, name FROM users"
... )
>>> rows = result.to_dict()
>>> print(rows[0])
{'id': 1, 'name': 'Alice'}
"""
if self.data is None:
msg = "No Arrow table available"
raise ValueError(msg)
return cast("list[dict[str, Any]]", self.data.to_pylist())
def write_to_storage_sync(
self,
destination: "StorageDestination",
*,
format_hint: "StorageFormat | None" = None,
storage_options: "dict[str, Any] | None" = None,
compression: str | None = None,
pipeline: "SyncStoragePipeline | None" = None,
) -> "StorageTelemetry":
table = self.get_data()
active_pipeline = pipeline or SyncStoragePipeline()
return active_pipeline.write_arrow(
table, destination, format_hint=format_hint, storage_options=storage_options, compression=compression
)
async def write_to_storage_async(
self,
destination: "StorageDestination",
*,
format_hint: "StorageFormat | None" = None,
storage_options: "dict[str, Any] | None" = None,
compression: str | None = None,
pipeline: "AsyncStoragePipeline | None" = None,
) -> "StorageTelemetry":
table = self.get_data()
active_pipeline = pipeline or AsyncStoragePipeline()
return await active_pipeline.write_arrow(
table, destination, format_hint=format_hint, storage_options=storage_options, compression=compression
)
def __len__(self) -> int:
"""Return number of rows in the Arrow table.
Returns:
Number of rows.
Raises:
ValueError: If no Arrow table is available.
Examples:
>>> result = session.select_to_arrow("SELECT * FROM users")
>>> print(len(result))
100
"""
if self.data is None:
msg = "No Arrow table available"
raise ValueError(msg)
return cast("int", self.data.num_rows)
def __iter__(self) -> "Iterator[dict[str, Any]]":
"""Iterate over rows as dictionaries.
Yields:
Dictionary for each row.
Raises:
ValueError: If no Arrow table is available.
Examples:
>>> result = session.select_to_arrow(
... "SELECT id, name FROM users"
... )
>>> for row in result:
... print(row["name"])
"""
if self.data is None:
msg = "No Arrow table available"
raise ValueError(msg)
yield from self.data.to_pylist()
class EmptyResult(StatementResult):
"""Sentinel result used when a stack operation has no driver result."""
__slots__ = ()
_EMPTY_STATEMENT = SQL("-- empty stack result --")
def __init__(self) -> None:
super().__init__(statement=self._EMPTY_STATEMENT, data=[], rows_affected=0)
def __iter__(self) -> Iterator[Any]:
return iter(())
def is_success(self) -> bool:
return True
def get_data(self) -> list[Any]:
return []
[docs]
class StackResult:
"""Wrapper for per-operation stack results that surfaces driver results directly."""
__slots__ = ("error", "metadata", "result", "rows_affected", "warning")
[docs]
def __init__(
self,
result: "StatementResult | ArrowResult | None" = None,
*,
rows_affected: int | None = None,
error: Exception | None = None,
warning: Any | None = None,
metadata: "dict[str, Any] | None" = None,
) -> None:
self.result: StatementResult | ArrowResult = result if result is not None else EmptyResult()
self.rows_affected = rows_affected if rows_affected is not None else _infer_rows_affected(self.result)
self.error = error
self.warning = warning
self.metadata = dict(metadata) if metadata else None
[docs]
def get_result(self) -> "StatementResult | ArrowResult":
"""Return the underlying driver result."""
return self.result
@property
def result_type(self) -> str:
"""Describe the underlying result type (SQL operation, Arrow, or custom)."""
if isinstance(self.result, ArrowResult):
return "ARROW"
if isinstance(self.result, SQLResult):
return self.result.operation_type.upper()
return type(self.result).__name__.upper()
[docs]
def is_sql_result(self) -> bool:
"""Return True when the underlying result is an SQLResult."""
return isinstance(self.result, StatementResult) and not isinstance(self.result, ArrowResult)
[docs]
def is_arrow_result(self) -> bool:
"""Return True when the underlying result is an ArrowResult."""
return isinstance(self.result, ArrowResult)
[docs]
def is_error(self) -> bool:
"""Return True when the stack operation captured an error."""
return self.error is not None
[docs]
def with_error(self, error: Exception) -> "StackResult":
"""Return a copy of the result that records the provided error."""
return StackResult(
result=self.result,
rows_affected=self.rows_affected,
warning=self.warning,
metadata=self.metadata,
error=error,
)
[docs]
@classmethod
def from_sql_result(cls, result: "SQLResult") -> "StackResult":
"""Convert a standard SQLResult into a stack-friendly representation."""
metadata = dict(result.metadata) if result.metadata else None
warning = metadata.get("warning") if metadata else None
return cls(result=result, rows_affected=result.rows_affected, warning=warning, metadata=metadata)
[docs]
@classmethod
def from_arrow_result(cls, result: "ArrowResult") -> "StackResult":
"""Create a stack result from an ArrowResult instance."""
metadata = dict(result.metadata) if result.metadata else None
return cls(result=result, rows_affected=result.rows_affected, metadata=metadata)
[docs]
@classmethod
def from_error(cls, error: Exception) -> "StackResult":
"""Create an error-only stack result."""
return cls(result=EmptyResult(), rows_affected=0, error=error)
def _infer_rows_affected(result: "StatementResult | ArrowResult") -> int:
rowcount = getattr(result, "rows_affected", None)
return int(rowcount) if isinstance(rowcount, int) else 0
def create_sql_result(
statement: "SQL",
data: list[dict[str, Any]] | None = None,
rows_affected: int = 0,
last_inserted_id: int | str | None = None,
execution_time: float | None = None,
metadata: Optional["dict[str, Any]"] = None,
**kwargs: Any,
) -> SQLResult:
"""Create SQLResult instance.
Args:
statement: The SQL statement that produced this result.
data: Result data from query execution.
rows_affected: Number of rows affected by the operation.
last_inserted_id: Last inserted ID (for INSERT operations).
execution_time: Execution time in seconds.
metadata: Additional metadata about the result.
**kwargs: Additional arguments for SQLResult initialization.
Returns:
SQLResult instance.
"""
return SQLResult(
statement=statement,
data=data,
rows_affected=rows_affected,
last_inserted_id=last_inserted_id,
execution_time=execution_time,
metadata=metadata,
**kwargs,
)
def create_arrow_result(
statement: "SQL",
data: Any,
rows_affected: int = 0,
last_inserted_id: int | str | None = None,
execution_time: float | None = None,
metadata: Optional["dict[str, Any]"] = None,
schema: Optional["dict[str, Any]"] = None,
) -> ArrowResult:
"""Create ArrowResult instance.
Args:
statement: The SQL statement that produced this result.
data: Arrow-based result data.
rows_affected: Number of rows affected by the operation.
last_inserted_id: Last inserted ID (for INSERT operations).
execution_time: Execution time in seconds.
metadata: Additional metadata about the result.
schema: Optional Arrow schema information.
Returns:
ArrowResult instance.
"""
return ArrowResult(
statement=statement,
data=data,
rows_affected=rows_affected,
last_inserted_id=last_inserted_id,
execution_time=execution_time,
metadata=metadata,
schema=schema,
)