Service Layer Pattern

A common pattern in production sqlspec applications is a base service class that wraps a driver and provides convenience methods for pagination, error handling, and transactions. Parameters and filters passed as *parameters are forwarded directly to the driver, which applies filters to the statement and binds parameters automatically.

Base Service

from __future__ import annotations

from contextlib import asynccontextmanager
from typing import TYPE_CHECKING, Any, Generic, TypeVar, cast, overload

from sqlspec import AsyncDriverAdapterBase
from sqlspec.core.filters import (
    LimitOffsetFilter,
    OffsetPagination,
    StatementFilter,
)
from sqlspec.typing import SchemaT, StatementParameters

AsyncDriverT = TypeVar("AsyncDriverT", bound=AsyncDriverAdapterBase)

if TYPE_CHECKING:
    from collections.abc import AsyncIterator

    from sqlspec import QueryBuilder, Statement, StatementConfig


class SQLSpecAsyncService(Generic[AsyncDriverT]):
    """Base async service with pagination, get-or-404, and transactions."""

    def __init__(self, driver: AsyncDriverT) -> None:
        self.driver = driver

    @overload
    async def paginate(
        self,
        statement: Statement | QueryBuilder,
        /,
        *parameters: StatementParameters | StatementFilter,
        schema_type: type[SchemaT],
        statement_config: StatementConfig | None = None,
        **kwargs: Any,
    ) -> OffsetPagination[SchemaT]: ...

    @overload
    async def paginate(
        self,
        statement: Statement | QueryBuilder,
        /,
        *parameters: StatementParameters | StatementFilter,
        schema_type: None = None,
        statement_config: StatementConfig | None = None,
        **kwargs: Any,
    ) -> OffsetPagination[dict[str, Any]]: ...

    async def paginate(
        self,
        statement: Statement | QueryBuilder,
        /,
        *parameters: StatementParameters | StatementFilter,
        schema_type: type[SchemaT] | None = None,
        statement_config: StatementConfig | None = None,
        **kwargs: Any,
    ) -> OffsetPagination[SchemaT] | OffsetPagination[dict[str, Any]]:
        """Execute a paginated query using filters from ``*parameters``."""
        results, total = await self.driver.select_with_total(
            statement, *parameters, schema_type=schema_type,
            statement_config=statement_config, **kwargs,
        )
        limit_offset = self.driver.find_filter(LimitOffsetFilter, parameters)
        return OffsetPagination(
            items=cast("list[Any]", results),
            limit=limit_offset.limit if limit_offset else 10,
            offset=limit_offset.offset if limit_offset else 0,
            total=total,
        )

    async def get_or_404(
        self,
        statement: Statement | QueryBuilder,
        /,
        *parameters: StatementParameters,
        schema_type: type[SchemaT] | None = None,
        error_message: str | None = None,
        statement_config: StatementConfig | None = None,
        **kwargs: Any,
    ) -> SchemaT | dict[str, Any]:
        """Fetch one row or raise an application-level not-found error."""
        result = await self.driver.select_one_or_none(
            statement, *parameters, schema_type=schema_type,
            statement_config=statement_config, **kwargs,
        )
        if result is None:
            raise ValueError(error_message or "Record not found")
        return result

    async def exists(
        self,
        statement: Statement | QueryBuilder,
        /,
        *parameters: StatementParameters,
        statement_config: StatementConfig | None = None,
        **kwargs: Any,
    ) -> bool:
        """Return True if the query matches at least one row."""
        result = await self.driver.select_one_or_none(
            statement, *parameters, statement_config=statement_config, **kwargs,
        )
        return result is not None

    @asynccontextmanager
    async def begin_transaction(self) -> AsyncIterator[None]:
        """Context manager that commits on success, rolls back on error."""
        await self.driver.begin()
        try:
            yield
        except Exception:
            await self.driver.rollback()
            raise
        else:
            await self.driver.commit()
from __future__ import annotations

from contextlib import asynccontextmanager
from typing import TYPE_CHECKING, Any, TypeVar, cast, overload

from sqlspec import AsyncDriverAdapterBase
from sqlspec.core.filters import (
    LimitOffsetFilter,
    OffsetPagination,
    StatementFilter,
)
from sqlspec.typing import SchemaT, StatementParameters

AsyncDriverT = TypeVar("AsyncDriverT", bound=AsyncDriverAdapterBase)

