Arrow: Basic Usage

Demonstrate the select_to_arrow() helper across multiple adapters and conversion targets (native Arrow, pandas, polars, and Parquet exports).

uv run python docs/examples/arrow/arrow_basic_usage.py

Source

  1"""Basic Apache Arrow Integration Examples.
  2
  3This example demonstrates the fundamental usage patterns for select_to_arrow()
  4across different database adapters and use cases.
  5
  6Requirements:
  7    pip install sqlspec[arrow,pandas,polars]
  8"""
  9
 10import asyncio
 11from pathlib import Path
 12
 13__all__ = (
 14    "example_adbc_native",
 15    "example_native_only_mode",
 16    "example_pandas_integration",
 17    "example_parquet_export",
 18    "example_polars_integration",
 19    "example_postgres_conversion",
 20    "example_return_formats",
 21    "main",
 22)
 23
 24
 25# Example 1: Basic Arrow Query (ADBC - Native Path)
 26async def example_adbc_native() -> None:
 27    """Demonstrate ADBC native Arrow support with zero-copy performance."""
 28    from sqlspec import SQLSpec
 29    from sqlspec.adapters.adbc import AdbcConfig
 30
 31    db_manager = SQLSpec()
 32    adbc_db = db_manager.add_config(
 33        AdbcConfig(connection_config={"driver": "adbc_driver_sqlite", "uri": "file::memory:?cache=shared"})
 34    )
 35
 36    with db_manager.provide_session(adbc_db) as session:
 37        # Create test table
 38        session.execute(
 39            """
 40            CREATE TABLE users (
 41                id INTEGER PRIMARY KEY,
 42                name TEXT NOT NULL,
 43                age INTEGER,
 44                email TEXT
 45            )
 46            """
 47        )
 48
 49        # Insert test data
 50        session.execute_many(
 51            "INSERT INTO users (id, name, age, email) VALUES (?, ?, ?, ?)",
 52            [
 53                (1, "Alice", 30, "[email protected]"),
 54                (2, "Bob", 25, "[email protected]"),
 55                (3, "Charlie", 35, "[email protected]"),
 56            ],
 57        )
 58
 59        # Native Arrow fetch - zero-copy!
 60        result = session.select_to_arrow("SELECT * FROM users WHERE age > :min_age", min_age=25)
 61
 62        print("ADBC Native Arrow Results:")
 63        print(f"  Rows: {len(result)}")
 64        print(f"  Columns: {result.data.column_names}")
 65        print(f"  Schema: {result.data.schema}")
 66        print()
 67
 68        # Iterate over results
 69        for row in result:
 70            print(f"  {row['name']}: {row['age']} years old")
 71
 72    print()
 73
 74
 75# Example 2: PostgreSQL with Conversion Path
 76async def example_postgres_conversion() -> None:
 77    """Demonstrate PostgreSQL adapter with dict → Arrow conversion."""
 78    import os
 79
 80    from sqlspec import SQLSpec
 81    from sqlspec.adapters.asyncpg import AsyncpgConfig
 82
 83    dsn = os.getenv("SQLSPEC_USAGE_PG_DSN", "postgresql://localhost/db")
 84    db_manager = SQLSpec()
 85    asyncpg_db = db_manager.add_config(AsyncpgConfig(connection_config={"dsn": dsn}))
 86
 87    async with db_manager.provide_session(asyncpg_db) as session:
 88        # Create test table with PostgreSQL-specific types
 89        await session.execute(
 90            """
 91            CREATE TABLE IF NOT EXISTS products (
 92                id SERIAL PRIMARY KEY,
 93                name TEXT,
 94                price NUMERIC(10, 2),
 95                tags TEXT[]
 96            )
 97            """
 98        )
 99
