Skip to content

prefect.automations

Automation

Bases: AutomationCore

Source code in src/prefect/automations.py
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
class Automation(AutomationCore):
    id: Optional[UUID] = Field(default=None, description="The ID of this automation")

    @sync_compatible
    async def create(self: Self) -> Self:
        """
        Create a new automation.

        auto_to_create = Automation(
            name="woodchonk",
            trigger=EventTrigger(
                expect={"animal.walked"},
                match={
                    "genus": "Marmota",
                    "species": "monax",
                },
                posture="Reactive",
                threshold=3,
                within=timedelta(seconds=10),
            ),
            actions=[CancelFlowRun()]
        )
        created_automation = auto_to_create.create()
        """
        client, _ = get_or_create_client()
        automation = AutomationCore(**self.model_dump(exclude={"id"}))
        self.id = await client.create_automation(automation=automation)
        return self

    @sync_compatible
    async def update(self: Self):
        """
        Updates an existing automation.
        auto = Automation.read(id=123)
        auto.name = "new name"
        auto.update()
        """

        client, _ = get_or_create_client()
        automation = AutomationCore(**self.model_dump(exclude={"id", "owner_resource"}))
        await client.update_automation(automation_id=self.id, automation=automation)

    @classmethod
    @sync_compatible
    async def read(
        cls: Self, id: Optional[UUID] = None, name: Optional[str] = None
    ) -> Self:
        """
        Read an automation by ID or name.
        automation = Automation.read(name="woodchonk")

        or

        automation = Automation.read(id=UUID("b3514963-02b1-47a5-93d1-6eeb131041cb"))
        """
        if id and name:
            raise ValueError("Only one of id or name can be provided")
        if not id and not name:
            raise ValueError("One of id or name must be provided")
        client, _ = get_or_create_client()
        if id:
            try:
                automation = await client.read_automation(automation_id=id)
            except PrefectHTTPStatusError as exc:
                if exc.response.status_code == 404:
                    raise ValueError(f"Automation with ID {id!r} not found")
            return Automation(**automation.model_dump())
        else:
            automation = await client.read_automations_by_name(name=name)
            if len(automation) > 0:
                return Automation(**automation[0].model_dump()) if automation else None
            else:
                raise ValueError(f"Automation with name {name!r} not found")

    @sync_compatible
    async def delete(self: Self) -> bool:
        """
        auto = Automation.read(id = 123)
        auto.delete()
        """
        try:
            client, _ = get_or_create_client()
            await client.delete_automation(self.id)
            return True
        except PrefectHTTPStatusError as exc:
            if exc.response.status_code == 404:
                return False
            raise

    @sync_compatible
    async def disable(self: Self) -> bool:
        """
        Disable an automation.
        auto = Automation.read(id = 123)
        auto.disable()
        """
        try:
            client, _ = get_or_create_client()
            await client.pause_automation(self.id)
            return True
        except PrefectHTTPStatusError as exc:
            if exc.response.status_code == 404:
                return False
            raise

    @sync_compatible
    async def enable(self: Self) -> bool:
        """
        Enable an automation.
        auto = Automation.read(id = 123)
        auto.enable()
        """
        try:
            client, _ = get_or_create_client()
            await client.resume_automation("asd")
            return True
        except PrefectHTTPStatusError as exc:
            if exc.response.status_code == 404:
                return False
            raise

create() async

Create a new automation.

auto_to_create = Automation( name="woodchonk", trigger=EventTrigger( expect={"animal.walked"}, match={ "genus": "Marmota", "species": "monax", }, posture="Reactive", threshold=3, within=timedelta(seconds=10), ), actions=[CancelFlowRun()] ) created_automation = auto_to_create.create()

Source code in src/prefect/automations.py
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
@sync_compatible
async def create(self: Self) -> Self:
    """
    Create a new automation.

    auto_to_create = Automation(
        name="woodchonk",
        trigger=EventTrigger(
            expect={"animal.walked"},
            match={
                "genus": "Marmota",
                "species": "monax",
            },
            posture="Reactive",
            threshold=3,
            within=timedelta(seconds=10),
        ),
        actions=[CancelFlowRun()]
    )
    created_automation = auto_to_create.create()
    """
    client, _ = get_or_create_client()
    automation = AutomationCore(**self.model_dump(exclude={"id"}))
    self.id = await client.create_automation(automation=automation)
    return self

