Psycopg Backend

Overview

Psycopg3 is a redesigned PostgreSQL adapter that provides both synchronous and asynchronous database access with native support for PostgreSQL-specific features like JSONB, server-side cursors, and the COPY protocol.

Key Features:

  • Dual Mode: Native async/await AND synchronous execution in a single adapter

  • Type Safety: Explicit Jsonb() wrapper for type-safe JSONB operations

  • SQL Composition: Secure SQL building with pg_sql.SQL() and pg_sql.Identifier()

  • Binary Protocol: Efficient binary data transfer by default

  • Connection Pooling: Built-in psycopg_pool with async support

  • Server-Side Cursors: Memory-efficient processing of large result sets

  • Contemporary Design: Fully redesigned API for PostgreSQL

Ideal Use Cases:

  • Applications requiring both async and sync database access patterns

  • PostgreSQL-first applications leveraging JSONB features

  • Production systems needing robust connection pooling

  • Projects prioritizing type safety and explicit type handling

  • Async-capable adapter with dual sync/async support

Warning

CRITICAL: JSONB Type Safety

Unlike asyncpg or psqlpy, psycopg3 requires explicitly wrapping Python dicts with Jsonb() when inserting JSONB data. This provides stronger type safety but means you cannot pass raw dicts directly to JSONB columns.

from psycopg.types.json import Jsonb

# WRONG - Will fail
await cur.execute("INSERT INTO table (data) VALUES (%s)", ({"key": "value"},))

# CORRECT - Wrap with Jsonb()
await cur.execute("INSERT INTO table (data) VALUES (%s)", (Jsonb({"key": "value"}),))

Installation

Install SQLSpec with Psycopg support:

# Binary distribution (recommended for development)
pip install sqlspec[psycopg] google-genai

# C extension (better performance for production)
pip install sqlspec[psycopg] psycopg[c] google-genai

# With connection pooling (recommended)
pip install sqlspec[psycopg] psycopg-pool google-genai

Tip

Performance Options:

  • psycopg[binary] - Pure Python, easier installation

  • psycopg[c] - C extension, ~30% faster, requires compiler

  • psycopg-pool - Connection pooling, required for production

Quick Start

Sync Usage

from sqlspec.adapters.psycopg import PsycopgSyncConfig
from sqlspec.adapters.psycopg.adk import PsycopgSyncADKStore
from sqlspec.extensions.adk import SQLSpecSessionService

# Create sync config with connection pool
config = PsycopgSyncConfig(
    pool_config={
        "conninfo": "postgresql://user:pass@localhost/db",
        "min_size": 5,
        "max_size": 20,
    }
)

# Create sync store
store = PsycopgSyncADKStore(config)
store.create_tables()

# Create session service
service = SQLSpecSessionService(store)

# Create session
session = service.create_session(
    app_name="my_agent",
    user_id="user_123",
    state={"context": "active"}
)

Configuration

Basic Async Configuration

from sqlspec.adapters.psycopg import PsycopgAsyncConfig

config = PsycopgAsyncConfig(
    pool_config={
        "conninfo": "postgresql://user:pass@localhost:5432/dbname",
        "min_size": 5,           # Minimum pool connections
        "max_size": 20,          # Maximum pool connections
        "timeout": 30.0,         # Connection acquisition timeout
        "max_lifetime": 3600.0,  # Max connection lifetime (1 hour)
        "max_idle": 600.0,       # Max connection idle time (10 min)
    }
)

Basic Sync Configuration

from sqlspec.adapters.psycopg import PsycopgSyncConfig

config = PsycopgSyncConfig(
    pool_config={
        "conninfo": "postgresql://user:pass@localhost:5432/dbname",
        "min_size": 5,
        "max_size": 20,
    }
)

Advanced Configuration

config = PsycopgAsyncConfig(
    pool_config={
        # Connection string
        "conninfo": "postgresql://user:pass@localhost/db?sslmode=require",

        # OR individual parameters
        "host": "localhost",
        "port": 5432,
        "user": "myuser",
        "password": "mypass",
        "dbname": "mydb",

        # Pool settings
        "min_size": 5,
        "max_size": 20,
        "timeout": 30.0,
        "max_waiting": 0,        # Max queued connection requests
        "max_lifetime": 3600.0,  # Recycle connections hourly
        "max_idle": 600.0,       # Close idle connections after 10min
        "reconnect_timeout": 300.0,
        "num_workers": 3,        # Background worker threads

        # Connection settings
        "connect_timeout": 10,
        "application_name": "my_adk_agent",
        "sslmode": "require",
        "autocommit": False,
    }
)

