Skip to content

prefect.events

TriggerTypes: TypeAlias = Union[EventTrigger, MetricTrigger, CompoundTrigger, SequenceTrigger] module-attribute

The union of all concrete trigger types that a user may actually create

Action

Bases: PrefectBaseModel, ABC

An Action that may be performed when an Automation is triggered

Source code in src/prefect/events/actions.py
12
13
14
15
16
17
18
19
class Action(PrefectBaseModel, abc.ABC):
    """An Action that may be performed when an Automation is triggered"""

    type: str

    def describe_for_cli(self) -> str:
        """A human-readable description of the action"""
        return self.type.replace("-", " ").capitalize()

describe_for_cli()

A human-readable description of the action

Source code in src/prefect/events/actions.py
17
18
19
def describe_for_cli(self) -> str:
    """A human-readable description of the action"""
    return self.type.replace("-", " ").capitalize()

AutomationCore

Bases: PrefectBaseModel

Defines an action a user wants to take when a certain number of events do or don't happen to the matching resources

Source code in src/prefect/events/schemas/automations.py
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
class AutomationCore(PrefectBaseModel, extra="ignore"):  # type: ignore[call-arg]
    """Defines an action a user wants to take when a certain number of events
    do or don't happen to the matching resources"""

    name: str = Field(..., description="The name of this automation")
    description: str = Field("", description="A longer description of this automation")

    enabled: bool = Field(True, description="Whether this automation will be evaluated")

    trigger: TriggerTypes = Field(
        ...,
        description=(
            "The criteria for which events this Automation covers and how it will "
            "respond to the presence or absence of those events"
        ),
    )

    actions: List[ActionTypes] = Field(
        ...,
        description="The actions to perform when this Automation triggers",
    )

    actions_on_trigger: List[ActionTypes] = Field(
        default_factory=list,
        description="The actions to perform when an Automation goes into a triggered state",
    )

    actions_on_resolve: List[ActionTypes] = Field(
        default_factory=list,
        description="The actions to perform when an Automation goes into a resolving state",
    )

    owner_resource: Optional[str] = Field(
        default=None, description="The owning resource of this automation"
    )

CallWebhook

Bases: Action

Call a webhook when an Automation is triggered.

Source code in src/prefect/events/actions.py
128
129
130
131
132
133
134
135
136
137
138
class CallWebhook(Action):
    """Call a webhook when an Automation is triggered."""

    type: Literal["call-webhook"] = "call-webhook"
    block_document_id: UUID = Field(
        description="The identifier of the webhook block to use"
    )
    payload: str = Field(
        default="",
        description="An optional templatable payload to send when calling the webhook.",
    )

CancelFlowRun

Bases: Action

Cancels a flow run associated with the trigger

Source code in src/prefect/events/actions.py
110
111
112
113
class CancelFlowRun(Action):
    """Cancels a flow run associated with the trigger"""

    type: Literal["cancel-flow-run"] = "cancel-flow-run"

ChangeFlowRunState

Bases: Action

Changes the state of a flow run associated with the trigger

Source code in src/prefect/events/actions.py
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
class ChangeFlowRunState(Action):
    """Changes the state of a flow run associated with the trigger"""

    type: Literal["change-flow-run-state"] = "change-flow-run-state"

    name: Optional[str] = Field(
        None,
        description="The name of the state to change the flow run to",
    )
    state: StateType = Field(
        ...,
        description="The type of the state to change the flow run to",
    )
    message: Optional[str] = Field(
        None,
        description="An optional message to associate with the state change",
    )

CompositeTrigger

Bases: Trigger, ABC

Requires some number of triggers to have fired within the given time period.

Source code in src/prefect/events/schemas/automations.py
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
class CompositeTrigger(Trigger, abc.ABC):
    """
    Requires some number of triggers to have fired within the given time period.
    """

    type: Literal["compound", "sequence"]
    triggers: List["TriggerTypes"]
    within: Optional[timedelta] = Field(
        None,
        description=(
            "The time period over which the events must occur.  For Reactive triggers, "
            "this may be as low as 0 seconds, but must be at least 10 seconds for "
            "Proactive triggers"
        ),
    )

CompoundTrigger

Bases: CompositeTrigger

