Arrow: Basic Usage¶
Demonstrate the select_to_arrow() helper across multiple adapters and
conversion targets (native Arrow, pandas, polars, and Parquet exports).
uv run python docs/examples/arrow/arrow_basic_usage.py
Source¶
1"""Basic Apache Arrow Integration Examples.
2
3This example demonstrates the fundamental usage patterns for select_to_arrow()
4across different database adapters and use cases.
5
6Requirements:
7 pip install sqlspec[arrow,pandas,polars]
8"""
9
10import asyncio
11from pathlib import Path
12
13__all__ = (
14 "example_adbc_native",
15 "example_native_only_mode",
16 "example_pandas_integration",
17 "example_parquet_export",
18 "example_polars_integration",
19 "example_postgres_conversion",
20 "example_return_formats",
21 "main",
22)
23
24
25# Example 1: Basic Arrow Query (ADBC - Native Path)
26async def example_adbc_native() -> None:
27 """Demonstrate ADBC native Arrow support with zero-copy performance."""
28 from sqlspec import SQLSpec
29 from sqlspec.adapters.adbc import AdbcConfig
30
31 db_manager = SQLSpec()
32 adbc_db = db_manager.add_config(
33 AdbcConfig(connection_config={"driver": "adbc_driver_sqlite", "uri": "file::memory:?cache=shared"})
34 )
35
36 with db_manager.provide_session(adbc_db) as session:
37 # Create test table
38 session.execute(
39 """
40 CREATE TABLE users (
41 id INTEGER PRIMARY KEY,
42 name TEXT NOT NULL,
43 age INTEGER,
44 email TEXT
45 )
46 """
47 )
48
49 # Insert test data
50 session.execute_many(
51 "INSERT INTO users (id, name, age, email) VALUES (?, ?, ?, ?)",
52 [
53 (1, "Alice", 30, "[email protected]"),
54 (2, "Bob", 25, "[email protected]"),
55 (3, "Charlie", 35, "[email protected]"),
56 ],
57 )
58
59 # Native Arrow fetch - zero-copy!
60 result = session.select_to_arrow("SELECT * FROM users WHERE age > :min_age", min_age=25)
61
62 print("ADBC Native Arrow Results:")
63 print(f" Rows: {len(result)}")
64 print(f" Columns: {result.data.column_names}")
65 print(f" Schema: {result.data.schema}")
66 print()
67
68 # Iterate over results
69 for row in result:
70 print(f" {row['name']}: {row['age']} years old")
71
72 print()
73
74
75# Example 2: PostgreSQL with Conversion Path
76async def example_postgres_conversion() -> None:
77 """Demonstrate PostgreSQL adapter with dict → Arrow conversion."""
78 import os
79
80 from sqlspec import SQLSpec
81 from sqlspec.adapters.asyncpg import AsyncpgConfig
82
83 dsn = os.getenv("SQLSPEC_USAGE_PG_DSN", "postgresql://localhost/db")
84 db_manager = SQLSpec()
85 asyncpg_db = db_manager.add_config(AsyncpgConfig(connection_config={"dsn": dsn}))
86
87 async with db_manager.provide_session(asyncpg_db) as session:
88 # Create test table with PostgreSQL-specific types
89 await session.execute(
90 """
91 CREATE TABLE IF NOT EXISTS products (
92 id SERIAL PRIMARY KEY,
93 name TEXT,
94 price NUMERIC(10, 2),
95 tags TEXT[]
96 )
97 """
98 )
99
100 # Insert test data
101 await session.execute_many(
102 "INSERT INTO products (name, price, tags) VALUES (:name, :price, :tags)",
103 [
104 {"name": "Widget", "price": 19.99, "tags": ["gadget", "tool"]},
105 {"name": "Gadget", "price": 29.99, "tags": ["electronics", "new"]},
106 ],
107 )
108
109 # Conversion path: dict → Arrow
110 result = await session.select_to_arrow("SELECT * FROM products WHERE price < :price_limit", price_limit=25.00)
111
112 print("PostgreSQL Conversion Path Results:")
113 print(f" Rows: {len(result)}")
114 print(f" Data: {result.to_dict()}")
115 print()
116
117
118# Example 3: pandas Integration
119async def example_pandas_integration() -> None:
120 """Demonstrate pandas integration via Arrow."""
121 from sqlspec import SQLSpec
122 from sqlspec.adapters.sqlite import SqliteConfig
123
124 db_manager = SQLSpec()
125 sqlite_db = db_manager.add_config(SqliteConfig(connection_config={"database": ":memory:"}))
126
127 with db_manager.provide_session(sqlite_db) as session:
128 # Create and populate table
129 session.execute(
130 """
131 CREATE TABLE sales (
132 id INTEGER PRIMARY KEY,
133 region TEXT,
134 amount REAL,
135 sale_date DATE
136 )
137 """
138 )
139
140 session.execute_many(
141 "INSERT INTO sales (id, region, amount, sale_date) VALUES (:id, :region, :amount, :sale_date)",
142 [
143 {"id": 1, "region": "North", "amount": 1000.00, "sale_date": "2024-01-15"},
144 {"id": 2, "region": "South", "amount": 1500.00, "sale_date": "2024-01-20"},
145 {"id": 3, "region": "North", "amount": 2000.00, "sale_date": "2024-02-10"},
146 {"id": 4, "region": "East", "amount": 1200.00, "sale_date": "2024-02-15"},
147 ],
148 )
149
150 # Query to Arrow
151 result = session.select_to_arrow("SELECT * FROM sales")
152
153 # Convert to pandas DataFrame
154 df = result.to_pandas()
155
156 print("pandas Integration:")
157 print(df)
158 print()
159 print("Summary Statistics:")
160 print(df.describe())
161 print()
162
163
164# Example 4: Polars Integration
165async def example_polars_integration() -> None:
166 """Demonstrate Polars integration via Arrow."""
167 from sqlspec import SQLSpec
168 from sqlspec.adapters.duckdb import DuckDBConfig
169
170 db_manager = SQLSpec()
171 duckdb = db_manager.add_config(DuckDBConfig(connection_config={"database": ":memory:"}))
172
173 with db_manager.provide_session(duckdb) as session:
174 # Create and populate table
175 session.execute(
176 """
177 CREATE TABLE events (
178 id INTEGER PRIMARY KEY,
179 event_type VARCHAR,
180 user_id INTEGER,
181 timestamp TIMESTAMP
182 )
183 """
184 )
185
186 session.execute_many(
187 "INSERT INTO events (id, event_type, user_id, timestamp) VALUES (:id, :event_type, :user_id, :ts)",
188 [
189 {"id": 1, "event_type": "login", "user_id": 100, "ts": "2024-01-01 10:00:00"},
190 {"id": 2, "event_type": "click", "user_id": 100, "ts": "2024-01-01 10:05:00"},
191 {"id": 3, "event_type": "login", "user_id": 101, "ts": "2024-01-01 10:10:00"},
192 {"id": 4, "event_type": "purchase", "user_id": 100, "ts": "2024-01-01 10:15:00"},
193 ],
194 )
195
196 # Query to Arrow (native DuckDB path)
197 result = session.select_to_arrow("SELECT event_type, COUNT(*) as count FROM events GROUP BY event_type")
198
199 # Convert to Polars DataFrame
200 pl_df = result.to_polars()
201
202 print("Polars Integration:")
203 print(pl_df)
204 print()
205
206
207# Example 5: Return Format Options
208async def example_return_formats() -> None:
209 """Demonstrate table vs batch return formats."""
210 from sqlspec import SQLSpec
211 from sqlspec.adapters.duckdb import DuckDBConfig
212
213 db_manager = SQLSpec()
214 duckdb = db_manager.add_config(DuckDBConfig(connection_config={"database": ":memory:"}))
215
216 with db_manager.provide_session(duckdb) as session:
217 # Create test data
218 session.execute("CREATE TABLE items (id INTEGER, name VARCHAR, quantity INTEGER)")
219 session.execute_many(
220 "INSERT INTO items (id, name, quantity) VALUES (:id, :name, :qty)",
221 [
222 {"id": 1, "name": "Apple", "qty": 10},
223 {"id": 2, "name": "Banana", "qty": 20},
224 {"id": 3, "name": "Orange", "qty": 15},
225 ],
226 )
227
228 # Table format (default)
229 table_result = session.select_to_arrow("SELECT * FROM items", return_format="table")
230
231 print("Table Format:")
232 print(f" Type: {type(table_result.data)}")
233 print(f" Rows: {len(table_result)}")
234 print(f" Columns: {table_result.data.column_names}")
235 print()
236
237 # Batch format
238 batch_result = session.select_to_arrow("SELECT * FROM items", return_format="batch")
239
240 print("Batch Format:")
241 print(f" Type: {type(batch_result.data)}")
242 print(f" Rows: {len(batch_result)}")
243 print()
244
245
246# Example 6: Export to Parquet
247async def example_parquet_export() -> None:
248 """Demonstrate exporting Arrow results to Parquet."""
249 from sqlspec import SQLSpec
250 from sqlspec.adapters.duckdb import DuckDBConfig
251
252 db_manager = SQLSpec()
253 duckdb = db_manager.add_config(DuckDBConfig(connection_config={"database": ":memory:"}))
254
255 with db_manager.provide_session(duckdb) as session:
256 # Create and populate table
257 session.execute(
258 """
259 CREATE TABLE logs (
260 id INTEGER,
261 timestamp TIMESTAMP,
262 level VARCHAR,
263 message VARCHAR
264 )
265 """
266 )
267
268 session.execute_many(
269 "INSERT INTO logs (id, timestamp, level, message) VALUES (:id, :ts, :level, :message)",
270 [
271 {"id": 1, "ts": "2024-01-01 10:00:00", "level": "INFO", "message": "Application started"},
272 {"id": 2, "ts": "2024-01-01 10:05:00", "level": "WARN", "message": "High memory usage"},
273 {"id": 3, "ts": "2024-01-01 10:10:00", "level": "ERROR", "message": "Database connection failed"},
274 ],
275 )
276
277 # Query to Arrow
278 result = session.select_to_arrow("SELECT * FROM logs")
279
280 # Export to Parquet using the storage bridge
281 output_path = Path("/tmp/arrow_basic_usage_logs.parquet")
282 telemetry = result.write_to_storage_sync(str(output_path), format_hint="parquet")
283
284 print("Parquet Export:")
285 print(f" Exported to: {output_path}")
286 print(f" Rows: {telemetry['rows_processed']}")
287 print(f" Bytes processed: {telemetry['bytes_processed']}")
288 print(f" File size: {output_path.stat().st_size} bytes")
289 print()
290
291
292# Example 7: Native-Only Mode
293async def example_native_only_mode() -> None:
294 """Demonstrate native-only mode enforcement."""
295 from sqlspec import SQLSpec
296 from sqlspec.adapters.adbc import AdbcConfig
297 from sqlspec.adapters.sqlite import SqliteConfig
298
299 # ADBC has native Arrow support
300 db_manager = SQLSpec()
301 adbc_sqlite = db_manager.add_config(AdbcConfig(connection_config={"uri": "sqlite://:memory:"}))
302
303 with db_manager.provide_session(adbc_sqlite) as session:
304 session.execute("CREATE TABLE test (id INTEGER, name TEXT)")
305 session.execute("INSERT INTO test VALUES (1, 'test')")
306
307 # This works - ADBC has native support
308 result = session.select_to_arrow("SELECT * FROM test", native_only=True)
309 print("Native-only mode (ADBC): Success")
310 print(f" Rows: {len(result)}")
311 print()
312
313 # SQLite does not have native Arrow support
314 sqlite_db = db_manager.add_config(SqliteConfig(connection_config={"database": ":memory:"}))
315
316 with db_manager.provide_session(sqlite_db) as session:
317 session.execute("CREATE TABLE test (id INTEGER, name TEXT)")
318 session.execute("INSERT INTO test VALUES (1, 'test')")
319 result = session.select_to_arrow("SELECT * FROM test", native_only=True)
320
321
322# Run all examples
323async def main() -> None:
324 """Run all examples."""
325 print("=" * 60)
326 print("Apache Arrow Integration Examples")
327 print("=" * 60)
328 print()
329
330 print("Example 1: ADBC Native Arrow")
331 print("-" * 60)
332 await example_adbc_native()
333
334 # print("Example 2: PostgreSQL Conversion Path")
335 # print("-" * 60)
336 # await example_postgres_conversion() # Requires PostgreSQL
337
338 print("Example 3: pandas Integration")
339 print("-" * 60)
340 await example_pandas_integration()
341
342 print("Example 4: Polars Integration")
343 print("-" * 60)
344 await example_polars_integration()
345
346 print("Example 5: Return Format Options")
347 print("-" * 60)
348 await example_return_formats()
349
350 print("Example 6: Parquet Export")
351 print("-" * 60)
352 await example_parquet_export()
353
354 print("Example 7: Native-Only Mode")
355 print("-" * 60)
356 await example_native_only_mode()
357
358 print("=" * 60)
359 print("All examples completed successfully!")
360 print("=" * 60)
361
362
363if __name__ == "__main__":
364 asyncio.run(main())