Custom Table Names

store = PsycopgAsyncADKStore(
    config,
    session_table="agent_sessions",
    events_table="agent_events"
)

Schema

Sessions Table

CREATE TABLE IF NOT EXISTS adk_sessions (
    id VARCHAR(128) PRIMARY KEY,
    app_name VARCHAR(128) NOT NULL,
    user_id VARCHAR(128) NOT NULL,
    state JSONB NOT NULL DEFAULT '{}'::jsonb,  -- PostgreSQL JSONB type
    create_time TIMESTAMPTZ NOT NULL DEFAULT CURRENT_TIMESTAMP,
    update_time TIMESTAMPTZ NOT NULL DEFAULT CURRENT_TIMESTAMP
) WITH (fillfactor = 80);  -- HOT updates optimization

-- Composite index for listing sessions
CREATE INDEX IF NOT EXISTS idx_adk_sessions_app_user
    ON adk_sessions(app_name, user_id);

-- Index for recent sessions queries
CREATE INDEX IF NOT EXISTS idx_adk_sessions_update_time
    ON adk_sessions(update_time DESC);

-- Partial GIN index for JSONB queries (only non-empty state)
CREATE INDEX IF NOT EXISTS idx_adk_sessions_state
    ON adk_sessions USING GIN (state)
    WHERE state != '{}'::jsonb;

Events Table

CREATE TABLE IF NOT EXISTS adk_events (
    id VARCHAR(128) PRIMARY KEY,
    session_id VARCHAR(128) NOT NULL,
    app_name VARCHAR(128) NOT NULL,
    user_id VARCHAR(128) NOT NULL,
    invocation_id VARCHAR(256),
    author VARCHAR(256),
    actions BYTEA,                        -- Binary serialized actions
    long_running_tool_ids_json TEXT,
    branch VARCHAR(256),
    timestamp TIMESTAMPTZ NOT NULL DEFAULT CURRENT_TIMESTAMP,
    content JSONB,                        -- Message content
    grounding_metadata JSONB,             -- Grounding information
    custom_metadata JSONB,                -- Custom application data
    partial BOOLEAN,
    turn_complete BOOLEAN,
    interrupted BOOLEAN,
    error_code VARCHAR(256),
    error_message VARCHAR(1024),
    FOREIGN KEY (session_id) REFERENCES adk_sessions(id) ON DELETE CASCADE
);

-- Composite index for event retrieval
CREATE INDEX IF NOT EXISTS idx_adk_events_session
    ON adk_events(session_id, timestamp ASC);

Note

PostgreSQL-Specific Features:

  • JSONB - Binary JSON type, more efficient than JSON text

  • TIMESTAMPTZ - Timezone-aware timestamps with microsecond precision

  • BYTEA - Binary data storage for pickled actions

  • FILLFACTOR 80 - Leaves space for HOT updates, reduces bloat

  • GIN Index - Efficient JSONB queries and containment operations

  • CASCADE DELETE - Automatic cleanup of events when session deleted

Usage Patterns

CRITICAL: Jsonb() Wrapper Requirement

Psycopg3 requires explicit type wrapping for JSONB data:

from psycopg.types.json import Jsonb

# Creating session with JSONB state
state = {"user": "alice", "preferences": {"theme": "dark"}}

# Store handles Jsonb() wrapping internally
session = await service.create_session(
    app_name="my_app",
    user_id="alice",
    state=state  # Automatically wrapped internally
)

# Manual cursor usage - MUST wrap yourself
async with config.provide_connection() as conn:
    async with conn.cursor() as cur:
        # WRONG - Will fail with type error
        await cur.execute(
            "INSERT INTO sessions (state) VALUES (%s)",
            ({"key": "value"},)
        )

        # CORRECT - Wrap with Jsonb()
        await cur.execute(
            "INSERT INTO sessions (state) VALUES (%s)",
            (Jsonb({"key": "value"}),)
        )