100        # Insert test data
101        await session.execute_many(
102            "INSERT INTO products (name, price, tags) VALUES (:name, :price, :tags)",
103            [
104                {"name": "Widget", "price": 19.99, "tags": ["gadget", "tool"]},
105                {"name": "Gadget", "price": 29.99, "tags": ["electronics", "new"]},
106            ],
107        )
108
109        # Conversion path: dict → Arrow
110        result = await session.select_to_arrow("SELECT * FROM products WHERE price < :price_limit", price_limit=25.00)
111
112        print("PostgreSQL Conversion Path Results:")
113        print(f"  Rows: {len(result)}")
114        print(f"  Data: {result.to_dict()}")
115        print()
116
117
118# Example 3: pandas Integration
119async def example_pandas_integration() -> None:
120    """Demonstrate pandas integration via Arrow."""
121    from sqlspec import SQLSpec
122    from sqlspec.adapters.sqlite import SqliteConfig
123
124    db_manager = SQLSpec()
125    sqlite_db = db_manager.add_config(SqliteConfig(connection_config={"database": ":memory:"}))
126
127    with db_manager.provide_session(sqlite_db) as session:
128        # Create and populate table
129        session.execute(
130            """
131            CREATE TABLE sales (
132                id INTEGER PRIMARY KEY,
133                region TEXT,
134                amount REAL,
135                sale_date DATE
136            )
137            """
138        )
139
140        session.execute_many(
141            "INSERT INTO sales (id, region, amount, sale_date) VALUES (:id, :region, :amount, :sale_date)",
142            [
143                {"id": 1, "region": "North", "amount": 1000.00, "sale_date": "2024-01-15"},
144                {"id": 2, "region": "South", "amount": 1500.00, "sale_date": "2024-01-20"},
145                {"id": 3, "region": "North", "amount": 2000.00, "sale_date": "2024-02-10"},
146                {"id": 4, "region": "East", "amount": 1200.00, "sale_date": "2024-02-15"},
147            ],
148        )
149
150        # Query to Arrow
151        result = session.select_to_arrow("SELECT * FROM sales")
152
153        # Convert to pandas DataFrame
154        df = result.to_pandas()
155
156        print("pandas Integration:")
157        print(df)
158        print()
159        print("Summary Statistics:")
160        print(df.describe())
161        print()
162
163
164# Example 4: Polars Integration
165async def example_polars_integration() -> None:
166    """Demonstrate Polars integration via Arrow."""
167    from sqlspec import SQLSpec
168    from sqlspec.adapters.duckdb import DuckDBConfig
169
170    db_manager = SQLSpec()
171    duckdb = db_manager.add_config(DuckDBConfig(connection_config={"database": ":memory:"}))
172
173    with db_manager.provide_session(duckdb) as session:
174        # Create and populate table
175        session.execute(
176            """
177            CREATE TABLE events (
178                id INTEGER PRIMARY KEY,
179                event_type VARCHAR,
180                user_id INTEGER,
181                timestamp TIMESTAMP
182            )
183            """
184        )
185
186        session.execute_many(
187            "INSERT INTO events (id, event_type, user_id, timestamp) VALUES (:id, :event_type, :user_id, :ts)",
188            [
189                {"id": 1, "event_type": "login", "user_id": 100, "ts": "2024-01-01 10:00:00"},
190                {"id": 2, "event_type": "click", "user_id": 100, "ts": "2024-01-01 10:05:00"},
191                {"id": 3, "event_type": "login", "user_id": 101, "ts": "2024-01-01 10:10:00"},
192                {"id": 4, "event_type": "purchase", "user_id": 100, "ts": "2024-01-01 10:15:00"},
193            ],
194        )
195
196        # Query to Arrow (native DuckDB path)
197        result = session.select_to_arrow("SELECT event_type, COUNT(*) as count FROM events GROUP BY event_type")
198
199        # Convert to Polars DataFrame
200        pl_df = result.to_polars()
201
202        print("Polars Integration:")
203        print(pl_df)
204        print()
205
206
207# Example 5: Return Format Options
208async def example_return_formats() -> None:
209    """Demonstrate table vs batch return formats."""
210    from sqlspec import SQLSpec
211    from sqlspec.adapters.duckdb import DuckDBConfig
212
213    db_manager = SQLSpec()
214    duckdb = db_manager.add_config(DuckDBConfig(connection_config={"database": ":memory:"}))
215
216    with db_manager.provide_session(duckdb) as session:
217        # Create test data
218        session.execute("CREATE TABLE items (id INTEGER, name VARCHAR, quantity INTEGER)")
219        session.execute_many(
220            "INSERT INTO items (id, name, quantity) VALUES (:id, :name, :qty)",
221            [
222                {"id": 1, "name": "Apple", "qty": 10},
223                {"id": 2, "name": "Banana", "qty": 20},
224                {"id": 3, "name": "Orange", "qty": 15},
225            ],
226        )
227
228        # Table format (default)
229        table_result = session.select_to_arrow("SELECT * FROM items", return_format="table")
230
231        print("Table Format:")
232        print(f"  Type: {type(table_result.data)}")
233        print(f"  Rows: {len(table_result)}")
234        print(f"  Columns: {table_result.data.column_names}")
235        print()
236
237        # Batch format
238        batch_result = session.select_to_arrow("SELECT * FROM items", return_format="batch")
239
240        print("Batch Format:")
241        print(f"  Type: {type(batch_result.data)}")
242        print(f"  Rows: {len(batch_result)}")
243        print()
244
245
246# Example 6: Export to Parquet
247async def example_parquet_export() -> None:
248    """Demonstrate exporting Arrow results to Parquet."""
249    from sqlspec import SQLSpec
250    from sqlspec.adapters.duckdb import DuckDBConfig
251
252    db_manager = SQLSpec()
253    duckdb = db_manager.add_config(DuckDBConfig(connection_config={"database": ":memory:"}))
254
255    with db_manager.provide_session(duckdb) as session:
256        # Create and populate table
257        session.execute(
258            """
259            CREATE TABLE logs (
260                id INTEGER,
261                timestamp TIMESTAMP,
262                level VARCHAR,
263                message VARCHAR
264            )
265            """
266        )
267
268        session.execute_many(
269            "INSERT INTO logs (id, timestamp, level, message) VALUES (:id, :ts, :level, :message)",
270            [
271                {"id": 1, "ts": "2024-01-01 10:00:00", "level": "INFO", "message": "Application started"},
272                {"id": 2, "ts": "2024-01-01 10:05:00", "level": "WARN", "message": "High memory usage"},
273                {"id": 3, "ts": "2024-01-01 10:10:00", "level": "ERROR", "message": "Database connection failed"},
274            ],
275        )
276
277        # Query to Arrow
278        result = session.select_to_arrow("SELECT * FROM logs")
279
280        # Export to Parquet using the storage bridge
281        output_path = Path("/tmp/arrow_basic_usage_logs.parquet")
282        telemetry = result.write_to_storage_sync(str(output_path), format_hint="parquet")
283
284        print("Parquet Export:")
285        print(f"  Exported to: {output_path}")
286        print(f"  Rows: {telemetry['rows_processed']}")
287        print(f"  Bytes processed: {telemetry['bytes_processed']}")
288        print(f"  File size: {output_path.stat().st_size} bytes")
289        print()
290
291
292# Example 7: Native-Only Mode
293async def example_native_only_mode() -> None:
294    """Demonstrate native-only mode enforcement."""
295    from sqlspec import SQLSpec
296    from sqlspec.adapters.adbc import AdbcConfig
297    from sqlspec.adapters.sqlite import SqliteConfig
298
299    # ADBC has native Arrow support
300    db_manager = SQLSpec()
301    adbc_sqlite = db_manager.add_config(AdbcConfig(connection_config={"uri": "sqlite://:memory:"}))
302
303    with db_manager.provide_session(adbc_sqlite) as session:
304        session.execute("CREATE TABLE test (id INTEGER, name TEXT)")
305        session.execute("INSERT INTO test VALUES (1, 'test')")
306
307        # This works - ADBC has native support
308        result = session.select_to_arrow("SELECT * FROM test", native_only=True)
309        print("Native-only mode (ADBC): Success")
310        print(f"  Rows: {len(result)}")
311        print()
312
313    # SQLite does not have native Arrow support
314    sqlite_db = db_manager.add_config(SqliteConfig(connection_config={"database": ":memory:"}))
315
316    with db_manager.provide_session(sqlite_db) as session:
317        session.execute("CREATE TABLE test (id INTEGER, name TEXT)")
318        session.execute("INSERT INTO test VALUES (1, 'test')")
319        result = session.select_to_arrow("SELECT * FROM test", native_only=True)
320
321
322# Run all examples
323async def main() -> None:
324    """Run all examples."""
325    print("=" * 60)
326    print("Apache Arrow Integration Examples")
327    print("=" * 60)
328    print()
329
330    print("Example 1: ADBC Native Arrow")
331    print("-" * 60)
332    await example_adbc_native()
333
334    # print("Example 2: PostgreSQL Conversion Path")
335    # print("-" * 60)
336    # await example_postgres_conversion()  # Requires PostgreSQL
337
338    print("Example 3: pandas Integration")
339    print("-" * 60)
340    await example_pandas_integration()
341
342    print("Example 4: Polars Integration")
343    print("-" * 60)
344    await example_polars_integration()
345
346    print("Example 5: Return Format Options")
347    print("-" * 60)
348    await example_return_formats()
349
350    print("Example 6: Parquet Export")
351    print("-" * 60)
352    await example_parquet_export()
353
354    print("Example 7: Native-Only Mode")
355    print("-" * 60)
356    await example_native_only_mode()
357
358    print("=" * 60)
359    print("All examples completed successfully!")
360    print("=" * 60)
361
362
363if __name__ == "__main__":
364    asyncio.run(main())