delete() async

auto = Automation.read(id = 123) auto.delete()

Source code in src/prefect/automations.py
152
153
154
155
156
157
158
159
160
161
162
163
164
165
@sync_compatible
async def delete(self: Self) -> bool:
    """
    auto = Automation.read(id = 123)
    auto.delete()
    """
    try:
        client, _ = get_or_create_client()
        await client.delete_automation(self.id)
        return True
    except PrefectHTTPStatusError as exc:
        if exc.response.status_code == 404:
            return False
        raise

disable() async

Disable an automation. auto = Automation.read(id = 123) auto.disable()

Source code in src/prefect/automations.py
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
@sync_compatible
async def disable(self: Self) -> bool:
    """
    Disable an automation.
    auto = Automation.read(id = 123)
    auto.disable()
    """
    try:
        client, _ = get_or_create_client()
        await client.pause_automation(self.id)
        return True
    except PrefectHTTPStatusError as exc:
        if exc.response.status_code == 404:
            return False
        raise

enable() async

Enable an automation. auto = Automation.read(id = 123) auto.enable()

Source code in src/prefect/automations.py
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
@sync_compatible
async def enable(self: Self) -> bool:
    """
    Enable an automation.
    auto = Automation.read(id = 123)
    auto.enable()
    """
    try:
        client, _ = get_or_create_client()
        await client.resume_automation("asd")
        return True
    except PrefectHTTPStatusError as exc:
        if exc.response.status_code == 404:
            return False
        raise

read(id=None, name=None) async classmethod

Read an automation by ID or name. automation = Automation.read(name="woodchonk")

or

automation = Automation.read(id=UUID("b3514963-02b1-47a5-93d1-6eeb131041cb"))

Source code in src/prefect/automations.py
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
@classmethod
@sync_compatible
async def read(
    cls: Self, id: Optional[UUID] = None, name: Optional[str] = None
) -> Self:
    """
    Read an automation by ID or name.
    automation = Automation.read(name="woodchonk")

    or

    automation = Automation.read(id=UUID("b3514963-02b1-47a5-93d1-6eeb131041cb"))
    """
    if id and name:
        raise ValueError("Only one of id or name can be provided")
    if not id and not name:
        raise ValueError("One of id or name must be provided")
    client, _ = get_or_create_client()
    if id:
        try:
            automation = await client.read_automation(automation_id=id)
        except PrefectHTTPStatusError as exc:
            if exc.response.status_code == 404:
                raise ValueError(f"Automation with ID {id!r} not found")
        return Automation(**automation.model_dump())
    else:
        automation = await client.read_automations_by_name(name=name)
        if len(automation) > 0:
            return Automation(**automation[0].model_dump()) if automation else None
        else:
            raise ValueError(f"Automation with name {name!r} not found")

update() async

Updates an existing automation. auto = Automation.read(id=123) auto.name = "new name" auto.update()

Source code in src/prefect/automations.py
107
108
109
110
111
112
113
114
115
116
117
118
@sync_compatible
async def update(self: Self):
    """
    Updates an existing automation.
    auto = Automation.read(id=123)
    auto.name = "new name"
    auto.update()
    """

    client, _ = get_or_create_client()
    automation = AutomationCore(**self.model_dump(exclude={"id", "owner_resource"}))
    await client.update_automation(automation_id=self.id, automation=automation)

AutomationCore

Bases: PrefectBaseModel

Defines an action a user wants to take when a certain number of events do or don't happen to the matching resources

