AsyncPG Backend

Overview

AsyncPG is an async-native PostgreSQL driver for Python, written in Cython. It is a recommended choice for production async AI agent deployments.

Key Features:

  • Performance: Written in Cython for improved execution speed

  • Native Async: Pure asyncio implementation, no thread pool overhead

  • Connection Pooling: Built-in sophisticated connection pool management

  • Native JSONB: Direct dict to/from JSONB conversion without manual serialization

  • Prepared Statements: Automatic statement preparation and caching

  • Microsecond Precision: TIMESTAMPTZ with microsecond-level accuracy

  • Type Safety: Rich PostgreSQL type support (arrays, composite types, UUIDs)

Ideal Use Cases:

  • Production AI agents with high-concurrency async workloads

  • Real-time conversational AI requiring fast response times

  • Multi-user agent platforms with high concurrency requirements

  • Applications with demanding performance requirements

  • Async web frameworks (Litestar, FastAPI, Starlette)

Tip

AsyncPG is designed for high-concurrency workloads, making it suitable for production AI agent applications where response time is critical.

Installation

Install SQLSpec with AsyncPG support:

pip install sqlspec[asyncpg] google-genai
# or
uv pip install sqlspec[asyncpg] google-genai

PostgreSQL Server Setup

AsyncPG requires a PostgreSQL server (version 10+):

Docker (Development):

docker run --name postgres-adk \
  -e POSTGRES_PASSWORD=secret \
  -e POSTGRES_DB=agentdb \
  -p 5432:5432 \
  -d postgres:16

Production Setup:

  • Managed Services: AWS RDS, Google Cloud SQL, Azure Database for PostgreSQL

  • Self-Hosted: PostgreSQL 14+ with connection pooling (PgBouncer recommended)

  • Configuration: Tune max_connections, shared_buffers, work_mem for workload

Quick Start

Basic Configuration

import asyncio
from sqlspec.adapters.asyncpg import AsyncpgConfig
from sqlspec.adapters.asyncpg.adk import AsyncpgADKStore
from sqlspec.extensions.adk import SQLSpecSessionService

async def main():
    # Create configuration with connection pool
    config = AsyncpgConfig(
        pool_config={
            "dsn": "postgresql://user:password@localhost:5432/agentdb",
            "min_size": 5,
            "max_size": 20,
            "command_timeout": 60.0,
        }
    )

    # Initialize store and create tables
    store = AsyncpgADKStore(config)
    await store.create_tables()

    # Create service for session management
    service = SQLSpecSessionService(store)

    # Create session
    session = await service.create_session(
        app_name="assistant_bot",
        user_id="user_123",
        state={"conversation_context": "greeting", "language": "en"}
    )
    print(f"Created session: {session.id}")

asyncio.run(main())

Connection String Formats

AsyncPG supports multiple connection string formats:

# Full DSN
config = AsyncpgConfig(pool_config={
    "dsn": "postgresql://user:password@host:5432/database"
})

# Individual parameters
config = AsyncpgConfig(pool_config={
    "host": "localhost",
    "port": 5432,
    "user": "agent_user",
    "password": "secure_password",
    "database": "agentdb"
})

# With SSL
config = AsyncpgConfig(pool_config={
    "dsn": "postgresql://user:pass@host:5432/db?sslmode=require"
})

Configuration

Connection Pool Configuration

AsyncPG’s built-in connection pool is highly configurable:

from sqlspec.adapters.asyncpg import AsyncpgConfig

config = AsyncpgConfig(
    pool_config={
        # Connection parameters
        "dsn": "postgresql://localhost/agentdb",
        "user": "agent_user",
        "password": "secure_password",

        # Pool sizing
        "min_size": 5,              # Minimum connections (default: 10)
        "max_size": 20,             # Maximum connections (default: 10)

        # Connection lifecycle
        "max_queries": 50000,       # Reconnect after N queries (default: 50000)
        "max_inactive_connection_lifetime": 300.0,  # Close idle after 5min

        # Timeouts
        "command_timeout": 60.0,    # Query timeout in seconds
        "connect_timeout": 10.0,    # Connection timeout

        # Statement caching
        "statement_cache_size": 100,             # LRU cache size (default: 100)
        "max_cached_statement_lifetime": 300,    # Cache lifetime in seconds
        "max_cacheable_statement_size": 1024*15, # Max statement size to cache

        # SSL configuration
        "ssl": "require",           # or ssl.SSLContext object

        # Server settings
        "server_settings": {
            "jit": "off",           # Disable JIT compilation if needed
            "application_name": "ai_agent"
        }
    }
)