SQL Composition with pg_sql

Psycopg3 provides safe SQL composition tools:

from psycopg import sql as pg_sql
from psycopg.types.json import Jsonb

# Safe dynamic table/column names
async with config.provide_connection() as conn:
    async with conn.cursor() as cur:
        # Compose SQL with identifiers (prevents SQL injection)
        query = pg_sql.SQL("""
            INSERT INTO {table} (id, state, update_time)
            VALUES (%s, %s, CURRENT_TIMESTAMP)
        """).format(table=pg_sql.Identifier("adk_sessions"))

        await cur.execute(query, (session_id, Jsonb(state)))

        # Multiple identifiers
        query = pg_sql.SQL("""
            SELECT {col1}, {col2} FROM {table} WHERE {col1} = %s
        """).format(
            col1=pg_sql.Identifier("user_id"),
            col2=pg_sql.Identifier("state"),
            table=pg_sql.Identifier("adk_sessions")
        )

        await cur.execute(query, ("user_123",))

Warning

Never use f-strings or format() for SQL construction!

Use pg_sql.SQL() and pg_sql.Identifier() to prevent SQL injection.

Cursor Context Managers

Psycopg3 requires cursor context managers:

# Async cursor pattern
async with config.provide_connection() as conn:
    async with conn.cursor() as cur:
        await cur.execute("SELECT * FROM adk_sessions WHERE user_id = %s", ("alice",))
        rows = await cur.fetchall()

# Sync cursor pattern
with config.provide_connection() as conn:
    with conn.cursor() as cur:
        cur.execute("SELECT * FROM adk_sessions WHERE user_id = %s", ("alice",))
        rows = cur.fetchall()

Server-Side Cursors (Large Result Sets)

For processing large event histories:

async with config.provide_connection() as conn:
    # Named cursor creates server-side cursor
    async with conn.cursor(name="large_event_query") as cur:
        await cur.execute("""
            SELECT * FROM adk_events
            WHERE app_name = %s
            ORDER BY timestamp ASC
        """, ("my_app",))

        # Stream results without loading all into memory
        async for row in cur:
            process_event(row)

Transaction Management

# Async transaction with context manager
async with config.provide_connection() as conn:
    async with conn.transaction():
        async with conn.cursor() as cur:
            await cur.execute(sql1)
            await cur.execute(sql2)
            # Auto-commit on success, rollback on exception

# Sync transaction
with config.provide_connection() as conn:
    with conn.transaction():
        with conn.cursor() as cur:
            cur.execute(sql1)
            cur.execute(sql2)

# Manual transaction control
async with config.provide_connection() as conn:
    await conn.set_autocommit(False)
    async with conn.cursor() as cur:
        try:
            await cur.execute(sql1)
            await cur.execute(sql2)
            await conn.commit()
        except Exception:
            await conn.rollback()
            raise

Performance Considerations

JSONB with Jsonb() Wrapper

The explicit Jsonb() wrapper provides:

Advantages:

  • Type safety - Catch errors at insert time, not query time

  • Explicit conversion - Clear when JSONB type is intended

  • Performance - Binary protocol optimization for JSONB

Pattern:

from psycopg.types.json import Jsonb

# Session state
state = {"key": "value"}

# Event content
content = {"parts": [{"text": "Hello"}]}

# Metadata
metadata = {"source": "web", "version": "1.0"}

# All must be wrapped when inserting manually
await cur.execute(
    "INSERT INTO events (content, metadata) VALUES (%s, %s)",
    (Jsonb(content), Jsonb(metadata))
)

Connection Pooling

Psycopg3 has built-in connection pooling via psycopg_pool:

config = PsycopgAsyncConfig(
    pool_config={
        "conninfo": "postgresql://...",
        "min_size": 5,           # Pre-create 5 connections
        "max_size": 20,          # Allow up to 20 connections
        "max_lifetime": 3600.0,  # Recycle connections hourly
        "max_idle": 600.0,       # Close idle connections after 10min
        "num_workers": 3,        # Background maintenance workers
    }
)

