aiomysql

Async MySQL driver (PyMySQL-compatible wire protocol, asyncio-native).

Configuration

class sqlspec.adapters.aiomysql.AiomysqlConfig[source]

Bases: AsyncDatabaseConfig[Connection, AiomysqlPool, AiomysqlDriver]

Configuration for aiomysql database connections.

Example:

config = AiomysqlConfig(
    connection_config=AiomysqlPoolParams(
        host="localhost", user="root", db="mydb"
    )
)
driver_type

alias of AiomysqlDriver

__init__(*, connection_config=None, connection_instance=None, migration_config=None, statement_config=None, driver_features=None, bind_key=None, extension_config=None, observability_config=None, **kwargs)[source]

Initialize aiomysql configuration.

Parameters:
  • connection_config -- Connection and pool configuration parameters

  • connection_instance -- Existing pool instance to use

  • migration_config -- Migration configuration

  • statement_config -- Statement configuration override

  • driver_features -- Driver feature configuration (TypedDict or dict)

  • bind_key -- Optional unique identifier for this configuration

  • extension_config -- Extension-specific configuration (e.g., Litestar plugin settings)

  • observability_config -- Adapter-level observability overrides for lifecycle hooks and observers

  • **kwargs -- Additional keyword arguments

async close_pool()[source]

Close the connection pool.

Return type:

None

async create_connection()[source]

Create a single async connection (not from pool).

Return type:

Connection

Returns:

An aiomysql connection instance.

async provide_pool(*args, **kwargs)[source]

Provide async pool instance.

Return type:

Pool

Returns:

The async connection pool.

get_signature_namespace()[source]

Get the signature namespace for aiomysql types.

Return type:

dict[str, typing.Any]

Returns:

Dictionary mapping type names to types.

get_event_runtime_hints()[source]

Return queue polling defaults for aiomysql adapters.

Return type:

EventRuntimeHints

Driver

class sqlspec.adapters.aiomysql.AiomysqlDriver[source]

Bases: AsyncDriverAdapterBase

MySQL/MariaDB database driver using aiomysql client library.

Implements asynchronous database operations for MySQL and MariaDB servers with support for parameter style conversion, type coercion, error handling, and transaction management.

__init__(connection, statement_config=None, driver_features=None)[source]

Initialize driver adapter with connection and configuration.

Parameters:
  • connection (Connection) -- Database connection instance

  • statement_config (StatementConfig | None) -- Statement configuration for the driver

  • driver_features (dict[str, typing.Any] | None) -- Driver-specific features like extensions, secrets, and connection callbacks

  • observability -- Optional runtime handling lifecycle hooks, observers, and spans

async dispatch_execute(cursor, statement)[source]

Execute single SQL statement.

Parameters:
  • cursor (Any) -- aiomysql cursor object

  • statement (SQL) -- SQL statement to execute

Returns:

Statement execution results with data or row counts

Return type:

ExecutionResult

async dispatch_execute_many(cursor, statement)[source]

Execute SQL statement with multiple parameter sets.

Parameters:
  • cursor (Any) -- aiomysql cursor object

  • statement (SQL) -- SQL statement with multiple parameter sets

Returns:

Batch execution results

Return type:

ExecutionResult

Raises:

ValueError -- If no parameters provided for executemany operation

async dispatch_execute_script(cursor, statement)[source]

Execute SQL script with statement splitting and parameter handling.

Parameters:
  • cursor (Any) -- aiomysql cursor object

  • statement (SQL) -- SQL script to execute

Returns:

Script execution results with statement count

Return type:

ExecutionResult

async begin()[source]

Begin a database transaction.

Raises:

SQLSpecError -- If transaction initialization fails

Return type:

None

async commit()[source]

Commit the current transaction.

Raises:

SQLSpecError -- If transaction commit fails

Return type:

None

async rollback()[source]

Rollback the current transaction.

Raises:

SQLSpecError -- If transaction rollback fails

Return type:

None

with_cursor(connection)[source]

Create cursor context manager for the connection.

Parameters:

connection (Connection) -- aiomysql database connection