Pool Sizing Guidelines

Choose pool size based on your workload:

Workload Type

Pool Size

Notes

Development/Testing

2-5

Minimal overhead, fast startup

Low-Concurrency Production

10-20

Typical web application

High-Concurrency Production

20-50

Thousands of concurrent users

Extreme Scale

50-100

Consider PgBouncer for connection pooling

Warning

Pool Exhaustion: If you see “pool exhausted” errors, either increase max_size or reduce query duration. Monitor with pool.get_size() and pool.get_idle_size().

Custom Table Names

store = AsyncpgADKStore(
    config,
    session_table="production_sessions",
    events_table="production_events"
)

Schema

Sessions Table

CREATE TABLE IF NOT EXISTS adk_sessions (
    id VARCHAR(128) PRIMARY KEY,
    app_name VARCHAR(128) NOT NULL,
    user_id VARCHAR(128) NOT NULL,
    state JSONB NOT NULL DEFAULT '{}'::jsonb,
    create_time TIMESTAMPTZ NOT NULL DEFAULT CURRENT_TIMESTAMP,
    update_time TIMESTAMPTZ NOT NULL DEFAULT CURRENT_TIMESTAMP
) WITH (fillfactor = 80);

CREATE INDEX IF NOT EXISTS idx_adk_sessions_app_user
    ON adk_sessions(app_name, user_id);

CREATE INDEX IF NOT EXISTS idx_adk_sessions_update_time
    ON adk_sessions(update_time DESC);

CREATE INDEX IF NOT EXISTS idx_adk_sessions_state
    ON adk_sessions USING GIN (state)
    WHERE state != '{}'::jsonb;

Schema Design Notes:

  • VARCHAR(128): Sufficient for UUIDs and application names

  • JSONB: Native PostgreSQL binary JSON format (faster than JSON)

  • TIMESTAMPTZ: Timezone-aware timestamps with microsecond precision

  • FILLFACTOR 80: Leaves 20% free space for HOT updates (reduces table bloat)

  • Composite Index: (app_name, user_id) for efficient session listing

  • Temporal Index: update_time DESC for recent session queries

  • Partial GIN Index: Only indexes non-empty JSONB state (saves space)

Events Table

CREATE TABLE IF NOT EXISTS adk_events (
    id VARCHAR(128) PRIMARY KEY,
    session_id VARCHAR(128) NOT NULL,
    app_name VARCHAR(128) NOT NULL,
    user_id VARCHAR(128) NOT NULL,
    invocation_id VARCHAR(256),
    author VARCHAR(256),
    actions BYTEA,
    long_running_tool_ids_json TEXT,
    branch VARCHAR(256),
    timestamp TIMESTAMPTZ NOT NULL DEFAULT CURRENT_TIMESTAMP,
    content JSONB,
    grounding_metadata JSONB,
    custom_metadata JSONB,
    partial BOOLEAN,
    turn_complete BOOLEAN,
    interrupted BOOLEAN,
    error_code VARCHAR(256),
    error_message VARCHAR(1024),
    FOREIGN KEY (session_id) REFERENCES adk_sessions(id) ON DELETE CASCADE
);

CREATE INDEX IF NOT EXISTS idx_adk_events_session
    ON adk_events(session_id, timestamp ASC);

Schema Design Notes:

  • VARCHAR Sizes: Optimized for typical Google ADK data

  • BYTEA: Binary storage for pre-serialized actions (no double-pickling)

  • JSONB: Direct dict conversion for content, grounding, and custom metadata

  • BOOLEAN: Native boolean type (more efficient than integers)

  • CASCADE DELETE: Automatically removes events when session deleted

  • Composite Index: (session_id, timestamp ASC) for chronological event retrieval

Usage Patterns

Session Management

import asyncio
from sqlspec.adapters.asyncpg import AsyncpgConfig
from sqlspec.adapters.asyncpg.adk import AsyncpgADKStore