Source code in src/prefect/events/schemas/automations.py
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
class AutomationCore(PrefectBaseModel, extra="ignore"):  # type: ignore[call-arg]
    """Defines an action a user wants to take when a certain number of events
    do or don't happen to the matching resources"""

    name: str = Field(..., description="The name of this automation")
    description: str = Field("", description="A longer description of this automation")

    enabled: bool = Field(True, description="Whether this automation will be evaluated")

    trigger: TriggerTypes = Field(
        ...,
        description=(
            "The criteria for which events this Automation covers and how it will "
            "respond to the presence or absence of those events"
        ),
    )

    actions: List[ActionTypes] = Field(
        ...,
        description="The actions to perform when this Automation triggers",
    )

    actions_on_trigger: List[ActionTypes] = Field(
        default_factory=list,
        description="The actions to perform when an Automation goes into a triggered state",
    )

    actions_on_resolve: List[ActionTypes] = Field(
        default_factory=list,
        description="The actions to perform when an Automation goes into a resolving state",
    )

    owner_resource: Optional[str] = Field(
        default=None, description="The owning resource of this automation"
    )

CallWebhook

Bases: Action

Call a webhook when an Automation is triggered.

Source code in src/prefect/events/actions.py
128
129
130
131
132
133
134
135
136
137
138
class CallWebhook(Action):
    """Call a webhook when an Automation is triggered."""

    type: Literal["call-webhook"] = "call-webhook"
    block_document_id: UUID = Field(
        description="The identifier of the webhook block to use"
    )
    payload: str = Field(
        default="",
        description="An optional templatable payload to send when calling the webhook.",
    )

CancelFlowRun

Bases: Action

Cancels a flow run associated with the trigger

Source code in src/prefect/events/actions.py
110
111
112
113
class CancelFlowRun(Action):
    """Cancels a flow run associated with the trigger"""

    type: Literal["cancel-flow-run"] = "cancel-flow-run"

ChangeFlowRunState

Bases: Action

Changes the state of a flow run associated with the trigger

Source code in src/prefect/events/actions.py
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
class ChangeFlowRunState(Action):
    """Changes the state of a flow run associated with the trigger"""

    type: Literal["change-flow-run-state"] = "change-flow-run-state"

    name: Optional[str] = Field(
        None,
        description="The name of the state to change the flow run to",
    )
    state: StateType = Field(
        ...,
        description="The type of the state to change the flow run to",
    )
    message: Optional[str] = Field(
        None,
        description="An optional message to associate with the state change",
    )

CompositeTrigger

Bases: Trigger, ABC

Requires some number of triggers to have fired within the given time period.

Source code in src/prefect/events/schemas/automations.py
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
class CompositeTrigger(Trigger, abc.ABC):
    """
    Requires some number of triggers to have fired within the given time period.
    """

    type: Literal["compound", "sequence"]
    triggers: List["TriggerTypes"]
    within: Optional[timedelta] = Field(
        None,
        description=(
            "The time period over which the events must occur.  For Reactive triggers, "
            "this may be as low as 0 seconds, but must be at least 10 seconds for "
            "Proactive triggers"
        ),
    )

CompoundTrigger

Bases: CompositeTrigger

A composite trigger that requires some number of triggers to have fired within the given time period

Source code in src/prefect/events/schemas/automations.py
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
class CompoundTrigger(CompositeTrigger):
    """A composite trigger that requires some number of triggers to have
    fired within the given time period"""

    type: Literal["compound"] = "compound"
    require: Union[int, Literal["any", "all"]]

    @model_validator(mode="after")
    def validate_require(self) -> Self:
        if isinstance(self.require, int):
            if self.require < 1:
                raise ValueError("require must be at least 1")
            if self.require > len(self.triggers):
                raise ValueError(
                    "require must be less than or equal to the number of triggers"
                )

        return self

    def describe_for_cli(self, indent: int = 0) -> str:
        """Return a human-readable description of this trigger for the CLI"""
        return textwrap.indent(
            "\n".join(
                [
                    f"{str(self.require).capitalize()} of:",
                    "\n".join(
                        [
                            trigger.describe_for_cli(indent=indent + 1)
                            for trigger in self.triggers
                        ]
                    ),
                ]
            ),
            prefix="  " * indent,
        )

describe_for_cli(indent=0)

Return a human-readable description of this trigger for the CLI

