"""ADBC ADK store for Google Agent Development Kit session/event storage."""
from typing import TYPE_CHECKING, Any, Final
from sqlspec.extensions.adk import BaseSyncADKStore, EventRecord, SessionRecord
from sqlspec.utils.logging import get_logger
from sqlspec.utils.serializers import from_json, to_json
if TYPE_CHECKING:
from sqlspec.adapters.adbc.config import AdbcConfig
logger = get_logger("adapters.adbc.adk.store")
__all__ = ("AdbcADKStore",)
DIALECT_POSTGRESQL: Final = "postgresql"
DIALECT_SQLITE: Final = "sqlite"
DIALECT_DUCKDB: Final = "duckdb"
DIALECT_SNOWFLAKE: Final = "snowflake"
DIALECT_GENERIC: Final = "generic"
ADBC_TABLE_NOT_FOUND_PATTERNS: Final = ("no such table", "table or view does not exist", "relation does not exist")
[docs]
class AdbcADKStore(BaseSyncADKStore["AdbcConfig"]):
"""ADBC synchronous ADK store for Arrow Database Connectivity.
Implements session and event storage for Google Agent Development Kit
using ADBC. ADBC provides a vendor-neutral API with Arrow-native data
transfer across multiple databases (PostgreSQL, SQLite, DuckDB, etc.).
Provides:
- Session state management with JSON serialization (TEXT storage)
- Event history tracking with BLOB-serialized actions
- Timezone-aware timestamps
- Foreign key constraints with cascade delete
- Database-agnostic SQL (supports multiple backends)
Args:
config: AdbcConfig with extension_config["adk"] settings.
Example:
from sqlspec.adapters.adbc import AdbcConfig
from sqlspec.adapters.adbc.adk import AdbcADKStore
config = AdbcConfig(
connection_config={"driver_name": "sqlite", "uri": ":memory:"},
extension_config={
"adk": {
"session_table": "my_sessions",
"events_table": "my_events",
"owner_id_column": "tenant_id INTEGER REFERENCES tenants(id)"
}
}
)
store = AdbcADKStore(config)
store.create_tables()
Notes:
- TEXT for JSON storage (compatible across all ADBC backends)
- BLOB for pre-serialized actions from Google ADK
- TIMESTAMP for timezone-aware timestamps (driver-dependent precision)
- INTEGER for booleans (0/1/NULL)
- Parameter style varies by backend (?, $1, :name, etc.)
- Uses dialect-agnostic SQL for maximum compatibility
- State and JSON fields use to_json/from_json for serialization
- ADBC drivers handle parameter binding automatically
- Configuration is read from config.extension_config["adk"]
"""
__slots__ = ("_dialect",)
[docs]
def __init__(self, config: "AdbcConfig") -> None:
"""Initialize ADBC ADK store.
Args:
config: AdbcConfig instance (any ADBC driver).
Notes:
Configuration is read from config.extension_config["adk"]:
- session_table: Sessions table name (default: "adk_sessions")
- events_table: Events table name (default: "adk_events")
- owner_id_column: Optional owner FK column DDL (default: None)
"""
super().__init__(config)
self._dialect = self._detect_dialect()
@property
def dialect(self) -> str:
"""Return the detected database dialect."""
return self._dialect
def _detect_dialect(self) -> str:
"""Detect ADBC driver dialect from connection config.
Returns:
Dialect identifier for DDL generation.
Notes:
Reads from config.connection_config driver_name.
Falls back to generic for unknown drivers.
"""
driver_name = self._config.connection_config.get("driver_name", "").lower()
if "postgres" in driver_name:
return DIALECT_POSTGRESQL
if "sqlite" in driver_name:
return DIALECT_SQLITE
if "duckdb" in driver_name:
return DIALECT_DUCKDB
if "snowflake" in driver_name:
return DIALECT_SNOWFLAKE
logger.warning(
"Unknown ADBC driver: %s. Using generic SQL dialect. "
"Consider using a direct adapter for better performance.",
driver_name,
)
return DIALECT_GENERIC
def _serialize_state(self, state: "dict[str, Any]") -> str:
"""Serialize state dictionary to JSON string.
Args:
state: State dictionary to serialize.
Returns:
JSON string.
"""
return to_json(state)
def _deserialize_state(self, data: Any) -> "dict[str, Any]":
"""Deserialize state data from JSON string.
Args:
data: JSON string from database.
Returns:
Deserialized state dictionary.
"""
if data is None:
return {}
return from_json(str(data)) # type: ignore[no-any-return]
def _serialize_json_field(self, value: Any) -> "str | None":
"""Serialize optional JSON field for event storage.
Args:
value: Value to serialize (dict or None).
Returns:
Serialized JSON string or None.
"""
if value is None:
return None
return to_json(value)
def _deserialize_json_field(self, data: Any) -> "dict[str, Any] | None":
"""Deserialize optional JSON field from database.
Args:
data: JSON string from database or None.
Returns:
Deserialized dictionary or None.
"""
if data is None:
return None
return from_json(str(data)) # type: ignore[no-any-return]
def _get_create_sessions_table_sql(self) -> str:
"""Get CREATE TABLE SQL for sessions with dialect dispatch.
Returns:
SQL statement to create adk_sessions table.
"""
if self._dialect == DIALECT_POSTGRESQL:
return self._get_sessions_ddl_postgresql()
if self._dialect == DIALECT_SQLITE:
return self._get_sessions_ddl_sqlite()
if self._dialect == DIALECT_DUCKDB:
return self._get_sessions_ddl_duckdb()
if self._dialect == DIALECT_SNOWFLAKE:
return self._get_sessions_ddl_snowflake()
return self._get_sessions_ddl_generic()
def _get_sessions_ddl_postgresql(self) -> str:
"""PostgreSQL DDL with JSONB and TIMESTAMPTZ.
Returns:
SQL to create sessions table optimized for PostgreSQL.
"""
owner_id_ddl = f", {self._owner_id_column_ddl}" if self._owner_id_column_ddl else ""
return f"""
CREATE TABLE IF NOT EXISTS {self._session_table} (
id VARCHAR(128) PRIMARY KEY,
app_name VARCHAR(128) NOT NULL,
user_id VARCHAR(128) NOT NULL{owner_id_ddl},
state JSONB NOT NULL DEFAULT '{{}}'::jsonb,
create_time TIMESTAMPTZ NOT NULL DEFAULT CURRENT_TIMESTAMP,
update_time TIMESTAMPTZ NOT NULL DEFAULT CURRENT_TIMESTAMP
)
"""
def _get_sessions_ddl_sqlite(self) -> str:
"""SQLite DDL with TEXT and REAL timestamps.
Returns:
SQL to create sessions table optimized for SQLite.
"""
owner_id_ddl = f", {self._owner_id_column_ddl}" if self._owner_id_column_ddl else ""
return f"""
CREATE TABLE IF NOT EXISTS {self._session_table} (
id TEXT PRIMARY KEY,
app_name TEXT NOT NULL,
user_id TEXT NOT NULL{owner_id_ddl},
state TEXT NOT NULL DEFAULT '{{}}',
create_time REAL NOT NULL,
update_time REAL NOT NULL
)
"""
def _get_sessions_ddl_duckdb(self) -> str:
"""DuckDB DDL with native JSON type.
Returns:
SQL to create sessions table optimized for DuckDB.
"""
owner_id_ddl = f", {self._owner_id_column_ddl}" if self._owner_id_column_ddl else ""
return f"""
CREATE TABLE IF NOT EXISTS {self._session_table} (
id VARCHAR(128) PRIMARY KEY,
app_name VARCHAR(128) NOT NULL,
user_id VARCHAR(128) NOT NULL{owner_id_ddl},
state JSON NOT NULL,
create_time TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP,
update_time TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP
)
"""
def _get_sessions_ddl_snowflake(self) -> str:
"""Snowflake DDL with VARIANT type.
Returns:
SQL to create sessions table optimized for Snowflake.
"""
owner_id_ddl = f", {self._owner_id_column_ddl}" if self._owner_id_column_ddl else ""
return f"""
CREATE TABLE IF NOT EXISTS {self._session_table} (
id VARCHAR PRIMARY KEY,
app_name VARCHAR NOT NULL,
user_id VARCHAR NOT NULL{owner_id_ddl},
state VARIANT NOT NULL,
create_time TIMESTAMP_TZ NOT NULL DEFAULT CURRENT_TIMESTAMP(),
update_time TIMESTAMP_TZ NOT NULL DEFAULT CURRENT_TIMESTAMP()
)
"""
def _get_sessions_ddl_generic(self) -> str:
"""Generic SQL-92 compatible DDL fallback.
Returns:
SQL to create sessions table using generic types.
"""
owner_id_ddl = f", {self._owner_id_column_ddl}" if self._owner_id_column_ddl else ""
return f"""
CREATE TABLE IF NOT EXISTS {self._session_table} (
id VARCHAR(128) PRIMARY KEY,
app_name VARCHAR(128) NOT NULL,
user_id VARCHAR(128) NOT NULL{owner_id_ddl},
state TEXT NOT NULL DEFAULT '{{}}',
create_time TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP,
update_time TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP
)
"""
def _get_create_events_table_sql(self) -> str:
"""Get CREATE TABLE SQL for events with dialect dispatch.
Returns:
SQL statement to create adk_events table.
"""
if self._dialect == DIALECT_POSTGRESQL:
return self._get_events_ddl_postgresql()
if self._dialect == DIALECT_SQLITE:
return self._get_events_ddl_sqlite()
if self._dialect == DIALECT_DUCKDB:
return self._get_events_ddl_duckdb()
if self._dialect == DIALECT_SNOWFLAKE:
return self._get_events_ddl_snowflake()
return self._get_events_ddl_generic()
def _get_events_ddl_postgresql(self) -> str:
"""PostgreSQL DDL for events table.
Returns:
SQL to create events table optimized for PostgreSQL.
"""
return f"""
CREATE TABLE IF NOT EXISTS {self._events_table} (
id VARCHAR(128) PRIMARY KEY,
session_id VARCHAR(128) NOT NULL,
app_name VARCHAR(128) NOT NULL,
user_id VARCHAR(128) NOT NULL,
invocation_id VARCHAR(256),
author VARCHAR(256),
actions BYTEA,
long_running_tool_ids_json TEXT,
branch VARCHAR(256),
timestamp TIMESTAMPTZ NOT NULL DEFAULT CURRENT_TIMESTAMP,
content JSONB,
grounding_metadata JSONB,
custom_metadata JSONB,
partial BOOLEAN,
turn_complete BOOLEAN,
interrupted BOOLEAN,
error_code VARCHAR(256),
error_message VARCHAR(1024),
FOREIGN KEY (session_id) REFERENCES {self._session_table}(id) ON DELETE CASCADE
)
"""
def _get_events_ddl_sqlite(self) -> str:
"""SQLite DDL for events table.
Returns:
SQL to create events table optimized for SQLite.
"""
return f"""
CREATE TABLE IF NOT EXISTS {self._events_table} (
id TEXT PRIMARY KEY,
session_id TEXT NOT NULL,
app_name TEXT NOT NULL,
user_id TEXT NOT NULL,
invocation_id TEXT,
author TEXT,
actions BLOB,
long_running_tool_ids_json TEXT,
branch TEXT,
timestamp REAL NOT NULL,
content TEXT,
grounding_metadata TEXT,
custom_metadata TEXT,
partial INTEGER,
turn_complete INTEGER,
interrupted INTEGER,
error_code TEXT,
error_message TEXT,
FOREIGN KEY (session_id) REFERENCES {self._session_table}(id) ON DELETE CASCADE
)
"""
def _get_events_ddl_duckdb(self) -> str:
"""DuckDB DDL for events table.
Returns:
SQL to create events table optimized for DuckDB.
"""
return f"""
CREATE TABLE IF NOT EXISTS {self._events_table} (
id VARCHAR(128) PRIMARY KEY,
session_id VARCHAR(128) NOT NULL,
app_name VARCHAR(128) NOT NULL,
user_id VARCHAR(128) NOT NULL,
invocation_id VARCHAR(256),
author VARCHAR(256),
actions BLOB,
long_running_tool_ids_json VARCHAR,
branch VARCHAR(256),
timestamp TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP,
content JSON,
grounding_metadata JSON,
custom_metadata JSON,
partial BOOLEAN,
turn_complete BOOLEAN,
interrupted BOOLEAN,
error_code VARCHAR(256),
error_message VARCHAR(1024),
FOREIGN KEY (session_id) REFERENCES {self._session_table}(id) ON DELETE CASCADE
)
"""
def _get_events_ddl_snowflake(self) -> str:
"""Snowflake DDL for events table.
Returns:
SQL to create events table optimized for Snowflake.
"""
return f"""
CREATE TABLE IF NOT EXISTS {self._events_table} (
id VARCHAR PRIMARY KEY,
session_id VARCHAR NOT NULL,
app_name VARCHAR NOT NULL,
user_id VARCHAR NOT NULL,
invocation_id VARCHAR,
author VARCHAR,
actions BINARY,
long_running_tool_ids_json VARCHAR,
branch VARCHAR,
timestamp TIMESTAMP_TZ NOT NULL DEFAULT CURRENT_TIMESTAMP(),
content VARIANT,
grounding_metadata VARIANT,
custom_metadata VARIANT,
partial BOOLEAN,
turn_complete BOOLEAN,
interrupted BOOLEAN,
error_code VARCHAR,
error_message VARCHAR,
FOREIGN KEY (session_id) REFERENCES {self._session_table}(id)
)
"""
def _get_events_ddl_generic(self) -> str:
"""Generic SQL-92 compatible DDL for events table.
Returns:
SQL to create events table using generic types.
"""
return f"""
CREATE TABLE IF NOT EXISTS {self._events_table} (
id VARCHAR(128) PRIMARY KEY,
session_id VARCHAR(128) NOT NULL,
app_name VARCHAR(128) NOT NULL,
user_id VARCHAR(128) NOT NULL,
invocation_id VARCHAR(256),
author VARCHAR(256),
actions BLOB,
long_running_tool_ids_json TEXT,
branch VARCHAR(256),
timestamp TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP,
content TEXT,
grounding_metadata TEXT,
custom_metadata TEXT,
partial INTEGER,
turn_complete INTEGER,
interrupted INTEGER,
error_code VARCHAR(256),
error_message VARCHAR(1024),
FOREIGN KEY (session_id) REFERENCES {self._session_table}(id) ON DELETE CASCADE
)
"""
def _get_drop_tables_sql(self) -> "list[str]":
"""Get DROP TABLE SQL statements.
Returns:
List of SQL statements to drop tables and indexes.
Notes:
Order matters: drop events table (child) before sessions (parent).
Most databases automatically drop indexes when dropping tables.
"""
return [f"DROP TABLE IF EXISTS {self._events_table}", f"DROP TABLE IF EXISTS {self._session_table}"]
[docs]
def create_tables(self) -> None:
"""Create both sessions and events tables if they don't exist."""
with self._config.provide_connection() as conn:
cursor = conn.cursor()
try:
self._enable_foreign_keys(cursor, conn)
cursor.execute(self._get_create_sessions_table_sql())
conn.commit()
sessions_idx_app_user = (
f"CREATE INDEX IF NOT EXISTS idx_{self._session_table}_app_user "
f"ON {self._session_table}(app_name, user_id)"
)
cursor.execute(sessions_idx_app_user)
conn.commit()
sessions_idx_update = (
f"CREATE INDEX IF NOT EXISTS idx_{self._session_table}_update_time "
f"ON {self._session_table}(update_time DESC)"
)
cursor.execute(sessions_idx_update)
conn.commit()
cursor.execute(self._get_create_events_table_sql())
conn.commit()
events_idx = (
f"CREATE INDEX IF NOT EXISTS idx_{self._events_table}_session "
f"ON {self._events_table}(session_id, timestamp ASC)"
)
cursor.execute(events_idx)
conn.commit()
finally:
cursor.close() # type: ignore[no-untyped-call]
logger.debug("Created ADK tables: %s, %s", self._session_table, self._events_table)
def _enable_foreign_keys(self, cursor: Any, conn: Any) -> None:
"""Enable foreign key constraints for SQLite.
Args:
cursor: Database cursor.
conn: Database connection.
Notes:
SQLite requires PRAGMA foreign_keys = ON to be set per connection.
This is a no-op for other databases.
"""
try:
cursor.execute("PRAGMA foreign_keys = ON")
conn.commit()
except Exception:
logger.debug("Foreign key enforcement not supported or already enabled")
[docs]
def create_session(
self, session_id: str, app_name: str, user_id: str, state: "dict[str, Any]", owner_id: "Any | None" = None
) -> SessionRecord:
"""Create a new session.
Args:
session_id: Unique session identifier.
app_name: Application name.
user_id: User identifier.
state: Initial session state.
owner_id: Optional owner ID value for owner_id_column (can be None for nullable columns).
Returns:
Created session record.
"""
state_json = self._serialize_state(state)
params: tuple[Any, ...]
if self._owner_id_column_name:
sql = f"""
INSERT INTO {self._session_table}
(id, app_name, user_id, {self._owner_id_column_name}, state, create_time, update_time)
VALUES (?, ?, ?, ?, ?, CURRENT_TIMESTAMP, CURRENT_TIMESTAMP)
"""
params = (session_id, app_name, user_id, owner_id, state_json)
else:
sql = f"""
INSERT INTO {self._session_table} (id, app_name, user_id, state, create_time, update_time)
VALUES (?, ?, ?, ?, CURRENT_TIMESTAMP, CURRENT_TIMESTAMP)
"""
params = (session_id, app_name, user_id, state_json)
with self._config.provide_connection() as conn:
cursor = conn.cursor()
try:
cursor.execute(sql, params)
conn.commit()
finally:
cursor.close() # type: ignore[no-untyped-call]
return self.get_session(session_id) # type: ignore[return-value]
[docs]
def get_session(self, session_id: str) -> "SessionRecord | None":
"""Get session by ID.
Args:
session_id: Session identifier.
Returns:
Session record or None if not found.
Notes:
State is deserialized from JSON string.
"""
sql = f"""
SELECT id, app_name, user_id, state, create_time, update_time
FROM {self._session_table}
WHERE id = ?
"""
try:
with self._config.provide_connection() as conn:
cursor = conn.cursor()
try:
cursor.execute(sql, (session_id,))
row = cursor.fetchone()
if row is None:
return None
return SessionRecord(
id=row[0],
app_name=row[1],
user_id=row[2],
state=self._deserialize_state(row[3]),
create_time=row[4],
update_time=row[5],
)
finally:
cursor.close() # type: ignore[no-untyped-call]
except Exception as e:
error_msg = str(e).lower()
if any(pattern in error_msg for pattern in ADBC_TABLE_NOT_FOUND_PATTERNS):
return None
raise
[docs]
def update_session_state(self, session_id: str, state: "dict[str, Any]") -> None:
"""Update session state.
Args:
session_id: Session identifier.
state: New state dictionary (replaces existing state).
Notes:
This replaces the entire state dictionary.
Updates update_time to current timestamp.
"""
state_json = self._serialize_state(state)
sql = f"""
UPDATE {self._session_table}
SET state = ?, update_time = CURRENT_TIMESTAMP
WHERE id = ?
"""
with self._config.provide_connection() as conn:
cursor = conn.cursor()
try:
cursor.execute(sql, (state_json, session_id))
conn.commit()
finally:
cursor.close() # type: ignore[no-untyped-call]
[docs]
def delete_session(self, session_id: str) -> None:
"""Delete session and all associated events (cascade).
Args:
session_id: Session identifier.
Notes:
Foreign key constraint ensures events are cascade-deleted.
"""
sql = f"DELETE FROM {self._session_table} WHERE id = ?"
with self._config.provide_connection() as conn:
cursor = conn.cursor()
try:
self._enable_foreign_keys(cursor, conn)
cursor.execute(sql, (session_id,))
conn.commit()
finally:
cursor.close() # type: ignore[no-untyped-call]
[docs]
def list_sessions(self, app_name: str, user_id: str | None = None) -> "list[SessionRecord]":
"""List sessions for an app, optionally filtered by user.
Args:
app_name: Application name.
user_id: User identifier. If None, lists all sessions for the app.
Returns:
List of session records ordered by update_time DESC.
Notes:
Uses composite index on (app_name, user_id) when user_id is provided.
"""
if user_id is None:
sql = f"""
SELECT id, app_name, user_id, state, create_time, update_time
FROM {self._session_table}
WHERE app_name = ?
ORDER BY update_time DESC
"""
params: tuple[str, ...] = (app_name,)
else:
sql = f"""
SELECT id, app_name, user_id, state, create_time, update_time
FROM {self._session_table}
WHERE app_name = ? AND user_id = ?
ORDER BY update_time DESC
"""
params = (app_name, user_id)
try:
with self._config.provide_connection() as conn:
cursor = conn.cursor()
try:
cursor.execute(sql, params)
rows = cursor.fetchall()
return [
SessionRecord(
id=row[0],
app_name=row[1],
user_id=row[2],
state=self._deserialize_state(row[3]),
create_time=row[4],
update_time=row[5],
)
for row in rows
]
finally:
cursor.close() # type: ignore[no-untyped-call]
except Exception as e:
error_msg = str(e).lower()
if any(pattern in error_msg for pattern in ADBC_TABLE_NOT_FOUND_PATTERNS):
return []
raise
[docs]
def create_event(
self,
event_id: str,
session_id: str,
app_name: str,
user_id: str,
author: "str | None" = None,
actions: "bytes | None" = None,
content: "dict[str, Any] | None" = None,
**kwargs: Any,
) -> "EventRecord":
"""Create a new event.
Args:
event_id: Unique event identifier.
session_id: Session identifier.
app_name: Application name.
user_id: User identifier.
author: Event author (user/assistant/system).
actions: Pickled actions object.
content: Event content (JSON).
**kwargs: Additional optional fields.
Returns:
Created event record.
Notes:
Uses CURRENT_TIMESTAMP for timestamp if not provided.
JSON fields are serialized to JSON strings.
Boolean fields are converted to INTEGER (0/1).
"""
content_json = self._serialize_json_field(content)
grounding_metadata_json = self._serialize_json_field(kwargs.get("grounding_metadata"))
custom_metadata_json = self._serialize_json_field(kwargs.get("custom_metadata"))
partial_int = self._to_int_bool(kwargs.get("partial"))
turn_complete_int = self._to_int_bool(kwargs.get("turn_complete"))
interrupted_int = self._to_int_bool(kwargs.get("interrupted"))
sql = f"""
INSERT INTO {self._events_table} (
id, session_id, app_name, user_id, invocation_id, author, actions,
long_running_tool_ids_json, branch, timestamp, content,
grounding_metadata, custom_metadata, partial, turn_complete,
interrupted, error_code, error_message
) VALUES (
?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?
)
"""
timestamp = kwargs.get("timestamp")
if timestamp is None:
from datetime import datetime, timezone
timestamp = datetime.now(timezone.utc)
with self._config.provide_connection() as conn:
cursor = conn.cursor()
try:
cursor.execute(
sql,
(
event_id,
session_id,
app_name,
user_id,
kwargs.get("invocation_id"),
author,
actions,
kwargs.get("long_running_tool_ids_json"),
kwargs.get("branch"),
timestamp,
content_json,
grounding_metadata_json,
custom_metadata_json,
partial_int,
turn_complete_int,
interrupted_int,
kwargs.get("error_code"),
kwargs.get("error_message"),
),
)
conn.commit()
finally:
cursor.close() # type: ignore[no-untyped-call]
events = self.list_events(session_id)
for event in events:
if event["id"] == event_id:
return event
msg = f"Failed to retrieve created event {event_id}"
raise RuntimeError(msg)
[docs]
def list_events(self, session_id: str) -> "list[EventRecord]":
"""List events for a session ordered by timestamp.
Args:
session_id: Session identifier.
Returns:
List of event records ordered by timestamp ASC.
Notes:
Uses index on (session_id, timestamp ASC).
JSON fields deserialized from JSON strings.
Converts INTEGER booleans to Python bool.
"""
sql = f"""
SELECT id, session_id, app_name, user_id, invocation_id, author, actions,
long_running_tool_ids_json, branch, timestamp, content,
grounding_metadata, custom_metadata, partial, turn_complete,
interrupted, error_code, error_message
FROM {self._events_table}
WHERE session_id = ?
ORDER BY timestamp ASC
"""
try:
with self._config.provide_connection() as conn:
cursor = conn.cursor()
try:
cursor.execute(sql, (session_id,))
rows = cursor.fetchall()
return [
EventRecord(
id=row[0],
session_id=row[1],
app_name=row[2],
user_id=row[3],
invocation_id=row[4],
author=row[5],
actions=bytes(row[6]) if row[6] is not None else b"",
long_running_tool_ids_json=row[7],
branch=row[8],
timestamp=row[9],
content=self._deserialize_json_field(row[10]),
grounding_metadata=self._deserialize_json_field(row[11]),
custom_metadata=self._deserialize_json_field(row[12]),
partial=self._from_int_bool(row[13]),
turn_complete=self._from_int_bool(row[14]),
interrupted=self._from_int_bool(row[15]),
error_code=row[16],
error_message=row[17],
)
for row in rows
]
finally:
cursor.close() # type: ignore[no-untyped-call]
except Exception as e:
error_msg = str(e).lower()
if any(pattern in error_msg for pattern in ADBC_TABLE_NOT_FOUND_PATTERNS):
return []
raise
@staticmethod
def _to_int_bool(value: "bool | None") -> "int | None":
"""Convert Python boolean to INTEGER (0/1).
Args:
value: Python boolean value or None.
Returns:
1 for True, 0 for False, None for None.
"""
if value is None:
return None
return 1 if value else 0
@staticmethod
def _from_int_bool(value: "int | None") -> "bool | None":
"""Convert INTEGER to Python boolean.
Args:
value: INTEGER value (0, 1, or None).
Returns:
Python boolean or None.
"""
if value is None:
return None
return bool(value)