Storage¶
Storage abstraction layer with multiple backend support (local filesystem, fsspec, obstore), configuration-based registration, and Arrow table import/export with CSV format support.
Pipelines¶
- class sqlspec.storage.SyncStoragePipeline[source]¶
Bases:
objectPipeline coordinating storage registry operations and telemetry.
- write_rows(rows, destination, *, format_hint=None, storage_options=None)[source]¶
Write dictionary rows to storage using cached serializers.
- Return type:
- write_arrow(table, destination, *, format_hint=None, storage_options=None, compression=None)[source]¶
Write an Arrow table to storage using zero-copy buffers.
- Return type:
- read_arrow(source, *, file_format, storage_options=None)[source]¶
Read an artifact from storage and decode it into an Arrow table.
- Return type:
tuple[Table,StorageTelemetry]
- stream_read(source, *, chunk_size=None, storage_options=None)[source]¶
Stream bytes from an artifact.
Registry¶
- class sqlspec.storage.StorageRegistry[source]¶
Bases:
objectGlobal storage registry for named backend configurations.
Allows registering named storage backends that can be accessed from anywhere in your application. Backends are automatically selected based on URI scheme unless explicitly overridden.
Examples
backend = registry.get(“s3://my-bucket”) backend = registry.get(“file:///tmp/data”) backend = registry.get(“gs://my-gcs-bucket”)
registry.register_alias(“my_app_store”, “file:///tmp/dev_data”)
registry.register_alias(“my_app_store”, “s3://prod-bucket/data”)
store = registry.get(“my_app_store”)
backend = registry.get(“s3://bucket”, backend=”fsspec”)
- register_alias(alias, uri, *, backend=None, base_path='', **kwargs)[source]¶
Register a named alias for a storage configuration.
- get(uri_or_alias, *, backend=None, **kwargs)[source]¶
Get backend instance using URI-first routing with automatic backend selection.
- Parameters:
- Return type:
ObjectStoreProtocol- Returns:
Backend instance with automatic backend selection
- Raises:
ImproperConfigurationError – If alias not found or invalid input
Configuration Types¶
- class sqlspec.storage.StorageCapabilities[source]¶
Bases:
TypedDictRuntime-evaluated driver storage capabilities.
- class sqlspec.storage.PartitionStrategyConfig[source]¶
Bases:
TypedDictConfiguration for partition fan-out strategies.
- class sqlspec.storage.StorageLoadRequest[source]¶
Bases:
TypedDictRequest describing a staging allocation.
- class sqlspec.storage.StagedArtifact[source]¶
Bases:
TypedDictMetadata describing a staged artifact managed by the pipeline.
- class sqlspec.storage.StorageTelemetry[source]¶
Bases:
TypedDictTelemetry payload for storage bridge operations.
- class sqlspec.storage.StorageBridgeJob[source]¶
Bases:
NamedTupleHandle representing a storage bridge operation.
-
telemetry:
StorageTelemetry¶ Alias for field number 2
-
telemetry:
Backends¶
- class sqlspec.storage.backends.base.ObjectStoreBase[source]¶
Bases:
ABCBase class for storage backends.
All synchronous methods follow the *_sync naming convention for consistency with their async counterparts.
- abstractmethod read_bytes_sync(path, **kwargs)[source]¶
Read bytes from storage synchronously.
- Return type:
- abstractmethod write_bytes_sync(path, data, **kwargs)[source]¶
Write bytes to storage synchronously.
- Return type:
- abstractmethod stream_read_sync(path, chunk_size=None, **kwargs)[source]¶
Stream bytes from storage synchronously.
- abstractmethod read_text_sync(path, encoding='utf-8', **kwargs)[source]¶
Read text from storage synchronously.
- Return type:
- abstractmethod write_text_sync(path, data, encoding='utf-8', **kwargs)[source]¶
Write text to storage synchronously.
- Return type:
- abstractmethod list_objects_sync(prefix='', recursive=True, **kwargs)[source]¶
List objects in storage synchronously.
- abstractmethod exists_sync(path, **kwargs)[source]¶
Check if object exists in storage synchronously.
- Return type:
- abstractmethod delete_sync(path, **kwargs)[source]¶
Delete object from storage synchronously.
- Return type:
- abstractmethod copy_sync(source, destination, **kwargs)[source]¶
Copy object within storage synchronously.
- Return type:
- abstractmethod move_sync(source, destination, **kwargs)[source]¶
Move object within storage synchronously.
- Return type:
- abstractmethod get_metadata_sync(path, **kwargs)[source]¶
Get object metadata from storage synchronously.
- abstractmethod is_object_sync(path)[source]¶
Check if path points to an object synchronously.
- Return type:
- abstractmethod is_path_sync(path)[source]¶
Check if path points to a directory synchronously.
- Return type:
- abstractmethod read_arrow_sync(path, **kwargs)[source]¶
Read Arrow table from storage synchronously.
- Return type:
Table
- abstractmethod write_arrow_sync(path, table, **kwargs)[source]¶
Write Arrow table to storage synchronously.
- Return type:
- abstractmethod stream_arrow_sync(pattern, **kwargs)[source]¶
Stream Arrow record batches from storage synchronously.
- Return type:
Iterator[RecordBatch]
- abstractmethod async read_bytes_async(path, **kwargs)[source]¶
Read bytes from storage asynchronously.
- Return type:
- abstractmethod async write_bytes_async(path, data, **kwargs)[source]¶
Write bytes to storage asynchronously.
- Return type:
- abstractmethod async read_text_async(path, encoding='utf-8', **kwargs)[source]¶
Read text from storage asynchronously.
- Return type:
- abstractmethod async write_text_async(path, data, encoding='utf-8', **kwargs)[source]¶
Write text to storage asynchronously.
- Return type:
- abstractmethod async stream_read_async(path, chunk_size=None, **kwargs)[source]¶
Stream bytes from storage asynchronously.
- Return type:
- abstractmethod async list_objects_async(prefix='', recursive=True, **kwargs)[source]¶
List objects in storage asynchronously.
- abstractmethod async exists_async(path, **kwargs)[source]¶
Check if object exists in storage asynchronously.
- Return type:
- abstractmethod async delete_async(path, **kwargs)[source]¶
Delete object from storage asynchronously.
- Return type:
- abstractmethod async copy_async(source, destination, **kwargs)[source]¶
Copy object within storage asynchronously.
- Return type:
- abstractmethod async move_async(source, destination, **kwargs)[source]¶
Move object within storage asynchronously.
- Return type:
- abstractmethod async get_metadata_async(path, **kwargs)[source]¶
Get object metadata from storage asynchronously.
- abstractmethod async read_arrow_async(path, **kwargs)[source]¶
Read Arrow table from storage asynchronously.
- Return type:
Table
- abstractmethod async write_arrow_async(path, table, **kwargs)[source]¶
Write Arrow table to storage asynchronously.
- Return type:
- abstractmethod stream_arrow_async(pattern, **kwargs)[source]¶
Stream Arrow record batches from storage asynchronously.
- Return type:
AsyncIterator[RecordBatch]
- class sqlspec.storage.backends.local.LocalStore[source]¶
Bases:
objectSimple local file system storage backend.
Provides file system operations without requiring fsspec or obstore. Supports file:// URIs and regular file paths.
All synchronous methods use the *_sync suffix for consistency with async methods.
- __init__(uri='', **kwargs)[source]¶
Initialize local storage backend.
- Parameters:
uri¶ (
str) – File URI or path (e.g., “file:///path” or “/path”)Additional options including: - base_path: Subdirectory relative to URI path. If relative, it’s combined
with the URI path. If absolute, it takes precedence (backward compatible).
The URI may be a file:// path (Windows style like file:///C:/path is supported). When both URI and base_path are provided, they are combined: - file:///home/user/storage + base_path=”subdir” -> /home/user/storage/subdir - file:///home/user/storage + base_path=”/other” -> /other (absolute takes precedence)
- read_text_sync(path, encoding='utf-8', **kwargs)[source]¶
Read text from file synchronously.
- Return type:
- write_text_sync(path, data, encoding='utf-8', **kwargs)[source]¶
Write text to file synchronously.
- Return type:
- list_objects_sync(prefix='', recursive=True, **kwargs)[source]¶
List objects in directory synchronously.
- Parameters:
- Return type:
When the prefix resembles a directory (contains a slash or ends with ‘/’), we treat it as a path; otherwise we filter filenames within the base path. Paths outside base_path are returned with their absolute names.
- copy_sync(source, destination, **kwargs)[source]¶
Copy file or directory synchronously.
- Return type:
- move_sync(source, destination, **kwargs)[source]¶
Move file or directory synchronously.
- Return type:
- glob_sync(pattern, **kwargs)[source]¶
Find files matching pattern synchronously.
Supports both relative and absolute patterns by adjusting where the glob search begins.
- read_arrow_sync(path, **kwargs)[source]¶
Read Arrow table from file synchronously.
- Return type:
Table
- write_arrow_sync(path, table, **kwargs)[source]¶
Write Arrow table to file synchronously.
- Return type:
- stream_arrow_sync(pattern, **kwargs)[source]¶
Stream Arrow record batches from files matching pattern synchronously.
- Yields:
Arrow record batches from matching files.
- Return type:
Iterator[RecordBatch]
- property supports_signing: bool¶
Whether this backend supports URL signing.
Local file storage does not support URL signing. Local files are accessed directly via file:// URIs.
- Returns:
Always False for local storage.
- sign_sync(paths, expires_in=3600, for_upload=False)[source]¶
Generate signed URL(s).
- Raises:
NotImplementedError – Local file storage does not require URL signing. Local files are accessed directly via file:// URIs.
- Return type:
- async write_bytes_async(path, data, **kwargs)[source]¶
Write bytes to file asynchronously.
- Return type:
- async read_text_async(path, encoding='utf-8', **kwargs)[source]¶
Read text from file asynchronously.
- Return type:
- async write_text_async(path, data, encoding='utf-8', **kwargs)[source]¶
Write text to file asynchronously.
- Return type:
- async stream_read_async(path, chunk_size=None, **kwargs)[source]¶
Stream bytes from file asynchronously.
Uses AsyncThreadedBytesIterator to offload blocking file I/O to a thread pool, ensuring the event loop is not blocked during read operations.
- async read_arrow_async(path, **kwargs)[source]¶
Read Arrow table asynchronously.
Uses async_() to offload blocking PyArrow I/O to thread pool.
- Return type:
Table
- async write_arrow_async(path, table, **kwargs)[source]¶
Write Arrow table asynchronously.
Uses async_() to offload blocking PyArrow I/O to thread pool.
- Return type:
- stream_arrow_async(pattern, **kwargs)[source]¶
Stream Arrow record batches asynchronously.
- Parameters:
- Return type:
AsyncIterator[RecordBatch]- Returns:
AsyncIterator yielding Arrow record batches.
- class sqlspec.storage.backends.fsspec.FSSpecBackend[source]¶
Bases:
objectStorage backend using fsspec.
Implements ObjectStoreProtocol using fsspec for various protocols including HTTP, HTTPS, FTP, and cloud storage services.
All synchronous methods use the *_sync suffix for consistency with async methods.
- __init__(uri, **kwargs)[source]¶
Initialize the fsspec-backed storage backend.
- Parameters:
For cloud URIs (S3/GS/Azure) and file:// URIs, we derive a default base_path from the URI path when no explicit base_path is provided. When both URI and base_path are provided, they are combined (base_path is appended to URI-derived path).
Examples
FSSpecBackend(“s3://bucket/prefix”) -> base_path = “bucket/prefix”
FSSpecBackend(”file:///home/user/storage”) -> base_path = “/home/user/storage”
FSSpecBackend(”file:///home/user”, base_path=”subdir”) -> base_path = “/home/user/subdir”
- write_bytes_sync(path, data, **kwargs)[source]¶
Write bytes to an object synchronously.
- Return type:
- read_text_sync(path, encoding='utf-8', **kwargs)[source]¶
Read text from an object synchronously.
- Return type:
- write_text_sync(path, data, encoding='utf-8', **kwargs)[source]¶
Write text to an object synchronously.
- Return type:
- read_arrow_sync(path, **kwargs)[source]¶
Read an Arrow table from storage synchronously.
- Return type:
Table
- write_arrow_sync(path, table, **kwargs)[source]¶
Write an Arrow table to storage synchronously.
- Return type:
- list_objects_sync(prefix='', recursive=True, **kwargs)[source]¶
List objects with optional prefix synchronously.
- is_path_sync(path)[source]¶
Check if path points to a prefix (directory-like) synchronously.
- Return type:
- property supports_signing: bool¶
Whether this backend supports URL signing.
FSSpec backends do not support URL signing. Use ObStoreBackend for S3, GCS, or Azure if you need signed URLs.
- Returns:
Always False for fsspec backends.
- sign_sync(paths, expires_in=3600, for_upload=False)[source]¶
Generate signed URL(s).
- Raises:
NotImplementedError – fsspec backends do not support URL signing. Use obstore backend for S3, GCS, or Azure if you need signed URLs.
- Return type:
- stream_arrow_sync(pattern, **kwargs)[source]¶
Stream Arrow record batches from storage synchronously.
- async read_bytes_async(path, **kwargs)[source]¶
Read bytes from storage asynchronously.
- Return type:
- async write_bytes_async(path, data, **kwargs)[source]¶
Write bytes to storage asynchronously.
- Return type:
- async stream_read_async(path, chunk_size=None, **kwargs)[source]¶
Stream bytes from storage asynchronously.
Uses asyncio.to_thread() to read chunks of the file in a thread pool, ensuring the event loop is not blocked while avoiding buffering the entire file into memory.
- stream_arrow_async(pattern, **kwargs)[source]¶
Stream Arrow record batches from storage asynchronously.
- Parameters:
- Return type:
AsyncIterator[RecordBatch]- Returns:
AsyncIterator yielding Arrow record batches.
- async read_text_async(path, encoding='utf-8', **kwargs)[source]¶
Read text from storage asynchronously.
- Return type:
- async write_text_async(path, data, encoding='utf-8', **kwargs)[source]¶
Write text to storage asynchronously.
- Return type:
- async list_objects_async(prefix='', recursive=True, **kwargs)[source]¶
List objects in storage asynchronously.
- async exists_async(path, **kwargs)[source]¶
Check if object exists in storage asynchronously.
- Return type:
- async copy_async(source, destination, **kwargs)[source]¶
Copy object in storage asynchronously.
- Return type:
- async move_async(source, destination, **kwargs)[source]¶
Move object in storage asynchronously.
- Return type:
- async sign_async(paths, expires_in=3600, for_upload=False)[source]¶
Generate signed URL(s) asynchronously.
- class sqlspec.storage.backends.obstore.ObStoreBackend[source]¶
Bases:
objectObject storage backend using obstore.
Implements ObjectStoreProtocol using obstore’s Rust-based implementation for storage operations. Supports AWS S3, Google Cloud Storage, Azure Blob Storage, local filesystem, and HTTP endpoints.
All synchronous methods use the *_sync suffix for consistency with async methods.
- __init__(uri, **kwargs)[source]¶
Initialize obstore backend.
- Parameters:
uri¶ (
str) – Storage URI. Supported formats: - file:///absolute/path - Local filesystem - s3://bucket/prefix - AWS S3 - gs://bucket/prefix - Google Cloud Storage - az://container/prefix - Azure Blob Storage - memory:// - In-memory storage (for testing)Additional options: - base_path (str): For local files (file://), this is combined with
the URI path to form the storage root. For example: uri=”file:///data” + base_path=”uploads” → /data/uploads If base_path is absolute, it overrides the URI path (backward compat). For cloud storage, base_path is used as an object key prefix.
Other obstore configuration options (timeouts, credentials, etc.)
- write_bytes_sync(path, data, **kwargs)[source]¶
Write bytes using obstore synchronously.
- Return type:
- read_text_sync(path, encoding='utf-8', **kwargs)[source]¶
Read text using obstore synchronously.
- Return type:
- write_text_sync(path, data, encoding='utf-8', **kwargs)[source]¶
Write text using obstore synchronously.
- Return type:
- list_objects_sync(prefix='', recursive=True, **kwargs)[source]¶
List objects using obstore synchronously.
- exists_sync(path, **kwargs)[source]¶
Check if object exists using obstore synchronously.
- Return type:
- copy_sync(source, destination, **kwargs)[source]¶
Copy object using obstore synchronously.
- Return type:
- move_sync(source, destination, **kwargs)[source]¶
Move object using obstore synchronously.
- Return type:
- glob_sync(pattern, **kwargs)[source]¶
Find objects matching pattern synchronously.
Lists all objects and filters them client-side using the pattern.
- is_path_sync(path)[source]¶
Check if path is a prefix/directory using obstore synchronously.
- Return type:
- read_arrow_sync(path, **kwargs)[source]¶
Read Arrow table using obstore synchronously.
- Return type:
Table
- write_arrow_sync(path, table, **kwargs)[source]¶
Write Arrow table using obstore synchronously.
- Return type:
- stream_read_sync(path, chunk_size=None, **kwargs)[source]¶
Stream bytes using obstore’s native streaming synchronously.
Uses obstore’s sync streaming iterator which yields chunks without loading the entire file into memory, for both local and remote backends.
- stream_arrow_sync(pattern, **kwargs)[source]¶
Stream Arrow record batches using obstore’s native streaming synchronously.
For each matching file, streams data through a buffered wrapper that PyArrow can read directly without loading the entire file.
- Return type:
Iterator[RecordBatch]
- property supports_signing: bool¶
Whether this backend supports URL signing.
Only S3, GCS, and Azure backends support pre-signed URLs. Local file storage does not support URL signing.
- Returns:
True if the protocol supports signing, False otherwise.
- sign_sync(paths, expires_in=3600, for_upload=False)[source]¶
Generate signed URL(s) for the object(s).
- Parameters:
- Return type:
- Returns:
Single signed URL string if paths is a string, or list of signed URLs if paths is a list. Preserves input type for convenience.
- Raises:
NotImplementedError – If the backend protocol does not support signing.
ValueError – If expires_in exceeds maximum (604800 seconds).
- async read_bytes_async(path, **kwargs)[source]¶
Read bytes from storage asynchronously.
- Return type:
- async write_bytes_async(path, data, **kwargs)[source]¶
Write bytes to storage asynchronously.
- Return type:
- async stream_read_async(path, chunk_size=None, **kwargs)[source]¶
Stream bytes from storage asynchronously.
Uses obstore’s native async streaming to yield chunks of bytes without buffering the entire file into memory.
- Return type:
- async list_objects_async(prefix='', recursive=True, **kwargs)[source]¶
List objects in storage asynchronously.
- async read_text_async(path, encoding='utf-8', **kwargs)[source]¶
Read text from storage asynchronously.
- Return type:
- async write_text_async(path, data, encoding='utf-8', **kwargs)[source]¶
Write text to storage asynchronously.
- Return type:
- async exists_async(path, **kwargs)[source]¶
Check if object exists in storage asynchronously.
- Return type:
- async copy_async(source, destination, **kwargs)[source]¶
Copy object in storage asynchronously.
- Return type:
- async move_async(source, destination, **kwargs)[source]¶
Move object in storage asynchronously.
- Return type:
- async read_arrow_async(path, **kwargs)[source]¶
Read Arrow table from storage asynchronously.
Uses async_() with storage limiter to offload blocking PyArrow I/O to thread pool.
- Return type:
Table
- async write_arrow_async(path, table, **kwargs)[source]¶
Write Arrow table to storage asynchronously.
Uses async_() with storage limiter to offload blocking PyArrow serialization to thread pool, preventing event loop blocking.
- Return type:
- stream_arrow_async(pattern, **kwargs)[source]¶
Stream Arrow record batches from storage asynchronously.
- Parameters:
- Return type:
AsyncIterator[RecordBatch]- Returns:
AsyncIterator yielding Arrow record batches.
- async sign_async(paths, expires_in=3600, for_upload=False)[source]¶
Generate signed URL(s) asynchronously.
- Parameters:
- Return type:
- Returns:
Single signed URL string if paths is a string, or list of signed URLs if paths is a list. Preserves input type for convenience.
- Raises:
NotImplementedError – If the backend protocol does not support signing.
ValueError – If expires_in exceeds maximum (604800 seconds).
Module Functions¶
- sqlspec.storage.create_storage_bridge_job(status, telemetry)[source]¶
Create a storage bridge job handle with a unique identifier.
- Return type:
- sqlspec.storage.get_storage_bridge_diagnostics()[source]¶
Return aggregated storage bridge + serializer cache metrics.
- sqlspec.storage.reset_storage_bridge_metrics()[source]¶
Reset aggregated storage bridge metrics.
- Return type:
- sqlspec.storage.resolve_storage_path(path, base_path='', protocol='file', strip_file_scheme=True)[source]¶
Resolve path relative to base_path with protocol-specific handling.
- Parameters:
- Return type:
- Returns:
Resolved path string suitable for the storage backend.
Examples
>>> resolve_storage_path("/data/file.txt", protocol="file") 'data/file.txt'
>>> resolve_storage_path( ... "file.txt", base_path="/base", protocol="file" ... ) 'base/file.txt'
>>> resolve_storage_path( ... "file:///data/file.txt", strip_file_scheme=True ... ) 'data/file.txt'
>>> resolve_storage_path( ... "/data/subdir/file.txt", ... base_path="/data", ... protocol="file", ... ) 'subdir/file.txt'