"""Oracle ADK store for Google Agent Development Kit session/event storage."""
from decimal import Decimal
from enum import Enum
from typing import TYPE_CHECKING, Any, Final, cast
import oracledb
from sqlspec import SQL
from sqlspec.adapters.oracledb.data_dictionary import (
OracleAsyncDataDictionary,
OracleSyncDataDictionary,
OracleVersionInfo,
)
from sqlspec.extensions.adk import BaseAsyncADKStore, BaseSyncADKStore, EventRecord, SessionRecord
from sqlspec.utils.logging import get_logger
from sqlspec.utils.serializers import from_json, to_json
if TYPE_CHECKING:
from datetime import datetime
from sqlspec.adapters.oracledb.config import OracleAsyncConfig, OracleSyncConfig
logger = get_logger("adapters.oracledb.adk.store")
__all__ = ("OracleAsyncADKStore", "OracleSyncADKStore")
ORACLE_TABLE_NOT_FOUND_ERROR: Final = 942
ORACLE_MIN_JSON_NATIVE_VERSION: Final = 21
ORACLE_MIN_JSON_NATIVE_COMPATIBLE: Final = 20
ORACLE_MIN_JSON_BLOB_VERSION: Final = 12
class JSONStorageType(str, Enum):
"""JSON storage type based on Oracle version."""
JSON_NATIVE = "json"
BLOB_JSON = "blob_json"
BLOB_PLAIN = "blob_plain"
def _coerce_decimal_values(value: Any) -> Any:
if isinstance(value, Decimal):
return float(value)
if isinstance(value, dict):
return {key: _coerce_decimal_values(val) for key, val in value.items()}
if isinstance(value, list):
return [_coerce_decimal_values(item) for item in value]
if isinstance(value, tuple):
return tuple(_coerce_decimal_values(item) for item in value)
if isinstance(value, set):
return {_coerce_decimal_values(item) for item in value}
if isinstance(value, frozenset):
return frozenset(_coerce_decimal_values(item) for item in value)
return value
def _storage_type_from_version(version_info: "OracleVersionInfo | None") -> JSONStorageType:
"""Determine JSON storage type based on Oracle version metadata."""
if version_info and version_info.supports_native_json():
logger.debug("Detected Oracle %s with compatible >= 20, using JSON_NATIVE", version_info)
return JSONStorageType.JSON_NATIVE
if version_info and version_info.supports_json_blob():
logger.debug("Detected Oracle %s, using BLOB_JSON (recommended)", version_info)
return JSONStorageType.BLOB_JSON
if version_info:
logger.debug("Detected Oracle %s (pre-12c), using BLOB_PLAIN", version_info)
return JSONStorageType.BLOB_PLAIN
logger.warning("Oracle version could not be detected; defaulting to BLOB_JSON storage")
return JSONStorageType.BLOB_JSON
def _to_oracle_bool(value: "bool | None") -> "int | None":
"""Convert Python boolean to Oracle NUMBER(1).
Args:
value: Python boolean value or None.
Returns:
1 for True, 0 for False, None for None.
"""
if value is None:
return None
return 1 if value else 0
def _from_oracle_bool(value: "int | None") -> "bool | None":
"""Convert Oracle NUMBER(1) to Python boolean.
Args:
value: Oracle NUMBER value (0, 1, or None).
Returns:
Python boolean or None.
"""
if value is None:
return None
return bool(value)
[docs]
class OracleAsyncADKStore(BaseAsyncADKStore["OracleAsyncConfig"]):
"""Oracle async ADK store using oracledb async driver.
Implements session and event storage for Google Agent Development Kit
using Oracle Database via the python-oracledb async driver. Provides:
- Session state management with version-specific JSON storage
- Event history tracking with BLOB-serialized actions
- TIMESTAMP WITH TIME ZONE for timezone-aware timestamps
- Foreign key constraints with cascade delete
- Efficient upserts using MERGE statement
Args:
config: OracleAsyncConfig with extension_config["adk"] settings.
Example:
from sqlspec.adapters.oracledb import OracleAsyncConfig
from sqlspec.adapters.oracledb.adk import OracleAsyncADKStore
config = OracleAsyncConfig(
pool_config={"dsn": "oracle://..."},
extension_config={
"adk": {
"session_table": "my_sessions",
"events_table": "my_events",
"owner_id_column": "tenant_id NUMBER(10) REFERENCES tenants(id)"
}
}
)
store = OracleAsyncADKStore(config)
await store.create_tables()
Notes:
- JSON storage type detected based on Oracle version (21c+, 12c+, legacy)
- BLOB for pre-serialized actions from Google ADK
- TIMESTAMP WITH TIME ZONE for timezone-aware timestamps
- NUMBER(1) for booleans (0/1/NULL)
- Named parameters using :param_name
- State merging handled at application level
- owner_id_column supports NUMBER, VARCHAR2, RAW for Oracle FK types
- Configuration is read from config.extension_config["adk"]
"""
__slots__ = ("_in_memory", "_json_storage_type", "_oracle_version_info")
[docs]
def __init__(self, config: "OracleAsyncConfig") -> None:
"""Initialize Oracle ADK store.
Args:
config: OracleAsyncConfig instance.
Notes:
Configuration is read from config.extension_config["adk"]:
- session_table: Sessions table name (default: "adk_sessions")
- events_table: Events table name (default: "adk_events")
- owner_id_column: Optional owner FK column DDL (default: None)
- in_memory: Enable INMEMORY PRIORITY HIGH clause (default: False)
"""
super().__init__(config)
self._json_storage_type: JSONStorageType | None = None
self._oracle_version_info: OracleVersionInfo | None = None
adk_config = config.extension_config.get("adk", {})
self._in_memory: bool = bool(adk_config.get("in_memory", False))
async def _get_create_sessions_table_sql(self) -> str:
"""Get Oracle CREATE TABLE SQL for sessions table.
Auto-detects optimal JSON storage type based on Oracle version.
Result is cached to minimize database queries.
"""
storage_type = await self._detect_json_storage_type()
return self._get_create_sessions_table_sql_for_type(storage_type)
async def _get_create_events_table_sql(self) -> str:
"""Get Oracle CREATE TABLE SQL for events table.
Auto-detects optimal JSON storage type based on Oracle version.
Result is cached to minimize database queries.
"""
storage_type = await self._detect_json_storage_type()
return self._get_create_events_table_sql_for_type(storage_type)
async def _detect_json_storage_type(self) -> JSONStorageType:
"""Detect the appropriate JSON storage type based on Oracle version.
Returns:
Appropriate JSONStorageType for this Oracle version.
Notes:
Queries product_component_version to determine Oracle version.
- Oracle 21c+ with compatible >= 20: Native JSON type
- Oracle 12c+: BLOB with IS JSON constraint (preferred)
- Oracle 11g and earlier: BLOB without constraint
BLOB is preferred over CLOB for 12c+ as per Oracle recommendations.
Result is cached in self._json_storage_type.
"""
if self._json_storage_type is not None:
return self._json_storage_type
version_info = await self._get_version_info()
self._json_storage_type = _storage_type_from_version(version_info)
return self._json_storage_type
async def _get_version_info(self) -> "OracleVersionInfo | None":
"""Return cached Oracle version info using Oracle data dictionary."""
if self._oracle_version_info is not None:
return self._oracle_version_info
async with self._config.provide_session() as driver:
dictionary = OracleAsyncDataDictionary()
self._oracle_version_info = await dictionary.get_version(driver)
if self._oracle_version_info is None:
logger.warning("Could not detect Oracle version, defaulting to BLOB_JSON storage")
return self._oracle_version_info
async def _serialize_state(self, state: "dict[str, Any]") -> "str | bytes":
"""Serialize state dictionary to appropriate format based on storage type.
Args:
state: State dictionary to serialize.
Returns:
JSON string for JSON_NATIVE, bytes for BLOB types.
"""
storage_type = await self._detect_json_storage_type()
if storage_type == JSONStorageType.JSON_NATIVE:
return to_json(state)
return to_json(state, as_bytes=True)
async def _deserialize_state(self, data: Any) -> "dict[str, Any]":
"""Deserialize state data from database format.
Args:
data: Data from database (may be LOB, str, bytes, or dict).
Returns:
Deserialized state dictionary.
Notes:
Handles LOB reading if data has read() method.
Oracle JSON type may return dict directly.
"""
if hasattr(data, "read"):
data = await data.read()
if isinstance(data, dict):
return cast("dict[str, Any]", _coerce_decimal_values(data))
if isinstance(data, bytes):
return from_json(data) # type: ignore[no-any-return]
if isinstance(data, str):
return from_json(data) # type: ignore[no-any-return]
return from_json(str(data)) # type: ignore[no-any-return]
async def _serialize_json_field(self, value: Any) -> "str | bytes | None":
"""Serialize optional JSON field for event storage.
Args:
value: Value to serialize (dict or None).
Returns:
Serialized JSON or None.
"""
if value is None:
return None
storage_type = await self._detect_json_storage_type()
if storage_type == JSONStorageType.JSON_NATIVE:
return to_json(value)
return to_json(value, as_bytes=True)
async def _deserialize_json_field(self, data: Any) -> "dict[str, Any] | None":
"""Deserialize optional JSON field from database.
Args:
data: Data from database (may be LOB, str, bytes, dict, or None).
Returns:
Deserialized dictionary or None.
Notes:
Oracle JSON type may return dict directly.
"""
if data is None:
return None
if hasattr(data, "read"):
data = await data.read()
if isinstance(data, dict):
return cast("dict[str, Any]", _coerce_decimal_values(data))
if isinstance(data, bytes):
return from_json(data) # type: ignore[no-any-return]
if isinstance(data, str):
return from_json(data) # type: ignore[no-any-return]
return from_json(str(data)) # type: ignore[no-any-return]
def _get_create_sessions_table_sql_for_type(self, storage_type: JSONStorageType) -> str:
"""Get Oracle CREATE TABLE SQL for sessions with specified storage type.
Args:
storage_type: JSON storage type to use.
Returns:
SQL statement to create adk_sessions table.
"""
if storage_type == JSONStorageType.JSON_NATIVE:
state_column = "state JSON NOT NULL"
elif storage_type == JSONStorageType.BLOB_JSON:
state_column = "state BLOB CHECK (state IS JSON) NOT NULL"
else:
state_column = "state BLOB NOT NULL"
owner_id_column_sql = f", {self._owner_id_column_ddl}" if self._owner_id_column_ddl else ""
inmemory_clause = " INMEMORY PRIORITY HIGH" if self._in_memory else ""
return f"""
BEGIN
EXECUTE IMMEDIATE 'CREATE TABLE {self._session_table} (
id VARCHAR2(128) PRIMARY KEY,
app_name VARCHAR2(128) NOT NULL,
user_id VARCHAR2(128) NOT NULL,
{state_column},
create_time TIMESTAMP WITH TIME ZONE DEFAULT SYSTIMESTAMP NOT NULL,
update_time TIMESTAMP WITH TIME ZONE DEFAULT SYSTIMESTAMP NOT NULL{owner_id_column_sql}
){inmemory_clause}';
EXCEPTION
WHEN OTHERS THEN
IF SQLCODE != -955 THEN
RAISE;
END IF;
END;
BEGIN
EXECUTE IMMEDIATE 'CREATE INDEX idx_{self._session_table}_app_user
ON {self._session_table}(app_name, user_id)';
EXCEPTION
WHEN OTHERS THEN
IF SQLCODE != -955 THEN
RAISE;
END IF;
END;
BEGIN
EXECUTE IMMEDIATE 'CREATE INDEX idx_{self._session_table}_update_time
ON {self._session_table}(update_time DESC)';
EXCEPTION
WHEN OTHERS THEN
IF SQLCODE != -955 THEN
RAISE;
END IF;
END;
"""
def _get_create_events_table_sql_for_type(self, storage_type: JSONStorageType) -> str:
"""Get Oracle CREATE TABLE SQL for events with specified storage type.
Args:
storage_type: JSON storage type to use.
Returns:
SQL statement to create adk_events table.
"""
if storage_type == JSONStorageType.JSON_NATIVE:
json_columns = """
content JSON,
grounding_metadata JSON,
custom_metadata JSON,
long_running_tool_ids_json JSON
"""
elif storage_type == JSONStorageType.BLOB_JSON:
json_columns = """
content BLOB CHECK (content IS JSON),
grounding_metadata BLOB CHECK (grounding_metadata IS JSON),
custom_metadata BLOB CHECK (custom_metadata IS JSON),
long_running_tool_ids_json BLOB CHECK (long_running_tool_ids_json IS JSON)
"""
else:
json_columns = """
content BLOB,
grounding_metadata BLOB,
custom_metadata BLOB,
long_running_tool_ids_json BLOB
"""
inmemory_clause = " INMEMORY PRIORITY HIGH" if self._in_memory else ""
return f"""
BEGIN
EXECUTE IMMEDIATE 'CREATE TABLE {self._events_table} (
id VARCHAR2(128) PRIMARY KEY,
session_id VARCHAR2(128) NOT NULL,
app_name VARCHAR2(128) NOT NULL,
user_id VARCHAR2(128) NOT NULL,
invocation_id VARCHAR2(256),
author VARCHAR2(256),
actions BLOB,
branch VARCHAR2(256),
timestamp TIMESTAMP WITH TIME ZONE DEFAULT SYSTIMESTAMP NOT NULL,
{json_columns},
partial NUMBER(1),
turn_complete NUMBER(1),
interrupted NUMBER(1),
error_code VARCHAR2(256),
error_message VARCHAR2(1024),
CONSTRAINT fk_{self._events_table}_session FOREIGN KEY (session_id)
REFERENCES {self._session_table}(id) ON DELETE CASCADE
){inmemory_clause}';
EXCEPTION
WHEN OTHERS THEN
IF SQLCODE != -955 THEN
RAISE;
END IF;
END;
BEGIN
EXECUTE IMMEDIATE 'CREATE INDEX idx_{self._events_table}_session
ON {self._events_table}(session_id, timestamp ASC)';
EXCEPTION
WHEN OTHERS THEN
IF SQLCODE != -955 THEN
RAISE;
END IF;
END;
"""
def _get_drop_tables_sql(self) -> "list[str]":
"""Get Oracle DROP TABLE SQL statements.
Returns:
List of SQL statements to drop tables and indexes.
Notes:
Order matters: drop events table (child) before sessions (parent).
Oracle automatically drops indexes when dropping tables.
"""
return [
f"""
BEGIN
EXECUTE IMMEDIATE 'DROP INDEX idx_{self._events_table}_session';
EXCEPTION
WHEN OTHERS THEN
IF SQLCODE != -1418 THEN
RAISE;
END IF;
END;
""",
f"""
BEGIN
EXECUTE IMMEDIATE 'DROP INDEX idx_{self._session_table}_update_time';
EXCEPTION
WHEN OTHERS THEN
IF SQLCODE != -1418 THEN
RAISE;
END IF;
END;
""",
f"""
BEGIN
EXECUTE IMMEDIATE 'DROP INDEX idx_{self._session_table}_app_user';
EXCEPTION
WHEN OTHERS THEN
IF SQLCODE != -1418 THEN
RAISE;
END IF;
END;
""",
f"""
BEGIN
EXECUTE IMMEDIATE 'DROP TABLE {self._events_table}';
EXCEPTION
WHEN OTHERS THEN
IF SQLCODE != -942 THEN
RAISE;
END IF;
END;
""",
f"""
BEGIN
EXECUTE IMMEDIATE 'DROP TABLE {self._session_table}';
EXCEPTION
WHEN OTHERS THEN
IF SQLCODE != -942 THEN
RAISE;
END IF;
END;
""",
]
[docs]
async def create_tables(self) -> None:
"""Create both sessions and events tables if they don't exist.
Notes:
Detects Oracle version to determine optimal JSON storage type.
Uses version-appropriate table schema.
"""
storage_type = await self._detect_json_storage_type()
logger.debug("Creating ADK tables with storage type: %s", storage_type)
async with self._config.provide_session() as driver:
await driver.execute_script(self._get_create_sessions_table_sql_for_type(storage_type))
await driver.execute_script(self._get_create_events_table_sql_for_type(storage_type))
logger.debug("Created ADK tables: %s, %s", self._session_table, self._events_table)
[docs]
async def create_session(
self, session_id: str, app_name: str, user_id: str, state: "dict[str, Any]", owner_id: "Any | None" = None
) -> SessionRecord:
"""Create a new session.
Args:
session_id: Unique session identifier.
app_name: Application name.
user_id: User identifier.
state: Initial session state.
owner_id: Optional owner ID value for owner_id_column (if configured).
Returns:
Created session record.
Notes:
Uses SYSTIMESTAMP for create_time and update_time.
State is serialized using version-appropriate format.
owner_id is ignored if owner_id_column not configured.
"""
state_data = await self._serialize_state(state)
if self._owner_id_column_name:
sql = f"""
INSERT INTO {self._session_table} (id, app_name, user_id, state, create_time, update_time, {self._owner_id_column_name})
VALUES (:id, :app_name, :user_id, :state, SYSTIMESTAMP, SYSTIMESTAMP, :owner_id)
"""
params = {
"id": session_id,
"app_name": app_name,
"user_id": user_id,
"state": state_data,
"owner_id": owner_id,
}
else:
sql = f"""
INSERT INTO {self._session_table} (id, app_name, user_id, state, create_time, update_time)
VALUES (:id, :app_name, :user_id, :state, SYSTIMESTAMP, SYSTIMESTAMP)
"""
params = {"id": session_id, "app_name": app_name, "user_id": user_id, "state": state_data}
async with self._config.provide_connection() as conn:
cursor = conn.cursor()
await cursor.execute(sql, params)
await conn.commit()
return await self.get_session(session_id) # type: ignore[return-value]
[docs]
async def get_session(self, session_id: str) -> "SessionRecord | None":
"""Get session by ID.
Args:
session_id: Session identifier.
Returns:
Session record or None if not found.
Notes:
Oracle returns datetime objects for TIMESTAMP columns.
State is deserialized using version-appropriate format.
"""
try:
async with self._config.provide_connection() as conn:
cursor = conn.cursor()
await cursor.execute(
f"""
SELECT id, app_name, user_id, state, create_time, update_time
FROM {self._session_table}
WHERE id = :id
""",
{"id": session_id},
)
row = await cursor.fetchone()
if row is None:
return None
session_id_val, app_name, user_id, state_data, create_time, update_time = row
state = await self._deserialize_state(state_data)
return SessionRecord(
id=session_id_val,
app_name=app_name,
user_id=user_id,
state=state,
create_time=create_time,
update_time=update_time,
)
except oracledb.DatabaseError as e:
error_obj = e.args[0] if e.args else None
if error_obj and error_obj.code == ORACLE_TABLE_NOT_FOUND_ERROR:
return None
raise
[docs]
async def update_session_state(self, session_id: str, state: "dict[str, Any]") -> None:
"""Update session state.
Args:
session_id: Session identifier.
state: New state dictionary (replaces existing state).
Notes:
This replaces the entire state dictionary.
Updates update_time to current timestamp.
State is serialized using version-appropriate format.
"""
state_data = await self._serialize_state(state)
sql = f"""
UPDATE {self._session_table}
SET state = :state, update_time = SYSTIMESTAMP
WHERE id = :id
"""
async with self._config.provide_connection() as conn:
cursor = conn.cursor()
await cursor.execute(sql, {"state": state_data, "id": session_id})
await conn.commit()
[docs]
async def delete_session(self, session_id: str) -> None:
"""Delete session and all associated events (cascade).
Args:
session_id: Session identifier.
Notes:
Foreign key constraint ensures events are cascade-deleted.
"""
sql = f"DELETE FROM {self._session_table} WHERE id = :id"
async with self._config.provide_connection() as conn:
cursor = conn.cursor()
await cursor.execute(sql, {"id": session_id})
await conn.commit()
[docs]
async def list_sessions(self, app_name: str, user_id: str | None = None) -> "list[SessionRecord]":
"""List sessions for an app, optionally filtered by user.
Args:
app_name: Application name.
user_id: User identifier. If None, lists all sessions for the app.
Returns:
List of session records ordered by update_time DESC.
Notes:
Uses composite index on (app_name, user_id) when user_id is provided.
State is deserialized using version-appropriate format.
"""
if user_id is None:
sql = f"""
SELECT id, app_name, user_id, state, create_time, update_time
FROM {self._session_table}
WHERE app_name = :app_name
ORDER BY update_time DESC
"""
params = {"app_name": app_name}
else:
sql = f"""
SELECT id, app_name, user_id, state, create_time, update_time
FROM {self._session_table}
WHERE app_name = :app_name AND user_id = :user_id
ORDER BY update_time DESC
"""
params = {"app_name": app_name, "user_id": user_id}
try:
async with self._config.provide_connection() as conn:
cursor = conn.cursor()
await cursor.execute(sql, params)
rows = await cursor.fetchall()
results = []
for row in rows:
state = await self._deserialize_state(row[3])
results.append(
SessionRecord(
id=row[0],
app_name=row[1],
user_id=row[2],
state=state,
create_time=row[4],
update_time=row[5],
)
)
return results
except oracledb.DatabaseError as e:
error_obj = e.args[0] if e.args else None
if error_obj and error_obj.code == ORACLE_TABLE_NOT_FOUND_ERROR:
return []
raise
[docs]
async def append_event(self, event_record: EventRecord) -> None:
"""Append an event to a session.
Args:
event_record: Event record to store.
Notes:
Uses SYSTIMESTAMP for timestamp if not provided.
JSON fields are serialized using version-appropriate format.
Boolean fields are converted to NUMBER(1).
"""
content_data = await self._serialize_json_field(event_record.get("content"))
grounding_metadata_data = await self._serialize_json_field(event_record.get("grounding_metadata"))
custom_metadata_data = await self._serialize_json_field(event_record.get("custom_metadata"))
sql = f"""
INSERT INTO {self._events_table} (
id, session_id, app_name, user_id, invocation_id, author, actions,
long_running_tool_ids_json, branch, timestamp, content,
grounding_metadata, custom_metadata, partial, turn_complete,
interrupted, error_code, error_message
) VALUES (
:id, :session_id, :app_name, :user_id, :invocation_id, :author, :actions,
:long_running_tool_ids_json, :branch, :timestamp, :content,
:grounding_metadata, :custom_metadata, :partial, :turn_complete,
:interrupted, :error_code, :error_message
)
"""
async with self._config.provide_connection() as conn:
cursor = conn.cursor()
await cursor.execute(
sql,
{
"id": event_record["id"],
"session_id": event_record["session_id"],
"app_name": event_record["app_name"],
"user_id": event_record["user_id"],
"invocation_id": event_record["invocation_id"],
"author": event_record["author"],
"actions": event_record["actions"],
"long_running_tool_ids_json": event_record.get("long_running_tool_ids_json"),
"branch": event_record.get("branch"),
"timestamp": event_record["timestamp"],
"content": content_data,
"grounding_metadata": grounding_metadata_data,
"custom_metadata": custom_metadata_data,
"partial": _to_oracle_bool(event_record.get("partial")),
"turn_complete": _to_oracle_bool(event_record.get("turn_complete")),
"interrupted": _to_oracle_bool(event_record.get("interrupted")),
"error_code": event_record.get("error_code"),
"error_message": event_record.get("error_message"),
},
)
await conn.commit()
[docs]
async def get_events(
self, session_id: str, after_timestamp: "datetime | None" = None, limit: "int | None" = None
) -> "list[EventRecord]":
"""Get events for a session.
Args:
session_id: Session identifier.
after_timestamp: Only return events after this time.
limit: Maximum number of events to return.
Returns:
List of event records ordered by timestamp ASC.
Notes:
Uses index on (session_id, timestamp ASC).
JSON fields deserialized using version-appropriate format.
Converts BLOB actions to bytes and NUMBER(1) booleans to Python bool.
"""
where_clauses = ["session_id = :session_id"]
params: dict[str, Any] = {"session_id": session_id}
if after_timestamp is not None:
where_clauses.append("timestamp > :after_timestamp")
params["after_timestamp"] = after_timestamp
where_clause = " AND ".join(where_clauses)
limit_clause = ""
if limit:
limit_clause = f" FETCH FIRST {limit} ROWS ONLY"
sql = f"""
SELECT id, session_id, app_name, user_id, invocation_id, author, actions,
long_running_tool_ids_json, branch, timestamp, content,
grounding_metadata, custom_metadata, partial, turn_complete,
interrupted, error_code, error_message
FROM {self._events_table}
WHERE {where_clause}
ORDER BY timestamp ASC{limit_clause}
"""
try:
async with self._config.provide_connection() as conn:
cursor = conn.cursor()
await cursor.execute(sql, params)
rows = await cursor.fetchall()
results = []
for row in rows:
actions_blob = row[6]
if hasattr(actions_blob, "read"):
actions_data = await actions_blob.read()
else:
actions_data = actions_blob
content = await self._deserialize_json_field(row[10])
grounding_metadata = await self._deserialize_json_field(row[11])
custom_metadata = await self._deserialize_json_field(row[12])
results.append(
EventRecord(
id=row[0],
session_id=row[1],
app_name=row[2],
user_id=row[3],
invocation_id=row[4],
author=row[5],
actions=bytes(actions_data) if actions_data is not None else b"",
long_running_tool_ids_json=row[7],
branch=row[8],
timestamp=row[9],
content=content,
grounding_metadata=grounding_metadata,
custom_metadata=custom_metadata,
partial=_from_oracle_bool(row[13]),
turn_complete=_from_oracle_bool(row[14]),
interrupted=_from_oracle_bool(row[15]),
error_code=row[16],
error_message=row[17],
)
)
return results
except oracledb.DatabaseError as e:
error_obj = e.args[0] if e.args else None
if error_obj and error_obj.code == ORACLE_TABLE_NOT_FOUND_ERROR:
return []
raise
[docs]
class OracleSyncADKStore(BaseSyncADKStore["OracleSyncConfig"]):
"""Oracle synchronous ADK store using oracledb sync driver.
Implements session and event storage for Google Agent Development Kit
using Oracle Database via the python-oracledb synchronous driver. Provides:
- Session state management with version-specific JSON storage
- Event history tracking with BLOB-serialized actions
- TIMESTAMP WITH TIME ZONE for timezone-aware timestamps
- Foreign key constraints with cascade delete
- Efficient upserts using MERGE statement
Args:
config: OracleSyncConfig with extension_config["adk"] settings.
Example:
from sqlspec.adapters.oracledb import OracleSyncConfig
from sqlspec.adapters.oracledb.adk import OracleSyncADKStore
config = OracleSyncConfig(
pool_config={"dsn": "oracle://..."},
extension_config={
"adk": {
"session_table": "my_sessions",
"events_table": "my_events",
"owner_id_column": "account_id NUMBER(19) REFERENCES accounts(id)"
}
}
)
store = OracleSyncADKStore(config)
store.create_tables()
Notes:
- JSON storage type detected based on Oracle version (21c+, 12c+, legacy)
- BLOB for pre-serialized actions from Google ADK
- TIMESTAMP WITH TIME ZONE for timezone-aware timestamps
- NUMBER(1) for booleans (0/1/NULL)
- Named parameters using :param_name
- State merging handled at application level
- owner_id_column supports NUMBER, VARCHAR2, RAW for Oracle FK types
- Configuration is read from config.extension_config["adk"]
"""
__slots__ = ("_in_memory", "_json_storage_type", "_oracle_version_info")
[docs]
def __init__(self, config: "OracleSyncConfig") -> None:
"""Initialize Oracle synchronous ADK store.
Args:
config: OracleSyncConfig instance.
Notes:
Configuration is read from config.extension_config["adk"]:
- session_table: Sessions table name (default: "adk_sessions")
- events_table: Events table name (default: "adk_events")
- owner_id_column: Optional owner FK column DDL (default: None)
- in_memory: Enable INMEMORY PRIORITY HIGH clause (default: False)
"""
super().__init__(config)
self._json_storage_type: JSONStorageType | None = None
self._oracle_version_info: OracleVersionInfo | None = None
adk_config = config.extension_config.get("adk", {})
self._in_memory: bool = bool(adk_config.get("in_memory", False))
def _get_create_sessions_table_sql(self) -> str:
"""Get Oracle CREATE TABLE SQL for sessions table.
Auto-detects optimal JSON storage type based on Oracle version.
Result is cached to minimize database queries.
"""
storage_type = self._detect_json_storage_type()
return self._get_create_sessions_table_sql_for_type(storage_type)
def _get_create_events_table_sql(self) -> str:
"""Get Oracle CREATE TABLE SQL for events table.
Auto-detects optimal JSON storage type based on Oracle version.
Result is cached to minimize database queries.
"""
storage_type = self._detect_json_storage_type()
return self._get_create_events_table_sql_for_type(storage_type)
def _detect_json_storage_type(self) -> JSONStorageType:
"""Detect the appropriate JSON storage type based on Oracle version.
Returns:
Appropriate JSONStorageType for this Oracle version.
Notes:
Queries product_component_version to determine Oracle version.
- Oracle 21c+ with compatible >= 20: Native JSON type
- Oracle 12c+: BLOB with IS JSON constraint (preferred)
- Oracle 11g and earlier: BLOB without constraint
BLOB is preferred over CLOB for 12c+ as per Oracle recommendations.
Result is cached in self._json_storage_type.
"""
if self._json_storage_type is not None:
return self._json_storage_type
version_info = self._get_version_info()
self._json_storage_type = _storage_type_from_version(version_info)
return self._json_storage_type
def _get_version_info(self) -> "OracleVersionInfo | None":
"""Return cached Oracle version info using Oracle data dictionary."""
if self._oracle_version_info is not None:
return self._oracle_version_info
with self._config.provide_session() as driver:
dictionary = OracleSyncDataDictionary()
self._oracle_version_info = dictionary.get_version(driver)
if self._oracle_version_info is None:
logger.warning("Could not detect Oracle version, defaulting to BLOB_JSON storage")
return self._oracle_version_info
def _serialize_state(self, state: "dict[str, Any]") -> "str | bytes":
"""Serialize state dictionary to appropriate format based on storage type.
Args:
state: State dictionary to serialize.
Returns:
JSON string for JSON_NATIVE, bytes for BLOB types.
"""
storage_type = self._detect_json_storage_type()
if storage_type == JSONStorageType.JSON_NATIVE:
return to_json(state)
return to_json(state, as_bytes=True)
def _deserialize_state(self, data: Any) -> "dict[str, Any]":
"""Deserialize state data from database format.
Args:
data: Data from database (may be LOB, str, bytes, or dict).
Returns:
Deserialized state dictionary.
Notes:
Handles LOB reading if data has read() method.
Oracle JSON type may return dict directly.
"""
if hasattr(data, "read"):
data = data.read()
if isinstance(data, dict):
return cast("dict[str, Any]", _coerce_decimal_values(data))
if isinstance(data, bytes):
return from_json(data) # type: ignore[no-any-return]
if isinstance(data, str):
return from_json(data) # type: ignore[no-any-return]
return from_json(str(data)) # type: ignore[no-any-return]
def _serialize_json_field(self, value: Any) -> "str | bytes | None":
"""Serialize optional JSON field for event storage.
Args:
value: Value to serialize (dict or None).
Returns:
Serialized JSON or None.
"""
if value is None:
return None
storage_type = self._detect_json_storage_type()
if storage_type == JSONStorageType.JSON_NATIVE:
return to_json(value)
return to_json(value, as_bytes=True)
def _deserialize_json_field(self, data: Any) -> "dict[str, Any] | None":
"""Deserialize optional JSON field from database.
Args:
data: Data from database (may be LOB, str, bytes, dict, or None).
Returns:
Deserialized dictionary or None.
Notes:
Oracle JSON type may return dict directly.
"""
if data is None:
return None
if hasattr(data, "read"):
data = data.read()
if isinstance(data, dict):
return cast("dict[str, Any]", _coerce_decimal_values(data))
if isinstance(data, bytes):
return from_json(data) # type: ignore[no-any-return]
if isinstance(data, str):
return from_json(data) # type: ignore[no-any-return]
return from_json(str(data)) # type: ignore[no-any-return]
def _get_create_sessions_table_sql_for_type(self, storage_type: JSONStorageType) -> str:
"""Get Oracle CREATE TABLE SQL for sessions with specified storage type.
Args:
storage_type: JSON storage type to use.
Returns:
SQL statement to create adk_sessions table.
"""
if storage_type == JSONStorageType.JSON_NATIVE:
state_column = "state JSON NOT NULL"
elif storage_type == JSONStorageType.BLOB_JSON:
state_column = "state BLOB CHECK (state IS JSON) NOT NULL"
else:
state_column = "state BLOB NOT NULL"
owner_id_column_sql = f", {self._owner_id_column_ddl}" if self._owner_id_column_ddl else ""
inmemory_clause = " INMEMORY PRIORITY HIGH" if self._in_memory else ""
return f"""
BEGIN
EXECUTE IMMEDIATE 'CREATE TABLE {self._session_table} (
id VARCHAR2(128) PRIMARY KEY,
app_name VARCHAR2(128) NOT NULL,
user_id VARCHAR2(128) NOT NULL,
{state_column},
create_time TIMESTAMP WITH TIME ZONE DEFAULT SYSTIMESTAMP NOT NULL,
update_time TIMESTAMP WITH TIME ZONE DEFAULT SYSTIMESTAMP NOT NULL{owner_id_column_sql}
){inmemory_clause}';
EXCEPTION
WHEN OTHERS THEN
IF SQLCODE != -955 THEN
RAISE;
END IF;
END;
BEGIN
EXECUTE IMMEDIATE 'CREATE INDEX idx_{self._session_table}_app_user
ON {self._session_table}(app_name, user_id)';
EXCEPTION
WHEN OTHERS THEN
IF SQLCODE != -955 THEN
RAISE;
END IF;
END;
BEGIN
EXECUTE IMMEDIATE 'CREATE INDEX idx_{self._session_table}_update_time
ON {self._session_table}(update_time DESC)';
EXCEPTION
WHEN OTHERS THEN
IF SQLCODE != -955 THEN
RAISE;
END IF;
END;
"""
def _get_create_events_table_sql_for_type(self, storage_type: JSONStorageType) -> str:
"""Get Oracle CREATE TABLE SQL for events with specified storage type.
Args:
storage_type: JSON storage type to use.
Returns:
SQL statement to create adk_events table.
"""
if storage_type == JSONStorageType.JSON_NATIVE:
json_columns = """
content JSON,
grounding_metadata JSON,
custom_metadata JSON,
long_running_tool_ids_json JSON
"""
elif storage_type == JSONStorageType.BLOB_JSON:
json_columns = """
content BLOB CHECK (content IS JSON),
grounding_metadata BLOB CHECK (grounding_metadata IS JSON),
custom_metadata BLOB CHECK (custom_metadata IS JSON),
long_running_tool_ids_json BLOB CHECK (long_running_tool_ids_json IS JSON)
"""
else:
json_columns = """
content BLOB,
grounding_metadata BLOB,
custom_metadata BLOB,
long_running_tool_ids_json BLOB
"""
inmemory_clause = " INMEMORY PRIORITY HIGH" if self._in_memory else ""
return f"""
BEGIN
EXECUTE IMMEDIATE 'CREATE TABLE {self._events_table} (
id VARCHAR2(128) PRIMARY KEY,
session_id VARCHAR2(128) NOT NULL,
app_name VARCHAR2(128) NOT NULL,
user_id VARCHAR2(128) NOT NULL,
invocation_id VARCHAR2(256),
author VARCHAR2(256),
actions BLOB,
branch VARCHAR2(256),
timestamp TIMESTAMP WITH TIME ZONE DEFAULT SYSTIMESTAMP NOT NULL,
{json_columns},
partial NUMBER(1),
turn_complete NUMBER(1),
interrupted NUMBER(1),
error_code VARCHAR2(256),
error_message VARCHAR2(1024),
CONSTRAINT fk_{self._events_table}_session FOREIGN KEY (session_id)
REFERENCES {self._session_table}(id) ON DELETE CASCADE
){inmemory_clause}';
EXCEPTION
WHEN OTHERS THEN
IF SQLCODE != -955 THEN
RAISE;
END IF;
END;
BEGIN
EXECUTE IMMEDIATE 'CREATE INDEX idx_{self._events_table}_session
ON {self._events_table}(session_id, timestamp ASC)';
EXCEPTION
WHEN OTHERS THEN
IF SQLCODE != -955 THEN
RAISE;
END IF;
END;
"""
def _get_drop_tables_sql(self) -> "list[str]":
"""Get Oracle DROP TABLE SQL statements.
Returns:
List of SQL statements to drop tables and indexes.
Notes:
Order matters: drop events table (child) before sessions (parent).
Oracle automatically drops indexes when dropping tables.
"""
return [
f"""
BEGIN
EXECUTE IMMEDIATE 'DROP INDEX idx_{self._events_table}_session';
EXCEPTION
WHEN OTHERS THEN
IF SQLCODE != -1418 THEN
RAISE;
END IF;
END;
""",
f"""
BEGIN
EXECUTE IMMEDIATE 'DROP INDEX idx_{self._session_table}_update_time';
EXCEPTION
WHEN OTHERS THEN
IF SQLCODE != -1418 THEN
RAISE;
END IF;
END;
""",
f"""
BEGIN
EXECUTE IMMEDIATE 'DROP INDEX idx_{self._session_table}_app_user';
EXCEPTION
WHEN OTHERS THEN
IF SQLCODE != -1418 THEN
RAISE;
END IF;
END;
""",
f"""
BEGIN
EXECUTE IMMEDIATE 'DROP TABLE {self._events_table}';
EXCEPTION
WHEN OTHERS THEN
IF SQLCODE != -942 THEN
RAISE;
END IF;
END;
""",
f"""
BEGIN
EXECUTE IMMEDIATE 'DROP TABLE {self._session_table}';
EXCEPTION
WHEN OTHERS THEN
IF SQLCODE != -942 THEN
RAISE;
END IF;
END;
""",
]
[docs]
def create_tables(self) -> None:
"""Create both sessions and events tables if they don't exist.
Notes:
Detects Oracle version to determine optimal JSON storage type.
Uses version-appropriate table schema.
"""
storage_type = self._detect_json_storage_type()
logger.info("Creating ADK tables with storage type: %s", storage_type)
with self._config.provide_session() as driver:
sessions_sql = SQL(self._get_create_sessions_table_sql_for_type(storage_type))
driver.execute_script(sessions_sql)
events_sql = SQL(self._get_create_events_table_sql_for_type(storage_type))
driver.execute_script(events_sql)
logger.debug("Created ADK tables: %s, %s", self._session_table, self._events_table)
[docs]
def create_session(
self, session_id: str, app_name: str, user_id: str, state: "dict[str, Any]", owner_id: "Any | None" = None
) -> SessionRecord:
"""Create a new session.
Args:
session_id: Unique session identifier.
app_name: Application name.
user_id: User identifier.
state: Initial session state.
owner_id: Optional owner ID value for owner_id_column (if configured).
Returns:
Created session record.
Notes:
Uses SYSTIMESTAMP for create_time and update_time.
State is serialized using version-appropriate format.
owner_id is ignored if owner_id_column not configured.
"""
state_data = self._serialize_state(state)
if self._owner_id_column_name:
sql = f"""
INSERT INTO {self._session_table} (id, app_name, user_id, state, create_time, update_time, {self._owner_id_column_name})
VALUES (:id, :app_name, :user_id, :state, SYSTIMESTAMP, SYSTIMESTAMP, :owner_id)
"""
params = {
"id": session_id,
"app_name": app_name,
"user_id": user_id,
"state": state_data,
"owner_id": owner_id,
}
else:
sql = f"""
INSERT INTO {self._session_table} (id, app_name, user_id, state, create_time, update_time)
VALUES (:id, :app_name, :user_id, :state, SYSTIMESTAMP, SYSTIMESTAMP)
"""
params = {"id": session_id, "app_name": app_name, "user_id": user_id, "state": state_data}
with self._config.provide_connection() as conn:
cursor = conn.cursor()
cursor.execute(sql, params)
conn.commit()
return self.get_session(session_id) # type: ignore[return-value]
[docs]
def get_session(self, session_id: str) -> "SessionRecord | None":
"""Get session by ID.
Args:
session_id: Session identifier.
Returns:
Session record or None if not found.
Notes:
Oracle returns datetime objects for TIMESTAMP columns.
State is deserialized using version-appropriate format.
"""
sql = f"""
SELECT id, app_name, user_id, state, create_time, update_time
FROM {self._session_table}
WHERE id = :id
"""
try:
with self._config.provide_connection() as conn:
cursor = conn.cursor()
cursor.execute(sql, {"id": session_id})
row = cursor.fetchone()
if row is None:
return None
session_id_val, app_name, user_id, state_data, create_time, update_time = row
state = self._deserialize_state(state_data)
return SessionRecord(
id=session_id_val,
app_name=app_name,
user_id=user_id,
state=state,
create_time=create_time,
update_time=update_time,
)
except oracledb.DatabaseError as e:
error_obj = e.args[0] if e.args else None
if error_obj and error_obj.code == ORACLE_TABLE_NOT_FOUND_ERROR:
return None
raise
[docs]
def update_session_state(self, session_id: str, state: "dict[str, Any]") -> None:
"""Update session state.
Args:
session_id: Session identifier.
state: New state dictionary (replaces existing state).
Notes:
This replaces the entire state dictionary.
Updates update_time to current timestamp.
State is serialized using version-appropriate format.
"""
state_data = self._serialize_state(state)
sql = f"""
UPDATE {self._session_table}
SET state = :state, update_time = SYSTIMESTAMP
WHERE id = :id
"""
with self._config.provide_connection() as conn:
cursor = conn.cursor()
cursor.execute(sql, {"state": state_data, "id": session_id})
conn.commit()
[docs]
def delete_session(self, session_id: str) -> None:
"""Delete session and all associated events (cascade).
Args:
session_id: Session identifier.
Notes:
Foreign key constraint ensures events are cascade-deleted.
"""
sql = f"DELETE FROM {self._session_table} WHERE id = :id"
with self._config.provide_connection() as conn:
cursor = conn.cursor()
cursor.execute(sql, {"id": session_id})
conn.commit()
[docs]
def list_sessions(self, app_name: str, user_id: str | None = None) -> "list[SessionRecord]":
"""List sessions for an app, optionally filtered by user.
Args:
app_name: Application name.
user_id: User identifier. If None, lists all sessions for the app.
Returns:
List of session records ordered by update_time DESC.
Notes:
Uses composite index on (app_name, user_id) when user_id is provided.
State is deserialized using version-appropriate format.
"""
if user_id is None:
sql = f"""
SELECT id, app_name, user_id, state, create_time, update_time
FROM {self._session_table}
WHERE app_name = :app_name
ORDER BY update_time DESC
"""
params = {"app_name": app_name}
else:
sql = f"""
SELECT id, app_name, user_id, state, create_time, update_time
FROM {self._session_table}
WHERE app_name = :app_name AND user_id = :user_id
ORDER BY update_time DESC
"""
params = {"app_name": app_name, "user_id": user_id}
try:
with self._config.provide_connection() as conn:
cursor = conn.cursor()
cursor.execute(sql, params)
rows = cursor.fetchall()
results = []
for row in rows:
state = self._deserialize_state(row[3])
results.append(
SessionRecord(
id=row[0],
app_name=row[1],
user_id=row[2],
state=state,
create_time=row[4],
update_time=row[5],
)
)
return results
except oracledb.DatabaseError as e:
error_obj = e.args[0] if e.args else None
if error_obj and error_obj.code == ORACLE_TABLE_NOT_FOUND_ERROR:
return []
raise
[docs]
def create_event(
self,
event_id: str,
session_id: str,
app_name: str,
user_id: str,
author: "str | None" = None,
actions: "bytes | None" = None,
content: "dict[str, Any] | None" = None,
**kwargs: Any,
) -> "EventRecord":
"""Create a new event.
Args:
event_id: Unique event identifier.
session_id: Session identifier.
app_name: Application name.
user_id: User identifier.
author: Event author (user/assistant/system).
actions: Pickled actions object.
content: Event content (JSONB/JSON).
**kwargs: Additional optional fields.
Returns:
Created event record.
Notes:
Uses SYSTIMESTAMP for timestamp if not provided.
JSON fields are serialized using version-appropriate format.
Boolean fields are converted to NUMBER(1).
"""
content_data = self._serialize_json_field(content)
grounding_metadata_data = self._serialize_json_field(kwargs.get("grounding_metadata"))
custom_metadata_data = self._serialize_json_field(kwargs.get("custom_metadata"))
sql = f"""
INSERT INTO {self._events_table} (
id, session_id, app_name, user_id, invocation_id, author, actions,
long_running_tool_ids_json, branch, timestamp, content,
grounding_metadata, custom_metadata, partial, turn_complete,
interrupted, error_code, error_message
) VALUES (
:id, :session_id, :app_name, :user_id, :invocation_id, :author, :actions,
:long_running_tool_ids_json, :branch, :timestamp, :content,
:grounding_metadata, :custom_metadata, :partial, :turn_complete,
:interrupted, :error_code, :error_message
)
"""
with self._config.provide_connection() as conn:
cursor = conn.cursor()
cursor.execute(
sql,
{
"id": event_id,
"session_id": session_id,
"app_name": app_name,
"user_id": user_id,
"invocation_id": kwargs.get("invocation_id"),
"author": author,
"actions": actions,
"long_running_tool_ids_json": kwargs.get("long_running_tool_ids_json"),
"branch": kwargs.get("branch"),
"timestamp": kwargs.get("timestamp"),
"content": content_data,
"grounding_metadata": grounding_metadata_data,
"custom_metadata": custom_metadata_data,
"partial": _to_oracle_bool(kwargs.get("partial")),
"turn_complete": _to_oracle_bool(kwargs.get("turn_complete")),
"interrupted": _to_oracle_bool(kwargs.get("interrupted")),
"error_code": kwargs.get("error_code"),
"error_message": kwargs.get("error_message"),
},
)
conn.commit()
events = self.list_events(session_id)
for event in events:
if event["id"] == event_id:
return event
msg = f"Failed to retrieve created event {event_id}"
raise RuntimeError(msg)
[docs]
def list_events(self, session_id: str) -> "list[EventRecord]":
"""List events for a session ordered by timestamp.
Args:
session_id: Session identifier.
Returns:
List of event records ordered by timestamp ASC.
Notes:
Uses index on (session_id, timestamp ASC).
JSON fields deserialized using version-appropriate format.
Converts BLOB actions to bytes and NUMBER(1) booleans to Python bool.
"""
sql = f"""
SELECT id, session_id, app_name, user_id, invocation_id, author, actions,
long_running_tool_ids_json, branch, timestamp, content,
grounding_metadata, custom_metadata, partial, turn_complete,
interrupted, error_code, error_message
FROM {self._events_table}
WHERE session_id = :session_id
ORDER BY timestamp ASC
"""
try:
with self._config.provide_connection() as conn:
cursor = conn.cursor()
cursor.execute(sql, {"session_id": session_id})
rows = cursor.fetchall()
results = []
for row in rows:
actions_blob = row[6]
actions_data = actions_blob.read() if hasattr(actions_blob, "read") else actions_blob
content = self._deserialize_json_field(row[10])
grounding_metadata = self._deserialize_json_field(row[11])
custom_metadata = self._deserialize_json_field(row[12])
results.append(
EventRecord(
id=row[0],
session_id=row[1],
app_name=row[2],
user_id=row[3],
invocation_id=row[4],
author=row[5],
actions=bytes(actions_data) if actions_data is not None else b"",
long_running_tool_ids_json=row[7],
branch=row[8],
timestamp=row[9],
content=content,
grounding_metadata=grounding_metadata,
custom_metadata=custom_metadata,
partial=_from_oracle_bool(row[13]),
turn_complete=_from_oracle_bool(row[14]),
interrupted=_from_oracle_bool(row[15]),
error_code=row[16],
error_message=row[17],
)
)
return results
except oracledb.DatabaseError as e:
error_obj = e.args[0] if e.args else None
if error_obj and error_obj.code == ORACLE_TABLE_NOT_FOUND_ERROR:
return []
raise