Arrow: Basic Usage

Demonstrate the select_to_arrow() helper across multiple adapters and conversion targets (native Arrow, pandas, polars, and Parquet exports).

uv run python docs/examples/arrow/arrow_basic_usage.py

Source

  1"""Basic Apache Arrow Integration Examples.
  2
  3This example demonstrates the fundamental usage patterns for select_to_arrow()
  4across different database adapters and use cases.
  5
  6Requirements:
  7    pip install sqlspec[arrow,pandas,polars]
  8"""
  9
 10import asyncio
 11from pathlib import Path
 12
 13__all__ = (
 14    "example_adbc_native",
 15    "example_native_only_mode",
 16    "example_pandas_integration",
 17    "example_parquet_export",
 18    "example_polars_integration",
 19    "example_postgres_conversion",
 20    "example_return_formats",
 21    "main",
 22)
 23
 24
 25# Example 1: Basic Arrow Query (ADBC - Native Path)
 26async def example_adbc_native() -> None:
 27    """Demonstrate ADBC native Arrow support with zero-copy performance."""
 28    from sqlspec import SQLSpec
 29    from sqlspec.adapters.adbc import AdbcConfig
 30
 31    db_manager = SQLSpec()
 32    adbc_db = db_manager.add_config(
 33        AdbcConfig(connection_config={"driver": "adbc_driver_sqlite", "uri": "file::memory:?cache=shared"})
 34    )
 35
 36    with db_manager.provide_session(adbc_db) as session:
 37        # Create test table
 38        session.execute(
 39            """
 40            CREATE TABLE users (
 41                id INTEGER PRIMARY KEY,
 42                name TEXT NOT NULL,
 43                age INTEGER,
 44                email TEXT
 45            )
 46            """
 47        )
 48
 49        # Insert test data
 50        session.execute_many(
 51            "INSERT INTO users (id, name, age, email) VALUES (?, ?, ?, ?)",
 52            [
 53                (1, "Alice", 30, "[email protected]"),
 54                (2, "Bob", 25, "[email protected]"),
 55                (3, "Charlie", 35, "[email protected]"),
 56            ],
 57        )
 58
 59        # Native Arrow fetch - zero-copy!
 60        result = session.select_to_arrow("SELECT * FROM users WHERE age > :min_age", min_age=25)
 61
 62        print("ADBC Native Arrow Results:")
 63        print(f"  Rows: {len(result)}")
 64        print(f"  Columns: {result.data.column_names}")
 65        print(f"  Schema: {result.data.schema}")
 66        print()
 67
 68        # Iterate over results
 69        for row in result:
 70            print(f"  {row['name']}: {row['age']} years old")
 71
 72    print()
 73
 74
 75# Example 2: PostgreSQL with Conversion Path
 76async def example_postgres_conversion() -> None:
 77    """Demonstrate PostgreSQL adapter with dict → Arrow conversion."""
 78    from sqlspec import SQLSpec
 79    from sqlspec.adapters.asyncpg import AsyncpgConfig
 80
 81    db_manager = SQLSpec()
 82    asyncpg_db = db_manager.add_config(AsyncpgConfig(pool_config={"dsn": "postgresql://localhost/test"}))
 83
 84    async with db_manager.provide_session(asyncpg_db) as session:
 85        # Create test table with PostgreSQL-specific types
 86        await session.execute(
 87            """
 88            CREATE TABLE IF NOT EXISTS products (
 89                id SERIAL PRIMARY KEY,
 90                name TEXT,
 91                price NUMERIC(10, 2),
 92                tags TEXT[]
 93            )
 94            """
 95        )
 96
 97        # Insert test data
 98        await session.execute_many(
 99            "INSERT INTO products (name, price, tags) VALUES (:name, :price, :tags)",
100            [
101                {"name": "Widget", "price": 19.99, "tags": ["gadget", "tool"]},
102                {"name": "Gadget", "price": 29.99, "tags": ["electronics", "new"]},
103            ],
104        )
105
106        # Conversion path: dict → Arrow
107        result = await session.select_to_arrow("SELECT * FROM products WHERE price < :price_limit", price_limit=25.00)
108
109        print("PostgreSQL Conversion Path Results:")
110        print(f"  Rows: {len(result)}")
111        print(f"  Data: {result.to_dict()}")
112        print()
113
114
115# Example 3: pandas Integration
116async def example_pandas_integration() -> None:
117    """Demonstrate pandas integration via Arrow."""
118    from sqlspec import SQLSpec
119    from sqlspec.adapters.sqlite import SqliteConfig
120
121    db_manager = SQLSpec()
122    sqlite_db = db_manager.add_config(SqliteConfig(pool_config={"database": ":memory:"}))
123
124    with db_manager.provide_session(sqlite_db) as session:
125        # Create and populate table
126        session.execute(
127            """
128            CREATE TABLE sales (
129                id INTEGER PRIMARY KEY,
130                region TEXT,
131                amount REAL,
132                sale_date DATE
133            )
134            """
135        )
136
137        session.execute_many(
138            "INSERT INTO sales (id, region, amount, sale_date) VALUES (:id, :region, :amount, :sale_date)",
139            [
140                {"id": 1, "region": "North", "amount": 1000.00, "sale_date": "2024-01-15"},
141                {"id": 2, "region": "South", "amount": 1500.00, "sale_date": "2024-01-20"},
142                {"id": 3, "region": "North", "amount": 2000.00, "sale_date": "2024-02-10"},
143                {"id": 4, "region": "East", "amount": 1200.00, "sale_date": "2024-02-15"},
144            ],
145        )
146
147        # Query to Arrow
148        result = session.select_to_arrow("SELECT * FROM sales")
149
150        # Convert to pandas DataFrame
151        df = result.to_pandas()
152
153        print("pandas Integration:")
154        print(df)
155        print()
156        print("Summary Statistics:")
157        print(df.describe())
158        print()
159
160
161# Example 4: Polars Integration
162async def example_polars_integration() -> None:
163    """Demonstrate Polars integration via Arrow."""
164    from sqlspec import SQLSpec
165    from sqlspec.adapters.duckdb import DuckDBConfig
166
167    db_manager = SQLSpec()
168    duckdb = db_manager.add_config(DuckDBConfig(pool_config={"database": ":memory:"}))
169
170    with db_manager.provide_session(duckdb) as session:
171        # Create and populate table
172        session.execute(
173            """
174            CREATE TABLE events (
175                id INTEGER PRIMARY KEY,
176                event_type VARCHAR,
177                user_id INTEGER,
178                timestamp TIMESTAMP
179            )
180            """
181        )
182
183        session.execute_many(
184            "INSERT INTO events (id, event_type, user_id, timestamp) VALUES (:id, :event_type, :user_id, :ts)",
185            [
186                {"id": 1, "event_type": "login", "user_id": 100, "ts": "2024-01-01 10:00:00"},
187                {"id": 2, "event_type": "click", "user_id": 100, "ts": "2024-01-01 10:05:00"},
188                {"id": 3, "event_type": "login", "user_id": 101, "ts": "2024-01-01 10:10:00"},
189                {"id": 4, "event_type": "purchase", "user_id": 100, "ts": "2024-01-01 10:15:00"},
190            ],
191        )
192
193        # Query to Arrow (native DuckDB path)
194        result = session.select_to_arrow("SELECT event_type, COUNT(*) as count FROM events GROUP BY event_type")
195
196        # Convert to Polars DataFrame
197        pl_df = result.to_polars()
198
199        print("Polars Integration:")
200        print(pl_df)
201        print()
202
203
204# Example 5: Return Format Options
205async def example_return_formats() -> None:
206    """Demonstrate table vs batch return formats."""
207    from sqlspec import SQLSpec
208    from sqlspec.adapters.duckdb import DuckDBConfig
209
210    db_manager = SQLSpec()
211    duckdb = db_manager.add_config(DuckDBConfig(pool_config={"database": ":memory:"}))
212
213    with db_manager.provide_session(duckdb) as session:
214        # Create test data
215        session.execute("CREATE TABLE items (id INTEGER, name VARCHAR, quantity INTEGER)")
216        session.execute_many(
217            "INSERT INTO items (id, name, quantity) VALUES (:id, :name, :qty)",
218            [
219                {"id": 1, "name": "Apple", "qty": 10},
220                {"id": 2, "name": "Banana", "qty": 20},
221                {"id": 3, "name": "Orange", "qty": 15},
222            ],
223        )
224
225        # Table format (default)
226        table_result = session.select_to_arrow("SELECT * FROM items", return_format="table")
227
228        print("Table Format:")
229        print(f"  Type: {type(table_result.data)}")
230        print(f"  Rows: {len(table_result)}")
231        print(f"  Columns: {table_result.data.column_names}")
232        print()
233
234        # Batch format
235        batch_result = session.select_to_arrow("SELECT * FROM items", return_format="batch")
236
237        print("Batch Format:")
238        print(f"  Type: {type(batch_result.data)}")
239        print(f"  Rows: {len(batch_result)}")
240        print()
241
242
243# Example 6: Export to Parquet
244async def example_parquet_export() -> None:
245    """Demonstrate exporting Arrow results to Parquet."""
246    from sqlspec import SQLSpec
247    from sqlspec.adapters.duckdb import DuckDBConfig
248
249    db_manager = SQLSpec()
250    duckdb = db_manager.add_config(DuckDBConfig(pool_config={"database": ":memory:"}))
251
252    with db_manager.provide_session(duckdb) as session:
253        # Create and populate table
254        session.execute(
255            """
256            CREATE TABLE logs (
257                id INTEGER,
258                timestamp TIMESTAMP,
259                level VARCHAR,
260                message VARCHAR
261            )
262            """
263        )
264
265        session.execute_many(
266            "INSERT INTO logs (id, timestamp, level, message) VALUES (:id, :ts, :level, :message)",
267            [
268                {"id": 1, "ts": "2024-01-01 10:00:00", "level": "INFO", "message": "Application started"},
269                {"id": 2, "ts": "2024-01-01 10:05:00", "level": "WARN", "message": "High memory usage"},
270                {"id": 3, "ts": "2024-01-01 10:10:00", "level": "ERROR", "message": "Database connection failed"},
271            ],
272        )
273
274        # Query to Arrow
275        result = session.select_to_arrow("SELECT * FROM logs")
276
277        # Export to Parquet using the storage bridge
278        output_path = Path("/tmp/arrow_basic_usage_logs.parquet")
279        telemetry = result.write_to_storage_sync(str(output_path), format_hint="parquet")
280
281        print("Parquet Export:")
282        print(f"  Exported to: {output_path}")
283        print(f"  Rows: {telemetry['rows_processed']}")
284        print(f"  Bytes processed: {telemetry['bytes_processed']}")
285        print(f"  File size: {output_path.stat().st_size} bytes")
286        print()
287
288
289# Example 7: Native-Only Mode
290async def example_native_only_mode() -> None:
291    """Demonstrate native-only mode enforcement."""
292    from sqlspec import SQLSpec
293    from sqlspec.adapters.adbc import AdbcConfig
294    from sqlspec.adapters.sqlite import SqliteConfig
295
296    # ADBC has native Arrow support
297    db_manager = SQLSpec()
298    adbc_sqlite = db_manager.add_config(AdbcConfig(connection_config={"uri": "sqlite://:memory:"}))
299
300    with db_manager.provide_session(adbc_sqlite) as session:
301        session.execute("CREATE TABLE test (id INTEGER, name TEXT)")
302        session.execute("INSERT INTO test VALUES (1, 'test')")
303
304        # This works - ADBC has native support
305        result = session.select_to_arrow("SELECT * FROM test", native_only=True)
306        print("Native-only mode (ADBC): Success")
307        print(f"  Rows: {len(result)}")
308        print()
309
310    # SQLite does not have native Arrow support
311    sqlite_db = db_manager.add_config(SqliteConfig(pool_config={"database": ":memory:"}))
312
313    with db_manager.provide_session(sqlite_db) as session:
314        session.execute("CREATE TABLE test (id INTEGER, name TEXT)")
315        session.execute("INSERT INTO test VALUES (1, 'test')")
316        result = session.select_to_arrow("SELECT * FROM test", native_only=True)
317
318
319# Run all examples
320async def main() -> None:
321    """Run all examples."""
322    print("=" * 60)
323    print("Apache Arrow Integration Examples")
324    print("=" * 60)
325    print()
326
327    print("Example 1: ADBC Native Arrow")
328    print("-" * 60)
329    await example_adbc_native()
330
331    # print("Example 2: PostgreSQL Conversion Path")
332    # print("-" * 60)
333    # await example_postgres_conversion()  # Requires PostgreSQL
334
335    print("Example 3: pandas Integration")
336    print("-" * 60)
337    await example_pandas_integration()
338
339    print("Example 4: Polars Integration")
340    print("-" * 60)
341    await example_polars_integration()
342
343    print("Example 5: Return Format Options")
344    print("-" * 60)
345    await example_return_formats()
346
347    print("Example 6: Parquet Export")
348    print("-" * 60)
349    await example_parquet_export()
350
351    print("Example 7: Native-Only Mode")
352    print("-" * 60)
353    await example_native_only_mode()
354
355    print("=" * 60)
356    print("All examples completed successfully!")
357    print("=" * 60)
358
359
360if __name__ == "__main__":
361    asyncio.run(main())