Pool Benefits:

  • Connection reuse - Avoid expensive connection establishment

  • Resource limits - Prevent connection exhaustion

  • Auto-reconnect - Handle connection failures gracefully

  • Background maintenance - Periodic connection health checks

Binary Protocol

Psycopg3 uses binary protocol by default:

  • Faster than text protocol (~30% for large datasets)

  • More efficient for JSONB, BYTEA, arrays

  • Automatic type adaptation

COPY Protocol (Bulk Operations)

For bulk event insertion:

async with config.provide_connection() as conn:
    async with conn.cursor() as cur:
        # COPY is much faster than executemany for bulk inserts
        async with cur.copy("COPY adk_events (id, session_id, ...) FROM STDIN") as copy:
            for event in large_event_list:
                await copy.write_row(event)

Prepared Statements

Psycopg3 automatically prepares frequently-used queries:

  • No manual preparation needed

  • Performance benefit for repeated queries

  • Automatic cache management

Best Practices

When to Use Async vs Sync

Use Async (PsycopgAsyncConfig) When:

  • Building async web applications (Litestar, FastAPI)

  • Need high concurrency with many simultaneous users

  • Integrating with async AI agent frameworks

  • Performance is critical for I/O-bound operations

Use Sync (PsycopgSyncConfig) When:

  • Simple scripts or batch processing jobs

  • Integration with sync-only frameworks

  • Development/testing with minimal complexity

  • Migration from psycopg2 codebase

SQL Composition Best Practices

from psycopg import sql as pg_sql

# GOOD - Safe identifier composition
query = pg_sql.SQL("SELECT * FROM {table} WHERE {col} = %s").format(
    table=pg_sql.Identifier("adk_sessions"),
    col=pg_sql.Identifier("user_id")
)

# BAD - SQL injection risk
table_name = "adk_sessions"
query = f"SELECT * FROM {table_name} WHERE user_id = %s"  # DON'T!

JSONB Query Patterns

# Query JSONB fields
await cur.execute("""
    SELECT id, state->>'theme' as theme
    FROM adk_sessions
    WHERE state @> %s::jsonb
""", (Jsonb({"preferences": {"theme": "dark"}}),))

# JSONB containment
await cur.execute("""
    SELECT * FROM adk_sessions
    WHERE state @> %s::jsonb
""", (Jsonb({"active": True}),))

# JSONB path queries
await cur.execute("""
    SELECT * FROM adk_sessions
    WHERE state #> '{preferences,theme}' = %s
""", ("dark",))

Connection Pool Sizing

# For web applications
config = PsycopgAsyncConfig(
    pool_config={
        "min_size": 10,          # Match expected concurrent requests
        "max_size": 50,          # 2-3x min_size for burst traffic
        "max_lifetime": 3600.0,  # Recycle hourly
    }
)

# For background workers
config = PsycopgAsyncConfig(
    pool_config={
        "min_size": 2,
        "max_size": 10,
    }
)

Use Cases

Async Web Application

from litestar import Litestar, get
from sqlspec.adapters.psycopg import PsycopgAsyncConfig
from sqlspec.adapters.psycopg.adk import PsycopgAsyncADKStore
from sqlspec.extensions.adk import SQLSpecSessionService

config = PsycopgAsyncConfig(
    pool_config={"conninfo": "postgresql://..."}
)
store = PsycopgAsyncADKStore(config)
service = SQLSpecSessionService(store)

@get("/sessions/{user_id:str}")
async def list_sessions(user_id: str) -> list:
    sessions = await service.list_sessions("web_app", user_id)
    return [s.to_dict() for s in sessions]

app = Litestar([list_sessions])

Sync Background Worker

from sqlspec.adapters.psycopg import PsycopgSyncConfig
from sqlspec.adapters.psycopg.adk import PsycopgSyncADKStore
from sqlspec.extensions.adk import SQLSpecSessionService

config = PsycopgSyncConfig(
    pool_config={"conninfo": "postgresql://..."}
)
store = PsycopgSyncADKStore(config)
service = SQLSpecSessionService(store)

def cleanup_old_sessions():
    # Sync operation for scheduled job
    all_sessions = store.list_sessions("my_app", "user_123")
    for session in all_sessions:
        if is_expired(session):
            store.delete_session(session["id"])

