Skip to content

prefect.server.utilities.messaging

Consumer

Bases: ABC

Abstract base class for consumers that receive messages from a message broker and call a handler function for each message received.

Source code in src/prefect/server/utilities/messaging/__init__.py
117
118
119
120
121
122
123
124
125
126
class Consumer(abc.ABC):
    """
    Abstract base class for consumers that receive messages from a message broker and
    call a handler function for each message received.
    """

    @abc.abstractmethod
    async def run(self, handler: MessageHandler) -> None:
        """Runs the consumer (indefinitely)"""
        ...

run(handler) abstractmethod async

Runs the consumer (indefinitely)

Source code in src/prefect/server/utilities/messaging/__init__.py
123
124
125
126
@abc.abstractmethod
async def run(self, handler: MessageHandler) -> None:
    """Runs the consumer (indefinitely)"""
    ...

Message

Bases: Protocol

A protocol representing a message sent to a message broker.

Source code in src/prefect/server/utilities/messaging/__init__.py
28
29
30
31
32
33
34
class Message(Protocol):
    """
    A protocol representing a message sent to a message broker.
    """

    data: Union[bytes, str]
    attributes: Dict[str, Any]

StopConsumer

Bases: Exception

Exception to raise to stop a consumer.

Source code in src/prefect/server/utilities/messaging/__init__.py
108
109
110
111
112
113
114
class StopConsumer(Exception):
    """
    Exception to raise to stop a consumer.
    """

    def __init__(self, ack: bool = False):
        self.ack = ack

create_cache()

Creates a new cache with the applications default settings. Returns: a new Cache instance

Source code in src/prefect/server/utilities/messaging/__init__.py
134
135
136
137
138
139
140
141
142
def create_cache() -> Cache:
    """
    Creates a new cache with the applications default settings.
    Returns:
        a new Cache instance
    """
    module = importlib.import_module(PREFECT_MESSAGING_CACHE.value())
    assert isinstance(module, CacheModule)
    return module.Cache()

create_consumer(topic, **kwargs)

Creates a new consumer with the applications default settings. Args: topic: the topic to consume from Returns: a new Consumer instance

Source code in src/prefect/server/utilities/messaging/__init__.py
185
186
187
188
189
190
191
192
193
194
195
def create_consumer(topic: str, **kwargs) -> Consumer:
    """
    Creates a new consumer with the applications default settings.
    Args:
        topic: the topic to consume from
    Returns:
        a new Consumer instance
    """
    module = importlib.import_module(PREFECT_MESSAGING_BROKER.value())
    assert isinstance(module, BrokerModule)
    return module.Consumer(topic, **kwargs)

create_publisher(topic, cache=None, deduplicate_by=None)

Creates a new publisher with the applications default settings. Args: topic: the topic to publish to Returns: a new Consumer instance

Source code in src/prefect/server/utilities/messaging/__init__.py
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
def create_publisher(
    topic: str, cache: Optional[Cache] = None, deduplicate_by: Optional[str] = None
) -> Publisher:
    """
    Creates a new publisher with the applications default settings.
    Args:
        topic: the topic to publish to
    Returns:
        a new Consumer instance
    """
    cache = cache or create_cache()

    module = importlib.import_module(PREFECT_MESSAGING_BROKER.value())
    assert isinstance(module, BrokerModule)
    return module.Publisher(topic, cache, deduplicate_by=deduplicate_by)

ephemeral_subscription(topic) async

Creates an ephemeral subscription to the given source, removing it when the context exits.

Source code in src/prefect/server/utilities/messaging/__init__.py
173
174
175
176
177
178
179
180
181
182
@asynccontextmanager
async def ephemeral_subscription(topic: str) -> AsyncGenerator[Dict[str, Any], None]:
    """
    Creates an ephemeral subscription to the given source, removing it when the context
    exits.
    """
    module = importlib.import_module(PREFECT_MESSAGING_BROKER.value())
    assert isinstance(module, BrokerModule)
    async with module.ephemeral_subscription(topic) as consumer_create_kwargs:
        yield consumer_create_kwargs