Skip to content

prefect.server.events.schemas.events

Event

Bases: PrefectBaseModel

The client-side view of an event that has happened to a Resource

Source code in src/prefect/server/events/schemas/events.py
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
class Event(PrefectBaseModel):
    """The client-side view of an event that has happened to a Resource"""

    occurred: DateTime = Field(
        description="When the event happened from the sender's perspective",
    )
    event: str = Field(
        description="The name of the event that happened",
    )
    resource: Resource = Field(
        description="The primary Resource this event concerns",
    )
    related: List[RelatedResource] = Field(
        default_factory=list,
        description="A list of additional Resources involved in this event",
    )
    payload: Dict[str, Any] = Field(
        default_factory=dict,
        description="An open-ended set of data describing what happened",
    )
    id: UUID = Field(
        description="The client-provided identifier of this event",
    )
    follows: Optional[UUID] = Field(
        None,
        description=(
            "The ID of an event that is known to have occurred prior to this one. "
            "If set, this may be used to establish a more precise ordering of causally-"
            "related events when they occur close enough together in time that the "
            "system may receive them out-of-order."
        ),
    )

    @property
    def involved_resources(self) -> Sequence[Resource]:
        return [self.resource] + list(self.related)

    @property
    def resource_in_role(self) -> Mapping[str, RelatedResource]:
        """Returns a mapping of roles to the first related resource in that role"""
        return {related.role: related for related in reversed(self.related)}

    @property
    def resources_in_role(self) -> Mapping[str, Sequence[RelatedResource]]:
        """Returns a mapping of roles to related resources in that role"""
        resources: Dict[str, List[RelatedResource]] = defaultdict(list)
        for related in self.related:
            resources[related.role].append(related)
        return resources

    @field_validator("related")
    @classmethod
    def enforce_maximum_related_resources(cls, value: List[RelatedResource]):
        if len(value) > PREFECT_EVENTS_MAXIMUM_RELATED_RESOURCES.value():
            raise ValueError(
                "The maximum number of related resources "
                f"is {PREFECT_EVENTS_MAXIMUM_RELATED_RESOURCES.value()}"
            )

        return value

    def receive(self, received: Optional[pendulum.DateTime] = None) -> "ReceivedEvent":
        kwargs = self.model_dump()
        if received is not None:
            kwargs["received"] = received
        return ReceivedEvent(**kwargs)

    def find_resource_label(self, label: str) -> Optional[str]:
        """Finds the value of the given label in this event's resource or one of its
        related resources.  If the label starts with `related:<role>:`, search for the
        first matching label in a related resource with that role."""
        directive, _, related_label = label.rpartition(":")
        directive, _, role = directive.partition(":")
        if directive == "related":
            for related in self.related:
                if related.role == role:
                    return related.get(related_label)
        return self.resource.get(label)

resource_in_role: Mapping[str, RelatedResource] property

Returns a mapping of roles to the first related resource in that role

resources_in_role: Mapping[str, Sequence[RelatedResource]] property

Returns a mapping of roles to related resources in that role

find_resource_label(label)

Finds the value of the given label in this event's resource or one of its related resources. If the label starts with related:<role>:, search for the first matching label in a related resource with that role.

Source code in src/prefect/server/events/schemas/events.py
166
167
168
169
170
171
172
173
174
175
176
def find_resource_label(self, label: str) -> Optional[str]:
    """Finds the value of the given label in this event's resource or one of its
    related resources.  If the label starts with `related:<role>:`, search for the
    first matching label in a related resource with that role."""
    directive, _, related_label = label.rpartition(":")
    directive, _, role = directive.partition(":")
    if directive == "related":
        for related in self.related:
            if related.role == role:
                return related.get(related_label)
    return self.resource.get(label)

EventCount

Bases: PrefectBaseModel

The count of events with the given filter value

Source code in src/prefect/server/events/schemas/events.py
324
325
326
327
328
329
330
331
332
333
class EventCount(PrefectBaseModel):
    """The count of events with the given filter value"""

    value: str = Field(..., description="The value to use for filtering")
    label: str = Field(..., description="The value to display for this count")
    count: int = Field(..., description="The count of matching events")
    start_time: DateTime = Field(
        ..., description="The start time of this group of events"
    )
    end_time: DateTime = Field(..., description="The end time of this group of events")

EventPage

Bases: PrefectBaseModel

A single page of events returned from the API, with an optional link to the next page of results

Source code in src/prefect/server/events/schemas/events.py
311
312
313
314
315
316
317
318
319
320
321
class EventPage(PrefectBaseModel):
    """A single page of events returned from the API, with an optional link to the
    next page of results"""

    events: List[ReceivedEvent] = Field(
        ..., description="The Events matching the query"
    )
    total: int = Field(..., description="The total number of matching Events")
    next_page: Optional[AnyHttpUrl] = Field(
        ..., description="The URL for the next page of results, if there are more"
    )