Mixed Async/Sync Application

# Async config for web API
async_config = PsycopgAsyncConfig(
    pool_config={"conninfo": "postgresql://..."}
)
async_store = PsycopgAsyncADKStore(async_config)

# Sync config for CLI tools (separate pool)
sync_config = PsycopgSyncConfig(
    pool_config={"conninfo": "postgresql://..."}
)
sync_store = PsycopgSyncADKStore(sync_config)

Comparison to Other PostgreSQL Drivers

Psycopg3 vs AsyncPG

Feature

Psycopg3

AsyncPG

Async/Sync Support

Both native

Async only

JSONB Handling

Explicit Jsonb() wrapper

Direct dict insertion

Parameter Style

%s (pyformat)

$1, $2 (numeric)

SQL Composition

pg_sql.SQL()

Manual string composition

Performance

Very fast (binary protocol)

Very fast (Cython-optimized)

Type Safety

Explicit, safer

Implicit, convenient

Cursor Model

Context managers required

Direct cursor usage

Best For

Dual async/sync, type safety

Pure async, raw performance

Psycopg3 vs Psqlpy

Feature

Psycopg3

Psqlpy

Implementation

Python + C extensions

Rust-based

Maturity

Stable, production-ready

Evolving

JSONB Handling

Jsonb() wrapper

Direct dict insertion

Parameter Style

%s (pyformat)

$1, $2 (numeric)

Ecosystem

Large, mature

Growing

Performance

Very fast

Extremely fast

Best For

General-purpose PostgreSQL

Performance-critical workloads

When to Choose Psycopg3

Choose Psycopg3 When:

  • Need both async AND sync database access

  • Want explicit type safety with JSONB operations

  • Require dual-mode async/sync adapter capabilities

  • Prefer PostgreSQL’s official SQL composition tools

  • Building applications with mixed sync/async components

  • Value ecosystem maturity and stability

Consider AsyncPG When:

  • Pure async application, no sync needed

  • Want simplest JSONB insertion (no wrapper required)

  • Want Cython-based performance characteristics

  • Prefer implicit type conversion

Consider Psqlpy When:

  • Want Rust-based performance characteristics

  • Building high-throughput data pipelines

  • Want Rust safety guarantees

  • Can work with an evolving ecosystem

Troubleshooting

Jsonb() Wrapper Errors

Error:

psycopg.errors.UndefinedFunction: operator does not exist: jsonb = record

Solution: Wrap dicts with Jsonb():

from psycopg.types.json import Jsonb

# WRONG
await cur.execute("INSERT INTO table (data) VALUES (%s)", ({"key": "value"},))

# CORRECT
await cur.execute("INSERT INTO table (data) VALUES (%s)", (Jsonb({"key": "value"}),))

SQL Composition Errors

Error:

psycopg.sql.Composable object is not iterable

Solution: Format SQL before execution:

from psycopg import sql as pg_sql

# WRONG - Missing .format()
query = pg_sql.SQL("SELECT * FROM {table}").format(table=pg_sql.Identifier("users"))
await cur.execute(query)  # Already formatted!

# CORRECT
query = pg_sql.SQL("SELECT * FROM {table}").format(table=pg_sql.Identifier("users"))
await cur.execute(query, ())  # No need to format again

Parameter Style Confusion

Error: Using wrong parameter placeholders:

# WRONG - PostgreSQL numeric style (that's asyncpg!)
await cur.execute("SELECT * FROM users WHERE id = $1", (123,))

# CORRECT - Psycopg uses %s
await cur.execute("SELECT * FROM users WHERE id = %s", (123,))

Connection Pool Not Opening

Error:

pool is not open

Solution: Ensure async pool is opened:

# Pool is automatically opened by config
async with config.provide_connection() as conn:
    # This works
    pass

# Or manually if using pool directly
pool = AsyncConnectionPool(conninfo, open=False)
await pool.open()

Cursor Not Found

Error:

cursor does not exist

Solution: Use context managers for cursors:

# WRONG - Cursor closed prematurely
conn = await config.create_connection()
cur = await conn.cursor()
await cur.execute(query)
# cur is closed here

