Arrow: Basic Usage¶
Demonstrate the select_to_arrow() helper across multiple adapters and
conversion targets (native Arrow, pandas, polars, and Parquet exports).
uv run python docs/examples/arrow/arrow_basic_usage.py
Source¶
1"""Basic Apache Arrow Integration Examples.
2
3This example demonstrates the fundamental usage patterns for select_to_arrow()
4across different database adapters and use cases.
5
6Requirements:
7 pip install sqlspec[arrow,pandas,polars]
8"""
9
10import asyncio
11from pathlib import Path
12
13__all__ = (
14 "example_adbc_native",
15 "example_native_only_mode",
16 "example_pandas_integration",
17 "example_parquet_export",
18 "example_polars_integration",
19 "example_postgres_conversion",
20 "example_return_formats",
21 "main",
22)
23
24
25# Example 1: Basic Arrow Query (ADBC - Native Path)
26async def example_adbc_native() -> None:
27 """Demonstrate ADBC native Arrow support with zero-copy performance."""
28 from sqlspec import SQLSpec
29 from sqlspec.adapters.adbc import AdbcConfig
30
31 db_manager = SQLSpec()
32 adbc_db = db_manager.add_config(
33 AdbcConfig(connection_config={"driver": "adbc_driver_sqlite", "uri": "file::memory:?cache=shared"})
34 )
35
36 with db_manager.provide_session(adbc_db) as session:
37 # Create test table
38 session.execute(
39 """
40 CREATE TABLE users (
41 id INTEGER PRIMARY KEY,
42 name TEXT NOT NULL,
43 age INTEGER,
44 email TEXT
45 )
46 """
47 )
48
49 # Insert test data
50 session.execute_many(
51 "INSERT INTO users (id, name, age, email) VALUES (?, ?, ?, ?)",
52 [
53 (1, "Alice", 30, "[email protected]"),
54 (2, "Bob", 25, "[email protected]"),
55 (3, "Charlie", 35, "[email protected]"),
56 ],
57 )
58
59 # Native Arrow fetch - zero-copy!
60 result = session.select_to_arrow("SELECT * FROM users WHERE age > :min_age", min_age=25)
61
62 print("ADBC Native Arrow Results:")
63 print(f" Rows: {len(result)}")
64 print(f" Columns: {result.data.column_names}")
65 print(f" Schema: {result.data.schema}")
66 print()
67
68 # Iterate over results
69 for row in result:
70 print(f" {row['name']}: {row['age']} years old")
71
72 print()
73
74
75# Example 2: PostgreSQL with Conversion Path
76async def example_postgres_conversion() -> None:
77 """Demonstrate PostgreSQL adapter with dict → Arrow conversion."""
78 from sqlspec import SQLSpec
79 from sqlspec.adapters.asyncpg import AsyncpgConfig
80
81 db_manager = SQLSpec()
82 asyncpg_db = db_manager.add_config(AsyncpgConfig(pool_config={"dsn": "postgresql://localhost/test"}))
83
84 async with db_manager.provide_session(asyncpg_db) as session:
85 # Create test table with PostgreSQL-specific types
86 await session.execute(
87 """
88 CREATE TABLE IF NOT EXISTS products (
89 id SERIAL PRIMARY KEY,
90 name TEXT,
91 price NUMERIC(10, 2),
92 tags TEXT[]
93 )
94 """
95 )
96
97 # Insert test data
98 await session.execute_many(
99 "INSERT INTO products (name, price, tags) VALUES (:name, :price, :tags)",
100 [
101 {"name": "Widget", "price": 19.99, "tags": ["gadget", "tool"]},
102 {"name": "Gadget", "price": 29.99, "tags": ["electronics", "new"]},
103 ],
104 )
105
106 # Conversion path: dict → Arrow
107 result = await session.select_to_arrow("SELECT * FROM products WHERE price < :price_limit", price_limit=25.00)
108
109 print("PostgreSQL Conversion Path Results:")
110 print(f" Rows: {len(result)}")
111 print(f" Data: {result.to_dict()}")
112 print()
113
114
115# Example 3: pandas Integration
116async def example_pandas_integration() -> None:
117 """Demonstrate pandas integration via Arrow."""
118 from sqlspec import SQLSpec
119 from sqlspec.adapters.sqlite import SqliteConfig
120
121 db_manager = SQLSpec()
122 sqlite_db = db_manager.add_config(SqliteConfig(pool_config={"database": ":memory:"}))
123
124 with db_manager.provide_session(sqlite_db) as session:
125 # Create and populate table
126 session.execute(
127 """
128 CREATE TABLE sales (
129 id INTEGER PRIMARY KEY,
130 region TEXT,
131 amount REAL,
132 sale_date DATE
133 )
134 """
135 )
136
137 session.execute_many(
138 "INSERT INTO sales (id, region, amount, sale_date) VALUES (:id, :region, :amount, :sale_date)",
139 [
140 {"id": 1, "region": "North", "amount": 1000.00, "sale_date": "2024-01-15"},
141 {"id": 2, "region": "South", "amount": 1500.00, "sale_date": "2024-01-20"},
142 {"id": 3, "region": "North", "amount": 2000.00, "sale_date": "2024-02-10"},
143 {"id": 4, "region": "East", "amount": 1200.00, "sale_date": "2024-02-15"},
144 ],
145 )
146
147 # Query to Arrow
148 result = session.select_to_arrow("SELECT * FROM sales")
149
150 # Convert to pandas DataFrame
151 df = result.to_pandas()
152
153 print("pandas Integration:")
154 print(df)
155 print()
156 print("Summary Statistics:")
157 print(df.describe())
158 print()
159
160
161# Example 4: Polars Integration
162async def example_polars_integration() -> None:
163 """Demonstrate Polars integration via Arrow."""
164 from sqlspec import SQLSpec
165 from sqlspec.adapters.duckdb import DuckDBConfig
166
167 db_manager = SQLSpec()
168 duckdb = db_manager.add_config(DuckDBConfig(pool_config={"database": ":memory:"}))
169
170 with db_manager.provide_session(duckdb) as session:
171 # Create and populate table
172 session.execute(
173 """
174 CREATE TABLE events (
175 id INTEGER PRIMARY KEY,
176 event_type VARCHAR,
177 user_id INTEGER,
178 timestamp TIMESTAMP
179 )
180 """
181 )
182
183 session.execute_many(
184 "INSERT INTO events (id, event_type, user_id, timestamp) VALUES (:id, :event_type, :user_id, :ts)",
185 [
186 {"id": 1, "event_type": "login", "user_id": 100, "ts": "2024-01-01 10:00:00"},
187 {"id": 2, "event_type": "click", "user_id": 100, "ts": "2024-01-01 10:05:00"},
188 {"id": 3, "event_type": "login", "user_id": 101, "ts": "2024-01-01 10:10:00"},
189 {"id": 4, "event_type": "purchase", "user_id": 100, "ts": "2024-01-01 10:15:00"},
190 ],
191 )
192
193 # Query to Arrow (native DuckDB path)
194 result = session.select_to_arrow("SELECT event_type, COUNT(*) as count FROM events GROUP BY event_type")
195
196 # Convert to Polars DataFrame
197 pl_df = result.to_polars()
198
199 print("Polars Integration:")
200 print(pl_df)
201 print()
202
203
204# Example 5: Return Format Options
205async def example_return_formats() -> None:
206 """Demonstrate table vs batch return formats."""
207 from sqlspec import SQLSpec
208 from sqlspec.adapters.duckdb import DuckDBConfig
209
210 db_manager = SQLSpec()
211 duckdb = db_manager.add_config(DuckDBConfig(pool_config={"database": ":memory:"}))
212
213 with db_manager.provide_session(duckdb) as session:
214 # Create test data
215 session.execute("CREATE TABLE items (id INTEGER, name VARCHAR, quantity INTEGER)")
216 session.execute_many(
217 "INSERT INTO items (id, name, quantity) VALUES (:id, :name, :qty)",
218 [
219 {"id": 1, "name": "Apple", "qty": 10},
220 {"id": 2, "name": "Banana", "qty": 20},
221 {"id": 3, "name": "Orange", "qty": 15},
222 ],
223 )
224
225 # Table format (default)
226 table_result = session.select_to_arrow("SELECT * FROM items", return_format="table")
227
228 print("Table Format:")
229 print(f" Type: {type(table_result.data)}")
230 print(f" Rows: {len(table_result)}")
231 print(f" Columns: {table_result.data.column_names}")
232 print()
233
234 # Batch format
235 batch_result = session.select_to_arrow("SELECT * FROM items", return_format="batch")
236
237 print("Batch Format:")
238 print(f" Type: {type(batch_result.data)}")
239 print(f" Rows: {len(batch_result)}")
240 print()
241
242
243# Example 6: Export to Parquet
244async def example_parquet_export() -> None:
245 """Demonstrate exporting Arrow results to Parquet."""
246 from sqlspec import SQLSpec
247 from sqlspec.adapters.duckdb import DuckDBConfig
248
249 db_manager = SQLSpec()
250 duckdb = db_manager.add_config(DuckDBConfig(pool_config={"database": ":memory:"}))
251
252 with db_manager.provide_session(duckdb) as session:
253 # Create and populate table
254 session.execute(
255 """
256 CREATE TABLE logs (
257 id INTEGER,
258 timestamp TIMESTAMP,
259 level VARCHAR,
260 message VARCHAR
261 )
262 """
263 )
264
265 session.execute_many(
266 "INSERT INTO logs (id, timestamp, level, message) VALUES (:id, :ts, :level, :message)",
267 [
268 {"id": 1, "ts": "2024-01-01 10:00:00", "level": "INFO", "message": "Application started"},
269 {"id": 2, "ts": "2024-01-01 10:05:00", "level": "WARN", "message": "High memory usage"},
270 {"id": 3, "ts": "2024-01-01 10:10:00", "level": "ERROR", "message": "Database connection failed"},
271 ],
272 )
273
274 # Query to Arrow
275 result = session.select_to_arrow("SELECT * FROM logs")
276
277 # Export to Parquet using the storage bridge
278 output_path = Path("/tmp/arrow_basic_usage_logs.parquet")
279 telemetry = result.write_to_storage_sync(str(output_path), format_hint="parquet")
280
281 print("Parquet Export:")
282 print(f" Exported to: {output_path}")
283 print(f" Rows: {telemetry['rows_processed']}")
284 print(f" Bytes processed: {telemetry['bytes_processed']}")
285 print(f" File size: {output_path.stat().st_size} bytes")
286 print()
287
288
289# Example 7: Native-Only Mode
290async def example_native_only_mode() -> None:
291 """Demonstrate native-only mode enforcement."""
292 from sqlspec import SQLSpec
293 from sqlspec.adapters.adbc import AdbcConfig
294 from sqlspec.adapters.sqlite import SqliteConfig
295
296 # ADBC has native Arrow support
297 db_manager = SQLSpec()
298 adbc_sqlite = db_manager.add_config(AdbcConfig(connection_config={"uri": "sqlite://:memory:"}))
299
300 with db_manager.provide_session(adbc_sqlite) as session:
301 session.execute("CREATE TABLE test (id INTEGER, name TEXT)")
302 session.execute("INSERT INTO test VALUES (1, 'test')")
303
304 # This works - ADBC has native support
305 result = session.select_to_arrow("SELECT * FROM test", native_only=True)
306 print("Native-only mode (ADBC): Success")
307 print(f" Rows: {len(result)}")
308 print()
309
310 # SQLite does not have native Arrow support
311 sqlite_db = db_manager.add_config(SqliteConfig(pool_config={"database": ":memory:"}))
312
313 with db_manager.provide_session(sqlite_db) as session:
314 session.execute("CREATE TABLE test (id INTEGER, name TEXT)")
315 session.execute("INSERT INTO test VALUES (1, 'test')")
316 result = session.select_to_arrow("SELECT * FROM test", native_only=True)
317
318
319# Run all examples
320async def main() -> None:
321 """Run all examples."""
322 print("=" * 60)
323 print("Apache Arrow Integration Examples")
324 print("=" * 60)
325 print()
326
327 print("Example 1: ADBC Native Arrow")
328 print("-" * 60)
329 await example_adbc_native()
330
331 # print("Example 2: PostgreSQL Conversion Path")
332 # print("-" * 60)
333 # await example_postgres_conversion() # Requires PostgreSQL
334
335 print("Example 3: pandas Integration")
336 print("-" * 60)
337 await example_pandas_integration()
338
339 print("Example 4: Polars Integration")
340 print("-" * 60)
341 await example_polars_integration()
342
343 print("Example 5: Return Format Options")
344 print("-" * 60)
345 await example_return_formats()
346
347 print("Example 6: Parquet Export")
348 print("-" * 60)
349 await example_parquet_export()
350
351 print("Example 7: Native-Only Mode")
352 print("-" * 60)
353 await example_native_only_mode()
354
355 print("=" * 60)
356 print("All examples completed successfully!")
357 print("=" * 60)
358
359
360if __name__ == "__main__":
361 asyncio.run(main())