async def session_example():
    config = AsyncpgConfig(pool_config={"dsn": "postgresql://..."})
    store = AsyncpgADKStore(config)
    await store.create_tables()

    # Create session with initial state
    session = await store.create_session(
        session_id="sess_abc123",
        app_name="chatbot",
        user_id="user_789",
        state={
            "conversation_context": "product_inquiry",
            "user_preferences": {"language": "en", "theme": "dark"},
            "cart_items": []
        }
    )

    # Get session by ID
    retrieved = await store.get_session("sess_abc123")
    if retrieved:
        print(f"State: {retrieved['state']}")

    # Update session state (full replacement)
    await store.update_session_state("sess_abc123", {
        "conversation_context": "checkout",
        "user_preferences": {"language": "en", "theme": "dark"},
        "cart_items": ["item1", "item2"]
    })

    # List all sessions for user
    sessions = await store.list_sessions("chatbot", "user_789")
    for session in sessions:
        print(f"Session {session['id']}: {session['update_time']}")

    # Delete session (cascade deletes events)
    await store.delete_session("sess_abc123")

asyncio.run(session_example())

Event Management

from datetime import datetime, timezone
from google.adk.events.event import Event
from google.genai import types

async def event_example():
    config = AsyncpgConfig(pool_config={"dsn": "postgresql://..."})
    store = AsyncpgADKStore(config)

    # Create session first
    session = await store.create_session(
        session_id="sess_xyz",
        app_name="assistant",
        user_id="user_456",
        state={}
    )

    # Append user event
    user_event = Event(
        id="evt_user_1",
        invocation_id="inv_123",
        author="user",
        branch="main",
        actions=[],
        timestamp=datetime.now(timezone.utc).timestamp(),
        content=types.Content(parts=[types.Part(text="Hello!")]),
        partial=False,
        turn_complete=True
    )
    await store.append_event(user_event)

    # Append assistant event with metadata
    assistant_event = Event(
        id="evt_asst_1",
        invocation_id="inv_123",
        author="assistant",
        branch="main",
        actions=[],
        timestamp=datetime.now(timezone.utc).timestamp(),
        content=types.Content(parts=[types.Part(text="Hi! How can I help?")]),
        grounding_metadata={"sources": ["knowledge_base_v2"]},
        custom_metadata={"confidence": 0.95, "model": "gemini-pro"},
        partial=False,
        turn_complete=True
    )
    await store.append_event(assistant_event)

    # Get all events for session (chronological order)
    events = await store.get_events("sess_xyz")
    for event in events:
        print(f"{event['author']}: {event['content']}")

    # Get recent events (since timestamp)
    from datetime import timedelta
    recent_time = datetime.now(timezone.utc) - timedelta(hours=1)
    recent_events = await store.get_events(
        "sess_xyz",
        after_timestamp=recent_time
    )

    # Limit number of events
    latest_10 = await store.get_events("sess_xyz", limit=10)

asyncio.run(event_example())
from datetime import datetime, UTC
from google.adk.events.event import Event
from google.genai import types

async def event_example():
    config = AsyncpgConfig(pool_config={"dsn": "postgresql://..."})
    store = AsyncpgADKStore(config)

    # Create session first
    session = await store.create_session(
        session_id="sess_xyz",
        app_name="assistant",
        user_id="user_456",
        state={}
    )

    # Append user event
    user_event = Event(
        id="evt_user_1",
        invocation_id="inv_123",
        author="user",
        branch="main",
        actions=[],
        timestamp=datetime.now(UTC).timestamp(),
        content=types.Content(parts=[types.Part(text="Hello!")]),
        partial=False,
        turn_complete=True
    )
    await store.append_event(user_event)

    # Append assistant event with metadata
    assistant_event = Event(
        id="evt_asst_1",
        invocation_id="inv_123",
        author="assistant",
        branch="main",
        actions=[],
        timestamp=datetime.now(UTC).timestamp(),
        content=types.Content(parts=[types.Part(text="Hi! How can I help?")]),
        grounding_metadata={"sources": ["knowledge_base_v2"]},
        custom_metadata={"confidence": 0.95, "model": "gemini-pro"},
        partial=False,
        turn_complete=True
    )
    await store.append_event(assistant_event)

    # Get all events for session (chronological order)
    events = await store.get_events("sess_xyz")
    for event in events:
        print(f"{event['author']}: {event['content']}")

    # Get recent events (since timestamp)
    from datetime import timedelta
    recent_time = datetime.now(UTC) - timedelta(hours=1)
    recent_events = await store.get_events(
        "sess_xyz",
        after_timestamp=recent_time
    )

    # Limit number of events
    latest_10 = await store.get_events("sess_xyz", limit=10)