Source code in src/prefect/events/schemas/automations.py
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
def describe_for_cli(self, indent: int = 0) -> str:
    """Return a human-readable description of this trigger for the CLI"""
    return textwrap.indent(
        "\n".join(
            [
                f"{str(self.require).capitalize()} of:",
                "\n".join(
                    [
                        trigger.describe_for_cli(indent=indent + 1)
                        for trigger in self.triggers
                    ]
                ),
            ]
        ),
        prefix="  " * indent,
    )

DeclareIncident

Bases: Action

Declares an incident for the triggering event. Only available on Prefect Cloud

Source code in src/prefect/events/actions.py
268
269
270
271
class DeclareIncident(Action):
    """Declares an incident for the triggering event.  Only available on Prefect Cloud"""

    type: Literal["declare-incident"] = "declare-incident"

DoNothing

Bases: Action

Do nothing when an Automation is triggered

Source code in src/prefect/events/actions.py
22
23
24
25
class DoNothing(Action):
    """Do nothing when an Automation is triggered"""

    type: Literal["do-nothing"] = "do-nothing"

EventTrigger

Bases: ResourceTrigger

A trigger that fires based on the presence or absence of events within a given period of time.

Source code in src/prefect/events/schemas/automations.py
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
class EventTrigger(ResourceTrigger):
    """
    A trigger that fires based on the presence or absence of events within a given
    period of time.
    """

    type: Literal["event"] = "event"

    after: Set[str] = Field(
        default_factory=set,
        description=(
            "The event(s) which must first been seen to fire this trigger.  If "
            "empty, then fire this trigger immediately.  Events may include "
            "trailing wildcards, like `prefect.flow-run.*`"
        ),
    )
    expect: Set[str] = Field(
        default_factory=set,
        description=(
            "The event(s) this trigger is expecting to see.  If empty, this "
            "trigger will match any event.  Events may include trailing wildcards, "
            "like `prefect.flow-run.*`"
        ),
    )

    for_each: Set[str] = Field(
        default_factory=set,
        description=(
            "Evaluate the trigger separately for each distinct value of these labels "
            "on the resource.  By default, labels refer to the primary resource of the "
            "triggering event.  You may also refer to labels from related "
            "resources by specifying `related:<role>:<label>`.  This will use the "
            "value of that label for the first related resource in that role.  For "
            'example, `"for_each": ["related:flow:prefect.resource.id"]` would '
            "evaluate the trigger for each flow."
        ),
    )
    posture: Literal[Posture.Reactive, Posture.Proactive] = Field(  # type: ignore[valid-type]
        default=Posture.Reactive,
        description=(
            "The posture of this trigger, either Reactive or Proactive.  Reactive "
            "triggers respond to the _presence_ of the expected events, while "
            "Proactive triggers respond to the _absence_ of those expected events."
        ),
    )
    threshold: int = Field(
        default=1,
        description=(
            "The number of events required for this trigger to fire (for "
            "Reactive triggers), or the number of events expected (for Proactive "
            "triggers)"
        ),
    )
    within: timedelta = Field(
        default=timedelta(seconds=0),
        ge=timedelta(seconds=0),
        description=(
            "The time period over which the events must occur.  For Reactive triggers, "
            "this may be as low as 0 seconds, but must be at least 10 seconds for "
            "Proactive triggers"
        ),
    )

    @model_validator(mode="before")
    @classmethod
    def enforce_minimum_within_for_proactive_triggers(
        cls, data: Dict[str, Any]
    ) -> Dict[str, Any]:
        if not isinstance(data, dict):
            return data

        if "within" in data and data["within"] is None:
            raise ValueError("`within` should be a valid timedelta")

        posture: Optional[Posture] = data.get("posture")
        within: Optional[timedelta] = data.get("within")

        if isinstance(within, (int, float)):
            within = timedelta(seconds=within)

        if posture == Posture.Proactive:
            if not within or within == timedelta(0):
                within = timedelta(seconds=10.0)
            elif within < timedelta(seconds=10.0):
                raise ValueError(
                    "`within` for Proactive triggers must be greater than or equal to "
                    "10 seconds"
                )

        return data | {"within": within} if within else data

    def describe_for_cli(self, indent: int = 0) -> str:
        """Return a human-readable description of this trigger for the CLI"""
        if self.posture == Posture.Reactive:
            return textwrap.indent(
                "\n".join(
                    [
                        f"Reactive: expecting {self.threshold} of {self.expect}",
                    ],
                ),
                prefix="  " * indent,
            )
        else:
            return textwrap.indent(
                "\n".join(
                    [
                        f"Proactive: expecting {self.threshold} {self.expect} event "
                        f"within {self.within}",
                    ],
                ),
                prefix="  " * indent,
            )