A composite trigger that requires some number of triggers to have fired within the given time period

Source code in src/prefect/events/schemas/automations.py
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
class CompoundTrigger(CompositeTrigger):
    """A composite trigger that requires some number of triggers to have
    fired within the given time period"""

    type: Literal["compound"] = "compound"
    require: Union[int, Literal["any", "all"]]

    @model_validator(mode="after")
    def validate_require(self) -> Self:
        if isinstance(self.require, int):
            if self.require < 1:
                raise ValueError("require must be at least 1")
            if self.require > len(self.triggers):
                raise ValueError(
                    "require must be less than or equal to the number of triggers"
                )

        return self

    def describe_for_cli(self, indent: int = 0) -> str:
        """Return a human-readable description of this trigger for the CLI"""
        return textwrap.indent(
            "\n".join(
                [
                    f"{str(self.require).capitalize()} of:",
                    "\n".join(
                        [
                            trigger.describe_for_cli(indent=indent + 1)
                            for trigger in self.triggers
                        ]
                    ),
                ]
            ),
            prefix="  " * indent,
        )

describe_for_cli(indent=0)

Return a human-readable description of this trigger for the CLI

Source code in src/prefect/events/schemas/automations.py
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
def describe_for_cli(self, indent: int = 0) -> str:
    """Return a human-readable description of this trigger for the CLI"""
    return textwrap.indent(
        "\n".join(
            [
                f"{str(self.require).capitalize()} of:",
                "\n".join(
                    [
                        trigger.describe_for_cli(indent=indent + 1)
                        for trigger in self.triggers
                    ]
                ),
            ]
        ),
        prefix="  " * indent,
    )

DeclareIncident

Bases: Action

Declares an incident for the triggering event. Only available on Prefect Cloud

Source code in src/prefect/events/actions.py
268
269
270
271
class DeclareIncident(Action):
    """Declares an incident for the triggering event.  Only available on Prefect Cloud"""

    type: Literal["declare-incident"] = "declare-incident"

DeploymentCompoundTrigger

Bases: BaseDeploymentTrigger, CompoundTrigger

A composite trigger that requires some number of triggers to have fired within the given time period

Source code in src/prefect/events/schemas/deployment_triggers.py
90
91
92
93
94
class DeploymentCompoundTrigger(BaseDeploymentTrigger, CompoundTrigger):
    """A composite trigger that requires some number of triggers to have
    fired within the given time period"""

    trigger_type: ClassVar[Type[TriggerTypes]] = CompoundTrigger

DeploymentEventTrigger

Bases: BaseDeploymentTrigger, EventTrigger

A trigger that fires based on the presence or absence of events within a given period of time.

Source code in src/prefect/events/schemas/deployment_triggers.py
73
74
75
76
77
78
79
class DeploymentEventTrigger(BaseDeploymentTrigger, EventTrigger):
    """
    A trigger that fires based on the presence or absence of events within a given
    period of time.
    """

    trigger_type: ClassVar[Type[TriggerTypes]] = EventTrigger

DeploymentMetricTrigger

Bases: BaseDeploymentTrigger, MetricTrigger

A trigger that fires based on the results of a metric query.

Source code in src/prefect/events/schemas/deployment_triggers.py
82
83
84
85
86
87
class DeploymentMetricTrigger(BaseDeploymentTrigger, MetricTrigger):
    """
    A trigger that fires based on the results of a metric query.
    """

    trigger_type: ClassVar[Type[TriggerTypes]] = MetricTrigger

DeploymentSequenceTrigger

Bases: BaseDeploymentTrigger, SequenceTrigger

A composite trigger that requires some number of triggers to have fired within the given time period in a specific order

Source code in src/prefect/events/schemas/deployment_triggers.py
 97
 98
 99
100
101
class DeploymentSequenceTrigger(BaseDeploymentTrigger, SequenceTrigger):
    """A composite trigger that requires some number of triggers to have fired
    within the given time period in a specific order"""

    trigger_type: ClassVar[Type[TriggerTypes]] = SequenceTrigger

DoNothing

Bases: Action

Do nothing when an Automation is triggered

Source code in src/prefect/events/actions.py
22
23
24
25
class DoNothing(Action):
    """Do nothing when an Automation is triggered"""

    type: Literal["do-nothing"] = "do-nothing"