Returns:

Context manager for cursor operations

Return type:

AiomysqlCursor

handle_database_exceptions()[source]

Provide exception handling context manager.

Returns:

Context manager for aiomysql exception handling

Return type:

AiomysqlExceptionHandler

async select_to_storage(statement, destination, /, *parameters, statement_config=None, partitioner=None, format_hint=None, telemetry=None, **kwargs)[source]

Execute a query and stream Arrow-formatted results into storage.

Return type:

StorageBridgeJob

async load_from_arrow(table, source, *, partitioner=None, overwrite=False, telemetry=None)[source]

Load Arrow data into MySQL using batched inserts.

Return type:

StorageBridgeJob

async load_from_storage(table, source, *, file_format, partitioner=None, overwrite=False)[source]

Load staged artifacts from storage into MySQL.

Return type:

StorageBridgeJob

property data_dictionary: AiomysqlDataDictionary

Get the data dictionary for this driver.

Returns:

Data dictionary instance for metadata queries

collect_rows(cursor, fetched)[source]

Collect aiomysql rows for the direct execution path.

Return type:

tuple[list[typing.Any], list[str], int]

resolve_rowcount(cursor)[source]

Resolve rowcount from aiomysql cursor for the direct execution path.

Return type:

int

Connection Parameters

class sqlspec.adapters.aiomysql.AiomysqlConnectionParams[source]

Bases: TypedDict

aiomysql connection parameters.

Pool Parameters

class sqlspec.adapters.aiomysql.AiomysqlPoolParams[source]

Bases: AiomysqlConnectionParams

aiomysql pool parameters.

Driver Features

class sqlspec.adapters.aiomysql.AiomysqlDriverFeatures[source]

Bases: TypedDict

aiomysql driver feature flags.

MySQL/MariaDB handle JSON natively, but custom serializers can be provided for specialized use cases (e.g., orjson for performance, msgspec for type safety).

json_serializer: Custom JSON serializer function.

Defaults to sqlspec.utils.serializers.to_json. Use for performance (orjson) or custom encoding.

json_deserializer: Custom JSON deserializer function.

Defaults to sqlspec.utils.serializers.from_json. Use for performance (orjson) or custom decoding.

on_connection_create: Async callback executed when a connection is acquired from pool.

Receives the raw aiomysql connection for low-level driver configuration. Called exactly once per physical connection using WeakSet tracking.

enable_events: Enable database event channel support.

Defaults to True when extension_config["events"] is configured. Provides pub/sub capabilities via table-backed queue (MySQL/MariaDB have no native pub/sub). Requires extension_config["events"] for migration setup.

events_backend: Event channel backend selection.

Only option: "table_queue" (durable table-backed queue with retries and exactly-once delivery). MySQL/MariaDB do not have native pub/sub, so table_queue is the only backend. Defaults to "table_queue".

Data Dictionary

class sqlspec.adapters.aiomysql.data_dictionary.AiomysqlDataDictionary[source]

Bases: AsyncDataDictionaryBase

MySQL-specific async data dictionary.

dialect: ClassVar[str] = 'mysql'

Dialect identifier. Must be defined by subclasses as a class attribute.

__init__()[source]
async get_version(driver)[source]

Get MySQL database version information.

Return type:

VersionInfo | None

async get_feature_flag(driver, feature)[source]

Check if MySQL database supports a specific feature.

Return type:

bool

async get_optimal_type(driver, type_category)[source]

Get optimal MySQL type for a category.

Return type:

str

async get_tables(driver, schema=None)[source]

Get tables sorted by topological dependency order using the MySQL catalog.

Return type:

list[TableMetadata]

async get_columns(driver, table=None, schema=None)[source]

Get column information for a table or schema.

Return type:

list[ColumnMetadata]

async get_indexes(driver, table=None, schema=None)[source]

Get index metadata for a table or schema.

Return type:

list[IndexMetadata]

async get_foreign_keys(driver, table=None, schema=None)[source]

Get foreign key metadata.

Return type:

list[ForeignKeyMetadata]