asyncio.run(event_example())

Integration with SQLSpecSessionService

from sqlspec.extensions.adk import SQLSpecSessionService

async def service_example():
    config = AsyncpgConfig(pool_config={"dsn": "postgresql://..."})
    store = AsyncpgADKStore(config)
    await store.create_tables()

    # Create high-level service
    service = SQLSpecSessionService(store)

    # Create session via service
    session = await service.create_session(
        app_name="support_bot",
        user_id="user_123",
        state={"ticket_id": "TKT-456"}
    )

    # Add events via service
    user_event = Event(...)
    await service.append_event(session, user_event)

    # Get session with full event history
    full_session = await service.get_session(
        app_name="support_bot",
        user_id="user_123",
        session_id=session.id
    )
    print(f"Session has {len(full_session.events)} events")

asyncio.run(service_example())

Performance Considerations

JSONB Optimization

AsyncPG automatically converts Python dicts to/from JSONB without manual serialization:

# AsyncPG handles this automatically - no json.dumps() needed!
await store.update_session_state("sess_id", {
    "complex": {"nested": {"data": [1, 2, 3]}},
    "arrays": [{"id": 1}, {"id": 2}],
    "nulls": None
})

JSONB Query Performance:

-- Fast: Uses GIN index on state
SELECT * FROM adk_sessions WHERE state @> '{"user_preferences": {"language": "en"}}';

-- Fast: JSON path extraction
SELECT state->'conversation_context' FROM adk_sessions WHERE id = $1;

-- Fast: Array operations
SELECT * FROM adk_sessions WHERE state->'cart_items' @> '["item1"]';

Connection Pooling Best Practices

Recommended Pattern:

# Create config and pool once at application startup
config = AsyncpgConfig(pool_config={
    "dsn": "postgresql://...",
    "min_size": 10,
    "max_size": 20
})

# Reuse config across requests
store = AsyncpgADKStore(config)
await store.create_tables()

# Pool is automatically managed
async def handle_request():
    # Each operation acquires/releases from pool
    session = await store.get_session(session_id)

Anti-Pattern (Avoid):

# BAD: Creating new config per request
async def handle_request():
    config = AsyncpgConfig(...)  # Don't do this!
    store = AsyncpgADKStore(config)

HOT Updates

PostgreSQL Heap-Only Tuple (HOT) updates reduce table bloat:

# HOT update works best when:
# 1. Only updating indexed columns
# 2. New row fits in same page (fillfactor = 80 provides space)

# This is HOT-eligible (only updating state and update_time)
await store.update_session_state(session_id, new_state)

# Monitor table bloat
# SELECT pg_stat_user_tables WHERE relname = 'adk_sessions';

Index Strategy

Composite Index Performance:

-- Fast: Uses idx_adk_sessions_app_user
SELECT * FROM adk_sessions WHERE app_name = $1 AND user_id = $2;

-- Fast: Index-only scan on update_time
SELECT * FROM adk_sessions ORDER BY update_time DESC LIMIT 10;

-- Fast: Uses idx_adk_events_session
SELECT * FROM adk_events WHERE session_id = $1 ORDER BY timestamp ASC;

JSONB GIN Index:

-- Fast: Partial GIN index on non-empty state
SELECT * FROM adk_sessions WHERE state ? 'conversation_context';

Prepared Statements

AsyncPG automatically prepares frequently-used statements:

# AsyncPG caches prepared statements (LRU cache, default 100)
# Repeated queries use cached prepared statement (faster)

for i in range(1000):
    await store.get_session(f"sess_{i}")  # Same SQL, different param

# Statement cache is per-connection
# Pool provides multiple connections, each with own cache

Best Practices

Schema Design

DO:

  • Use JSONB for flexible state storage

  • Create composite indexes for common query patterns

  • Set FILLFACTOR 80 for frequently-updated tables

  • Use partial indexes to save space

  • Enable CASCADE deletes for referential integrity

