Skip to content

prefect.server.events.clients

AssertingEventsClient

Bases: EventsClient

An implementation of the Prefect Events client that records all events sent to it for inspection during tests.

Source code in src/prefect/server/events/clients.py
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
class AssertingEventsClient(EventsClient):
    """An implementation of the Prefect Events client that records all events sent
    to it for inspection during tests."""

    last: ClassVar[Optional["AssertingEventsClient"]] = None
    all: ClassVar[List["AssertingEventsClient"]] = []

    args: Tuple
    kwargs: Dict[str, Any]
    events: List[Event]

    def __init__(self, *args, **kwargs):
        AssertingEventsClient.last = self
        AssertingEventsClient.all.append(self)
        self.args = args
        self.kwargs = kwargs

    @classmethod
    def reset(cls) -> None:
        """Reset all captured instances and their events.  For use this between tests"""
        cls.last = None
        cls.all = []

    async def emit(self, event: Event) -> Event:
        if not hasattr(self, "events"):
            raise TypeError(
                "Events may only be emitted while this client is being used as a "
                "context manager"
            )
        self.events.append(event)
        return event

    async def __aenter__(self) -> Self:
        self.events = []
        return self

    async def __aexit__(
        self,
        exc_type: Optional[Type[Exception]],
        exc_val: Optional[Exception],
        exc_tb: Optional[TracebackType],
    ) -> None:
        # retain the events list so that assertions may be made
        return None

    @classmethod
    def emitted_events_count(cls) -> int:
        return sum(len(client.events) for client in cls.all)

    @classmethod
    def assert_emitted_event_count(cls, count: int) -> None:
        """Assert that the given number of events were emitted."""
        total_num_events = cls.emitted_events_count()
        assert (
            total_num_events == count
        ), f"The number of emitted events did not match the expected count: {total_num_events=} != {count=}"

    @classmethod
    def assert_emitted_event_with(
        cls,
        event: Optional[str] = None,
        resource: Optional[Dict[str, LabelValue]] = None,
        related: Optional[List[Dict[str, LabelValue]]] = None,
        payload: Optional[Dict[str, Any]] = None,
    ):
        """Assert that an event was emitted containing the given properties."""
        assert cls.last is not None and cls.all, "No event client was created"

        emitted_events = [
            event for client in cls.all for event in reversed(client.events)
        ]

        resource_spec = (
            ResourceSpecification.model_validate(resource) if resource else None
        )
        related_specs = (
            [
                ResourceSpecification.model_validate(related_resource)
                for related_resource in related
            ]
            if related
            else None
        )

        mismatch_reasons: List[Tuple[str, str]] = []

        def event_matches(emitted_event: Event) -> bool:
            if event is not None and emitted_event.event != event:
                mismatch_reasons.append((f"{event=}", f"{emitted_event.event=}"))
                return False

            if resource_spec and not resource_spec.matches(emitted_event.resource):
                mismatch_reasons.append((f"{resource=}", f"{emitted_event.resource=}"))
                return False

            if related_specs:
                for related_spec in related_specs:
                    if not any(
                        related_spec.matches(related_resource)
                        for related_resource in emitted_event.related
                    ):
                        mismatch_reasons.append(
                            (f"{related=}", f"{emitted_event.related=}")
                        )
                        return False

            if payload and any(
                emitted_event.payload.get(k) != v for k, v in payload.items()
            ):
                mismatch_reasons.append((f"{payload=}", f"{emitted_event.payload=}"))
                return False

            return True

        assert any(
            event_matches(emitted_event) for emitted_event in emitted_events
        ), f"""An event was not emitted matching the following criteria:
    {event=}
    {resource=}
    {related=}
    {payload=}

# of captured events: {len(emitted_events)}
{chr(10).join(dedent(f'''
                    Expected:
                        {expected}
                    Received:
                        {received}
                ''') for expected, received in mismatch_reasons)}
"""

    @classmethod
    def assert_no_emitted_event_with(
        cls,
        event: Optional[str] = None,
        resource: Optional[Dict[str, LabelValue]] = None,
        related: Optional[List[Dict[str, LabelValue]]] = None,
        payload: Optional[Dict[str, Any]] = None,
    ):
        try:
            cls.assert_emitted_event_with(event, resource, related, payload)
        except AssertionError:
            return
        else:
            assert False, "An event was emitted matching the given criteria"

assert_emitted_event_count(count) classmethod

Assert that the given number of events were emitted.

Source code in src/prefect/server/events/clients.py
 99
100
101
102
103
104
105
@classmethod
def assert_emitted_event_count(cls, count: int) -> None:
    """Assert that the given number of events were emitted."""
    total_num_events = cls.emitted_events_count()
    assert (
        total_num_events == count
    ), f"The number of emitted events did not match the expected count: {total_num_events=} != {count=}"

assert_emitted_event_with(event=None, resource=None, related=None, payload=None) classmethod

Assert that an event was emitted containing the given properties.

