ADK + Litestar Endpoint¶
Initialize SQLSpecSessionService inside Litestar and expose a /sessions endpoint backed by
AioSQLite.
uv run python docs/examples/extensions/adk/litestar_aiosqlite.py
Source¶
1"""Expose SQLSpec-backed ADK sessions through a Litestar endpoint."""
2
3import asyncio
4from typing import Any
5
6from litestar import Litestar, get
7
8from sqlspec.adapters.aiosqlite import AiosqliteConfig
9from sqlspec.adapters.aiosqlite.adk import AiosqliteADKStore
10from sqlspec.extensions.adk import SQLSpecSessionService
11
12config = AiosqliteConfig(pool_config={"database": ":memory:"})
13service: "SQLSpecSessionService | None" = None
14
15
16async def startup() -> None:
17 """Initialize the ADK store when the app boots."""
18 global service
19 store = AiosqliteADKStore(config)
20 await store.create_tables()
21 service = SQLSpecSessionService(store)
22
23
24@get("/sessions")
25async def list_sessions() -> "dict[str, Any]":
26 """Return the session count for the demo app."""
27 assert service is not None
28 sessions = await service.list_sessions(app_name="docs", user_id="demo")
29 return {"count": len(sessions.sessions)}
30
31
32app = Litestar(route_handlers=[list_sessions], on_startup=[startup])
33
34
35def main() -> None:
36 """Prime the database outside of Litestar for smoke tests."""
37 asyncio.run(startup())
38
39
40if __name__ == "__main__":
41 main()