describe_for_cli(indent=0)

Return a human-readable description of this trigger for the CLI

Source code in src/prefect/events/schemas/automations.py
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
def describe_for_cli(self, indent: int = 0) -> str:
    """Return a human-readable description of this trigger for the CLI"""
    if self.posture == Posture.Reactive:
        return textwrap.indent(
            "\n".join(
                [
                    f"Reactive: expecting {self.threshold} of {self.expect}",
                ],
            ),
            prefix="  " * indent,
        )
    else:
        return textwrap.indent(
            "\n".join(
                [
                    f"Proactive: expecting {self.threshold} {self.expect} event "
                    f"within {self.within}",
                ],
            ),
            prefix="  " * indent,
        )

MetricTrigger

Bases: ResourceTrigger

A trigger that fires based on the results of a metric query.

Source code in src/prefect/events/schemas/automations.py
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
class MetricTrigger(ResourceTrigger):
    """
    A trigger that fires based on the results of a metric query.
    """

    type: Literal["metric"] = "metric"

    posture: Literal[Posture.Metric] = Field(  # type: ignore[valid-type]
        Posture.Metric,
        description="Periodically evaluate the configured metric query.",
    )

    metric: MetricTriggerQuery = Field(
        ...,
        description="The metric query to evaluate for this trigger. ",
    )

    def describe_for_cli(self, indent: int = 0) -> str:
        """Return a human-readable description of this trigger for the CLI"""
        m = self.metric
        return textwrap.indent(
            "\n".join(
                [
                    f"Metric: {m.name.value} {m.operator.value} {m.threshold} for {m.range}",
                ]
            ),
            prefix="  " * indent,
        )

describe_for_cli(indent=0)

Return a human-readable description of this trigger for the CLI

Source code in src/prefect/events/schemas/automations.py
303
304
305
306
307
308
309
310
311
312
313
def describe_for_cli(self, indent: int = 0) -> str:
    """Return a human-readable description of this trigger for the CLI"""
    m = self.metric
    return textwrap.indent(
        "\n".join(
            [
                f"Metric: {m.name.value} {m.operator.value} {m.threshold} for {m.range}",
            ]
        ),
        prefix="  " * indent,
    )

MetricTriggerQuery

Bases: PrefectBaseModel

Defines a subset of the Trigger subclass, which is specific to Metric automations, that specify the query configurations and breaching conditions for the Automation

Source code in src/prefect/events/schemas/automations.py
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
class MetricTriggerQuery(PrefectBaseModel):
    """Defines a subset of the Trigger subclass, which is specific
    to Metric automations, that specify the query configurations
    and breaching conditions for the Automation"""

    name: PrefectMetric = Field(
        ...,
        description="The name of the metric to query.",
    )
    threshold: float = Field(
        ...,
        description=(
            "The threshold value against which we'll compare " "the query result."
        ),
    )
    operator: MetricTriggerOperator = Field(
        ...,
        description=(
            "The comparative operator (LT / LTE / GT / GTE) used to compare "
            "the query result against the threshold value."
        ),
    )
    range: timedelta = Field(
        timedelta(seconds=300),  # defaults to 5 minutes
        description=(
            "The lookback duration (seconds) for a metric query. This duration is "
            "used to determine the time range over which the query will be executed. "
            "The minimum value is 300 seconds (5 minutes)."
        ),
    )
    firing_for: timedelta = Field(
        timedelta(seconds=300),  # defaults to 5 minutes
        description=(
            "The duration (seconds) for which the metric query must breach "
            "or resolve continuously before the state is updated and the "
            "automation is triggered. "
            "The minimum value is 300 seconds (5 minutes)."
        ),
    )

    @field_validator("range", "firing_for")
    def enforce_minimum_range(cls, value: timedelta):
        if value < timedelta(seconds=300):
            raise ValueError("The minimum range is 300 seconds (5 minutes)")
        return value

