ETL & Data Pipelines

SQLSpec works well for ETL (Extract-Transform-Load) workflows. Use multiple database configs to move data between systems, and leverage Arrow-based methods for high-performance bulk transfers.

Multi-Database ETL

Register source and target databases on a single SQLSpec instance. Extract from one, transform in Python, and load into the other.

multi-database ETL pipeline
from sqlspec import SQLSpec
from sqlspec.adapters.sqlite import SqliteConfig

# Simulate an ETL pipeline with two SQLite databases
spec = SQLSpec()
source_config = spec.add_config(SqliteConfig(connection_config={"database": str(tmp_path / "source.db")}))
target_config = spec.add_config(SqliteConfig(connection_config={"database": str(tmp_path / "target.db")}))

# Step 1: Seed source data
with spec.provide_session(source_config) as session:
    session.execute("create table orders (id integer primary key, amount real, status text)")
    session.execute_many(
        "insert into orders (amount, status) values (?, ?)",
        [(100.0, "complete"), (50.0, "pending"), (200.0, "complete")],
    )

# Step 2: Extract from source
with spec.provide_session(source_config) as session:
    completed = session.select("select id, amount from orders where status = ?", "complete")

# Step 3: Load into target
with spec.provide_session(target_config) as session:
    session.execute("create table revenue (order_id integer, amount real)")
    session.execute_many(
        "insert into revenue (order_id, amount) values (?, ?)", [(row["id"], row["amount"]) for row in completed]
    )
    total = session.select_value("select sum(amount) from revenue")
    print(f"Total revenue: {total}")  # Total revenue: 300.0

Arrow-Based Bulk Transfer

For large datasets, use select_to_arrow() to get results as Apache Arrow tables. This avoids per-row Python object overhead and enables zero-copy transfers between databases that support native Arrow (ADBC, DuckDB, BigQuery).

# Extract as Arrow table
arrow_result = await source_session.select_to_arrow(
    "SELECT * FROM large_table WHERE updated > :since",
    {"since": last_sync},
)

# Arrow table can be converted to pandas, polars, or written to Parquet
df = arrow_result.to_pandas()

# Or use native Arrow paths for zero-copy with DuckDB/ADBC
arrow_result = await session.select_to_arrow(
    "SELECT * FROM events",
    return_format="table",    # "table", "batch", "batches", or "reader"
    batch_size=10000,         # rows per batch
)

Supported return_format values:

  • "table" – single pyarrow.Table (default)

  • "batch" – single RecordBatch

  • "batches" – iterator of RecordBatch objects

  • "reader"RecordBatchReader for streaming

DuckDB as Staging Layer

DuckDB excels as an ETL staging layer because it can read Parquet, CSV, and JSON files natively and attach to external PostgreSQL databases.

from sqlspec import SQLSpec
from sqlspec.adapters.duckdb import DuckDBConfig

spec = SQLSpec()
config = spec.add_config(
    DuckDBConfig(connection_config={"database": "/tmp/staging.db"})
)

with spec.provide_session(config) as session:
    # Read directly from Parquet files
    session.execute(
        "CREATE TABLE staging AS SELECT * FROM read_parquet('data/*.parquet')"
    )

    # Transform and aggregate
    session.execute(
        "CREATE TABLE summary AS "
        "SELECT date, COUNT(*) as events "
        "FROM staging GROUP BY date"
    )

    # Export results
    result = session.select("SELECT * FROM summary ORDER BY date")