DON’T:

  • Store large binary data in JSONB (use BYTEA)

  • Create indexes on rarely-queried columns

  • Use TEXT for JSON (use JSONB instead)

  • Forget to set update_time on state changes

Query Patterns

DO:

# Good: Leverages composite index
sessions = await store.list_sessions("app", "user")

# Good: Ordered by indexed column
events = await store.get_events("session_id", limit=100)

# Good: Uses GIN index
# SELECT * FROM adk_sessions WHERE state @> '{"key": "value"}'

DON’T:

# Bad: Sequential scan
# SELECT * FROM adk_sessions WHERE state::text LIKE '%value%'

# Bad: No limit on large result sets
events = await store.get_events("session_id")  # Could be millions!

Connection Management

DO:

# Good: Reuse config and pool
config = AsyncpgConfig(...)
store = AsyncpgADKStore(config)

async def many_queries():
    for i in range(1000):
        await store.get_session(f"sess_{i}")

DON’T:

# Bad: New pool per query
async def bad_pattern():
    config = AsyncpgConfig(...)  # Creates new pool!
    store = AsyncpgADKStore(config)
    await store.get_session("sess_id")

Monitoring

Monitor AsyncPG pool health:

async def monitor_pool():
    pool = await config.provide_pool()

    # Check pool statistics
    print(f"Pool size: {pool.get_size()}")
    print(f"Idle connections: {pool.get_idle_size()}")
    print(f"Min size: {pool.get_min_size()}")
    print(f"Max size: {pool.get_max_size()}")

    # Log slow queries
    async with config.provide_connection() as conn:
        await conn.execute("SET log_min_duration_statement = 1000;")

Use Cases

Production Async Web Applications

AsyncPG is ideal for async web frameworks:

from litestar import Litestar, get
from sqlspec.adapters.asyncpg import AsyncpgConfig
from sqlspec.adapters.asyncpg.adk import AsyncpgADKStore

# Initialize at app startup
config = AsyncpgConfig(pool_config={"dsn": "postgresql://..."})
store = AsyncpgADKStore(config)

@get("/session/{session_id:str}")
async def get_session(session_id: str) -> dict:
    session = await store.get_session(session_id)
    return session or {"error": "not found"}

app = Litestar(
    route_handlers=[get_session],
    on_startup=[lambda: store.create_tables()]
)

High-Concurrency AI Agents

Handle thousands of concurrent users:

config = AsyncpgConfig(pool_config={
    "dsn": "postgresql://...",
    "min_size": 20,
    "max_size": 50,
    "command_timeout": 60.0
})

store = AsyncpgADKStore(config)
service = SQLSpecSessionService(store)

async def handle_concurrent_users():
    tasks = []
    for user_id in range(10000):
        task = service.create_session(
            app_name="assistant",
            user_id=f"user_{user_id}",
            state={}
        )
        tasks.append(task)

    # AsyncPG efficiently handles concurrent operations
    sessions = await asyncio.gather(*tasks)
    print(f"Created {len(sessions)} sessions")

Real-Time Conversational AI

Minimize latency with AsyncPG’s speed:

import time

async def measure_latency():
    start = time.perf_counter()

    # Create session
    session = await store.create_session(
        session_id="sess_timing",
        app_name="realtime_chat",
        user_id="user_456",
        state={}
    )

    # Add event
    event = Event(...)
    await store.append_event(event)

    # Get session with events
    full_session = await store.get_events("sess_timing")

    elapsed_ms = (time.perf_counter() - start) * 1000
    print(f"Total latency: {elapsed_ms:.2f}ms")  # Typically < 10ms

When to Choose AsyncPG

Use AsyncPG When:

✅ Building production async AI agents ✅ Need strong PostgreSQL performance characteristics ✅ Using async web frameworks (Litestar, FastAPI, Starlette) ✅ Need connection pooling for high concurrency ✅ Working with JSONB data extensively ✅ Require microsecond timestamp precision ✅ Want automatic prepared statement caching

Consider Alternatives When:

Psycopg3: Need sync AND async in same codebase (psycopg supports both) ❌ Psqlpy: Want Rust-based performance characteristics (experimental) ❌ ADBC: Need cross-database portability with Arrow format ❌ SQLite: Development/testing without PostgreSQL server ❌ DuckDB: Analytical workloads, not transactional