Event

Bases: PrefectBaseModel

The client-side view of an event that has happened to a Resource

Source code in src/prefect/events/schemas/events.py
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
class Event(PrefectBaseModel):
    """The client-side view of an event that has happened to a Resource"""

    model_config = ConfigDict(extra="ignore")

    occurred: DateTime = Field(
        default_factory=lambda: pendulum.now("UTC"),
        description="When the event happened from the sender's perspective",
    )
    event: str = Field(
        description="The name of the event that happened",
    )
    resource: Resource = Field(
        description="The primary Resource this event concerns",
    )
    related: List[RelatedResource] = Field(
        default_factory=list,
        description="A list of additional Resources involved in this event",
    )
    payload: Dict[str, Any] = Field(
        default_factory=dict,
        description="An open-ended set of data describing what happened",
    )
    id: UUID = Field(
        default_factory=uuid4,
        description="The client-provided identifier of this event",
    )
    follows: Optional[UUID] = Field(
        default=None,
        description=(
            "The ID of an event that is known to have occurred prior to this one. "
            "If set, this may be used to establish a more precise ordering of causally-"
            "related events when they occur close enough together in time that the "
            "system may receive them out-of-order."
        ),
    )

    @property
    def involved_resources(self) -> Sequence[Resource]:
        return [self.resource] + list(self.related)

    @property
    def resource_in_role(self) -> Mapping[str, RelatedResource]:
        """Returns a mapping of roles to the first related resource in that role"""
        return {related.role: related for related in reversed(self.related)}

    @property
    def resources_in_role(self) -> Mapping[str, Sequence[RelatedResource]]:
        """Returns a mapping of roles to related resources in that role"""
        resources: Dict[str, List[RelatedResource]] = defaultdict(list)
        for related in self.related:
            resources[related.role].append(related)
        return resources

    @field_validator("related")
    @classmethod
    def enforce_maximum_related_resources(cls, value: List[RelatedResource]):
        if len(value) > PREFECT_EVENTS_MAXIMUM_RELATED_RESOURCES.value():
            raise ValueError(
                "The maximum number of related resources "
                f"is {PREFECT_EVENTS_MAXIMUM_RELATED_RESOURCES.value()}"
            )

        return value

    def find_resource_label(self, label: str) -> Optional[str]:
        """Finds the value of the given label in this event's resource or one of its
        related resources.  If the label starts with `related:<role>:`, search for the
        first matching label in a related resource with that role."""
        directive, _, related_label = label.rpartition(":")
        directive, _, role = directive.partition(":")
        if directive == "related":
            for related in self.related:
                if related.role == role:
                    return related.get(related_label)
        return self.resource.get(label)

resource_in_role: Mapping[str, RelatedResource] property

Returns a mapping of roles to the first related resource in that role

resources_in_role: Mapping[str, Sequence[RelatedResource]] property

Returns a mapping of roles to related resources in that role

find_resource_label(label)

Finds the value of the given label in this event's resource or one of its related resources. If the label starts with related:<role>:, search for the first matching label in a related resource with that role.

Source code in src/prefect/events/schemas/events.py
158
159
160
161
162
163
164
165
166
167
168
def find_resource_label(self, label: str) -> Optional[str]:
    """Finds the value of the given label in this event's resource or one of its
    related resources.  If the label starts with `related:<role>:`, search for the
    first matching label in a related resource with that role."""
    directive, _, related_label = label.rpartition(":")
    directive, _, role = directive.partition(":")
    if directive == "related":
        for related in self.related:
            if related.role == role:
                return related.get(related_label)
    return self.resource.get(label)

EventTrigger

Bases: ResourceTrigger

A trigger that fires based on the presence or absence of events within a given period of time.

