"""Psycopg ADK store for Google Agent Development Kit session/event storage."""
from typing import TYPE_CHECKING, Any
from psycopg import errors
from psycopg import sql as pg_sql
from psycopg.types.json import Jsonb
from sqlspec.extensions.adk import BaseAsyncADKStore, BaseSyncADKStore, EventRecord, SessionRecord
from sqlspec.utils.logging import get_logger
if TYPE_CHECKING:
from datetime import datetime
from sqlspec.adapters.psycopg.config import PsycopgAsyncConfig, PsycopgSyncConfig
logger = get_logger("adapters.psycopg.adk.store")
__all__ = ("PsycopgAsyncADKStore", "PsycopgSyncADKStore")
[docs]
class PsycopgAsyncADKStore(BaseAsyncADKStore["PsycopgAsyncConfig"]):
"""PostgreSQL ADK store using Psycopg3 driver.
Implements session and event storage for Google Agent Development Kit
using PostgreSQL via psycopg3 with native async/await support.
Provides:
- Session state management with JSONB storage and merge operations
- Event history tracking with BYTEA-serialized actions
- Microsecond-precision timestamps with TIMESTAMPTZ
- Foreign key constraints with cascade delete
- Efficient upserts using ON CONFLICT
- GIN indexes for JSONB queries
- HOT updates with FILLFACTOR 80
Args:
config: PsycopgAsyncConfig with extension_config["adk"] settings.
Example:
from sqlspec.adapters.psycopg import PsycopgAsyncConfig
from sqlspec.adapters.psycopg.adk import PsycopgAsyncADKStore
config = PsycopgAsyncConfig(
pool_config={"conninfo": "postgresql://..."},
extension_config={
"adk": {
"session_table": "my_sessions",
"events_table": "my_events",
"owner_id_column": "tenant_id INTEGER NOT NULL REFERENCES tenants(id) ON DELETE CASCADE"
}
}
)
store = PsycopgAsyncADKStore(config)
await store.create_tables()
Notes:
- PostgreSQL JSONB type used for state (more efficient than JSON)
- Psycopg requires wrapping dicts with Jsonb() for type safety
- TIMESTAMPTZ provides timezone-aware microsecond precision
- State merging uses `state || $1::jsonb` operator for efficiency
- BYTEA for pre-serialized actions from Google ADK
- GIN index on state for JSONB queries (partial index)
- FILLFACTOR 80 leaves space for HOT updates
- Parameter style: $1, $2, $3 (PostgreSQL numeric placeholders)
- Configuration is read from config.extension_config["adk"]
"""
__slots__ = ()
[docs]
def __init__(self, config: "PsycopgAsyncConfig") -> None:
"""Initialize Psycopg ADK store.
Args:
config: PsycopgAsyncConfig instance.
Notes:
Configuration is read from config.extension_config["adk"]:
- session_table: Sessions table name (default: "adk_sessions")
- events_table: Events table name (default: "adk_events")
- owner_id_column: Optional owner FK column DDL (default: None)
"""
super().__init__(config)
async def _get_create_sessions_table_sql(self) -> str:
"""Get PostgreSQL CREATE TABLE SQL for sessions.
Returns:
SQL statement to create adk_sessions table with indexes.
Notes:
- VARCHAR(128) for IDs and names (sufficient for UUIDs and app names)
- JSONB type for state storage with default empty object
- TIMESTAMPTZ with microsecond precision
- FILLFACTOR 80 for HOT updates (reduces table bloat)
- Composite index on (app_name, user_id) for listing
- Index on update_time DESC for recent session queries
- Partial GIN index on state for JSONB queries (only non-empty)
- Optional owner ID column for multi-tenancy or user references
"""
owner_id_line = ""
if self._owner_id_column_ddl:
owner_id_line = f",\n {self._owner_id_column_ddl}"
return f"""
CREATE TABLE IF NOT EXISTS {self._session_table} (
id VARCHAR(128) PRIMARY KEY,
app_name VARCHAR(128) NOT NULL,
user_id VARCHAR(128) NOT NULL{owner_id_line},
state JSONB NOT NULL DEFAULT '{{}}'::jsonb,
create_time TIMESTAMPTZ NOT NULL DEFAULT CURRENT_TIMESTAMP,
update_time TIMESTAMPTZ NOT NULL DEFAULT CURRENT_TIMESTAMP
) WITH (fillfactor = 80);
CREATE INDEX IF NOT EXISTS idx_{self._session_table}_app_user
ON {self._session_table}(app_name, user_id);
CREATE INDEX IF NOT EXISTS idx_{self._session_table}_update_time
ON {self._session_table}(update_time DESC);
CREATE INDEX IF NOT EXISTS idx_{self._session_table}_state
ON {self._session_table} USING GIN (state)
WHERE state != '{{}}'::jsonb;
"""
async def _get_create_events_table_sql(self) -> str:
"""Get PostgreSQL CREATE TABLE SQL for events.
Returns:
SQL statement to create adk_events table with indexes.
Notes:
- VARCHAR sizes: id(128), session_id(128), invocation_id(256), author(256),
branch(256), error_code(256), error_message(1024)
- BYTEA for pickled actions (no size limit)
- JSONB for content, grounding_metadata, custom_metadata, long_running_tool_ids_json
- BOOLEAN for partial, turn_complete, interrupted
- Foreign key to sessions with CASCADE delete
- Index on (session_id, timestamp ASC) for ordered event retrieval
"""
return f"""
CREATE TABLE IF NOT EXISTS {self._events_table} (
id VARCHAR(128) PRIMARY KEY,
session_id VARCHAR(128) NOT NULL,
app_name VARCHAR(128) NOT NULL,
user_id VARCHAR(128) NOT NULL,
invocation_id VARCHAR(256),
author VARCHAR(256),
actions BYTEA,
long_running_tool_ids_json JSONB,
branch VARCHAR(256),
timestamp TIMESTAMPTZ NOT NULL DEFAULT CURRENT_TIMESTAMP,
content JSONB,
grounding_metadata JSONB,
custom_metadata JSONB,
partial BOOLEAN,
turn_complete BOOLEAN,
interrupted BOOLEAN,
error_code VARCHAR(256),
error_message VARCHAR(1024),
FOREIGN KEY (session_id) REFERENCES {self._session_table}(id) ON DELETE CASCADE
);
CREATE INDEX IF NOT EXISTS idx_{self._events_table}_session
ON {self._events_table}(session_id, timestamp ASC);
"""
def _get_drop_tables_sql(self) -> "list[str]":
"""Get PostgreSQL DROP TABLE SQL statements.
Returns:
List of SQL statements to drop tables and indexes.
Notes:
Order matters: drop events table (child) before sessions (parent).
PostgreSQL automatically drops indexes when dropping tables.
"""
return [f"DROP TABLE IF EXISTS {self._events_table}", f"DROP TABLE IF EXISTS {self._session_table}"]
[docs]
async def create_tables(self) -> None:
"""Create both sessions and events tables if they don't exist."""
async with self._config.provide_session() as driver:
await driver.execute_script(await self._get_create_sessions_table_sql())
await driver.execute_script(await self._get_create_events_table_sql())
logger.debug("Created ADK tables: %s, %s", self._session_table, self._events_table)
[docs]
async def create_session(
self, session_id: str, app_name: str, user_id: str, state: "dict[str, Any]", owner_id: "Any | None" = None
) -> SessionRecord:
"""Create a new session.
Args:
session_id: Unique session identifier.
app_name: Application name.
user_id: User identifier.
state: Initial session state.
owner_id: Optional owner ID value for owner_id_column (if configured).
Returns:
Created session record.
Notes:
Uses CURRENT_TIMESTAMP for create_time and update_time.
State is wrapped with Jsonb() for PostgreSQL type safety.
If owner_id_column is configured, owner_id value must be provided.
"""
params: tuple[Any, ...]
if self._owner_id_column_name:
query = pg_sql.SQL("""
INSERT INTO {table} (id, app_name, user_id, {owner_id_col}, state, create_time, update_time)
VALUES (%s, %s, %s, %s, %s, CURRENT_TIMESTAMP, CURRENT_TIMESTAMP)
""").format(
table=pg_sql.Identifier(self._session_table), owner_id_col=pg_sql.Identifier(self._owner_id_column_name)
)
params = (session_id, app_name, user_id, owner_id, Jsonb(state))
else:
query = pg_sql.SQL("""
INSERT INTO {table} (id, app_name, user_id, state, create_time, update_time)
VALUES (%s, %s, %s, %s, CURRENT_TIMESTAMP, CURRENT_TIMESTAMP)
""").format(table=pg_sql.Identifier(self._session_table))
params = (session_id, app_name, user_id, Jsonb(state))
async with self._config.provide_connection() as conn, conn.cursor() as cur:
await cur.execute(query, params)
return await self.get_session(session_id) # type: ignore[return-value]
[docs]
async def get_session(self, session_id: str) -> "SessionRecord | None":
"""Get session by ID.
Args:
session_id: Session identifier.
Returns:
Session record or None if not found.
Notes:
PostgreSQL returns datetime objects for TIMESTAMPTZ columns.
JSONB is automatically deserialized by psycopg to Python dict.
"""
query = pg_sql.SQL("""
SELECT id, app_name, user_id, state, create_time, update_time
FROM {table}
WHERE id = %s
""").format(table=pg_sql.Identifier(self._session_table))
try:
async with self._config.provide_connection() as conn, conn.cursor() as cur:
await cur.execute(query, (session_id,))
row = await cur.fetchone()
if row is None:
return None
return SessionRecord(
id=row["id"],
app_name=row["app_name"],
user_id=row["user_id"],
state=row["state"],
create_time=row["create_time"],
update_time=row["update_time"],
)
except errors.UndefinedTable:
return None
[docs]
async def update_session_state(self, session_id: str, state: "dict[str, Any]") -> None:
"""Update session state.
Args:
session_id: Session identifier.
state: New state dictionary (replaces existing state).
Notes:
This replaces the entire state dictionary.
Uses CURRENT_TIMESTAMP for update_time.
State is wrapped with Jsonb() for PostgreSQL type safety.
"""
query = pg_sql.SQL("""
UPDATE {table}
SET state = %s, update_time = CURRENT_TIMESTAMP
WHERE id = %s
""").format(table=pg_sql.Identifier(self._session_table))
async with self._config.provide_connection() as conn, conn.cursor() as cur:
await cur.execute(query, (Jsonb(state), session_id))
[docs]
async def delete_session(self, session_id: str) -> None:
"""Delete session and all associated events (cascade).
Args:
session_id: Session identifier.
Notes:
Foreign key constraint ensures events are cascade-deleted.
"""
query = pg_sql.SQL("DELETE FROM {table} WHERE id = %s").format(table=pg_sql.Identifier(self._session_table))
async with self._config.provide_connection() as conn, conn.cursor() as cur:
await cur.execute(query, (session_id,))
[docs]
async def list_sessions(self, app_name: str, user_id: str | None = None) -> "list[SessionRecord]":
"""List sessions for an app, optionally filtered by user.
Args:
app_name: Application name.
user_id: User identifier. If None, lists all sessions for the app.
Returns:
List of session records ordered by update_time DESC.
Notes:
Uses composite index on (app_name, user_id) when user_id is provided.
"""
if user_id is None:
query = pg_sql.SQL("""
SELECT id, app_name, user_id, state, create_time, update_time
FROM {table}
WHERE app_name = %s
ORDER BY update_time DESC
""").format(table=pg_sql.Identifier(self._session_table))
params: tuple[str, ...] = (app_name,)
else:
query = pg_sql.SQL("""
SELECT id, app_name, user_id, state, create_time, update_time
FROM {table}
WHERE app_name = %s AND user_id = %s
ORDER BY update_time DESC
""").format(table=pg_sql.Identifier(self._session_table))
params = (app_name, user_id)
try:
async with self._config.provide_connection() as conn, conn.cursor() as cur:
await cur.execute(query, params)
rows = await cur.fetchall()
return [
SessionRecord(
id=row["id"],
app_name=row["app_name"],
user_id=row["user_id"],
state=row["state"],
create_time=row["create_time"],
update_time=row["update_time"],
)
for row in rows
]
except errors.UndefinedTable:
return []
[docs]
async def append_event(self, event_record: EventRecord) -> None:
"""Append an event to a session.
Args:
event_record: Event record to store.
Notes:
Uses CURRENT_TIMESTAMP for timestamp if not provided.
JSONB fields are wrapped with Jsonb() for PostgreSQL type safety.
"""
content_json = event_record.get("content")
grounding_metadata_json = event_record.get("grounding_metadata")
custom_metadata_json = event_record.get("custom_metadata")
query = pg_sql.SQL("""
INSERT INTO {table} (
id, session_id, app_name, user_id, invocation_id, author, actions,
long_running_tool_ids_json, branch, timestamp, content,
grounding_metadata, custom_metadata, partial, turn_complete,
interrupted, error_code, error_message
) VALUES (
%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s
)
""").format(table=pg_sql.Identifier(self._events_table))
async with self._config.provide_connection() as conn, conn.cursor() as cur:
await cur.execute(
query,
(
event_record["id"],
event_record["session_id"],
event_record["app_name"],
event_record["user_id"],
event_record.get("invocation_id"),
event_record.get("author"),
event_record.get("actions"),
event_record.get("long_running_tool_ids_json"),
event_record.get("branch"),
event_record["timestamp"],
Jsonb(content_json) if content_json is not None else None,
Jsonb(grounding_metadata_json) if grounding_metadata_json is not None else None,
Jsonb(custom_metadata_json) if custom_metadata_json is not None else None,
event_record.get("partial"),
event_record.get("turn_complete"),
event_record.get("interrupted"),
event_record.get("error_code"),
event_record.get("error_message"),
),
)
[docs]
async def get_events(
self, session_id: str, after_timestamp: "datetime | None" = None, limit: "int | None" = None
) -> "list[EventRecord]":
"""Get events for a session.
Args:
session_id: Session identifier.
after_timestamp: Only return events after this time.
limit: Maximum number of events to return.
Returns:
List of event records ordered by timestamp ASC.
Notes:
Uses index on (session_id, timestamp ASC).
JSONB fields are automatically deserialized by psycopg.
BYTEA actions are converted to bytes.
"""
where_clauses = ["session_id = %s"]
params: list[Any] = [session_id]
if after_timestamp is not None:
where_clauses.append("timestamp > %s")
params.append(after_timestamp)
where_clause = " AND ".join(where_clauses)
if limit:
params.append(limit)
query = pg_sql.SQL(
"""
SELECT id, session_id, app_name, user_id, invocation_id, author, actions,
long_running_tool_ids_json, branch, timestamp, content,
grounding_metadata, custom_metadata, partial, turn_complete,
interrupted, error_code, error_message
FROM {table}
WHERE {where_clause}
ORDER BY timestamp ASC{limit_clause}
"""
).format(
table=pg_sql.Identifier(self._events_table),
where_clause=pg_sql.SQL(where_clause), # pyright: ignore[reportArgumentType]
limit_clause=pg_sql.SQL(" LIMIT %s" if limit else ""), # pyright: ignore[reportArgumentType]
)
try:
async with self._config.provide_connection() as conn, conn.cursor() as cur:
await cur.execute(query, tuple(params))
rows = await cur.fetchall()
return [
EventRecord(
id=row["id"],
session_id=row["session_id"],
app_name=row["app_name"],
user_id=row["user_id"],
invocation_id=row["invocation_id"],
author=row["author"],
actions=bytes(row["actions"]) if row["actions"] else b"",
long_running_tool_ids_json=row["long_running_tool_ids_json"],
branch=row["branch"],
timestamp=row["timestamp"],
content=row["content"],
grounding_metadata=row["grounding_metadata"],
custom_metadata=row["custom_metadata"],
partial=row["partial"],
turn_complete=row["turn_complete"],
interrupted=row["interrupted"],
error_code=row["error_code"],
error_message=row["error_message"],
)
for row in rows
]
except errors.UndefinedTable:
return []
[docs]
class PsycopgSyncADKStore(BaseSyncADKStore["PsycopgSyncConfig"]):
"""PostgreSQL synchronous ADK store using Psycopg3 driver.
Implements session and event storage for Google Agent Development Kit
using PostgreSQL via psycopg3 with synchronous execution.
Provides:
- Session state management with JSONB storage and merge operations
- Event history tracking with BYTEA-serialized actions
- Microsecond-precision timestamps with TIMESTAMPTZ
- Foreign key constraints with cascade delete
- Efficient upserts using ON CONFLICT
- GIN indexes for JSONB queries
- HOT updates with FILLFACTOR 80
Args:
config: PsycopgSyncConfig with extension_config["adk"] settings.
Example:
from sqlspec.adapters.psycopg import PsycopgSyncConfig
from sqlspec.adapters.psycopg.adk import PsycopgSyncADKStore
config = PsycopgSyncConfig(
pool_config={"conninfo": "postgresql://..."},
extension_config={
"adk": {
"session_table": "my_sessions",
"events_table": "my_events",
"owner_id_column": "tenant_id INTEGER NOT NULL REFERENCES tenants(id) ON DELETE CASCADE"
}
}
)
store = PsycopgSyncADKStore(config)
store.create_tables()
Notes:
- PostgreSQL JSONB type used for state (more efficient than JSON)
- Psycopg requires wrapping dicts with Jsonb() for type safety
- TIMESTAMPTZ provides timezone-aware microsecond precision
- State merging uses `state || $1::jsonb` operator for efficiency
- BYTEA for pre-serialized actions from Google ADK
- GIN index on state for JSONB queries (partial index)
- FILLFACTOR 80 leaves space for HOT updates
- Parameter style: $1, $2, $3 (PostgreSQL numeric placeholders)
- Configuration is read from config.extension_config["adk"]
"""
__slots__ = ()
[docs]
def __init__(self, config: "PsycopgSyncConfig") -> None:
"""Initialize Psycopg synchronous ADK store.
Args:
config: PsycopgSyncConfig instance.
Notes:
Configuration is read from config.extension_config["adk"]:
- session_table: Sessions table name (default: "adk_sessions")
- events_table: Events table name (default: "adk_events")
- owner_id_column: Optional owner FK column DDL (default: None)
"""
super().__init__(config)
def _get_create_sessions_table_sql(self) -> str:
"""Get PostgreSQL CREATE TABLE SQL for sessions.
Returns:
SQL statement to create adk_sessions table with indexes.
Notes:
- VARCHAR(128) for IDs and names (sufficient for UUIDs and app names)
- JSONB type for state storage with default empty object
- TIMESTAMPTZ with microsecond precision
- FILLFACTOR 80 for HOT updates (reduces table bloat)
- Composite index on (app_name, user_id) for listing
- Index on update_time DESC for recent session queries
- Partial GIN index on state for JSONB queries (only non-empty)
- Optional owner ID column for multi-tenancy or user references
"""
owner_id_line = ""
if self._owner_id_column_ddl:
owner_id_line = f",\n {self._owner_id_column_ddl}"
return f"""
CREATE TABLE IF NOT EXISTS {self._session_table} (
id VARCHAR(128) PRIMARY KEY,
app_name VARCHAR(128) NOT NULL,
user_id VARCHAR(128) NOT NULL{owner_id_line},
state JSONB NOT NULL DEFAULT '{{}}'::jsonb,
create_time TIMESTAMPTZ NOT NULL DEFAULT CURRENT_TIMESTAMP,
update_time TIMESTAMPTZ NOT NULL DEFAULT CURRENT_TIMESTAMP
) WITH (fillfactor = 80);
CREATE INDEX IF NOT EXISTS idx_{self._session_table}_app_user
ON {self._session_table}(app_name, user_id);
CREATE INDEX IF NOT EXISTS idx_{self._session_table}_update_time
ON {self._session_table}(update_time DESC);
CREATE INDEX IF NOT EXISTS idx_{self._session_table}_state
ON {self._session_table} USING GIN (state)
WHERE state != '{{}}'::jsonb;
"""
def _get_create_events_table_sql(self) -> str:
"""Get PostgreSQL CREATE TABLE SQL for events.
Returns:
SQL statement to create adk_events table with indexes.
Notes:
- VARCHAR sizes: id(128), session_id(128), invocation_id(256), author(256),
branch(256), error_code(256), error_message(1024)
- BYTEA for pickled actions (no size limit)
- JSONB for content, grounding_metadata, custom_metadata, long_running_tool_ids_json
- BOOLEAN for partial, turn_complete, interrupted
- Foreign key to sessions with CASCADE delete
- Index on (session_id, timestamp ASC) for ordered event retrieval
"""
return f"""
CREATE TABLE IF NOT EXISTS {self._events_table} (
id VARCHAR(128) PRIMARY KEY,
session_id VARCHAR(128) NOT NULL,
app_name VARCHAR(128) NOT NULL,
user_id VARCHAR(128) NOT NULL,
invocation_id VARCHAR(256),
author VARCHAR(256),
actions BYTEA,
long_running_tool_ids_json JSONB,
branch VARCHAR(256),
timestamp TIMESTAMPTZ NOT NULL DEFAULT CURRENT_TIMESTAMP,
content JSONB,
grounding_metadata JSONB,
custom_metadata JSONB,
partial BOOLEAN,
turn_complete BOOLEAN,
interrupted BOOLEAN,
error_code VARCHAR(256),
error_message VARCHAR(1024),
FOREIGN KEY (session_id) REFERENCES {self._session_table}(id) ON DELETE CASCADE
);
CREATE INDEX IF NOT EXISTS idx_{self._events_table}_session
ON {self._events_table}(session_id, timestamp ASC);
"""
def _get_drop_tables_sql(self) -> "list[str]":
"""Get PostgreSQL DROP TABLE SQL statements.
Returns:
List of SQL statements to drop tables and indexes.
Notes:
Order matters: drop events table (child) before sessions (parent).
PostgreSQL automatically drops indexes when dropping tables.
"""
return [f"DROP TABLE IF EXISTS {self._events_table}", f"DROP TABLE IF EXISTS {self._session_table}"]
[docs]
def create_tables(self) -> None:
"""Create both sessions and events tables if they don't exist."""
with self._config.provide_session() as driver:
driver.execute_script(self._get_create_sessions_table_sql())
driver.execute_script(self._get_create_events_table_sql())
logger.debug("Created ADK tables: %s, %s", self._session_table, self._events_table)
[docs]
def create_session(
self, session_id: str, app_name: str, user_id: str, state: "dict[str, Any]", owner_id: "Any | None" = None
) -> SessionRecord:
"""Create a new session.
Args:
session_id: Unique session identifier.
app_name: Application name.
user_id: User identifier.
state: Initial session state.
owner_id: Optional owner ID value for owner_id_column (if configured).
Returns:
Created session record.
Notes:
Uses CURRENT_TIMESTAMP for create_time and update_time.
State is wrapped with Jsonb() for PostgreSQL type safety.
If owner_id_column is configured, owner_id value must be provided.
"""
params: tuple[Any, ...]
if self._owner_id_column_name:
query = pg_sql.SQL("""
INSERT INTO {table} (id, app_name, user_id, {owner_id_col}, state, create_time, update_time)
VALUES (%s, %s, %s, %s, %s, CURRENT_TIMESTAMP, CURRENT_TIMESTAMP)
""").format(
table=pg_sql.Identifier(self._session_table), owner_id_col=pg_sql.Identifier(self._owner_id_column_name)
)
params = (session_id, app_name, user_id, owner_id, Jsonb(state))
else:
query = pg_sql.SQL("""
INSERT INTO {table} (id, app_name, user_id, state, create_time, update_time)
VALUES (%s, %s, %s, %s, CURRENT_TIMESTAMP, CURRENT_TIMESTAMP)
""").format(table=pg_sql.Identifier(self._session_table))
params = (session_id, app_name, user_id, Jsonb(state))
with self._config.provide_connection() as conn, conn.cursor() as cur:
cur.execute(query, params)
return self.get_session(session_id) # type: ignore[return-value]
[docs]
def get_session(self, session_id: str) -> "SessionRecord | None":
"""Get session by ID.
Args:
session_id: Session identifier.
Returns:
Session record or None if not found.
Notes:
PostgreSQL returns datetime objects for TIMESTAMPTZ columns.
JSONB is automatically deserialized by psycopg to Python dict.
"""
query = pg_sql.SQL("""
SELECT id, app_name, user_id, state, create_time, update_time
FROM {table}
WHERE id = %s
""").format(table=pg_sql.Identifier(self._session_table))
try:
with self._config.provide_connection() as conn, conn.cursor() as cur:
cur.execute(query, (session_id,))
row = cur.fetchone()
if row is None:
return None
return SessionRecord(
id=row["id"],
app_name=row["app_name"],
user_id=row["user_id"],
state=row["state"],
create_time=row["create_time"],
update_time=row["update_time"],
)
except errors.UndefinedTable:
return None
[docs]
def update_session_state(self, session_id: str, state: "dict[str, Any]") -> None:
"""Update session state.
Args:
session_id: Session identifier.
state: New state dictionary (replaces existing state).
Notes:
This replaces the entire state dictionary.
Uses CURRENT_TIMESTAMP for update_time.
State is wrapped with Jsonb() for PostgreSQL type safety.
"""
query = pg_sql.SQL("""
UPDATE {table}
SET state = %s, update_time = CURRENT_TIMESTAMP
WHERE id = %s
""").format(table=pg_sql.Identifier(self._session_table))
with self._config.provide_connection() as conn, conn.cursor() as cur:
cur.execute(query, (Jsonb(state), session_id))
[docs]
def delete_session(self, session_id: str) -> None:
"""Delete session and all associated events (cascade).
Args:
session_id: Session identifier.
Notes:
Foreign key constraint ensures events are cascade-deleted.
"""
query = pg_sql.SQL("DELETE FROM {table} WHERE id = %s").format(table=pg_sql.Identifier(self._session_table))
with self._config.provide_connection() as conn, conn.cursor() as cur:
cur.execute(query, (session_id,))
[docs]
def list_sessions(self, app_name: str, user_id: str | None = None) -> "list[SessionRecord]":
"""List sessions for an app, optionally filtered by user.
Args:
app_name: Application name.
user_id: User identifier. If None, lists all sessions for the app.
Returns:
List of session records ordered by update_time DESC.
Notes:
Uses composite index on (app_name, user_id) when user_id is provided.
"""
if user_id is None:
query = pg_sql.SQL("""
SELECT id, app_name, user_id, state, create_time, update_time
FROM {table}
WHERE app_name = %s
ORDER BY update_time DESC
""").format(table=pg_sql.Identifier(self._session_table))
params: tuple[str, ...] = (app_name,)
else:
query = pg_sql.SQL("""
SELECT id, app_name, user_id, state, create_time, update_time
FROM {table}
WHERE app_name = %s AND user_id = %s
ORDER BY update_time DESC
""").format(table=pg_sql.Identifier(self._session_table))
params = (app_name, user_id)
try:
with self._config.provide_connection() as conn, conn.cursor() as cur:
cur.execute(query, params)
rows = cur.fetchall()
return [
SessionRecord(
id=row["id"],
app_name=row["app_name"],
user_id=row["user_id"],
state=row["state"],
create_time=row["create_time"],
update_time=row["update_time"],
)
for row in rows
]
except errors.UndefinedTable:
return []
[docs]
def create_event(
self,
event_id: str,
session_id: str,
app_name: str,
user_id: str,
author: "str | None" = None,
actions: "bytes | None" = None,
content: "dict[str, Any] | None" = None,
**kwargs: Any,
) -> EventRecord:
"""Create a new event.
Args:
event_id: Unique event identifier.
session_id: Session identifier.
app_name: Application name.
user_id: User identifier.
author: Event author (user/assistant/system).
actions: Pickled actions object.
content: Event content (JSONB).
**kwargs: Additional optional fields (invocation_id, branch, timestamp,
grounding_metadata, custom_metadata, partial, turn_complete,
interrupted, error_code, error_message, long_running_tool_ids_json).
Returns:
Created event record.
Notes:
Uses CURRENT_TIMESTAMP for timestamp if not provided in kwargs.
JSONB fields are wrapped with Jsonb() for PostgreSQL type safety.
"""
content_json = Jsonb(content) if content is not None else None
grounding_metadata = kwargs.get("grounding_metadata")
grounding_metadata_json = Jsonb(grounding_metadata) if grounding_metadata is not None else None
custom_metadata = kwargs.get("custom_metadata")
custom_metadata_json = Jsonb(custom_metadata) if custom_metadata is not None else None
query = pg_sql.SQL("""
INSERT INTO {table} (
id, session_id, app_name, user_id, invocation_id, author, actions,
long_running_tool_ids_json, branch, timestamp, content,
grounding_metadata, custom_metadata, partial, turn_complete,
interrupted, error_code, error_message
) VALUES (
%s, %s, %s, %s, %s, %s, %s, %s, %s, COALESCE(%s, CURRENT_TIMESTAMP), %s, %s, %s, %s, %s, %s, %s, %s
)
RETURNING id, session_id, app_name, user_id, invocation_id, author, actions,
long_running_tool_ids_json, branch, timestamp, content,
grounding_metadata, custom_metadata, partial, turn_complete,
interrupted, error_code, error_message
""").format(table=pg_sql.Identifier(self._events_table))
with self._config.provide_connection() as conn, conn.cursor() as cur:
cur.execute(
query,
(
event_id,
session_id,
app_name,
user_id,
kwargs.get("invocation_id"),
author,
actions,
kwargs.get("long_running_tool_ids_json"),
kwargs.get("branch"),
kwargs.get("timestamp"),
content_json,
grounding_metadata_json,
custom_metadata_json,
kwargs.get("partial"),
kwargs.get("turn_complete"),
kwargs.get("interrupted"),
kwargs.get("error_code"),
kwargs.get("error_message"),
),
)
row = cur.fetchone()
if row is None:
msg = f"Failed to create event {event_id}"
raise RuntimeError(msg)
return EventRecord(
id=row["id"],
session_id=row["session_id"],
app_name=row["app_name"],
user_id=row["user_id"],
invocation_id=row["invocation_id"],
author=row["author"],
actions=bytes(row["actions"]) if row["actions"] else b"",
long_running_tool_ids_json=row["long_running_tool_ids_json"],
branch=row["branch"],
timestamp=row["timestamp"],
content=row["content"],
grounding_metadata=row["grounding_metadata"],
custom_metadata=row["custom_metadata"],
partial=row["partial"],
turn_complete=row["turn_complete"],
interrupted=row["interrupted"],
error_code=row["error_code"],
error_message=row["error_message"],
)
[docs]
def list_events(self, session_id: str) -> "list[EventRecord]":
"""List events for a session ordered by timestamp.
Args:
session_id: Session identifier.
Returns:
List of event records ordered by timestamp ASC.
Notes:
Uses index on (session_id, timestamp ASC).
JSONB fields are automatically deserialized by psycopg.
BYTEA actions are converted to bytes.
"""
query = pg_sql.SQL("""
SELECT id, session_id, app_name, user_id, invocation_id, author, actions,
long_running_tool_ids_json, branch, timestamp, content,
grounding_metadata, custom_metadata, partial, turn_complete,
interrupted, error_code, error_message
FROM {table}
WHERE session_id = %s
ORDER BY timestamp ASC
""").format(table=pg_sql.Identifier(self._events_table))
try:
with self._config.provide_connection() as conn, conn.cursor() as cur:
cur.execute(query, (session_id,))
rows = cur.fetchall()
return [
EventRecord(
id=row["id"],
session_id=row["session_id"],
app_name=row["app_name"],
user_id=row["user_id"],
invocation_id=row["invocation_id"],
author=row["author"],
actions=bytes(row["actions"]) if row["actions"] else b"",
long_running_tool_ids_json=row["long_running_tool_ids_json"],
branch=row["branch"],
timestamp=row["timestamp"],
content=row["content"],
grounding_metadata=row["grounding_metadata"],
custom_metadata=row["custom_metadata"],
partial=row["partial"],
turn_complete=row["turn_complete"],
interrupted=row["interrupted"],
error_code=row["error_code"],
error_message=row["error_message"],
)
for row in rows
]
except errors.UndefinedTable:
return []