Comparison: AsyncPG vs Other PostgreSQL Drivers

Feature

AsyncPG

Psycopg3

Psqlpy

ADBC

Performance

⭐⭐⭐⭐⭐

⭐⭐⭐⭐

⭐⭐⭐⭐⭐

⭐⭐⭐

Async Support

Native

Native

Native

Yes

Sync Support

No

Yes

No

Yes

Connection Pool

Built-in

Via pgpool

Built-in

No

JSONB Handling

Automatic

Manual

Automatic

Manual

Prepared Stmts

Automatic

Manual

Automatic

N/A

Maturity

Stable

Stable

Experimental

Stable

Best For

Async prod

Sync+async

Max speed

Portability

Note

Recommendation: Use AsyncPG for production async workloads. If you need both sync and async in the same application, use Psycopg3. For Rust-based performance characteristics and willing to deal with less maturity, try Psqlpy.

Troubleshooting

Connection Pool Exhausted

Error:

asyncpg.exceptions.TooManyConnectionsError: pool exhausted

Solution:

# Increase pool size
config = AsyncpgConfig(pool_config={
    "max_size": 50,  # Increase from default 10
    "command_timeout": 30.0  # Prevent hung connections
})

# Or use a transaction timeout
async with config.provide_connection() as conn:
    async with conn.transaction():
        await conn.execute("SET LOCAL statement_timeout = '30s'")

Connection Refused

Error:

asyncpg.exceptions.ConnectionDoesNotExistError: connection refused

Solution:

# Verify PostgreSQL is running
psql -h localhost -U postgres -d agentdb

# Check connection parameters
config = AsyncpgConfig(pool_config={
    "host": "localhost",  # Correct host
    "port": 5432,         # Correct port
    "user": "postgres",   # Correct user
    "database": "agentdb" # Correct database
})

Slow Queries

Symptom: Queries taking longer than expected

Solution:

# Enable query logging
async with config.provide_connection() as conn:
    await conn.execute("SET log_min_duration_statement = 100;")

# Check query plan
result = await conn.fetch("EXPLAIN ANALYZE SELECT * FROM adk_sessions ...")

# Ensure indexes exist
await conn.execute("""
    SELECT schemaname, tablename, indexname
    FROM pg_indexes
    WHERE tablename IN ('adk_sessions', 'adk_events')
""")

SSL Connection Issues

Error:

asyncpg.exceptions.InvalidAuthorizationSpecificationError: SSL required

Solution:

import ssl

# Require SSL
config = AsyncpgConfig(pool_config={
    "dsn": "postgresql://...",
    "ssl": "require"
})

# Or use custom SSL context
ssl_context = ssl.create_default_context()
ssl_context.check_hostname = False
ssl_context.verify_mode = ssl.CERT_NONE

config = AsyncpgConfig(pool_config={
    "dsn": "postgresql://...",
    "ssl": ssl_context
})

JSONB Type Codec Errors

Error:

TypeError: Object of type X is not JSON serializable

Solution:

# Custom JSON serializer
import json
from datetime import datetime

def custom_json_serializer(obj):
    if isinstance(obj, datetime):
        return obj.isoformat()
    return json.dumps(obj)

config = AsyncpgConfig(
    pool_config={"dsn": "postgresql://..."},
    driver_features={
        "json_serializer": custom_json_serializer
    }
)

Migration from Other Databases

From SQLite to AsyncPG

from sqlspec.adapters.sqlite import SqliteConfig
from sqlspec.adapters.sqlite.adk import SqliteADKStore
from sqlspec.adapters.asyncpg import AsyncpgConfig
from sqlspec.adapters.asyncpg.adk import AsyncpgADKStore

# Export from SQLite
sqlite_config = SqliteConfig(database="./agent.db")
sqlite_store = SqliteADKStore(sqlite_config)

sessions = sqlite_store.list_sessions("app", "user")

# Import to AsyncPG
pg_config = AsyncpgConfig(pool_config={"dsn": "postgresql://..."})
pg_store = AsyncpgADKStore(pg_config)
await pg_store.create_tables()

for session in sessions:
    await pg_store.create_session(
        session_id=session["id"],
        app_name=session["app_name"],
        user_id=session["user_id"],
        state=session["state"]
    )