if TYPE_CHECKING:
    from collections.abc import AsyncIterator

    from sqlspec import QueryBuilder, Statement, StatementConfig


class SQLSpecAsyncService[AsyncDriverT: AsyncDriverAdapterBase]:
    """Base async service with pagination, get-or-404, and transactions."""

    def __init__(self, driver: AsyncDriverT) -> None:
        self.driver = driver

    @overload
    async def paginate(
        self,
        statement: Statement | QueryBuilder,
        /,
        *parameters: StatementParameters | StatementFilter,
        schema_type: type[SchemaT],
        statement_config: StatementConfig | None = None,
        **kwargs: Any,
    ) -> OffsetPagination[SchemaT]: ...

    @overload
    async def paginate(
        self,
        statement: Statement | QueryBuilder,
        /,
        *parameters: StatementParameters | StatementFilter,
        schema_type: None = None,
        statement_config: StatementConfig | None = None,
        **kwargs: Any,
    ) -> OffsetPagination[dict[str, Any]]: ...

    async def paginate(
        self,
        statement: Statement | QueryBuilder,
        /,
        *parameters: StatementParameters | StatementFilter,
        schema_type: type[SchemaT] | None = None,
        statement_config: StatementConfig | None = None,
        **kwargs: Any,
    ) -> OffsetPagination[SchemaT] | OffsetPagination[dict[str, Any]]:
        """Execute a paginated query using filters from ``*parameters``."""
        results, total = await self.driver.select_with_total(
            statement, *parameters, schema_type=schema_type,
            statement_config=statement_config, **kwargs,
        )
        limit_offset = self.driver.find_filter(LimitOffsetFilter, parameters)
        return OffsetPagination(
            items=cast("list[Any]", results),
            limit=limit_offset.limit if limit_offset else 10,
            offset=limit_offset.offset if limit_offset else 0,
            total=total,
        )

    async def get_or_404(
        self,
        statement: Statement | QueryBuilder,
        /,
        *parameters: StatementParameters,
        schema_type: type[SchemaT] | None = None,
        error_message: str | None = None,
        statement_config: StatementConfig | None = None,
        **kwargs: Any,
    ) -> SchemaT | dict[str, Any]:
        """Fetch one row or raise an application-level not-found error."""
        result = await self.driver.select_one_or_none(
            statement, *parameters, schema_type=schema_type,
            statement_config=statement_config, **kwargs,
        )
        if result is None:
            raise ValueError(error_message or "Record not found")
        return result

    async def exists(
        self,
        statement: Statement | QueryBuilder,
        /,
        *parameters: StatementParameters,
        statement_config: StatementConfig | None = None,
        **kwargs: Any,
    ) -> bool:
        """Return True if the query matches at least one row."""
        result = await self.driver.select_one_or_none(
            statement, *parameters, statement_config=statement_config, **kwargs,
        )
        return result is not None

    @asynccontextmanager
    async def begin_transaction(self) -> AsyncIterator[None]:
        """Context manager that commits on success, rolls back on error."""
        await self.driver.begin()
        try:
            yield
        except Exception:
            await self.driver.rollback()
            raise
        else:
            await self.driver.commit()
from __future__ import annotations

from contextlib import contextmanager
from typing import TYPE_CHECKING, Any, Generic, TypeVar, cast, overload

from sqlspec import SyncDriverAdapterBase
from sqlspec.core.filters import (
    LimitOffsetFilter,
    OffsetPagination,
    StatementFilter,
)
from sqlspec.typing import SchemaT, StatementParameters

SyncDriverT = TypeVar("SyncDriverT", bound=SyncDriverAdapterBase)

if TYPE_CHECKING:
    from collections.abc import Iterator

    from sqlspec import QueryBuilder, Statement, StatementConfig