# CORRECT - Use context manager
async with config.provide_connection() as conn:
    async with conn.cursor() as cur:
        await cur.execute(query)
        rows = await cur.fetchall()

Migration from Psycopg2

Key Differences

# Psycopg2 (old)
import psycopg2
conn = psycopg2.connect("dbname=test")
cur = conn.cursor()
cur.execute("SELECT * FROM table")

# Psycopg3 (new) - Async
import psycopg
async with await psycopg.AsyncConnection.connect("dbname=test") as conn:
    async with conn.cursor() as cur:
        await cur.execute("SELECT * FROM table")

JSONB Handling Changes

# Psycopg2
import json
cur.execute("INSERT INTO table (data) VALUES (%s)", (json.dumps({"key": "value"}),))

# Psycopg3
from psycopg.types.json import Jsonb
await cur.execute("INSERT INTO table (data) VALUES (%s)", (Jsonb({"key": "value"}),))

Connection Pool Migration

# Psycopg2 (using psycopg2.pool)
from psycopg2.pool import ThreadedConnectionPool
pool = ThreadedConnectionPool(5, 20, dsn="...")

# Psycopg3 (using psycopg_pool)
from psycopg_pool import AsyncConnectionPool
pool = AsyncConnectionPool("...", min_size=5, max_size=20)
await pool.open()

API Reference

class sqlspec.adapters.psycopg.adk.PsycopgAsyncADKStore[source]

Bases: BaseAsyncADKStore[PsycopgAsyncConfig]

PostgreSQL ADK store using Psycopg3 driver.

Implements session and event storage for Google Agent Development Kit using PostgreSQL via psycopg3 with native async/await support.

Provides: - Session state management with JSONB storage and merge operations - Event history tracking with BYTEA-serialized actions - Microsecond-precision timestamps with TIMESTAMPTZ - Foreign key constraints with cascade delete - Efficient upserts using ON CONFLICT - GIN indexes for JSONB queries - HOT updates with FILLFACTOR 80

Parameters:

config (PsycopgAsyncConfig) – PsycopgAsyncConfig with extension_config[“adk”] settings.

Example

from sqlspec.adapters.psycopg import PsycopgAsyncConfig from sqlspec.adapters.psycopg.adk import PsycopgAsyncADKStore

config = PsycopgAsyncConfig(

pool_config={“conninfo”: “postgresql://…”}, extension_config={

“adk”: {

“session_table”: “my_sessions”, “events_table”: “my_events”, “owner_id_column”: “tenant_id INTEGER NOT NULL REFERENCES tenants(id) ON DELETE CASCADE”

}

}

) store = PsycopgAsyncADKStore(config) await store.create_tables()

Notes

  • PostgreSQL JSONB type used for state (more efficient than JSON)

  • Psycopg requires wrapping dicts with Jsonb() for type safety

  • TIMESTAMPTZ provides timezone-aware microsecond precision

  • State merging uses state || $1::jsonb operator for efficiency

  • BYTEA for pre-serialized actions from Google ADK

  • GIN index on state for JSONB queries (partial index)

  • FILLFACTOR 80 leaves space for HOT updates

  • Parameter style: $1, $2, $3 (PostgreSQL numeric placeholders)

  • Configuration is read from config.extension_config[“adk”]

__init__(config)[source]

Initialize Psycopg ADK store.

Parameters:

config (PsycopgAsyncConfig) – PsycopgAsyncConfig instance.

Notes

Configuration is read from config.extension_config[“adk”]: - session_table: Sessions table name (default: “adk_sessions”) - events_table: Events table name (default: “adk_events”) - owner_id_column: Optional owner FK column DDL (default: None)

async create_tables()[source]

Create both sessions and events tables if they don’t exist.

Return type:

None

async create_session(session_id, app_name, user_id, state, owner_id=None)[source]

Create a new session.

Parameters:
  • session_id (str) – Unique session identifier.

  • app_name (str) – Application name.

  • user_id (str) – User identifier.

  • state (dict[str, typing.Any]) – Initial session state.

  • owner_id (Optional[typing.Any]) – Optional owner ID value for owner_id_column (if configured).

Return type:

SessionRecord

Returns:

Created session record.

Notes

