ADK + Litestar Endpoint

Initialize SQLSpecSessionService inside Litestar and expose a /sessions endpoint backed by AioSQLite.

uv run python docs/examples/extensions/adk/litestar_aiosqlite.py

Source

 1"""Expose SQLSpec-backed ADK sessions through a Litestar endpoint."""
 2
 3import asyncio
 4from typing import Any
 5
 6from litestar import Litestar, get
 7
 8from sqlspec.adapters.aiosqlite import AiosqliteConfig
 9from sqlspec.adapters.aiosqlite.adk import AiosqliteADKStore
10from sqlspec.extensions.adk import SQLSpecSessionService
11
12config = AiosqliteConfig(pool_config={"database": ":memory:"})
13service: "SQLSpecSessionService | None" = None
14
15
16async def startup() -> None:
17    """Initialize the ADK store when the app boots."""
18    global service
19    store = AiosqliteADKStore(config)
20    await store.create_tables()
21    service = SQLSpecSessionService(store)
22
23
24@get("/sessions")
25async def list_sessions() -> "dict[str, Any]":
26    """Return the session count for the demo app."""
27    assert service is not None
28    sessions = await service.list_sessions(app_name="docs", user_id="demo")
29    return {"count": len(sessions.sessions)}
30
31
32app = Litestar(route_handlers=[list_sessions], on_startup=[startup])
33
34
35def main() -> None:
36    """Prime the database outside of Litestar for smoke tests."""
37    asyncio.run(startup())
38
39
40if __name__ == "__main__":
41    main()