From Psycopg to AsyncPG

Both use the same SQL schema, so migration is straightforward:

# Old Psycopg config

# New AsyncPG config (same connection params)
from sqlspec.adapters.asyncpg import AsyncpgConfig

# Just change the config class - SQL is identical
config = AsyncpgConfig(pool_config={
    "dsn": "postgresql://..."  # Same connection string
})

API Reference

class sqlspec.adapters.asyncpg.adk.AsyncpgADKStore[source]

Bases: BaseAsyncADKStore[AsyncConfigT]

PostgreSQL ADK store base class for all PostgreSQL drivers.

Implements session and event storage for Google Agent Development Kit using PostgreSQL via any PostgreSQL driver (AsyncPG, Psycopg, Psqlpy). All drivers share the same SQL dialect and parameter style ($1, $2, etc).

Provides: - Session state management with JSONB storage and merge operations - Event history tracking with BYTEA-serialized actions - Microsecond-precision timestamps with TIMESTAMPTZ - Foreign key constraints with cascade delete - Efficient upserts using ON CONFLICT - GIN indexes for JSONB queries - HOT updates with FILLFACTOR 80 - Optional user FK column for multi-tenancy

Parameters:

config (TypeVar(AsyncConfigT, bound= AsyncDatabaseConfig[Any, Any, Any] | NoPoolAsyncConfig[Any, Any])) – PostgreSQL database config with extension_config[“adk”] settings.

Example

from sqlspec.adapters.asyncpg import AsyncpgConfig from sqlspec.adapters.asyncpg.adk import AsyncpgADKStore

config = AsyncpgConfig(

pool_config={“dsn”: “postgresql://…”}, extension_config={

“adk”: {

“session_table”: “my_sessions”, “events_table”: “my_events”, “owner_id_column”: “tenant_id INTEGER NOT NULL REFERENCES tenants(id) ON DELETE CASCADE”

}

}

) store = AsyncpgADKStore(config) await store.create_tables()

Notes

  • PostgreSQL JSONB type used for state (more efficient than JSON)

  • AsyncPG automatically converts Python dicts to/from JSONB (no manual serialization)

  • TIMESTAMPTZ provides timezone-aware microsecond precision

  • State merging uses state || $1::jsonb operator for efficiency

  • BYTEA for pre-serialized actions from Google ADK (not pickled here)

  • GIN index on state for JSONB queries (partial index)

  • FILLFACTOR 80 leaves space for HOT updates

  • Generic over PostgresConfigT to support all PostgreSQL drivers

  • Owner ID column enables multi-tenant isolation with referential integrity

  • Configuration is read from config.extension_config[“adk”]

__init__(config)[source]

Initialize AsyncPG ADK store.

Parameters:

config (TypeVar(AsyncConfigT, bound= AsyncDatabaseConfig[Any, Any, Any] | NoPoolAsyncConfig[Any, Any])) – PostgreSQL database config.

Notes

Configuration is read from config.extension_config[“adk”]: - session_table: Sessions table name (default: “adk_sessions”) - events_table: Events table name (default: “adk_events”) - owner_id_column: Optional owner FK column DDL (default: None)

async create_tables()[source]

Create both sessions and events tables if they don’t exist.

Return type:

None

async create_session(session_id, app_name, user_id, state, owner_id=None)[source]

Create a new session.

Parameters:
  • session_id (str) – Unique session identifier.

  • app_name (str) – Application name.

  • user_id (str) – User identifier.

  • state (dict[str, typing.Any]) – Initial session state.

  • owner_id (Optional[typing.Any]) – Optional owner ID value for owner_id_column (if configured).

Return type:

SessionRecord

Returns:

Created session record.

Notes

Uses CURRENT_TIMESTAMP for create_time and update_time. State is passed as dict and asyncpg converts to JSONB automatically. If owner_id_column is configured, owner_id value must be provided.

async get_session(session_id)[source]

Get session by ID.

Parameters:

session_id (str) – Session identifier.

Return type:

SessionRecord | None

Returns:

Session record or None if not found.

Notes

PostgreSQL returns datetime objects for TIMESTAMPTZ columns. JSONB is automatically parsed by asyncpg.