Uses CURRENT_TIMESTAMP for create_time and update_time. State is wrapped with Jsonb() for PostgreSQL type safety. If owner_id_column is configured, owner_id value must be provided.

async get_session(session_id)[source]

Get session by ID.

Parameters:

session_id (str) – Session identifier.

Return type:

SessionRecord | None

Returns:

Session record or None if not found.

Notes

PostgreSQL returns datetime objects for TIMESTAMPTZ columns. JSONB is automatically deserialized by psycopg to Python dict.

async update_session_state(session_id, state)[source]

Update session state.

Parameters:
  • session_id (str) – Session identifier.

  • state (dict[str, typing.Any]) – New state dictionary (replaces existing state).

Return type:

None

Notes

This replaces the entire state dictionary. Uses CURRENT_TIMESTAMP for update_time. State is wrapped with Jsonb() for PostgreSQL type safety.

async delete_session(session_id)[source]

Delete session and all associated events (cascade).

Parameters:

session_id (str) – Session identifier.

Return type:

None

Notes

Foreign key constraint ensures events are cascade-deleted.

async list_sessions(app_name, user_id=None)[source]

List sessions for an app, optionally filtered by user.

Parameters:
  • app_name (str) – Application name.

  • user_id (str | None) – User identifier. If None, lists all sessions for the app.

Return type:

list[SessionRecord]

Returns:

List of session records ordered by update_time DESC.

Notes

Uses composite index on (app_name, user_id) when user_id is provided.

async append_event(event_record)[source]

Append an event to a session.

Parameters:

event_record (EventRecord) – Event record to store.

Return type:

None

Notes

Uses CURRENT_TIMESTAMP for timestamp if not provided. JSONB fields are wrapped with Jsonb() for PostgreSQL type safety.

async get_events(session_id, after_timestamp=None, limit=None)[source]

Get events for a session.

Parameters:
  • session_id (str) – Session identifier.

  • after_timestamp (datetime | None) – Only return events after this time.

  • limit (int | None) – Maximum number of events to return.

Return type:

list[EventRecord]

Returns:

List of event records ordered by timestamp ASC.

Notes

Uses index on (session_id, timestamp ASC). JSONB fields are automatically deserialized by psycopg. BYTEA actions are converted to bytes.

property config: ConfigT

Return the database configuration.

property events_table: str

Return the events table name.

property owner_id_column_ddl: str | None

Return the full owner ID column DDL (or None if not configured).

property owner_id_column_name: str | None

Return the owner ID column name only (or None if not configured).

property session_table: str

Return the sessions table name.

class sqlspec.adapters.psycopg.adk.PsycopgSyncADKStore[source]

Bases: BaseSyncADKStore[PsycopgSyncConfig]

PostgreSQL synchronous ADK store using Psycopg3 driver.

Implements session and event storage for Google Agent Development Kit using PostgreSQL via psycopg3 with synchronous execution.

Provides: - Session state management with JSONB storage and merge operations - Event history tracking with BYTEA-serialized actions - Microsecond-precision timestamps with TIMESTAMPTZ - Foreign key constraints with cascade delete - Efficient upserts using ON CONFLICT - GIN indexes for JSONB queries - HOT updates with FILLFACTOR 80

Parameters:

config (PsycopgSyncConfig) – PsycopgSyncConfig with extension_config[“adk”] settings.

Example

from sqlspec.adapters.psycopg import PsycopgSyncConfig from sqlspec.adapters.psycopg.adk import PsycopgSyncADKStore

config = PsycopgSyncConfig(

pool_config={“conninfo”: “postgresql://…”}, extension_config={

“adk”: {

“session_table”: “my_sessions”, “events_table”: “my_events”, “owner_id_column”: “tenant_id INTEGER NOT NULL REFERENCES tenants(id) ON DELETE CASCADE”

}

}

) store = PsycopgSyncADKStore(config) store.create_tables()

Notes

  • PostgreSQL JSONB type used for state (more efficient than JSON)

  • Psycopg requires wrapping dicts with Jsonb() for type safety

  • TIMESTAMPTZ provides timezone-aware microsecond precision

  • State merging uses state || $1::jsonb operator for efficiency

  • BYTEA for pre-serialized actions from Google ADK

  • GIN index on state for JSONB queries (partial index)

  • FILLFACTOR 80 leaves space for HOT updates

  • Parameter style: $1, $2, $3 (PostgreSQL numeric placeholders)

  • Configuration is read from config.extension_config[“adk”]