ReceivedEvent

Bases: Event

The server-side view of an event that has happened to a Resource after it has been received by the server

Source code in src/prefect/server/events/schemas/events.py
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
class ReceivedEvent(Event):
    """The server-side view of an event that has happened to a Resource after it has
    been received by the server"""

    model_config = ConfigDict(extra="ignore", from_attributes=True)

    received: DateTime = Field(
        default_factory=lambda: pendulum.now("UTC"),
        description="When the event was received by Prefect Cloud",
    )

    def as_database_row(self) -> Dict[str, Any]:
        row = self.model_dump()
        row["resource_id"] = self.resource.id
        row["recorded"] = pendulum.now("UTC")
        row["related_resource_ids"] = [related.id for related in self.related]
        return row

    def as_database_resource_rows(self) -> List[Dict[str, Any]]:
        def without_id_and_role(resource: Resource) -> Dict[str, str]:
            d: Dict[str, str] = resource.root.copy()
            d.pop("prefect.resource.id", None)
            d.pop("prefect.resource.role", None)
            return d

        return [
            {
                "occurred": self.occurred,
                "resource_id": resource.id,
                "resource_role": (
                    resource.role if isinstance(resource, RelatedResource) else ""
                ),
                "resource": without_id_and_role(resource),
                "event_id": self.id,
            }
            for resource in [self.resource, *self.related]
        ]

RelatedResource

Bases: Resource

A Resource with a specific role in an Event

Source code in src/prefect/server/events/schemas/events.py
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
class RelatedResource(Resource):
    """A Resource with a specific role in an Event"""

    @model_validator(mode="after")
    def requires_resource_role(self) -> Self:
        if "prefect.resource.role" not in self.root:
            raise ValueError(
                "Related Resources must include the prefect.resource.role label"
            )
        if not self.root["prefect.resource.role"]:
            raise ValueError("The prefect.resource.role label must be non-empty")

        return self

    @property
    def role(self) -> str:
        return self["prefect.resource.role"]

Resource

Bases: Labelled

An observable business object of interest to the user

Source code in src/prefect/server/events/schemas/events.py
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
class Resource(Labelled):
    """An observable business object of interest to the user"""

    @model_validator(mode="after")
    def enforce_maximum_labels(self) -> Self:
        if len(self.root) > PREFECT_EVENTS_MAXIMUM_LABELS_PER_RESOURCE.value():
            raise ValueError(
                "The maximum number of labels per resource "
                f"is {PREFECT_EVENTS_MAXIMUM_LABELS_PER_RESOURCE.value()}"
            )

        return self

    @model_validator(mode="after")
    def requires_resource_id(self) -> Self:
        if "prefect.resource.id" not in self.root:
            raise ValueError("Resources must include the prefect.resource.id label")
        if not self.root["prefect.resource.id"]:
            raise ValueError("The prefect.resource.id label must be non-empty")

        return self

    @property
    def id(self) -> str:
        return self["prefect.resource.id"]

    @property
    def name(self) -> Optional[str]:
        return self.get("prefect.resource.name")

    def prefect_object_id(self, kind: str) -> UUID:
        """Extracts the UUID from an event's resource ID if it's the expected kind
        of prefect resource"""
        prefix = f"{kind}." if not kind.endswith(".") else kind

        if not self.id.startswith(prefix):
            raise ValueError(f"Resource ID {self.id} does not start with {prefix}")

        return UUID(self.id[len(prefix) :])

prefect_object_id(kind)

Extracts the UUID from an event's resource ID if it's the expected kind of prefect resource

Source code in src/prefect/server/events/schemas/events.py
69
70
71
72
73
74
75
76
77
def prefect_object_id(self, kind: str) -> UUID:
    """Extracts the UUID from an event's resource ID if it's the expected kind
    of prefect resource"""
    prefix = f"{kind}." if not kind.endswith(".") else kind

    if not self.id.startswith(prefix):
        raise ValueError(f"Resource ID {self.id} does not start with {prefix}")

    return UUID(self.id[len(prefix) :])

matches(expected, value)

Returns true if the given value matches the expected string, which may include a a negation prefix ("!this-value") or a wildcard suffix ("any-value-starting-with*")

Source code in src/prefect/server/events/schemas/events.py
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
def matches(expected: str, value: Optional[str]) -> bool:
    """Returns true if the given value matches the expected string, which may
    include a a negation prefix ("!this-value") or a wildcard suffix
    ("any-value-starting-with*")"""
    if value is None:
        return False

    positive = True
    if expected.startswith("!"):
        expected = expected[1:]
        positive = False

    if expected.endswith("*"):
        match = value.startswith(expected[:-1])
    else:
        match = value == expected

    return match if positive else not match