"""SQLSpec-backed session service for Google ADK."""
import uuid
from datetime import datetime, timezone
from typing import TYPE_CHECKING, Any
from google.adk.sessions.base_session_service import BaseSessionService, GetSessionConfig, ListSessionsResponse
from sqlspec.extensions.adk.converters import event_to_record, record_to_session
from sqlspec.utils.logging import get_logger
if TYPE_CHECKING:
from google.adk.events.event import Event
from google.adk.sessions import Session
from sqlspec.extensions.adk.store import BaseAsyncADKStore
logger = get_logger("extensions.adk.service")
__all__ = ("SQLSpecSessionService",)
[docs]
class SQLSpecSessionService(BaseSessionService):
"""SQLSpec-backed implementation of BaseSessionService.
Provides session and event storage using SQLSpec database adapters.
Delegates all database operations to a store implementation.
Args:
store: Database store implementation (e.g., AsyncpgADKStore).
Example:
from sqlspec.adapters.asyncpg import AsyncpgConfig
from sqlspec.adapters.asyncpg.adk.store import AsyncpgADKStore
from sqlspec.extensions.adk.service import SQLSpecSessionService
config = AsyncpgConfig(pool_config={"dsn": "postgresql://..."})
store = AsyncpgADKStore(config)
await store.create_tables()
service = SQLSpecSessionService(store)
session = await service.create_session(
app_name="my_app",
user_id="user123",
state={"key": "value"}
)
"""
[docs]
def __init__(self, store: "BaseAsyncADKStore") -> None:
"""Initialize the session service.
Args:
store: Database store implementation.
"""
self._store = store
@property
def store(self) -> "BaseAsyncADKStore":
"""Return the database store."""
return self._store
[docs]
async def create_session(
self, *, app_name: str, user_id: str, state: "dict[str, Any] | None" = None, session_id: "str | None" = None
) -> "Session":
"""Create a new session.
Args:
app_name: Name of the application.
user_id: ID of the user.
state: Initial state of the session.
session_id: Client-provided session ID. If None, generates a UUID.
Returns:
The newly created session.
"""
if session_id is None:
session_id = str(uuid.uuid4())
if state is None:
state = {}
record = await self._store.create_session(
session_id=session_id, app_name=app_name, user_id=user_id, state=state
)
return record_to_session(record, events=[])
[docs]
async def get_session(
self, *, app_name: str, user_id: str, session_id: str, config: "GetSessionConfig | None" = None
) -> "Session | None":
"""Get a session by ID.
Args:
app_name: Name of the application.
user_id: ID of the user.
session_id: Session identifier.
config: Configuration for retrieving events.
Returns:
Session object if found, None otherwise.
"""
record = await self._store.get_session(session_id)
if not record:
return None
if record["app_name"] != app_name or record["user_id"] != user_id:
return None
after_timestamp = None
limit = None
if config:
if config.after_timestamp:
after_timestamp = datetime.fromtimestamp(config.after_timestamp, tz=timezone.utc)
limit = config.num_recent_events
events = await self._store.get_events(session_id=session_id, after_timestamp=after_timestamp, limit=limit)
return record_to_session(record, events)
[docs]
async def list_sessions(self, *, app_name: str, user_id: str | None = None) -> "ListSessionsResponse":
"""List all sessions for an app, optionally filtered by user.
Args:
app_name: Name of the application.
user_id: ID of the user. If None, all sessions for the app are listed.
Returns:
Response containing list of sessions (without events).
"""
records = await self._store.list_sessions(app_name=app_name, user_id=user_id)
sessions = [record_to_session(record, events=[]) for record in records]
return ListSessionsResponse(sessions=sessions)
[docs]
async def delete_session(self, *, app_name: str, user_id: str, session_id: str) -> None:
"""Delete a session and all its events.
Args:
app_name: Name of the application.
user_id: ID of the user.
session_id: Session identifier.
"""
record = await self._store.get_session(session_id)
if not record:
return
if record["app_name"] != app_name or record["user_id"] != user_id:
return
await self._store.delete_session(session_id)
[docs]
async def append_event(self, session: "Session", event: "Event") -> "Event":
"""Append an event to a session.
Args:
session: Session to append to.
event: Event to append.
Returns:
The appended event.
"""
event = await super().append_event(session, event) # pyright: ignore
if event.partial:
return event
event_record = event_to_record(
event=event, session_id=session.id, app_name=session.app_name, user_id=session.user_id
)
await self._store.append_event(event_record)
session_record = await self._store.get_session(session.id)
if session_record:
session.last_update_time = session_record["update_time"].timestamp()
return event