__init__(config)[source]

Initialize Psycopg synchronous ADK store.

Parameters:

config (PsycopgSyncConfig) – PsycopgSyncConfig instance.

Notes

Configuration is read from config.extension_config[“adk”]: - session_table: Sessions table name (default: “adk_sessions”) - events_table: Events table name (default: “adk_events”) - owner_id_column: Optional owner FK column DDL (default: None)

create_tables()[source]

Create both sessions and events tables if they don’t exist.

Return type:

None

create_session(session_id, app_name, user_id, state, owner_id=None)[source]

Create a new session.

Parameters:
  • session_id (str) – Unique session identifier.

  • app_name (str) – Application name.

  • user_id (str) – User identifier.

  • state (dict[str, typing.Any]) – Initial session state.

  • owner_id (Optional[typing.Any]) – Optional owner ID value for owner_id_column (if configured).

Return type:

SessionRecord

Returns:

Created session record.

Notes

Uses CURRENT_TIMESTAMP for create_time and update_time. State is wrapped with Jsonb() for PostgreSQL type safety. If owner_id_column is configured, owner_id value must be provided.

get_session(session_id)[source]

Get session by ID.

Parameters:

session_id (str) – Session identifier.

Return type:

SessionRecord | None

Returns:

Session record or None if not found.

Notes

PostgreSQL returns datetime objects for TIMESTAMPTZ columns. JSONB is automatically deserialized by psycopg to Python dict.

update_session_state(session_id, state)[source]

Update session state.

Parameters:
  • session_id (str) – Session identifier.

  • state (dict[str, typing.Any]) – New state dictionary (replaces existing state).

Return type:

None

Notes

This replaces the entire state dictionary. Uses CURRENT_TIMESTAMP for update_time. State is wrapped with Jsonb() for PostgreSQL type safety.

delete_session(session_id)[source]

Delete session and all associated events (cascade).

Parameters:

session_id (str) – Session identifier.

Return type:

None

Notes

Foreign key constraint ensures events are cascade-deleted.

list_sessions(app_name, user_id=None)[source]

List sessions for an app, optionally filtered by user.

Parameters:
  • app_name (str) – Application name.

  • user_id (str | None) – User identifier. If None, lists all sessions for the app.

Return type:

list[SessionRecord]

Returns:

List of session records ordered by update_time DESC.

Notes

Uses composite index on (app_name, user_id) when user_id is provided.

create_event(event_id, session_id, app_name, user_id, author=None, actions=None, content=None, **kwargs)[source]

Create a new event.

Parameters:
  • event_id (str) – Unique event identifier.

  • session_id (str) – Session identifier.

  • app_name (str) – Application name.

  • user_id (str) – User identifier.

  • author (str | None) – Event author (user/assistant/system).

  • actions (bytes | None) – Pickled actions object.

  • content (dict[str, typing.Any] | None) – Event content (JSONB).

  • **kwargs (Any) – Additional optional fields (invocation_id, branch, timestamp, grounding_metadata, custom_metadata, partial, turn_complete, interrupted, error_code, error_message, long_running_tool_ids_json).

Return type:

EventRecord

Returns:

Created event record.

Notes

Uses CURRENT_TIMESTAMP for timestamp if not provided in kwargs. JSONB fields are wrapped with Jsonb() for PostgreSQL type safety.

list_events(session_id)[source]

List events for a session ordered by timestamp.

Parameters:

session_id (str) – Session identifier.

Return type:

list[EventRecord]

Returns:

List of event records ordered by timestamp ASC.

Notes

Uses index on (session_id, timestamp ASC). JSONB fields are automatically deserialized by psycopg. BYTEA actions are converted to bytes.

property config: ConfigT

Return the database configuration.

property events_table: str

Return the events table name.

property owner_id_column_ddl: str | None

Return the full owner ID column DDL (or None if not configured).

property owner_id_column_name: str | None

Return the owner ID column name only (or None if not configured).

property session_table: str

Return the sessions table name.

See Also