Psycopg Backend¶
Overview¶
Psycopg3 is a redesigned PostgreSQL adapter that provides both synchronous and asynchronous database access with native support for PostgreSQL-specific features like JSONB, server-side cursors, and the COPY protocol.
Key Features:
Dual Mode: Native async/await AND synchronous execution in a single adapter
Type Safety: Explicit
Jsonb()wrapper for type-safe JSONB operationsSQL Composition: Secure SQL building with
pg_sql.SQL()andpg_sql.Identifier()Binary Protocol: Efficient binary data transfer by default
Connection Pooling: Built-in
psycopg_poolwith async supportServer-Side Cursors: Memory-efficient processing of large result sets
Contemporary Design: Fully redesigned API for PostgreSQL
Ideal Use Cases:
Applications requiring both async and sync database access patterns
PostgreSQL-first applications leveraging JSONB features
Production systems needing robust connection pooling
Projects prioritizing type safety and explicit type handling
Async-capable adapter with dual sync/async support
Warning
CRITICAL: JSONB Type Safety
Unlike asyncpg or psqlpy, psycopg3 requires explicitly wrapping Python dicts
with Jsonb() when inserting JSONB data. This provides stronger type safety
but means you cannot pass raw dicts directly to JSONB columns.
from psycopg.types.json import Jsonb
# WRONG - Will fail
await cur.execute("INSERT INTO table (data) VALUES (%s)", ({"key": "value"},))
# CORRECT - Wrap with Jsonb()
await cur.execute("INSERT INTO table (data) VALUES (%s)", (Jsonb({"key": "value"}),))
Installation¶
Install SQLSpec with Psycopg support:
# Binary distribution (recommended for development)
pip install sqlspec[psycopg] google-genai
# C extension (better performance for production)
pip install sqlspec[psycopg] psycopg[c] google-genai
# With connection pooling (recommended)
pip install sqlspec[psycopg] psycopg-pool google-genai
Tip
Performance Options:
psycopg[binary]- Pure Python, easier installationpsycopg[c]- C extension, ~30% faster, requires compilerpsycopg-pool- Connection pooling, required for production
Quick Start¶
Async Usage (Recommended)¶
from sqlspec.adapters.psycopg import PsycopgAsyncConfig
from sqlspec.adapters.psycopg.adk import PsycopgAsyncADKStore
from sqlspec.extensions.adk import SQLSpecSessionService
# Create async config with connection pool
config = PsycopgAsyncConfig(
pool_config={
"conninfo": "postgresql://user:pass@localhost/db",
"min_size": 5,
"max_size": 20,
}
)
# Create async store
store = PsycopgAsyncADKStore(config)
await store.create_tables()
# Create session service
service = SQLSpecSessionService(store)
# Create session with JSONB state
session = await service.create_session(
app_name="my_agent",
user_id="user_123",
state={"context": "active", "preferences": {"theme": "dark"}}
)
Sync Usage¶
from sqlspec.adapters.psycopg import PsycopgSyncConfig
from sqlspec.adapters.psycopg.adk import PsycopgSyncADKStore
from sqlspec.extensions.adk import SQLSpecSessionService
# Create sync config with connection pool
config = PsycopgSyncConfig(
pool_config={
"conninfo": "postgresql://user:pass@localhost/db",
"min_size": 5,
"max_size": 20,
}
)
# Create sync store
store = PsycopgSyncADKStore(config)
store.create_tables()
# Create session service
service = SQLSpecSessionService(store)
# Create session
session = service.create_session(
app_name="my_agent",
user_id="user_123",
state={"context": "active"}
)
Configuration¶
Basic Async Configuration¶
from sqlspec.adapters.psycopg import PsycopgAsyncConfig
config = PsycopgAsyncConfig(
pool_config={
"conninfo": "postgresql://user:pass@localhost:5432/dbname",
"min_size": 5, # Minimum pool connections
"max_size": 20, # Maximum pool connections
"timeout": 30.0, # Connection acquisition timeout
"max_lifetime": 3600.0, # Max connection lifetime (1 hour)
"max_idle": 600.0, # Max connection idle time (10 min)
}
)
Basic Sync Configuration¶
from sqlspec.adapters.psycopg import PsycopgSyncConfig
config = PsycopgSyncConfig(
pool_config={
"conninfo": "postgresql://user:pass@localhost:5432/dbname",
"min_size": 5,
"max_size": 20,
}
)
Advanced Configuration¶
config = PsycopgAsyncConfig(
pool_config={
# Connection string
"conninfo": "postgresql://user:pass@localhost/db?sslmode=require",
# OR individual parameters
"host": "localhost",
"port": 5432,
"user": "myuser",
"password": "mypass",
"dbname": "mydb",
# Pool settings
"min_size": 5,
"max_size": 20,
"timeout": 30.0,
"max_waiting": 0, # Max queued connection requests
"max_lifetime": 3600.0, # Recycle connections hourly
"max_idle": 600.0, # Close idle connections after 10min
"reconnect_timeout": 300.0,
"num_workers": 3, # Background worker threads
# Connection settings
"connect_timeout": 10,
"application_name": "my_adk_agent",
"sslmode": "require",
"autocommit": False,
}
)
Custom Table Names¶
store = PsycopgAsyncADKStore(
config,
session_table="agent_sessions",
events_table="agent_events"
)
Schema¶
Sessions Table¶
CREATE TABLE IF NOT EXISTS adk_sessions (
id VARCHAR(128) PRIMARY KEY,
app_name VARCHAR(128) NOT NULL,
user_id VARCHAR(128) NOT NULL,
state JSONB NOT NULL DEFAULT '{}'::jsonb, -- PostgreSQL JSONB type
create_time TIMESTAMPTZ NOT NULL DEFAULT CURRENT_TIMESTAMP,
update_time TIMESTAMPTZ NOT NULL DEFAULT CURRENT_TIMESTAMP
) WITH (fillfactor = 80); -- HOT updates optimization
-- Composite index for listing sessions
CREATE INDEX IF NOT EXISTS idx_adk_sessions_app_user
ON adk_sessions(app_name, user_id);
-- Index for recent sessions queries
CREATE INDEX IF NOT EXISTS idx_adk_sessions_update_time
ON adk_sessions(update_time DESC);
-- Partial GIN index for JSONB queries (only non-empty state)
CREATE INDEX IF NOT EXISTS idx_adk_sessions_state
ON adk_sessions USING GIN (state)
WHERE state != '{}'::jsonb;
Events Table¶
CREATE TABLE IF NOT EXISTS adk_events (
id VARCHAR(128) PRIMARY KEY,
session_id VARCHAR(128) NOT NULL,
app_name VARCHAR(128) NOT NULL,
user_id VARCHAR(128) NOT NULL,
invocation_id VARCHAR(256),
author VARCHAR(256),
actions BYTEA, -- Binary serialized actions
long_running_tool_ids_json TEXT,
branch VARCHAR(256),
timestamp TIMESTAMPTZ NOT NULL DEFAULT CURRENT_TIMESTAMP,
content JSONB, -- Message content
grounding_metadata JSONB, -- Grounding information
custom_metadata JSONB, -- Custom application data
partial BOOLEAN,
turn_complete BOOLEAN,
interrupted BOOLEAN,
error_code VARCHAR(256),
error_message VARCHAR(1024),
FOREIGN KEY (session_id) REFERENCES adk_sessions(id) ON DELETE CASCADE
);
-- Composite index for event retrieval
CREATE INDEX IF NOT EXISTS idx_adk_events_session
ON adk_events(session_id, timestamp ASC);
Note
PostgreSQL-Specific Features:
JSONB- Binary JSON type, more efficient than JSON textTIMESTAMPTZ- Timezone-aware timestamps with microsecond precisionBYTEA- Binary data storage for pickled actionsFILLFACTOR 80- Leaves space for HOT updates, reduces bloatGIN Index- Efficient JSONB queries and containment operationsCASCADE DELETE- Automatic cleanup of events when session deleted
Usage Patterns¶
CRITICAL: Jsonb() Wrapper Requirement¶
Psycopg3 requires explicit type wrapping for JSONB data:
from psycopg.types.json import Jsonb
# Creating session with JSONB state
state = {"user": "alice", "preferences": {"theme": "dark"}}
# Store handles Jsonb() wrapping internally
session = await service.create_session(
app_name="my_app",
user_id="alice",
state=state # Automatically wrapped internally
)
# Manual cursor usage - MUST wrap yourself
async with config.provide_connection() as conn:
async with conn.cursor() as cur:
# WRONG - Will fail with type error
await cur.execute(
"INSERT INTO sessions (state) VALUES (%s)",
({"key": "value"},)
)
# CORRECT - Wrap with Jsonb()
await cur.execute(
"INSERT INTO sessions (state) VALUES (%s)",
(Jsonb({"key": "value"}),)
)
SQL Composition with pg_sql¶
Psycopg3 provides safe SQL composition tools:
from psycopg import sql as pg_sql
from psycopg.types.json import Jsonb
# Safe dynamic table/column names
async with config.provide_connection() as conn:
async with conn.cursor() as cur:
# Compose SQL with identifiers (prevents SQL injection)
query = pg_sql.SQL("""
INSERT INTO {table} (id, state, update_time)
VALUES (%s, %s, CURRENT_TIMESTAMP)
""").format(table=pg_sql.Identifier("adk_sessions"))
await cur.execute(query, (session_id, Jsonb(state)))
# Multiple identifiers
query = pg_sql.SQL("""
SELECT {col1}, {col2} FROM {table} WHERE {col1} = %s
""").format(
col1=pg_sql.Identifier("user_id"),
col2=pg_sql.Identifier("state"),
table=pg_sql.Identifier("adk_sessions")
)
await cur.execute(query, ("user_123",))
Warning
Never use f-strings or format() for SQL construction!
Use pg_sql.SQL() and pg_sql.Identifier() to prevent SQL injection.
Cursor Context Managers¶
Psycopg3 requires cursor context managers:
# Async cursor pattern
async with config.provide_connection() as conn:
async with conn.cursor() as cur:
await cur.execute("SELECT * FROM adk_sessions WHERE user_id = %s", ("alice",))
rows = await cur.fetchall()
# Sync cursor pattern
with config.provide_connection() as conn:
with conn.cursor() as cur:
cur.execute("SELECT * FROM adk_sessions WHERE user_id = %s", ("alice",))
rows = cur.fetchall()
Server-Side Cursors (Large Result Sets)¶
For processing large event histories:
async with config.provide_connection() as conn:
# Named cursor creates server-side cursor
async with conn.cursor(name="large_event_query") as cur:
await cur.execute("""
SELECT * FROM adk_events
WHERE app_name = %s
ORDER BY timestamp ASC
""", ("my_app",))
# Stream results without loading all into memory
async for row in cur:
process_event(row)
Transaction Management¶
# Async transaction with context manager
async with config.provide_connection() as conn:
async with conn.transaction():
async with conn.cursor() as cur:
await cur.execute(sql1)
await cur.execute(sql2)
# Auto-commit on success, rollback on exception
# Sync transaction
with config.provide_connection() as conn:
with conn.transaction():
with conn.cursor() as cur:
cur.execute(sql1)
cur.execute(sql2)
# Manual transaction control
async with config.provide_connection() as conn:
await conn.set_autocommit(False)
async with conn.cursor() as cur:
try:
await cur.execute(sql1)
await cur.execute(sql2)
await conn.commit()
except Exception:
await conn.rollback()
raise
Performance Considerations¶
JSONB with Jsonb() Wrapper¶
The explicit Jsonb() wrapper provides:
Advantages:
Type safety - Catch errors at insert time, not query time
Explicit conversion - Clear when JSONB type is intended
Performance - Binary protocol optimization for JSONB
Pattern:
from psycopg.types.json import Jsonb
# Session state
state = {"key": "value"}
# Event content
content = {"parts": [{"text": "Hello"}]}
# Metadata
metadata = {"source": "web", "version": "1.0"}
# All must be wrapped when inserting manually
await cur.execute(
"INSERT INTO events (content, metadata) VALUES (%s, %s)",
(Jsonb(content), Jsonb(metadata))
)
Connection Pooling¶
Psycopg3 has built-in connection pooling via psycopg_pool:
config = PsycopgAsyncConfig(
pool_config={
"conninfo": "postgresql://...",
"min_size": 5, # Pre-create 5 connections
"max_size": 20, # Allow up to 20 connections
"max_lifetime": 3600.0, # Recycle connections hourly
"max_idle": 600.0, # Close idle connections after 10min
"num_workers": 3, # Background maintenance workers
}
)
Pool Benefits:
Connection reuse - Avoid expensive connection establishment
Resource limits - Prevent connection exhaustion
Auto-reconnect - Handle connection failures gracefully
Background maintenance - Periodic connection health checks
Binary Protocol¶
Psycopg3 uses binary protocol by default:
Faster than text protocol (~30% for large datasets)
More efficient for JSONB, BYTEA, arrays
Automatic type adaptation
COPY Protocol (Bulk Operations)¶
For bulk event insertion:
async with config.provide_connection() as conn:
async with conn.cursor() as cur:
# COPY is much faster than executemany for bulk inserts
async with cur.copy("COPY adk_events (id, session_id, ...) FROM STDIN") as copy:
for event in large_event_list:
await copy.write_row(event)
Prepared Statements¶
Psycopg3 automatically prepares frequently-used queries:
No manual preparation needed
Performance benefit for repeated queries
Automatic cache management
Best Practices¶
When to Use Async vs Sync¶
Use Async (PsycopgAsyncConfig) When:
Building async web applications (Litestar, FastAPI)
Need high concurrency with many simultaneous users
Integrating with async AI agent frameworks
Performance is critical for I/O-bound operations
Use Sync (PsycopgSyncConfig) When:
Simple scripts or batch processing jobs
Integration with sync-only frameworks
Development/testing with minimal complexity
Migration from psycopg2 codebase
SQL Composition Best Practices¶
from psycopg import sql as pg_sql
# GOOD - Safe identifier composition
query = pg_sql.SQL("SELECT * FROM {table} WHERE {col} = %s").format(
table=pg_sql.Identifier("adk_sessions"),
col=pg_sql.Identifier("user_id")
)
# BAD - SQL injection risk
table_name = "adk_sessions"
query = f"SELECT * FROM {table_name} WHERE user_id = %s" # DON'T!
JSONB Query Patterns¶
# Query JSONB fields
await cur.execute("""
SELECT id, state->>'theme' as theme
FROM adk_sessions
WHERE state @> %s::jsonb
""", (Jsonb({"preferences": {"theme": "dark"}}),))
# JSONB containment
await cur.execute("""
SELECT * FROM adk_sessions
WHERE state @> %s::jsonb
""", (Jsonb({"active": True}),))
# JSONB path queries
await cur.execute("""
SELECT * FROM adk_sessions
WHERE state #> '{preferences,theme}' = %s
""", ("dark",))
Connection Pool Sizing¶
# For web applications
config = PsycopgAsyncConfig(
pool_config={
"min_size": 10, # Match expected concurrent requests
"max_size": 50, # 2-3x min_size for burst traffic
"max_lifetime": 3600.0, # Recycle hourly
}
)
# For background workers
config = PsycopgAsyncConfig(
pool_config={
"min_size": 2,
"max_size": 10,
}
)
Use Cases¶
Async Web Application¶
from litestar import Litestar, get
from sqlspec.adapters.psycopg import PsycopgAsyncConfig
from sqlspec.adapters.psycopg.adk import PsycopgAsyncADKStore
from sqlspec.extensions.adk import SQLSpecSessionService
config = PsycopgAsyncConfig(
pool_config={"conninfo": "postgresql://..."}
)
store = PsycopgAsyncADKStore(config)
service = SQLSpecSessionService(store)
@get("/sessions/{user_id:str}")
async def list_sessions(user_id: str) -> list:
sessions = await service.list_sessions("web_app", user_id)
return [s.to_dict() for s in sessions]
app = Litestar([list_sessions])
Sync Background Worker¶
from sqlspec.adapters.psycopg import PsycopgSyncConfig
from sqlspec.adapters.psycopg.adk import PsycopgSyncADKStore
from sqlspec.extensions.adk import SQLSpecSessionService
config = PsycopgSyncConfig(
pool_config={"conninfo": "postgresql://..."}
)
store = PsycopgSyncADKStore(config)
service = SQLSpecSessionService(store)
def cleanup_old_sessions():
# Sync operation for scheduled job
all_sessions = store.list_sessions("my_app", "user_123")
for session in all_sessions:
if is_expired(session):
store.delete_session(session["id"])
Mixed Async/Sync Application¶
# Async config for web API
async_config = PsycopgAsyncConfig(
pool_config={"conninfo": "postgresql://..."}
)
async_store = PsycopgAsyncADKStore(async_config)
# Sync config for CLI tools (separate pool)
sync_config = PsycopgSyncConfig(
pool_config={"conninfo": "postgresql://..."}
)
sync_store = PsycopgSyncADKStore(sync_config)
Comparison to Other PostgreSQL Drivers¶
Psycopg3 vs AsyncPG¶
Feature |
Psycopg3 |
AsyncPG |
|---|---|---|
Async/Sync Support |
Both native |
Async only |
JSONB Handling |
Explicit |
Direct dict insertion |
Parameter Style |
|
|
SQL Composition |
|
Manual string composition |
Performance |
Very fast (binary protocol) |
Very fast (Cython-optimized) |
Type Safety |
Explicit, safer |
Implicit, convenient |
Cursor Model |
Context managers required |
Direct cursor usage |
Best For |
Dual async/sync, type safety |
Pure async, raw performance |
Psycopg3 vs Psqlpy¶
Feature |
Psycopg3 |
Psqlpy |
|---|---|---|
Implementation |
Python + C extensions |
Rust-based |
Maturity |
Stable, production-ready |
Evolving |
JSONB Handling |
|
Direct dict insertion |
Parameter Style |
|
|
Ecosystem |
Large, mature |
Growing |
Performance |
Very fast |
Extremely fast |
Best For |
General-purpose PostgreSQL |
Performance-critical workloads |
When to Choose Psycopg3¶
Choose Psycopg3 When:
Need both async AND sync database access
Want explicit type safety with JSONB operations
Require dual-mode async/sync adapter capabilities
Prefer PostgreSQL’s official SQL composition tools
Building applications with mixed sync/async components
Value ecosystem maturity and stability
Consider AsyncPG When:
Pure async application, no sync needed
Want simplest JSONB insertion (no wrapper required)
Want Cython-based performance characteristics
Prefer implicit type conversion
Consider Psqlpy When:
Want Rust-based performance characteristics
Building high-throughput data pipelines
Want Rust safety guarantees
Can work with an evolving ecosystem
Troubleshooting¶
Jsonb() Wrapper Errors¶
Error:
psycopg.errors.UndefinedFunction: operator does not exist: jsonb = record
Solution: Wrap dicts with Jsonb():
from psycopg.types.json import Jsonb
# WRONG
await cur.execute("INSERT INTO table (data) VALUES (%s)", ({"key": "value"},))
# CORRECT
await cur.execute("INSERT INTO table (data) VALUES (%s)", (Jsonb({"key": "value"}),))
SQL Composition Errors¶
Error:
psycopg.sql.Composable object is not iterable
Solution: Format SQL before execution:
from psycopg import sql as pg_sql
# WRONG - Missing .format()
query = pg_sql.SQL("SELECT * FROM {table}").format(table=pg_sql.Identifier("users"))
await cur.execute(query) # Already formatted!
# CORRECT
query = pg_sql.SQL("SELECT * FROM {table}").format(table=pg_sql.Identifier("users"))
await cur.execute(query, ()) # No need to format again
Parameter Style Confusion¶
Error: Using wrong parameter placeholders:
# WRONG - PostgreSQL numeric style (that's asyncpg!)
await cur.execute("SELECT * FROM users WHERE id = $1", (123,))
# CORRECT - Psycopg uses %s
await cur.execute("SELECT * FROM users WHERE id = %s", (123,))
Connection Pool Not Opening¶
Error:
pool is not open
Solution: Ensure async pool is opened:
# Pool is automatically opened by config
async with config.provide_connection() as conn:
# This works
pass
# Or manually if using pool directly
pool = AsyncConnectionPool(conninfo, open=False)
await pool.open()
Cursor Not Found¶
Error:
cursor does not exist
Solution: Use context managers for cursors:
# WRONG - Cursor closed prematurely
conn = await config.create_connection()
cur = await conn.cursor()
await cur.execute(query)
# cur is closed here
# CORRECT - Use context manager
async with config.provide_connection() as conn:
async with conn.cursor() as cur:
await cur.execute(query)
rows = await cur.fetchall()
Migration from Psycopg2¶
Key Differences¶
# Psycopg2 (old)
import psycopg2
conn = psycopg2.connect("dbname=test")
cur = conn.cursor()
cur.execute("SELECT * FROM table")
# Psycopg3 (new) - Async
import psycopg
async with await psycopg.AsyncConnection.connect("dbname=test") as conn:
async with conn.cursor() as cur:
await cur.execute("SELECT * FROM table")
JSONB Handling Changes¶
# Psycopg2
import json
cur.execute("INSERT INTO table (data) VALUES (%s)", (json.dumps({"key": "value"}),))
# Psycopg3
from psycopg.types.json import Jsonb
await cur.execute("INSERT INTO table (data) VALUES (%s)", (Jsonb({"key": "value"}),))
Connection Pool Migration¶
# Psycopg2 (using psycopg2.pool)
from psycopg2.pool import ThreadedConnectionPool
pool = ThreadedConnectionPool(5, 20, dsn="...")
# Psycopg3 (using psycopg_pool)
from psycopg_pool import AsyncConnectionPool
pool = AsyncConnectionPool("...", min_size=5, max_size=20)
await pool.open()
API Reference¶
- class sqlspec.adapters.psycopg.adk.PsycopgAsyncADKStore[source]
Bases:
BaseAsyncADKStore[PsycopgAsyncConfig]PostgreSQL ADK store using Psycopg3 driver.
Implements session and event storage for Google Agent Development Kit using PostgreSQL via psycopg3 with native async/await support.
Provides: - Session state management with JSONB storage and merge operations - Event history tracking with BYTEA-serialized actions - Microsecond-precision timestamps with TIMESTAMPTZ - Foreign key constraints with cascade delete - Efficient upserts using ON CONFLICT - GIN indexes for JSONB queries - HOT updates with FILLFACTOR 80
- Parameters:
config¶ (
PsycopgAsyncConfig) – PsycopgAsyncConfig with extension_config[“adk”] settings.
Example
from sqlspec.adapters.psycopg import PsycopgAsyncConfig from sqlspec.adapters.psycopg.adk import PsycopgAsyncADKStore
- config = PsycopgAsyncConfig(
pool_config={“conninfo”: “postgresql://…”}, extension_config={
- “adk”: {
“session_table”: “my_sessions”, “events_table”: “my_events”, “owner_id_column”: “tenant_id INTEGER NOT NULL REFERENCES tenants(id) ON DELETE CASCADE”
}
}
) store = PsycopgAsyncADKStore(config) await store.create_tables()
Notes
PostgreSQL JSONB type used for state (more efficient than JSON)
Psycopg requires wrapping dicts with Jsonb() for type safety
TIMESTAMPTZ provides timezone-aware microsecond precision
State merging uses state || $1::jsonb operator for efficiency
BYTEA for pre-serialized actions from Google ADK
GIN index on state for JSONB queries (partial index)
FILLFACTOR 80 leaves space for HOT updates
Parameter style: $1, $2, $3 (PostgreSQL numeric placeholders)
Configuration is read from config.extension_config[“adk”]
- __init__(config)[source]
Initialize Psycopg ADK store.
- Parameters:
config¶ (
PsycopgAsyncConfig) – PsycopgAsyncConfig instance.
Notes
Configuration is read from config.extension_config[“adk”]: - session_table: Sessions table name (default: “adk_sessions”) - events_table: Events table name (default: “adk_events”) - owner_id_column: Optional owner FK column DDL (default: None)
- async create_tables()[source]
Create both sessions and events tables if they don’t exist.
- Return type:
- async create_session(session_id, app_name, user_id, state, owner_id=None)[source]
Create a new session.
- Parameters:
- Return type:
- Returns:
Created session record.
Notes
Uses CURRENT_TIMESTAMP for create_time and update_time. State is wrapped with Jsonb() for PostgreSQL type safety. If owner_id_column is configured, owner_id value must be provided.
- async get_session(session_id)[source]
Get session by ID.
- Parameters:
- Return type:
- Returns:
Session record or None if not found.
Notes
PostgreSQL returns datetime objects for TIMESTAMPTZ columns. JSONB is automatically deserialized by psycopg to Python dict.
- async update_session_state(session_id, state)[source]
Update session state.
- Parameters:
- Return type:
Notes
This replaces the entire state dictionary. Uses CURRENT_TIMESTAMP for update_time. State is wrapped with Jsonb() for PostgreSQL type safety.
- async delete_session(session_id)[source]
Delete session and all associated events (cascade).
Notes
Foreign key constraint ensures events are cascade-deleted.
- async list_sessions(app_name, user_id=None)[source]
List sessions for an app, optionally filtered by user.
- Parameters:
- Return type:
- Returns:
List of session records ordered by update_time DESC.
Notes
Uses composite index on (app_name, user_id) when user_id is provided.
- async append_event(event_record)[source]
Append an event to a session.
- Parameters:
event_record¶ (
EventRecord) – Event record to store.- Return type:
Notes
Uses CURRENT_TIMESTAMP for timestamp if not provided. JSONB fields are wrapped with Jsonb() for PostgreSQL type safety.
- async get_events(session_id, after_timestamp=None, limit=None)[source]
Get events for a session.
- Parameters:
- Return type:
- Returns:
List of event records ordered by timestamp ASC.
Notes
Uses index on (session_id, timestamp ASC). JSONB fields are automatically deserialized by psycopg. BYTEA actions are converted to bytes.
- property config: ConfigT
Return the database configuration.
- property events_table: str
Return the events table name.
- property owner_id_column_ddl: str | None
Return the full owner ID column DDL (or None if not configured).
- property owner_id_column_name: str | None
Return the owner ID column name only (or None if not configured).
- property session_table: str
Return the sessions table name.
- class sqlspec.adapters.psycopg.adk.PsycopgSyncADKStore[source]
Bases:
BaseSyncADKStore[PsycopgSyncConfig]PostgreSQL synchronous ADK store using Psycopg3 driver.
Implements session and event storage for Google Agent Development Kit using PostgreSQL via psycopg3 with synchronous execution.
Provides: - Session state management with JSONB storage and merge operations - Event history tracking with BYTEA-serialized actions - Microsecond-precision timestamps with TIMESTAMPTZ - Foreign key constraints with cascade delete - Efficient upserts using ON CONFLICT - GIN indexes for JSONB queries - HOT updates with FILLFACTOR 80
- Parameters:
config¶ (
PsycopgSyncConfig) – PsycopgSyncConfig with extension_config[“adk”] settings.
Example
from sqlspec.adapters.psycopg import PsycopgSyncConfig from sqlspec.adapters.psycopg.adk import PsycopgSyncADKStore
- config = PsycopgSyncConfig(
pool_config={“conninfo”: “postgresql://…”}, extension_config={
- “adk”: {
“session_table”: “my_sessions”, “events_table”: “my_events”, “owner_id_column”: “tenant_id INTEGER NOT NULL REFERENCES tenants(id) ON DELETE CASCADE”
}
}
) store = PsycopgSyncADKStore(config) store.create_tables()
Notes
PostgreSQL JSONB type used for state (more efficient than JSON)
Psycopg requires wrapping dicts with Jsonb() for type safety
TIMESTAMPTZ provides timezone-aware microsecond precision
State merging uses state || $1::jsonb operator for efficiency
BYTEA for pre-serialized actions from Google ADK
GIN index on state for JSONB queries (partial index)
FILLFACTOR 80 leaves space for HOT updates
Parameter style: $1, $2, $3 (PostgreSQL numeric placeholders)
Configuration is read from config.extension_config[“adk”]
- __init__(config)[source]
Initialize Psycopg synchronous ADK store.
- Parameters:
config¶ (
PsycopgSyncConfig) – PsycopgSyncConfig instance.
Notes
Configuration is read from config.extension_config[“adk”]: - session_table: Sessions table name (default: “adk_sessions”) - events_table: Events table name (default: “adk_events”) - owner_id_column: Optional owner FK column DDL (default: None)
- create_tables()[source]
Create both sessions and events tables if they don’t exist.
- Return type:
- create_session(session_id, app_name, user_id, state, owner_id=None)[source]
Create a new session.
- Parameters:
- Return type:
- Returns:
Created session record.
Notes
Uses CURRENT_TIMESTAMP for create_time and update_time. State is wrapped with Jsonb() for PostgreSQL type safety. If owner_id_column is configured, owner_id value must be provided.
- get_session(session_id)[source]
Get session by ID.
- Parameters:
- Return type:
- Returns:
Session record or None if not found.
Notes
PostgreSQL returns datetime objects for TIMESTAMPTZ columns. JSONB is automatically deserialized by psycopg to Python dict.
- update_session_state(session_id, state)[source]
Update session state.
- Parameters:
- Return type:
Notes
This replaces the entire state dictionary. Uses CURRENT_TIMESTAMP for update_time. State is wrapped with Jsonb() for PostgreSQL type safety.
- delete_session(session_id)[source]
Delete session and all associated events (cascade).
Notes
Foreign key constraint ensures events are cascade-deleted.
- list_sessions(app_name, user_id=None)[source]
List sessions for an app, optionally filtered by user.
- Parameters:
- Return type:
- Returns:
List of session records ordered by update_time DESC.
Notes
Uses composite index on (app_name, user_id) when user_id is provided.
- create_event(event_id, session_id, app_name, user_id, author=None, actions=None, content=None, **kwargs)[source]
Create a new event.
- Parameters:
author¶ (
str|None) – Event author (user/assistant/system).content¶ (
dict[str, typing.Any] |None) – Event content (JSONB).**kwargs¶ (
Any) – Additional optional fields (invocation_id, branch, timestamp, grounding_metadata, custom_metadata, partial, turn_complete, interrupted, error_code, error_message, long_running_tool_ids_json).
- Return type:
- Returns:
Created event record.
Notes
Uses CURRENT_TIMESTAMP for timestamp if not provided in kwargs. JSONB fields are wrapped with Jsonb() for PostgreSQL type safety.
- list_events(session_id)[source]
List events for a session ordered by timestamp.
- Parameters:
- Return type:
- Returns:
List of event records ordered by timestamp ASC.
Notes
Uses index on (session_id, timestamp ASC). JSONB fields are automatically deserialized by psycopg. BYTEA actions are converted to bytes.
- property config: ConfigT
Return the database configuration.
- property events_table: str
Return the events table name.
- property owner_id_column_ddl: str | None
Return the full owner ID column DDL (or None if not configured).
- property owner_id_column_name: str | None
Return the owner ID column name only (or None if not configured).
- property session_table: str
Return the sessions table name.
See Also¶
Quick Start - Quick start guide
Database Adapters - Adapter comparison
Schema Reference - Database schema details
/reference/adapters/psycopg - SQLSpec Psycopg adapter reference
Psycopg3 Documentation - Official documentation
Psycopg3 Basic Usage - Usage guide
PostgreSQL JSONB Functions - JSONB operations