class SQLSpecSyncService(Generic[SyncDriverT]):
    """Base sync service with pagination, get-or-404, and transactions."""

    def __init__(self, driver: SyncDriverT) -> None:
        self.driver = driver

    @overload
    def paginate(
        self,
        statement: Statement | QueryBuilder,
        /,
        *parameters: StatementParameters | StatementFilter,
        schema_type: type[SchemaT],
        statement_config: StatementConfig | None = None,
        **kwargs: Any,
    ) -> OffsetPagination[SchemaT]: ...

    @overload
    def paginate(
        self,
        statement: Statement | QueryBuilder,
        /,
        *parameters: StatementParameters | StatementFilter,
        schema_type: None = None,
        statement_config: StatementConfig | None = None,
        **kwargs: Any,
    ) -> OffsetPagination[dict[str, Any]]: ...

    def paginate(
        self,
        statement: Statement | QueryBuilder,
        /,
        *parameters: StatementParameters | StatementFilter,
        schema_type: type[SchemaT] | None = None,
        statement_config: StatementConfig | None = None,
        **kwargs: Any,
    ) -> OffsetPagination[SchemaT] | OffsetPagination[dict[str, Any]]:
        """Execute a paginated query using filters from ``*parameters``."""
        results, total = self.driver.select_with_total(
            statement, *parameters, schema_type=schema_type,
            statement_config=statement_config, **kwargs,
        )
        limit_offset = self.driver.find_filter(LimitOffsetFilter, parameters)
        return OffsetPagination(
            items=cast("list[Any]", results),
            limit=limit_offset.limit if limit_offset else 10,
            offset=limit_offset.offset if limit_offset else 0,
            total=total,
        )

    def get_or_404(
        self,
        statement: Statement | QueryBuilder,
        /,
        *parameters: StatementParameters,
        schema_type: type[SchemaT] | None = None,
        error_message: str | None = None,
        statement_config: StatementConfig | None = None,
        **kwargs: Any,
    ) -> SchemaT | dict[str, Any]:
        """Fetch one row or raise an application-level not-found error."""
        result = self.driver.select_one_or_none(
            statement, *parameters, schema_type=schema_type,
            statement_config=statement_config, **kwargs,
        )
        if result is None:
            raise ValueError(error_message or "Record not found")
        return result

    def exists(
        self,
        statement: Statement | QueryBuilder,
        /,
        *parameters: StatementParameters,
        statement_config: StatementConfig | None = None,
        **kwargs: Any,
    ) -> bool:
        """Return True if the query matches at least one row."""
        result = self.driver.select_one_or_none(
            statement, *parameters, statement_config=statement_config, **kwargs,
        )
        return result is not None

    @contextmanager
    def begin_transaction(self) -> Iterator[None]:
        """Context manager that commits on success, rolls back on error."""
        self.driver.begin()
        try:
            yield
        except Exception:
            self.driver.rollback()
            raise
        else:
            self.driver.commit()
from __future__ import annotations

from contextlib import contextmanager
from typing import TYPE_CHECKING, Any, TypeVar, cast, overload

from sqlspec import SyncDriverAdapterBase
from sqlspec.core.filters import (
    LimitOffsetFilter,
    OffsetPagination,
    StatementFilter,
)
from sqlspec.typing import SchemaT, StatementParameters

SyncDriverT = TypeVar("SyncDriverT", bound=SyncDriverAdapterBase)

if TYPE_CHECKING:
    from collections.abc import Iterator

    from sqlspec import QueryBuilder, Statement, StatementConfig


class SQLSpecSyncService[SyncDriverT: SyncDriverAdapterBase]:
    """Base sync service with pagination, get-or-404, and transactions."""

    def __init__(self, driver: SyncDriverT) -> None:
        self.driver = driver

    @overload
    def paginate(
        self,
        statement: Statement | QueryBuilder,
        /,
        *parameters: StatementParameters | StatementFilter,
        schema_type: type[SchemaT],
        statement_config: StatementConfig | None = None,
        **kwargs: Any,
    ) -> OffsetPagination[SchemaT]: ...

    @overload
    def paginate(
        self,
        statement: Statement | QueryBuilder,
        /,
        *parameters: StatementParameters | StatementFilter,
        schema_type: None = None,
        statement_config: StatementConfig | None = None,
        **kwargs: Any,
    ) -> OffsetPagination[dict[str, Any]]: ...

    def paginate(
        self,
        statement: Statement | QueryBuilder,
        /,
        *parameters: StatementParameters | StatementFilter,
        schema_type: type[SchemaT] | None = None,
        statement_config: StatementConfig | None = None,
        **kwargs: Any,
    ) -> OffsetPagination[SchemaT] | OffsetPagination[dict[str, Any]]:
        """Execute a paginated query using filters from ``*parameters``."""
        results, total = self.driver.select_with_total(
            statement, *parameters, schema_type=schema_type,
            statement_config=statement_config, **kwargs,
        )
        limit_offset = self.driver.find_filter(LimitOffsetFilter, parameters)
        return OffsetPagination(
            items=cast("list[Any]", results),
            limit=limit_offset.limit if limit_offset else 10,
            offset=limit_offset.offset if limit_offset else 0,
            total=total,
        )

    def get_or_404(
        self,
        statement: Statement | QueryBuilder,
        /,
        *parameters: StatementParameters,
        schema_type: type[SchemaT] | None = None,
        error_message: str | None = None,
        statement_config: StatementConfig | None = None,
        **kwargs: Any,
    ) -> SchemaT | dict[str, Any]:
        """Fetch one row or raise an application-level not-found error."""
        result = self.driver.select_one_or_none(
            statement, *parameters, schema_type=schema_type,
            statement_config=statement_config, **kwargs,
        )
        if result is None:
            raise ValueError(error_message or "Record not found")
        return result

    def exists(
        self,
        statement: Statement | QueryBuilder,
        /,
        *parameters: StatementParameters,
        statement_config: StatementConfig | None = None,
        **kwargs: Any,
    ) -> bool:
        """Return True if the query matches at least one row."""
        result = self.driver.select_one_or_none(
            statement, *parameters, statement_config=statement_config, **kwargs,
        )
        return result is not None

    @contextmanager
    def begin_transaction(self) -> Iterator[None]:
        """Context manager that commits on success, rolls back on error."""
        self.driver.begin()
        try:
            yield
        except Exception:
            self.driver.rollback()
            raise
        else:
            self.driver.commit()