Source code in src/prefect/events/schemas/automations.py
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
class EventTrigger(ResourceTrigger):
    """
    A trigger that fires based on the presence or absence of events within a given
    period of time.
    """

    type: Literal["event"] = "event"

    after: Set[str] = Field(
        default_factory=set,
        description=(
            "The event(s) which must first been seen to fire this trigger.  If "
            "empty, then fire this trigger immediately.  Events may include "
            "trailing wildcards, like `prefect.flow-run.*`"
        ),
    )
    expect: Set[str] = Field(
        default_factory=set,
        description=(
            "The event(s) this trigger is expecting to see.  If empty, this "
            "trigger will match any event.  Events may include trailing wildcards, "
            "like `prefect.flow-run.*`"
        ),
    )

    for_each: Set[str] = Field(
        default_factory=set,
        description=(
            "Evaluate the trigger separately for each distinct value of these labels "
            "on the resource.  By default, labels refer to the primary resource of the "
            "triggering event.  You may also refer to labels from related "
            "resources by specifying `related:<role>:<label>`.  This will use the "
            "value of that label for the first related resource in that role.  For "
            'example, `"for_each": ["related:flow:prefect.resource.id"]` would '
            "evaluate the trigger for each flow."
        ),
    )
    posture: Literal[Posture.Reactive, Posture.Proactive] = Field(  # type: ignore[valid-type]
        default=Posture.Reactive,
        description=(
            "The posture of this trigger, either Reactive or Proactive.  Reactive "
            "triggers respond to the _presence_ of the expected events, while "
            "Proactive triggers respond to the _absence_ of those expected events."
        ),
    )
    threshold: int = Field(
        default=1,
        description=(
            "The number of events required for this trigger to fire (for "
            "Reactive triggers), or the number of events expected (for Proactive "
            "triggers)"
        ),
    )
    within: timedelta = Field(
        default=timedelta(seconds=0),
        ge=timedelta(seconds=0),
        description=(
            "The time period over which the events must occur.  For Reactive triggers, "
            "this may be as low as 0 seconds, but must be at least 10 seconds for "
            "Proactive triggers"
        ),
    )

    @model_validator(mode="before")
    @classmethod
    def enforce_minimum_within_for_proactive_triggers(
        cls, data: Dict[str, Any]
    ) -> Dict[str, Any]:
        if not isinstance(data, dict):
            return data

        if "within" in data and data["within"] is None:
            raise ValueError("`within` should be a valid timedelta")

        posture: Optional[Posture] = data.get("posture")
        within: Optional[timedelta] = data.get("within")

        if isinstance(within, (int, float)):
            within = timedelta(seconds=within)

        if posture == Posture.Proactive:
            if not within or within == timedelta(0):
                within = timedelta(seconds=10.0)
            elif within < timedelta(seconds=10.0):
                raise ValueError(
                    "`within` for Proactive triggers must be greater than or equal to "
                    "10 seconds"
                )

        return data | {"within": within} if within else data

    def describe_for_cli(self, indent: int = 0) -> str:
        """Return a human-readable description of this trigger for the CLI"""
        if self.posture == Posture.Reactive:
            return textwrap.indent(
                "\n".join(
                    [
                        f"Reactive: expecting {self.threshold} of {self.expect}",
                    ],
                ),
                prefix="  " * indent,
            )
        else:
            return textwrap.indent(
                "\n".join(
                    [
                        f"Proactive: expecting {self.threshold} {self.expect} event "
                        f"within {self.within}",
                    ],
                ),
                prefix="  " * indent,
            )

describe_for_cli(indent=0)

Return a human-readable description of this trigger for the CLI

Source code in src/prefect/events/schemas/automations.py
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
def describe_for_cli(self, indent: int = 0) -> str:
    """Return a human-readable description of this trigger for the CLI"""
    if self.posture == Posture.Reactive:
        return textwrap.indent(
            "\n".join(
                [
                    f"Reactive: expecting {self.threshold} of {self.expect}",
                ],
            ),
            prefix="  " * indent,
        )
    else:
        return textwrap.indent(
            "\n".join(
                [
                    f"Proactive: expecting {self.threshold} {self.expect} event "
                    f"within {self.within}",
                ],
            ),
            prefix="  " * indent,
        )

MetricTrigger

Bases: ResourceTrigger

A trigger that fires based on the results of a metric query.

