Source code for sqlspec.extensions.events._payload

"""Shared payload encoding/decoding utilities for event backends."""

import contextlib
from datetime import datetime, timezone
from typing import Any

from sqlspec.exceptions import EventChannelError
from sqlspec.extensions.events._models import EventMessage
from sqlspec.utils.serializers import from_json, to_json
from sqlspec.utils.uuids import uuid4

__all__ = ("decode_notify_payload", "encode_notify_payload", "parse_event_timestamp")

MAX_NOTIFY_BYTES = 8000


[docs] def encode_notify_payload(event_id: str, payload: "dict[str, Any]", metadata: "dict[str, Any] | None") -> str: """Encode event data as JSON for NOTIFY payload. Raises: EventChannelError: If the encoded payload exceeds PostgreSQL's 8KB limit. """ encoded = to_json( { "event_id": event_id, "payload": payload, "metadata": metadata, "published_at": datetime.now(timezone.utc).isoformat(), }, as_bytes=True, ) if len(encoded) > MAX_NOTIFY_BYTES: msg = "PostgreSQL NOTIFY payload exceeds 8 KB limit" raise EventChannelError(msg) return encoded.decode("utf-8")
[docs] def decode_notify_payload(channel: str, payload: str) -> "EventMessage": """Decode JSON payload from NOTIFY into an EventMessage.""" raw = from_json(payload) data = raw if isinstance(raw, dict) else {"payload": raw} payload_val = data.get("payload") metadata_val = data.get("metadata") timestamp = parse_event_timestamp(data.get("published_at")) return EventMessage( event_id=data.get("event_id", uuid4().hex), channel=channel, payload=payload_val if isinstance(payload_val, dict) else {"value": payload_val}, metadata=metadata_val if metadata_val is None or isinstance(metadata_val, dict) else {"value": metadata_val}, attempts=0, available_at=timestamp, lease_expires_at=None, created_at=timestamp, )
[docs] def parse_event_timestamp(value: Any) -> "datetime": """Parse a timestamp value into a timezone-aware datetime. Handles ISO format strings, datetime objects, and falls back to current UTC time for invalid or missing values. """ if isinstance(value, datetime): return value if value.tzinfo else value.replace(tzinfo=timezone.utc) if isinstance(value, str): with contextlib.suppress(ValueError): parsed = datetime.fromisoformat(value) return parsed if parsed.tzinfo else parsed.replace(tzinfo=timezone.utc) return datetime.now(timezone.utc)