Domain Services

Inherit from the base and use the sql builder or SQL file loader for queries. Filters from Litestar dependencies flow straight through *filters to the driver:

from __future__ import annotations

from typing import TYPE_CHECKING

from pydantic import BaseModel
from sqlspec import sql
from sqlspec.core.filters import OffsetPagination, StatementFilter

if TYPE_CHECKING:
    from uuid import UUID


class User(BaseModel):
    id: str
    email: str
    name: str


class UserService(SQLSpecAsyncService[AsyncDriverT]):

    async def list_with_count(self, *filters: StatementFilter) -> OffsetPagination[User]:
        return await self.paginate(
            sql.select("id", "email", "name").from_("users"),
            *filters,
            schema_type=User,
        )

    async def get_user(self, user_id: UUID) -> User:
        return await self.get_or_404(
            sql.select("id", "email", "name").from_("users").where_eq("id", user_id),
            schema_type=User,
            error_message=f"User {user_id} not found",
        )

    async def create_user(self, email: str, name: str) -> User:
        return await self.driver.select_one(
            sql.insert("users").columns("email", "name").values(email, name).returning("id", "email", "name"),
            schema_type=User,
        )
from __future__ import annotations

from typing import TYPE_CHECKING

from pydantic import BaseModel
from sqlspec import sql
from sqlspec.core.filters import OffsetPagination, StatementFilter

if TYPE_CHECKING:
    from uuid import UUID


class User(BaseModel):
    id: str
    email: str
    name: str


class UserService(SQLSpecSyncService[SyncDriverT]):

    def list_with_count(self, *filters: StatementFilter) -> OffsetPagination[User]:
        return self.paginate(
            sql.select("id", "email", "name").from_("users"),
            *filters,
            schema_type=User,
        )

    def get_user(self, user_id: UUID) -> User:
        return self.get_or_404(
            sql.select("id", "email", "name").from_("users").where_eq("id", user_id),
            schema_type=User,
            error_message=f"User {user_id} not found",
        )

    def create_user(self, email: str, name: str) -> User:
        return self.driver.select_one(
            sql.insert("users").columns("email", "name").values(email, name).returning("id", "email", "name"),
            schema_type=User,
        )

Using with Litestar

With create_filter_dependencies, filters are injected from query parameters and forwarded through the service to the driver:

from typing import Annotated

from litestar import Controller, get
from litestar.params import Dependency
from sqlspec.core.filters import FilterTypes, OffsetPagination
from sqlspec.extensions.litestar.providers import create_filter_dependencies


class UserController(Controller):
    path = "/api/users"
    dependencies = create_filter_dependencies({
        "pagination_type": "limit_offset",
        "pagination_size": 20,
        "sort_field": "created_at",
        "sort_order": "desc",
        "search": "name,email",
    })

    @get()
    async def list_users(
        self,
        users_service: UserService,
        filters: Annotated[list[FilterTypes], Dependency(skip_validation=True)],
    ) -> OffsetPagination[User]:
        return await users_service.list_with_count(*filters)

See also