Source code in src/prefect/events/schemas/automations.py
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
class MetricTrigger(ResourceTrigger):
    """
    A trigger that fires based on the results of a metric query.
    """

    type: Literal["metric"] = "metric"

    posture: Literal[Posture.Metric] = Field(  # type: ignore[valid-type]
        Posture.Metric,
        description="Periodically evaluate the configured metric query.",
    )

    metric: MetricTriggerQuery = Field(
        ...,
        description="The metric query to evaluate for this trigger. ",
    )

    def describe_for_cli(self, indent: int = 0) -> str:
        """Return a human-readable description of this trigger for the CLI"""
        m = self.metric
        return textwrap.indent(
            "\n".join(
                [
                    f"Metric: {m.name.value} {m.operator.value} {m.threshold} for {m.range}",
                ]
            ),
            prefix="  " * indent,
        )

describe_for_cli(indent=0)

Return a human-readable description of this trigger for the CLI

Source code in src/prefect/events/schemas/automations.py
303
304
305
306
307
308
309
310
311
312
313
def describe_for_cli(self, indent: int = 0) -> str:
    """Return a human-readable description of this trigger for the CLI"""
    m = self.metric
    return textwrap.indent(
        "\n".join(
            [
                f"Metric: {m.name.value} {m.operator.value} {m.threshold} for {m.range}",
            ]
        ),
        prefix="  " * indent,
    )

MetricTriggerQuery

Bases: PrefectBaseModel

Defines a subset of the Trigger subclass, which is specific to Metric automations, that specify the query configurations and breaching conditions for the Automation

Source code in src/prefect/events/schemas/automations.py
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
class MetricTriggerQuery(PrefectBaseModel):
    """Defines a subset of the Trigger subclass, which is specific
    to Metric automations, that specify the query configurations
    and breaching conditions for the Automation"""

    name: PrefectMetric = Field(
        ...,
        description="The name of the metric to query.",
    )
    threshold: float = Field(
        ...,
        description=(
            "The threshold value against which we'll compare " "the query result."
        ),
    )
    operator: MetricTriggerOperator = Field(
        ...,
        description=(
            "The comparative operator (LT / LTE / GT / GTE) used to compare "
            "the query result against the threshold value."
        ),
    )
    range: timedelta = Field(
        timedelta(seconds=300),  # defaults to 5 minutes
        description=(
            "The lookback duration (seconds) for a metric query. This duration is "
            "used to determine the time range over which the query will be executed. "
            "The minimum value is 300 seconds (5 minutes)."
        ),
    )
    firing_for: timedelta = Field(
        timedelta(seconds=300),  # defaults to 5 minutes
        description=(
            "The duration (seconds) for which the metric query must breach "
            "or resolve continuously before the state is updated and the "
            "automation is triggered. "
            "The minimum value is 300 seconds (5 minutes)."
        ),
    )

    @field_validator("range", "firing_for")
    def enforce_minimum_range(cls, value: timedelta):
        if value < timedelta(seconds=300):
            raise ValueError("The minimum range is 300 seconds (5 minutes)")
        return value

PauseAutomation

Bases: AutomationAction

Pauses a Work Queue

Source code in src/prefect/events/actions.py
256
257
258
259
class PauseAutomation(AutomationAction):
    """Pauses a Work Queue"""

    type: Literal["pause-automation"] = "pause-automation"

PauseDeployment

Bases: DeploymentAction

Pauses the given Deployment

Source code in src/prefect/events/actions.py
79
80
81
82
class PauseDeployment(DeploymentAction):
    """Pauses the given Deployment"""

    type: Literal["pause-deployment"] = "pause-deployment"

PauseWorkPool

Bases: WorkPoolAction

Pauses a Work Pool

Source code in src/prefect/events/actions.py
172
173
174
175
class PauseWorkPool(WorkPoolAction):
    """Pauses a Work Pool"""

    type: Literal["pause-work-pool"] = "pause-work-pool"

PauseWorkQueue

Bases: WorkQueueAction

Pauses a Work Queue

Source code in src/prefect/events/actions.py
214
215
216
217
class PauseWorkQueue(WorkQueueAction):
    """Pauses a Work Queue"""

    type: Literal["pause-work-queue"] = "pause-work-queue"

ReceivedEvent

Bases: Event

The server-side view of an event that has happened to a Resource after it has been received by the server