PauseAutomation

Bases: AutomationAction

Pauses a Work Queue

Source code in src/prefect/events/actions.py
256
257
258
259
class PauseAutomation(AutomationAction):
    """Pauses a Work Queue"""

    type: Literal["pause-automation"] = "pause-automation"

PauseDeployment

Bases: DeploymentAction

Pauses the given Deployment

Source code in src/prefect/events/actions.py
79
80
81
82
class PauseDeployment(DeploymentAction):
    """Pauses the given Deployment"""

    type: Literal["pause-deployment"] = "pause-deployment"

PauseWorkPool

Bases: WorkPoolAction

Pauses a Work Pool

Source code in src/prefect/events/actions.py
172
173
174
175
class PauseWorkPool(WorkPoolAction):
    """Pauses a Work Pool"""

    type: Literal["pause-work-pool"] = "pause-work-pool"

PauseWorkQueue

Bases: WorkQueueAction

Pauses a Work Queue

Source code in src/prefect/events/actions.py
214
215
216
217
class PauseWorkQueue(WorkQueueAction):
    """Pauses a Work Queue"""

    type: Literal["pause-work-queue"] = "pause-work-queue"

ResourceTrigger

Bases: Trigger, ABC

Base class for triggers that may filter by the labels of resources.

Source code in src/prefect/events/schemas/automations.py
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
class ResourceTrigger(Trigger, abc.ABC):
    """
    Base class for triggers that may filter by the labels of resources.
    """

    type: str

    match: ResourceSpecification = Field(
        default_factory=lambda: ResourceSpecification.model_validate({}),
        description="Labels for resources which this trigger will match.",
    )
    match_related: ResourceSpecification = Field(
        default_factory=lambda: ResourceSpecification.model_validate({}),
        description="Labels for related resources which this trigger will match.",
    )

ResumeAutomation

Bases: AutomationAction

Resumes a Work Queue

Source code in src/prefect/events/actions.py
262
263
264
265
class ResumeAutomation(AutomationAction):
    """Resumes a Work Queue"""

    type: Literal["resume-automation"] = "resume-automation"

ResumeDeployment

Bases: DeploymentAction

Resumes the given Deployment

Source code in src/prefect/events/actions.py
85
86
87
88
class ResumeDeployment(DeploymentAction):
    """Resumes the given Deployment"""

    type: Literal["resume-deployment"] = "resume-deployment"

ResumeWorkPool

Bases: WorkPoolAction

Resumes a Work Pool

Source code in src/prefect/events/actions.py
178
179
180
181
class ResumeWorkPool(WorkPoolAction):
    """Resumes a Work Pool"""

    type: Literal["resume-work-pool"] = "resume-work-pool"

ResumeWorkQueue

Bases: WorkQueueAction

Resumes a Work Queue

Source code in src/prefect/events/actions.py
220
221
222
223
class ResumeWorkQueue(WorkQueueAction):
    """Resumes a Work Queue"""

    type: Literal["resume-work-queue"] = "resume-work-queue"

RunDeployment

Bases: DeploymentAction

Runs the given deployment with the given parameters

Source code in src/prefect/events/actions.py
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
class RunDeployment(DeploymentAction):
    """Runs the given deployment with the given parameters"""

    type: Literal["run-deployment"] = "run-deployment"

    parameters: Optional[Dict[str, Any]] = Field(
        None,
        description=(
            "The parameters to pass to the deployment, or None to use the "
            "deployment's default parameters"
        ),
    )
    job_variables: Optional[Dict[str, Any]] = Field(
        None,
        description=(
            "The job variables to pass to the created flow run, or None "
            "to use the deployment's default job variables"
        ),
    )

SendNotification

Bases: Action

Send a notification when an Automation is triggered

