Skip to content

prefect.events.schemas.events

Event

Bases: PrefectBaseModel

The client-side view of an event that has happened to a Resource

Source code in src/prefect/events/schemas/events.py
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
class Event(PrefectBaseModel):
    """The client-side view of an event that has happened to a Resource"""

    model_config = ConfigDict(extra="ignore")

    occurred: DateTime = Field(
        default_factory=lambda: pendulum.now("UTC"),
        description="When the event happened from the sender's perspective",
    )
    event: str = Field(
        description="The name of the event that happened",
    )
    resource: Resource = Field(
        description="The primary Resource this event concerns",
    )
    related: List[RelatedResource] = Field(
        default_factory=list,
        description="A list of additional Resources involved in this event",
    )
    payload: Dict[str, Any] = Field(
        default_factory=dict,
        description="An open-ended set of data describing what happened",
    )
    id: UUID = Field(
        default_factory=uuid4,
        description="The client-provided identifier of this event",
    )
    follows: Optional[UUID] = Field(
        default=None,
        description=(
            "The ID of an event that is known to have occurred prior to this one. "
            "If set, this may be used to establish a more precise ordering of causally-"
            "related events when they occur close enough together in time that the "
            "system may receive them out-of-order."
        ),
    )

    @property
    def involved_resources(self) -> Sequence[Resource]:
        return [self.resource] + list(self.related)

    @property
    def resource_in_role(self) -> Mapping[str, RelatedResource]:
        """Returns a mapping of roles to the first related resource in that role"""
        return {related.role: related for related in reversed(self.related)}

    @property
    def resources_in_role(self) -> Mapping[str, Sequence[RelatedResource]]:
        """Returns a mapping of roles to related resources in that role"""
        resources: Dict[str, List[RelatedResource]] = defaultdict(list)
        for related in self.related:
            resources[related.role].append(related)
        return resources

    @field_validator("related")
    @classmethod
    def enforce_maximum_related_resources(cls, value: List[RelatedResource]):
        if len(value) > PREFECT_EVENTS_MAXIMUM_RELATED_RESOURCES.value():
            raise ValueError(
                "The maximum number of related resources "
                f"is {PREFECT_EVENTS_MAXIMUM_RELATED_RESOURCES.value()}"
            )

        return value

    def find_resource_label(self, label: str) -> Optional[str]:
        """Finds the value of the given label in this event's resource or one of its
        related resources.  If the label starts with `related:<role>:`, search for the
        first matching label in a related resource with that role."""
        directive, _, related_label = label.rpartition(":")
        directive, _, role = directive.partition(":")
        if directive == "related":
            for related in self.related:
                if related.role == role:
                    return related.get(related_label)
        return self.resource.get(label)

resource_in_role: Mapping[str, RelatedResource] property

Returns a mapping of roles to the first related resource in that role

resources_in_role: Mapping[str, Sequence[RelatedResource]] property

Returns a mapping of roles to related resources in that role

find_resource_label(label)

Finds the value of the given label in this event's resource or one of its related resources. If the label starts with related:<role>:, search for the first matching label in a related resource with that role.

Source code in src/prefect/events/schemas/events.py
158
159
160
161
162
163
164
165
166
167
168
def find_resource_label(self, label: str) -> Optional[str]:
    """Finds the value of the given label in this event's resource or one of its
    related resources.  If the label starts with `related:<role>:`, search for the
    first matching label in a related resource with that role."""
    directive, _, related_label = label.rpartition(":")
    directive, _, role = directive.partition(":")
    if directive == "related":
        for related in self.related:
            if related.role == role:
                return related.get(related_label)
    return self.resource.get(label)

ReceivedEvent

Bases: Event

The server-side view of an event that has happened to a Resource after it has been received by the server

Source code in src/prefect/events/schemas/events.py
171
172
173
174
175
176
177
178
179
180
class ReceivedEvent(Event):
    """The server-side view of an event that has happened to a Resource after it has
    been received by the server"""

    model_config = ConfigDict(from_attributes=True)

    received: DateTime = Field(
        ...,
        description="When the event was received by Prefect Cloud",
    )

RelatedResource

Bases: Resource

A Resource with a specific role in an Event

Source code in src/prefect/events/schemas/events.py
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
class RelatedResource(Resource):
    """A Resource with a specific role in an Event"""

    @model_validator(mode="after")
    def requires_resource_role(self) -> Self:
        if "prefect.resource.role" not in self.root:
            raise ValueError(
                "Related Resources must include the prefect.resource.role label"
            )
        if not self.root["prefect.resource.role"]:
            raise ValueError("The prefect.resource.role label must be non-empty")

        return self

    @property
    def role(self) -> str:
        return self["prefect.resource.role"]

Resource

Bases: Labelled

An observable business object of interest to the user

Source code in src/prefect/events/schemas/events.py
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
class Resource(Labelled):
    """An observable business object of interest to the user"""

    @model_validator(mode="after")
    def enforce_maximum_labels(self) -> Self:
        if len(self.root) > PREFECT_EVENTS_MAXIMUM_LABELS_PER_RESOURCE.value():
            raise ValueError(
                "The maximum number of labels per resource "
                f"is {PREFECT_EVENTS_MAXIMUM_LABELS_PER_RESOURCE.value()}"
            )

        return self

    @model_validator(mode="after")
    def requires_resource_id(self) -> Self:
        if "prefect.resource.id" not in self.root:
            raise ValueError("Resources must include the prefect.resource.id label")
        if not self.root["prefect.resource.id"]:
            raise ValueError("The prefect.resource.id label must be non-empty")

        return self

    @property
    def id(self) -> str:
        return self["prefect.resource.id"]

    @property
    def name(self) -> Optional[str]:
        return self.get("prefect.resource.name")

    def prefect_object_id(self, kind: str) -> UUID:
        """Extracts the UUID from an event's resource ID if it's the expected kind
        of prefect resource"""
        prefix = f"{kind}." if not kind.endswith(".") else kind

        if not self.id.startswith(prefix):
            raise ValueError(f"Resource ID {self.id} does not start with {prefix}")

        return UUID(self.id[len(prefix) :])

prefect_object_id(kind)

Extracts the UUID from an event's resource ID if it's the expected kind of prefect resource

Source code in src/prefect/events/schemas/events.py
63
64
65
66
67
68
69
70
71
def prefect_object_id(self, kind: str) -> UUID:
    """Extracts the UUID from an event's resource ID if it's the expected kind
    of prefect resource"""
    prefix = f"{kind}." if not kind.endswith(".") else kind

    if not self.id.startswith(prefix):
        raise ValueError(f"Resource ID {self.id} does not start with {prefix}")

    return UUID(self.id[len(prefix) :])

matches(expected, value)

Returns true if the given value matches the expected string, which may include a a negation prefix ("!this-value") or a wildcard suffix ("any-value-starting-with*")

Source code in src/prefect/events/schemas/events.py
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
def matches(expected: str, value: Optional[str]) -> bool:
    """Returns true if the given value matches the expected string, which may
    include a a negation prefix ("!this-value") or a wildcard suffix
    ("any-value-starting-with*")"""
    if value is None:
        return False

    positive = True
    if expected.startswith("!"):
        expected = expected[1:]
        positive = False

    if expected.endswith("*"):
        match = value.startswith(expected[:-1])
    else:
        match = value == expected

    return match if positive else not match