Source code in src/prefect/events/schemas/events.py
171
172
173
174
175
176
177
178
179
180
class ReceivedEvent(Event):
    """The server-side view of an event that has happened to a Resource after it has
    been received by the server"""

    model_config = ConfigDict(from_attributes=True)

    received: DateTime = Field(
        ...,
        description="When the event was received by Prefect Cloud",
    )

RelatedResource

Bases: Resource

A Resource with a specific role in an Event

Source code in src/prefect/events/schemas/events.py
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
class RelatedResource(Resource):
    """A Resource with a specific role in an Event"""

    @model_validator(mode="after")
    def requires_resource_role(self) -> Self:
        if "prefect.resource.role" not in self.root:
            raise ValueError(
                "Related Resources must include the prefect.resource.role label"
            )
        if not self.root["prefect.resource.role"]:
            raise ValueError("The prefect.resource.role label must be non-empty")

        return self

    @property
    def role(self) -> str:
        return self["prefect.resource.role"]

Resource

Bases: Labelled

An observable business object of interest to the user

Source code in src/prefect/events/schemas/events.py
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
class Resource(Labelled):
    """An observable business object of interest to the user"""

    @model_validator(mode="after")
    def enforce_maximum_labels(self) -> Self:
        if len(self.root) > PREFECT_EVENTS_MAXIMUM_LABELS_PER_RESOURCE.value():
            raise ValueError(
                "The maximum number of labels per resource "
                f"is {PREFECT_EVENTS_MAXIMUM_LABELS_PER_RESOURCE.value()}"
            )

        return self

    @model_validator(mode="after")
    def requires_resource_id(self) -> Self:
        if "prefect.resource.id" not in self.root:
            raise ValueError("Resources must include the prefect.resource.id label")
        if not self.root["prefect.resource.id"]:
            raise ValueError("The prefect.resource.id label must be non-empty")

        return self

    @property
    def id(self) -> str:
        return self["prefect.resource.id"]

    @property
    def name(self) -> Optional[str]:
        return self.get("prefect.resource.name")

    def prefect_object_id(self, kind: str) -> UUID:
        """Extracts the UUID from an event's resource ID if it's the expected kind
        of prefect resource"""
        prefix = f"{kind}." if not kind.endswith(".") else kind

        if not self.id.startswith(prefix):
            raise ValueError(f"Resource ID {self.id} does not start with {prefix}")

        return UUID(self.id[len(prefix) :])

prefect_object_id(kind)

Extracts the UUID from an event's resource ID if it's the expected kind of prefect resource

Source code in src/prefect/events/schemas/events.py
63
64
65
66
67
68
69
70
71
def prefect_object_id(self, kind: str) -> UUID:
    """Extracts the UUID from an event's resource ID if it's the expected kind
    of prefect resource"""
    prefix = f"{kind}." if not kind.endswith(".") else kind

    if not self.id.startswith(prefix):
        raise ValueError(f"Resource ID {self.id} does not start with {prefix}")

    return UUID(self.id[len(prefix) :])

ResourceTrigger

Bases: Trigger, ABC

Base class for triggers that may filter by the labels of resources.

Source code in src/prefect/events/schemas/automations.py
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
class ResourceTrigger(Trigger, abc.ABC):
    """
    Base class for triggers that may filter by the labels of resources.
    """

    type: str

    match: ResourceSpecification = Field(
        default_factory=lambda: ResourceSpecification.model_validate({}),
        description="Labels for resources which this trigger will match.",
    )
    match_related: ResourceSpecification = Field(
        default_factory=lambda: ResourceSpecification.model_validate({}),
        description="Labels for related resources which this trigger will match.",
    )

ResumeAutomation

Bases: AutomationAction

Resumes a Work Queue

Source code in src/prefect/events/actions.py
262
263
264
265
class ResumeAutomation(AutomationAction):
    """Resumes a Work Queue"""

    type: Literal["resume-automation"] = "resume-automation"

ResumeDeployment

Bases: DeploymentAction

Resumes the given Deployment

Source code in src/prefect/events/actions.py
85
86
87
88
class ResumeDeployment(DeploymentAction):
    """Resumes the given Deployment"""

    type: Literal["resume-deployment"] = "resume-deployment"

ResumeWorkPool

Bases: WorkPoolAction

Resumes a Work Pool