Source code in src/prefect/events/actions.py
141
142
143
144
145
146
147
148
149
class SendNotification(Action):
    """Send a notification when an Automation is triggered"""

    type: Literal["send-notification"] = "send-notification"
    block_document_id: UUID = Field(
        description="The identifier of the notification block to use"
    )
    subject: str = Field("Prefect automated notification")
    body: str = Field(description="The text of the notification to send")

SequenceTrigger

Bases: CompositeTrigger

A composite trigger that requires some number of triggers to have fired within the given time period in a specific order

Source code in src/prefect/events/schemas/automations.py
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
class SequenceTrigger(CompositeTrigger):
    """A composite trigger that requires some number of triggers to have fired
    within the given time period in a specific order"""

    type: Literal["sequence"] = "sequence"

    def describe_for_cli(self, indent: int = 0) -> str:
        """Return a human-readable description of this trigger for the CLI"""
        return textwrap.indent(
            "\n".join(
                [
                    "In this order:",
                    "\n".join(
                        [
                            trigger.describe_for_cli(indent=indent + 1)
                            for trigger in self.triggers
                        ]
                    ),
                ]
            ),
            prefix="  " * indent,
        )

describe_for_cli(indent=0)

Return a human-readable description of this trigger for the CLI

Source code in src/prefect/events/schemas/automations.py
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
def describe_for_cli(self, indent: int = 0) -> str:
    """Return a human-readable description of this trigger for the CLI"""
    return textwrap.indent(
        "\n".join(
            [
                "In this order:",
                "\n".join(
                    [
                        trigger.describe_for_cli(indent=indent + 1)
                        for trigger in self.triggers
                    ]
                ),
            ]
        ),
        prefix="  " * indent,
    )

SuspendFlowRun

Bases: Action

Suspends a flow run associated with the trigger

Source code in src/prefect/events/actions.py
122
123
124
125
class SuspendFlowRun(Action):
    """Suspends a flow run associated with the trigger"""

    type: Literal["suspend-flow-run"] = "suspend-flow-run"

Trigger

Bases: PrefectBaseModel, ABC

Base class describing a set of criteria that must be satisfied in order to trigger an automation.

Source code in src/prefect/events/schemas/automations.py
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
class Trigger(PrefectBaseModel, abc.ABC, extra="ignore"):  # type: ignore[call-arg]
    """
    Base class describing a set of criteria that must be satisfied in order to trigger
    an automation.
    """

    type: str

    @abc.abstractmethod
    def describe_for_cli(self, indent: int = 0) -> str:
        """Return a human-readable description of this trigger for the CLI"""

    # The following allows the regular Trigger class to be used when serving or
    # deploying flows, analogous to how the Deployment*Trigger classes work

    _deployment_id: Optional[UUID] = PrivateAttr(default=None)

    def set_deployment_id(self, deployment_id: UUID):
        self._deployment_id = deployment_id

    def owner_resource(self) -> Optional[str]:
        return f"prefect.deployment.{self._deployment_id}"

    def actions(self) -> List[ActionTypes]:
        assert self._deployment_id
        return [
            RunDeployment(
                source="selected",
                deployment_id=self._deployment_id,
                parameters=getattr(self, "parameters", None),
                job_variables=getattr(self, "job_variables", None),
            )
        ]

    def as_automation(self) -> "AutomationCore":
        assert self._deployment_id

        trigger: TriggerTypes = cast(TriggerTypes, self)

        # This is one of the Deployment*Trigger classes, so translate it over to a
        # plain Trigger
        if hasattr(self, "trigger_type"):
            trigger = self.trigger_type(**self.model_dump())

        return AutomationCore(
            name=(
                getattr(self, "name", None)
                or f"Automation for deployment {self._deployment_id}"
            ),
            description="",
            enabled=getattr(self, "enabled", True),
            trigger=trigger,
            actions=self.actions(),
            owner_resource=self.owner_resource(),
        )

describe_for_cli(indent=0) abstractmethod

Return a human-readable description of this trigger for the CLI

Source code in src/prefect/events/schemas/automations.py
46
47
48
@abc.abstractmethod
def describe_for_cli(self, indent: int = 0) -> str:
    """Return a human-readable description of this trigger for the CLI"""