"""DuckDB ADK store for Google Agent Development Kit.
DuckDB is an OLAP database optimized for analytical queries. This adapter provides:
- Embedded session storage with zero-configuration setup
- Excellent performance for analytical queries on session data
- Native JSON type support for flexible state storage
- Perfect for development, testing, and analytical workloads
Notes:
DuckDB is optimized for OLAP workloads and analytical queries. For highly
concurrent DML operations (frequent inserts/updates/deletes), consider
PostgreSQL or other OLTP-optimized databases.
"""
from datetime import datetime, timezone
from typing import TYPE_CHECKING, Any, Final
from sqlspec.extensions.adk import BaseSyncADKStore, EventRecord, SessionRecord
from sqlspec.utils.logging import get_logger
from sqlspec.utils.serializers import from_json, to_json
if TYPE_CHECKING:
from sqlspec.adapters.duckdb.config import DuckDBConfig
logger = get_logger("adapters.duckdb.adk.store")
__all__ = ("DuckdbADKStore",)
DUCKDB_TABLE_NOT_FOUND_ERROR: Final = "does not exist"
[docs]
class DuckdbADKStore(BaseSyncADKStore["DuckDBConfig"]):
"""DuckDB ADK store for Google Agent Development Kit.
Implements session and event storage for Google Agent Development Kit
using DuckDB's synchronous driver. Provides:
- Session state management with native JSON type
- Event history tracking with BLOB-serialized actions
- Native TIMESTAMP type support
- Foreign key constraints (manual cascade in delete_session)
- Columnar storage for analytical queries
Args:
config: DuckDBConfig with extension_config["adk"] settings.
Example:
from sqlspec.adapters.duckdb import DuckDBConfig
from sqlspec.adapters.duckdb.adk import DuckdbADKStore
config = DuckDBConfig(
database="sessions.ddb",
extension_config={
"adk": {
"session_table": "my_sessions",
"events_table": "my_events",
"owner_id_column": "tenant_id INTEGER REFERENCES tenants(id)"
}
}
)
store = DuckdbADKStore(config)
store.create_tables()
session = store.create_session(
session_id="session-123",
app_name="my-app",
user_id="user-456",
state={"context": "conversation"}
)
Notes:
- Uses DuckDB native JSON type (not JSONB)
- TIMESTAMP for date/time storage with microsecond precision
- BLOB for binary actions data
- BOOLEAN native type support
- Columnar storage provides excellent analytical query performance
- DuckDB doesn't support CASCADE in foreign keys (manual cascade required)
- Optimized for OLAP workloads; for high-concurrency writes use PostgreSQL
- Configuration is read from config.extension_config["adk"]
"""
__slots__ = ()
[docs]
def __init__(self, config: "DuckDBConfig") -> None:
"""Initialize DuckDB ADK store.
Args:
config: DuckDBConfig instance.
Notes:
Configuration is read from config.extension_config["adk"]:
- session_table: Sessions table name (default: "adk_sessions")
- events_table: Events table name (default: "adk_events")
- owner_id_column: Optional owner FK column DDL (default: None)
"""
super().__init__(config)
def _get_create_sessions_table_sql(self) -> str:
"""Get DuckDB CREATE TABLE SQL for sessions.
Returns:
SQL statement to create adk_sessions table with indexes.
Notes:
- VARCHAR for IDs and names
- JSON type for state storage (DuckDB native)
- TIMESTAMP for create_time and update_time
- CURRENT_TIMESTAMP for defaults
- Optional owner ID column for multi-tenant scenarios
- Composite index on (app_name, user_id) for listing
- Index on update_time DESC for recent session queries
"""
owner_id_line = ""
if self._owner_id_column_ddl:
owner_id_line = f",\n {self._owner_id_column_ddl}"
return f"""
CREATE TABLE IF NOT EXISTS {self._session_table} (
id VARCHAR PRIMARY KEY,
app_name VARCHAR NOT NULL,
user_id VARCHAR NOT NULL{owner_id_line},
state JSON NOT NULL,
create_time TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP,
update_time TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP
);
CREATE INDEX IF NOT EXISTS idx_{self._session_table}_app_user ON {self._session_table}(app_name, user_id);
CREATE INDEX IF NOT EXISTS idx_{self._session_table}_update_time ON {self._session_table}(update_time DESC);
"""
def _get_create_events_table_sql(self) -> str:
"""Get DuckDB CREATE TABLE SQL for events.
Returns:
SQL statement to create adk_events table with indexes.
Notes:
- VARCHAR for string fields
- BLOB for pickled actions
- JSON for content, grounding_metadata, custom_metadata, long_running_tool_ids_json
- BOOLEAN for flags
- Foreign key constraint (DuckDB doesn't support CASCADE)
- Index on (session_id, timestamp ASC) for ordered event retrieval
- Manual cascade delete required in delete_session method
"""
return f"""
CREATE TABLE IF NOT EXISTS {self._events_table} (
id VARCHAR PRIMARY KEY,
session_id VARCHAR NOT NULL,
app_name VARCHAR NOT NULL,
user_id VARCHAR NOT NULL,
invocation_id VARCHAR,
author VARCHAR,
actions BLOB,
long_running_tool_ids_json JSON,
branch VARCHAR,
timestamp TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP,
content JSON,
grounding_metadata JSON,
custom_metadata JSON,
partial BOOLEAN,
turn_complete BOOLEAN,
interrupted BOOLEAN,
error_code VARCHAR,
error_message VARCHAR,
FOREIGN KEY (session_id) REFERENCES {self._session_table}(id)
);
CREATE INDEX IF NOT EXISTS idx_{self._events_table}_session ON {self._events_table}(session_id, timestamp ASC);
"""
def _get_drop_tables_sql(self) -> "list[str]":
"""Get DuckDB DROP TABLE SQL statements.
Returns:
List of SQL statements to drop tables and indexes.
Notes:
Order matters: drop events table (child) before sessions (parent).
DuckDB automatically drops indexes when dropping tables.
"""
return [f"DROP TABLE IF EXISTS {self._events_table}", f"DROP TABLE IF EXISTS {self._session_table}"]
[docs]
def create_tables(self) -> None:
"""Create both sessions and events tables if they don't exist."""
with self._config.provide_connection() as conn:
conn.execute(self._get_create_sessions_table_sql())
conn.execute(self._get_create_events_table_sql())
logger.debug("Created ADK tables: %s, %s", self._session_table, self._events_table)
[docs]
def create_session(
self, session_id: str, app_name: str, user_id: str, state: "dict[str, Any]", owner_id: "Any | None" = None
) -> SessionRecord:
"""Create a new session.
Args:
session_id: Unique session identifier.
app_name: Application name.
user_id: User identifier.
state: Initial session state.
owner_id: Optional owner ID value for owner_id_column (if configured).
Returns:
Created session record.
Notes:
Uses current UTC timestamp for create_time and update_time.
State is JSON-serialized using SQLSpec serializers.
"""
now = datetime.now(timezone.utc)
state_json = to_json(state)
params: tuple[Any, ...]
if self._owner_id_column_name:
sql = f"""
INSERT INTO {self._session_table}
(id, app_name, user_id, {self._owner_id_column_name}, state, create_time, update_time)
VALUES (?, ?, ?, ?, ?, ?, ?)
"""
params = (session_id, app_name, user_id, owner_id, state_json, now, now)
else:
sql = f"""
INSERT INTO {self._session_table} (id, app_name, user_id, state, create_time, update_time)
VALUES (?, ?, ?, ?, ?, ?)
"""
params = (session_id, app_name, user_id, state_json, now, now)
with self._config.provide_connection() as conn:
conn.execute(sql, params)
conn.commit()
return SessionRecord(
id=session_id, app_name=app_name, user_id=user_id, state=state, create_time=now, update_time=now
)
[docs]
def get_session(self, session_id: str) -> "SessionRecord | None":
"""Get session by ID.
Args:
session_id: Session identifier.
Returns:
Session record or None if not found.
Notes:
DuckDB returns datetime objects for TIMESTAMP columns.
JSON is parsed from database storage.
"""
sql = f"""
SELECT id, app_name, user_id, state, create_time, update_time
FROM {self._session_table}
WHERE id = ?
"""
try:
with self._config.provide_connection() as conn:
cursor = conn.execute(sql, (session_id,))
row = cursor.fetchone()
if row is None:
return None
session_id_val, app_name, user_id, state_data, create_time, update_time = row
state = from_json(state_data) if state_data else {}
return SessionRecord(
id=session_id_val,
app_name=app_name,
user_id=user_id,
state=state,
create_time=create_time,
update_time=update_time,
)
except Exception as e:
if DUCKDB_TABLE_NOT_FOUND_ERROR in str(e):
return None
raise
[docs]
def update_session_state(self, session_id: str, state: "dict[str, Any]") -> None:
"""Update session state.
Args:
session_id: Session identifier.
state: New state dictionary (replaces existing state).
Notes:
This replaces the entire state dictionary.
Update time is automatically set to current UTC timestamp.
"""
now = datetime.now(timezone.utc)
state_json = to_json(state)
sql = f"""
UPDATE {self._session_table}
SET state = ?, update_time = ?
WHERE id = ?
"""
with self._config.provide_connection() as conn:
conn.execute(sql, (state_json, now, session_id))
conn.commit()
[docs]
def delete_session(self, session_id: str) -> None:
"""Delete session and all associated events.
Args:
session_id: Session identifier.
Notes:
DuckDB doesn't support CASCADE in foreign keys, so we manually delete events first.
"""
delete_events_sql = f"DELETE FROM {self._events_table} WHERE session_id = ?"
delete_session_sql = f"DELETE FROM {self._session_table} WHERE id = ?"
with self._config.provide_connection() as conn:
conn.execute(delete_events_sql, (session_id,))
conn.execute(delete_session_sql, (session_id,))
conn.commit()
[docs]
def list_sessions(self, app_name: str, user_id: str | None = None) -> "list[SessionRecord]":
"""List sessions for an app, optionally filtered by user.
Args:
app_name: Application name.
user_id: User identifier. If None, lists all sessions for the app.
Returns:
List of session records ordered by update_time DESC.
Notes:
Uses composite index on (app_name, user_id) when user_id is provided.
"""
if user_id is None:
sql = f"""
SELECT id, app_name, user_id, state, create_time, update_time
FROM {self._session_table}
WHERE app_name = ?
ORDER BY update_time DESC
"""
params: tuple[str, ...] = (app_name,)
else:
sql = f"""
SELECT id, app_name, user_id, state, create_time, update_time
FROM {self._session_table}
WHERE app_name = ? AND user_id = ?
ORDER BY update_time DESC
"""
params = (app_name, user_id)
try:
with self._config.provide_connection() as conn:
cursor = conn.execute(sql, params)
rows = cursor.fetchall()
return [
SessionRecord(
id=row[0],
app_name=row[1],
user_id=row[2],
state=from_json(row[3]) if row[3] else {},
create_time=row[4],
update_time=row[5],
)
for row in rows
]
except Exception as e:
if DUCKDB_TABLE_NOT_FOUND_ERROR in str(e):
return []
raise
[docs]
def create_event(
self,
event_id: str,
session_id: str,
app_name: str,
user_id: str,
author: "str | None" = None,
actions: "bytes | None" = None,
content: "dict[str, Any] | None" = None,
**kwargs: Any,
) -> EventRecord:
"""Create a new event.
Args:
event_id: Unique event identifier.
session_id: Session identifier.
app_name: Application name.
user_id: User identifier.
author: Event author (user/assistant/system).
actions: Pickled actions object.
content: Event content (JSON).
**kwargs: Additional optional fields.
Returns:
Created event record.
Notes:
Uses current UTC timestamp if not provided in kwargs.
JSON fields are serialized using SQLSpec serializers.
"""
timestamp = kwargs.get("timestamp", datetime.now(timezone.utc))
content_json = to_json(content) if content else None
grounding_metadata = kwargs.get("grounding_metadata")
grounding_metadata_json = to_json(grounding_metadata) if grounding_metadata else None
custom_metadata = kwargs.get("custom_metadata")
custom_metadata_json = to_json(custom_metadata) if custom_metadata else None
sql = f"""
INSERT INTO {self._events_table} (
id, session_id, app_name, user_id, invocation_id, author, actions,
long_running_tool_ids_json, branch, timestamp, content,
grounding_metadata, custom_metadata, partial, turn_complete,
interrupted, error_code, error_message
) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
"""
with self._config.provide_connection() as conn:
conn.execute(
sql,
(
event_id,
session_id,
app_name,
user_id,
kwargs.get("invocation_id"),
author,
actions,
kwargs.get("long_running_tool_ids_json"),
kwargs.get("branch"),
timestamp,
content_json,
grounding_metadata_json,
custom_metadata_json,
kwargs.get("partial"),
kwargs.get("turn_complete"),
kwargs.get("interrupted"),
kwargs.get("error_code"),
kwargs.get("error_message"),
),
)
conn.commit()
return EventRecord(
id=event_id,
session_id=session_id,
app_name=app_name,
user_id=user_id,
invocation_id=kwargs.get("invocation_id", ""),
author=author or "",
actions=actions or b"",
long_running_tool_ids_json=kwargs.get("long_running_tool_ids_json"),
branch=kwargs.get("branch"),
timestamp=timestamp,
content=content,
grounding_metadata=grounding_metadata,
custom_metadata=custom_metadata,
partial=kwargs.get("partial"),
turn_complete=kwargs.get("turn_complete"),
interrupted=kwargs.get("interrupted"),
error_code=kwargs.get("error_code"),
error_message=kwargs.get("error_message"),
)
[docs]
def get_event(self, event_id: str) -> "EventRecord | None":
"""Get event by ID.
Args:
event_id: Event identifier.
Returns:
Event record or None if not found.
"""
sql = f"""
SELECT id, session_id, app_name, user_id, invocation_id, author, actions,
long_running_tool_ids_json, branch, timestamp, content,
grounding_metadata, custom_metadata, partial, turn_complete,
interrupted, error_code, error_message
FROM {self._events_table}
WHERE id = ?
"""
try:
with self._config.provide_connection() as conn:
cursor = conn.execute(sql, (event_id,))
row = cursor.fetchone()
if row is None:
return None
return EventRecord(
id=row[0],
session_id=row[1],
app_name=row[2],
user_id=row[3],
invocation_id=row[4],
author=row[5],
actions=bytes(row[6]) if row[6] else b"",
long_running_tool_ids_json=row[7],
branch=row[8],
timestamp=row[9],
content=from_json(row[10]) if row[10] else None,
grounding_metadata=from_json(row[11]) if row[11] else None,
custom_metadata=from_json(row[12]) if row[12] else None,
partial=row[13],
turn_complete=row[14],
interrupted=row[15],
error_code=row[16],
error_message=row[17],
)
except Exception as e:
if DUCKDB_TABLE_NOT_FOUND_ERROR in str(e):
return None
raise
[docs]
def list_events(self, session_id: str) -> "list[EventRecord]":
"""List events for a session ordered by timestamp.
Args:
session_id: Session identifier.
Returns:
List of event records ordered by timestamp ASC.
"""
sql = f"""
SELECT id, session_id, app_name, user_id, invocation_id, author, actions,
long_running_tool_ids_json, branch, timestamp, content,
grounding_metadata, custom_metadata, partial, turn_complete,
interrupted, error_code, error_message
FROM {self._events_table}
WHERE session_id = ?
ORDER BY timestamp ASC
"""
try:
with self._config.provide_connection() as conn:
cursor = conn.execute(sql, (session_id,))
rows = cursor.fetchall()
return [
EventRecord(
id=row[0],
session_id=row[1],
app_name=row[2],
user_id=row[3],
invocation_id=row[4],
author=row[5],
actions=bytes(row[6]) if row[6] else b"",
long_running_tool_ids_json=row[7],
branch=row[8],
timestamp=row[9],
content=from_json(row[10]) if row[10] else None,
grounding_metadata=from_json(row[11]) if row[11] else None,
custom_metadata=from_json(row[12]) if row[12] else None,
partial=row[13],
turn_complete=row[14],
interrupted=row[15],
error_code=row[16],
error_message=row[17],
)
for row in rows
]
except Exception as e:
if DUCKDB_TABLE_NOT_FOUND_ERROR in str(e):
return []
raise