Source code for sqlspec.extensions.events._protocols

"""Protocols for EventChannel handlers and backends."""

from typing import TYPE_CHECKING, Any, ClassVar, Protocol, runtime_checkable

if TYPE_CHECKING:
    from sqlspec.extensions.events._models import EventMessage

__all__ = ("AsyncEventBackendProtocol", "AsyncEventHandler", "SyncEventBackendProtocol", "SyncEventHandler")


[docs] class AsyncEventHandler(Protocol): """Protocol describing async event handler callables."""
[docs] async def __call__(self, message: "EventMessage") -> Any: # pragma: no cover - typing only """Process a queued event message asynchronously."""
[docs] class SyncEventHandler(Protocol): """Protocol describing sync event handler callables."""
[docs] def __call__(self, message: "EventMessage") -> Any: # pragma: no cover - typing only """Process a queued event message synchronously."""
[docs] @runtime_checkable class AsyncEventBackendProtocol(Protocol): """Protocol for async event backends. All async event backends (native or queue-based) must implement these methods. """ supports_async: ClassVar[bool] backend_name: ClassVar[str]
[docs] async def publish(self, channel: str, payload: "dict[str, Any]", metadata: "dict[str, Any] | None" = None) -> str: """Publish an event to a channel. Args: channel: Target channel name. payload: Event payload (must be JSON-serializable). metadata: Optional metadata dict. Returns: The event ID. """ ...
[docs] async def dequeue(self, channel: str, poll_interval: float) -> "EventMessage | None": """Dequeue an event from the channel. Args: channel: Channel name to listen on. poll_interval: Timeout in seconds to wait for a notification. Returns: EventMessage if a notification was received, None otherwise. """ ...
[docs] async def ack(self, event_id: str) -> None: """Acknowledge an event. Args: event_id: ID of the event to acknowledge. """ ...
[docs] async def nack(self, event_id: str) -> None: """Return an event to the queue for redelivery. Args: event_id: ID of the event to return. """ ...
[docs] async def shutdown(self) -> None: """Shutdown the backend and release resources.""" ...
[docs] @runtime_checkable class SyncEventBackendProtocol(Protocol): """Protocol for sync event backends. All sync event backends (native or queue-based) must implement these methods. """ supports_sync: ClassVar[bool] backend_name: ClassVar[str]
[docs] def publish(self, channel: str, payload: "dict[str, Any]", metadata: "dict[str, Any] | None" = None) -> str: """Publish an event to a channel. Args: channel: Target channel name. payload: Event payload (must be JSON-serializable). metadata: Optional metadata dict. Returns: The event ID. """ ...
[docs] def dequeue(self, channel: str, poll_interval: float) -> "EventMessage | None": """Dequeue an event from the channel. Args: channel: Channel name to listen on. poll_interval: Timeout in seconds to wait for a notification. Returns: EventMessage if a notification was received, None otherwise. """ ...
[docs] def ack(self, event_id: str) -> None: """Acknowledge an event. Args: event_id: ID of the event to acknowledge. """ ...
[docs] def nack(self, event_id: str) -> None: """Return an event to the queue for redelivery. Args: event_id: ID of the event to return. """ ...
[docs] def shutdown(self) -> None: """Shutdown the backend and release resources.""" ...