Source code in src/prefect/events/actions.py
178
179
180
181
class ResumeWorkPool(WorkPoolAction):
    """Resumes a Work Pool"""

    type: Literal["resume-work-pool"] = "resume-work-pool"

ResumeWorkQueue

Bases: WorkQueueAction

Resumes a Work Queue

Source code in src/prefect/events/actions.py
220
221
222
223
class ResumeWorkQueue(WorkQueueAction):
    """Resumes a Work Queue"""

    type: Literal["resume-work-queue"] = "resume-work-queue"

RunDeployment

Bases: DeploymentAction

Runs the given deployment with the given parameters

Source code in src/prefect/events/actions.py
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
class RunDeployment(DeploymentAction):
    """Runs the given deployment with the given parameters"""

    type: Literal["run-deployment"] = "run-deployment"

    parameters: Optional[Dict[str, Any]] = Field(
        None,
        description=(
            "The parameters to pass to the deployment, or None to use the "
            "deployment's default parameters"
        ),
    )
    job_variables: Optional[Dict[str, Any]] = Field(
        None,
        description=(
            "The job variables to pass to the created flow run, or None "
            "to use the deployment's default job variables"
        ),
    )

SendNotification

Bases: Action

Send a notification when an Automation is triggered

Source code in src/prefect/events/actions.py
141
142
143
144
145
146
147
148
149
class SendNotification(Action):
    """Send a notification when an Automation is triggered"""

    type: Literal["send-notification"] = "send-notification"
    block_document_id: UUID = Field(
        description="The identifier of the notification block to use"
    )
    subject: str = Field("Prefect automated notification")
    body: str = Field(description="The text of the notification to send")

SequenceTrigger

Bases: CompositeTrigger

A composite trigger that requires some number of triggers to have fired within the given time period in a specific order

Source code in src/prefect/events/schemas/automations.py
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
class SequenceTrigger(CompositeTrigger):
    """A composite trigger that requires some number of triggers to have fired
    within the given time period in a specific order"""

    type: Literal["sequence"] = "sequence"

    def describe_for_cli(self, indent: int = 0) -> str:
        """Return a human-readable description of this trigger for the CLI"""
        return textwrap.indent(
            "\n".join(
                [
                    "In this order:",
                    "\n".join(
                        [
                            trigger.describe_for_cli(indent=indent + 1)
                            for trigger in self.triggers
                        ]
                    ),
                ]
            ),
            prefix="  " * indent,
        )

describe_for_cli(indent=0)

Return a human-readable description of this trigger for the CLI

Source code in src/prefect/events/schemas/automations.py
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
def describe_for_cli(self, indent: int = 0) -> str:
    """Return a human-readable description of this trigger for the CLI"""
    return textwrap.indent(
        "\n".join(
            [
                "In this order:",
                "\n".join(
                    [
                        trigger.describe_for_cli(indent=indent + 1)
                        for trigger in self.triggers
                    ]
                ),
            ]
        ),
        prefix="  " * indent,
    )

SuspendFlowRun

Bases: Action

Suspends a flow run associated with the trigger

Source code in src/prefect/events/actions.py
122
123
124
125
class SuspendFlowRun(Action):
    """Suspends a flow run associated with the trigger"""

    type: Literal["suspend-flow-run"] = "suspend-flow-run"

Trigger

Bases: PrefectBaseModel, ABC

Base class describing a set of criteria that must be satisfied in order to trigger an automation.