async update_session_state(session_id, state)[source]

Update session state.

Parameters:
  • session_id (str) – Session identifier.

  • state (dict[str, typing.Any]) – New state dictionary (replaces existing state).

Return type:

None

Notes

This replaces the entire state dictionary. Uses CURRENT_TIMESTAMP for update_time.

async delete_session(session_id)[source]

Delete session and all associated events (cascade).

Parameters:

session_id (str) – Session identifier.

Return type:

None

Notes

Foreign key constraint ensures events are cascade-deleted.

async list_sessions(app_name, user_id=None)[source]

List sessions for an app, optionally filtered by user.

Parameters:
  • app_name (str) – Application name.

  • user_id (str | None) – User identifier. If None, lists all sessions for the app.

Return type:

list[SessionRecord]

Returns:

List of session records ordered by update_time DESC.

Notes

Uses composite index on (app_name, user_id) when user_id is provided.

async append_event(event_record)[source]

Append an event to a session.

Parameters:

event_record (EventRecord) – Event record to store.

Return type:

None

Notes

Uses CURRENT_TIMESTAMP for timestamp if not provided. JSONB fields are passed as dicts and asyncpg converts automatically.

async get_events(session_id, after_timestamp=None, limit=None)[source]

Get events for a session.

Parameters:
  • session_id (str) – Session identifier.

  • after_timestamp (datetime | None) – Only return events after this time.

  • limit (int | None) – Maximum number of events to return.

Return type:

list[EventRecord]

Returns:

List of event records ordered by timestamp ASC.

Notes

Uses index on (session_id, timestamp ASC). Parses JSONB fields and converts BYTEA actions to bytes.

property config: ConfigT

Return the database configuration.

property events_table: str

Return the events table name.

property owner_id_column_ddl: str | None

Return the full owner ID column DDL (or None if not configured).

property owner_id_column_name: str | None

Return the owner ID column name only (or None if not configured).

property session_table: str

Return the sessions table name.

class sqlspec.adapters.asyncpg.AsyncpgConfig[source]

Bases: AsyncDatabaseConfig[PoolConnectionProxy, Pool[Record], AsyncpgDriver]

Configuration for AsyncPG database connections using TypedDict.

driver_type

alias of AsyncpgDriver

connection_type

alias of PoolConnectionProxy

__init__(*, pool_config=None, pool_instance=None, migration_config=None, statement_config=None, driver_features=None, bind_key=None, extension_config=None, observability_config=None)[source]

Initialize AsyncPG configuration.

Parameters:
  • pool_config – Pool configuration parameters (TypedDict or dict)

  • pool_instance – Existing pool instance to use

  • migration_config – Migration configuration

  • statement_config – Statement configuration override

  • driver_features – Driver features configuration (TypedDict or dict)

  • bind_key – Optional unique identifier for this configuration

  • extension_config – Extension-specific configuration (e.g., Litestar plugin settings)

  • observability_config – Adapter-level observability overrides for lifecycle hooks and observers

async close_pool()[source]

Close the connection pool.

Return type:

None

async create_connection()[source]

Create a single async connection from the pool.

Return type:

PoolConnectionProxy

Returns:

An AsyncPG connection instance.

provide_connection(*args, **kwargs)[source]

Provide an async connection context manager.

Parameters:
  • *args (Any) – Additional arguments.

  • **kwargs (Any) – Additional keyword arguments.

Yields:

An AsyncPG connection instance.

Return type:

AsyncGenerator[PoolConnectionProxy, None]

provide_session(*args, statement_config=None, **kwargs)[source]

Provide an async driver session context manager.

Parameters:
  • *args (Any) – Additional arguments.

  • statement_config (StatementConfig | None) – Optional statement configuration override.

  • **kwargs (Any) – Additional keyword arguments.

Yields:

An AsyncpgDriver instance.

Return type:

AsyncGenerator[AsyncpgDriver, None]

async provide_pool(*args, **kwargs)[source]

Provide async pool instance.

Return type:

Record

Returns:

The async connection pool.

get_signature_namespace()[source]

Get the signature namespace for AsyncPG types.

This provides all AsyncPG-specific types that Litestar needs to recognize to avoid serialization attempts.

Return type:

dict[str, typing.Any]

Returns:

Dictionary mapping type names to types.

See Also