Skip to content

prefect.server.events.messaging

EventPublisher

Bases: Publisher

Source code in src/prefect/server/events/messaging.py
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
class EventPublisher(Publisher):
    _publisher: Publisher

    def __init__(self, publisher: Publisher = None):
        self._publisher = publisher

    async def __aenter__(self):
        await self._publisher.__aenter__()
        return self

    async def __aexit__(self, *args):
        await self._publisher.__aexit__(*args)

    async def publish_data(self, data: bytes, attributes: dict):
        await self._publisher.publish_data(data, attributes)

    async def publish_event(self, event: ReceivedEvent):
        """
        Publishes the given events

        Args:
            event: the event to publish
        """
        encoded = event.model_dump_json().encode()
        if len(encoded) > PREFECT_EVENTS_MAXIMUM_SIZE_BYTES.value():
            logger.warning(
                "Refusing to publish event of size %s",
                extra={
                    "event_id": str(event.id),
                    "event": event.event[:100],
                    "length": len(encoded),
                },
            )
            return

        logger.debug(
            "Publishing event: %s with id: %s for resource: %s",
            event.event,
            event.id,
            event.resource.get("prefect.resource.id"),
        )
        await self.publish_data(
            encoded,
            {
                "id": str(event.id),
                "event": event.event,
            },
        )

publish_event(event) async

Publishes the given events

Parameters:

Name Type Description Default
event ReceivedEvent

the event to publish

required
Source code in src/prefect/server/events/messaging.py
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
async def publish_event(self, event: ReceivedEvent):
    """
    Publishes the given events

    Args:
        event: the event to publish
    """
    encoded = event.model_dump_json().encode()
    if len(encoded) > PREFECT_EVENTS_MAXIMUM_SIZE_BYTES.value():
        logger.warning(
            "Refusing to publish event of size %s",
            extra={
                "event_id": str(event.id),
                "event": event.event[:100],
                "length": len(encoded),
            },
        )
        return

    logger.debug(
        "Publishing event: %s with id: %s for resource: %s",
        event.event,
        event.id,
        event.resource.get("prefect.resource.id"),
    )
    await self.publish_data(
        encoded,
        {
            "id": str(event.id),
            "event": event.event,
        },
    )

publish(events) async

Send the given events as a batch via the default publisher

Source code in src/prefect/server/events/messaging.py
11
12
13
14
15
async def publish(events: Iterable[ReceivedEvent]):
    """Send the given events as a batch via the default publisher"""
    async with create_event_publisher() as publisher:
        for event in events:
            await publisher.publish_event(event)