Source code in src/prefect/events/schemas/automations.py
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
class Trigger(PrefectBaseModel, abc.ABC, extra="ignore"):  # type: ignore[call-arg]
    """
    Base class describing a set of criteria that must be satisfied in order to trigger
    an automation.
    """

    type: str

    @abc.abstractmethod
    def describe_for_cli(self, indent: int = 0) -> str:
        """Return a human-readable description of this trigger for the CLI"""

    # The following allows the regular Trigger class to be used when serving or
    # deploying flows, analogous to how the Deployment*Trigger classes work

    _deployment_id: Optional[UUID] = PrivateAttr(default=None)

    def set_deployment_id(self, deployment_id: UUID):
        self._deployment_id = deployment_id

    def owner_resource(self) -> Optional[str]:
        return f"prefect.deployment.{self._deployment_id}"

    def actions(self) -> List[ActionTypes]:
        assert self._deployment_id
        return [
            RunDeployment(
                source="selected",
                deployment_id=self._deployment_id,
                parameters=getattr(self, "parameters", None),
                job_variables=getattr(self, "job_variables", None),
            )
        ]

    def as_automation(self) -> "AutomationCore":
        assert self._deployment_id

        trigger: TriggerTypes = cast(TriggerTypes, self)

        # This is one of the Deployment*Trigger classes, so translate it over to a
        # plain Trigger
        if hasattr(self, "trigger_type"):
            trigger = self.trigger_type(**self.model_dump())

        return AutomationCore(
            name=(
                getattr(self, "name", None)
                or f"Automation for deployment {self._deployment_id}"
            ),
            description="",
            enabled=getattr(self, "enabled", True),
            trigger=trigger,
            actions=self.actions(),
            owner_resource=self.owner_resource(),
        )

describe_for_cli(indent=0) abstractmethod

Return a human-readable description of this trigger for the CLI

Source code in src/prefect/events/schemas/automations.py
46
47
48
@abc.abstractmethod
def describe_for_cli(self, indent: int = 0) -> str:
    """Return a human-readable description of this trigger for the CLI"""

emit_event(event, resource, occurred=None, related=None, payload=None, id=None, follows=None)

Send an event to Prefect Cloud.

Parameters:

Name Type Description Default
event str

The name of the event that happened.

required
resource Dict[str, str]

The primary Resource this event concerns.

required
occurred Optional[DateTime]

When the event happened from the sender's perspective. Defaults to the current datetime.

None
related Optional[Union[List[Dict[str, str]], List[RelatedResource]]]

A list of additional Resources involved in this event.

None
payload Optional[Dict[str, Any]]

An open-ended set of data describing what happened.

None
id Optional[UUID]

The sender-provided identifier for this event. Defaults to a random UUID.

None
follows Optional[Event]

The event that preceded this one. If the preceding event happened more than 5 minutes prior to this event the follows relationship will not be set.

None

Returns:

Type Description
Optional[Event]

The event that was emitted if worker is using a client that emit

Optional[Event]

events, otherwise None

Source code in src/prefect/events/utilities.py
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
def emit_event(
    event: str,
    resource: Dict[str, str],
    occurred: Optional[DateTime] = None,
    related: Optional[Union[List[Dict[str, str]], List[RelatedResource]]] = None,
    payload: Optional[Dict[str, Any]] = None,
    id: Optional[UUID] = None,
    follows: Optional[Event] = None,
) -> Optional[Event]:
    """
    Send an event to Prefect Cloud.

    Args:
        event: The name of the event that happened.
        resource: The primary Resource this event concerns.
        occurred: When the event happened from the sender's perspective.
                  Defaults to the current datetime.
        related: A list of additional Resources involved in this event.
        payload: An open-ended set of data describing what happened.
        id: The sender-provided identifier for this event. Defaults to a random
            UUID.
        follows: The event that preceded this one. If the preceding event
            happened more than 5 minutes prior to this event the follows
            relationship will not be set.

    Returns:
        The event that was emitted if worker is using a client that emit
        events, otherwise None
    """
    if not should_emit_events():
        return None

    operational_clients = [
        AssertingPassthroughEventsClient,
        AssertingEventsClient,
        PrefectCloudEventsClient,
        PrefectEventsClient,
    ]
    worker_instance = EventsWorker.instance()

    if worker_instance.client_type not in operational_clients:
        return None

    event_kwargs: Dict[str, Any] = {
        "event": event,
        "resource": resource,
    }

    if occurred is None:
        occurred = pendulum.now("UTC")
    event_kwargs["occurred"] = occurred

    if related is not None:
        event_kwargs["related"] = related

    if payload is not None:
        event_kwargs["payload"] = payload

    if id is not None:
        event_kwargs["id"] = id

    if follows is not None:
        if -TIGHT_TIMING < (occurred - follows.occurred) < TIGHT_TIMING:
            event_kwargs["follows"] = follows.id

    event_obj = Event(**event_kwargs)
    worker_instance.send(event_obj)

    return event_obj