Source code in src/prefect/server/events/clients.py
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
    @classmethod
    def assert_emitted_event_with(
        cls,
        event: Optional[str] = None,
        resource: Optional[Dict[str, LabelValue]] = None,
        related: Optional[List[Dict[str, LabelValue]]] = None,
        payload: Optional[Dict[str, Any]] = None,
    ):
        """Assert that an event was emitted containing the given properties."""
        assert cls.last is not None and cls.all, "No event client was created"

        emitted_events = [
            event for client in cls.all for event in reversed(client.events)
        ]

        resource_spec = (
            ResourceSpecification.model_validate(resource) if resource else None
        )
        related_specs = (
            [
                ResourceSpecification.model_validate(related_resource)
                for related_resource in related
            ]
            if related
            else None
        )

        mismatch_reasons: List[Tuple[str, str]] = []

        def event_matches(emitted_event: Event) -> bool:
            if event is not None and emitted_event.event != event:
                mismatch_reasons.append((f"{event=}", f"{emitted_event.event=}"))
                return False

            if resource_spec and not resource_spec.matches(emitted_event.resource):
                mismatch_reasons.append((f"{resource=}", f"{emitted_event.resource=}"))
                return False

            if related_specs:
                for related_spec in related_specs:
                    if not any(
                        related_spec.matches(related_resource)
                        for related_resource in emitted_event.related
                    ):
                        mismatch_reasons.append(
                            (f"{related=}", f"{emitted_event.related=}")
                        )
                        return False

            if payload and any(
                emitted_event.payload.get(k) != v for k, v in payload.items()
            ):
                mismatch_reasons.append((f"{payload=}", f"{emitted_event.payload=}"))
                return False

            return True

        assert any(
            event_matches(emitted_event) for emitted_event in emitted_events
        ), f"""An event was not emitted matching the following criteria:
    {event=}
    {resource=}
    {related=}
    {payload=}

# of captured events: {len(emitted_events)}
{chr(10).join(dedent(f'''
                    Expected:
                        {expected}
                    Received:
                        {received}
                ''') for expected, received in mismatch_reasons)}
"""

reset() classmethod

Reset all captured instances and their events. For use this between tests

Source code in src/prefect/server/events/clients.py
67
68
69
70
71
@classmethod
def reset(cls) -> None:
    """Reset all captured instances and their events.  For use this between tests"""
    cls.last = None
    cls.all = []

EventsClient

Bases: ABC

The abstract interface for a Prefect Events client

Source code in src/prefect/server/events/clients.py
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
class EventsClient(abc.ABC):
    """The abstract interface for a Prefect Events client"""

    @abc.abstractmethod
    async def emit(self, event: Event) -> Optional[Event]:
        ...

    async def __aenter__(self) -> Self:
        return self

    async def __aexit__(
        self,
        exc_type: Optional[Type[Exception]],
        exc_val: Optional[Exception],
        exc_tb: Optional[TracebackType],
    ) -> None:
        return None

NullEventsClient

Bases: EventsClient

A no-op implementation of the Prefect Events client for testing

Source code in src/prefect/server/events/clients.py
43
44
45
46
47
class NullEventsClient(EventsClient):
    """A no-op implementation of the Prefect Events client for testing"""

    async def emit(self, event: Event) -> None:
        pass

PrefectServerEventsAPIClient

Source code in src/prefect/server/events/clients.py
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
class PrefectServerEventsAPIClient:
    _http_client: PrefectHttpxAsyncClient

    def __init__(self, additional_headers: Dict[str, str] = {}):
        from prefect.server.api.server import create_app

        # create_app caches application instances, and invoking it with no arguments
        # will point it to the the currently running server instance
        api_app = create_app()

        self._http_client = PrefectHttpxAsyncClient(
            transport=httpx.ASGITransport(app=api_app, raise_app_exceptions=False),
            headers={**additional_headers},
            base_url="http://prefect-in-memory/api",
            enable_csrf_support=False,
            raise_on_all_errors=False,
        )

    async def __aenter__(self) -> Self:
        await self._http_client.__aenter__()
        return self

    async def __aexit__(self, *args):
        await self._http_client.__aexit__(*args)

    async def pause_automation(self, automation_id: UUID) -> httpx.Response:
        return await self._http_client.patch(
            f"/automations/{automation_id}", json={"enabled": False}
        )

    async def resume_automation(self, automation_id: UUID) -> httpx.Response:
        return await self._http_client.patch(
            f"/automations/{automation_id}", json={"enabled": True}
        )

__init__(additional_headers={})

Source code in src/prefect/server/events/clients.py
228
229
230
231
232
233
234
235
236
237
238
239
240
241
def __init__(self, additional_headers: Dict[str, str] = {}):
    from prefect.server.api.server import create_app

    # create_app caches application instances, and invoking it with no arguments
    # will point it to the the currently running server instance
    api_app = create_app()

    self._http_client = PrefectHttpxAsyncClient(
        transport=httpx.ASGITransport(app=api_app, raise_app_exceptions=False),
        headers={**additional_headers},
        base_url="http://prefect-in-memory/api",
        enable_csrf_support=False,
        raise_on_all_errors=False,
    )