Skip to content

prefect.server.orchestration.global_policy

Bookkeeping logic that fires on every state transition.

For clarity, GlobalFlowpolicy and GlobalTaskPolicy contain all transition logic implemented using BaseUniversalTransform. None of these operations modify state, and regardless of what orchestration Prefect REST API might enforce on a transition, the global policies contain Prefect's necessary bookkeeping. Because these transforms record information about the validated state committed to the state database, they should be the most deeply nested contexts in orchestration loop.

GlobalFlowPolicy

Bases: BaseOrchestrationPolicy

Global transforms that run against flow-run-state transitions in priority order.

These transforms are intended to run immediately before and after a state transition is validated.

Source code in src/prefect/server/orchestration/global_policy.py
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
class GlobalFlowPolicy(BaseOrchestrationPolicy):
    """
    Global transforms that run against flow-run-state transitions in priority order.

    These transforms are intended to run immediately before and after a state transition
    is validated.
    """

    @staticmethod
    def priority():
        return COMMON_GLOBAL_TRANSFORMS() + [
            UpdateSubflowParentTask,
            UpdateSubflowStateDetails,
            IncrementFlowRunCount,
            RemoveResumingIndicator,
        ]

GlobalTaskPolicy

Bases: BaseOrchestrationPolicy

Global transforms that run against task-run-state transitions in priority order.

These transforms are intended to run immediately before and after a state transition is validated.

Source code in src/prefect/server/orchestration/global_policy.py
57
58
59
60
61
62
63
64
65
66
67
68
69
class GlobalTaskPolicy(BaseOrchestrationPolicy):
    """
    Global transforms that run against task-run-state transitions in priority order.

    These transforms are intended to run immediately before and after a state transition
    is validated.
    """

    @staticmethod
    def priority():
        return COMMON_GLOBAL_TRANSFORMS() + [
            IncrementTaskRunCount,
        ]

IncrementFlowRunCount

Bases: BaseUniversalTransform

Records the number of times a run enters a running state. For use with retries.

Source code in src/prefect/server/orchestration/global_policy.py
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
class IncrementFlowRunCount(BaseUniversalTransform):
    """
    Records the number of times a run enters a running state. For use with retries.
    """

    async def before_transition(self, context: OrchestrationContext) -> None:
        if self.nullified_transition():
            return

        # if entering a running state...
        if context.proposed_state.is_running():
            # do not increment the run count if resuming a paused flow
            api_version = context.parameters.get("api-version", None)
            if api_version is None or api_version >= Version("0.8.4"):
                if context.run.empirical_policy.resuming:
                    return

            # increment the run count
            context.run.run_count += 1

IncrementRunTime

Bases: BaseUniversalTransform

Records the amount of time a run spends in the running state.

Source code in src/prefect/server/orchestration/global_policy.py
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
class IncrementRunTime(BaseUniversalTransform):
    """
    Records the amount of time a run spends in the running state.
    """

    async def before_transition(self, context: OrchestrationContext) -> None:
        if self.nullified_transition():
            return

        # if exiting a running state...
        if context.initial_state and context.initial_state.is_running():
            # increment the run time by the time spent in the previous state
            context.run.total_run_time += (
                context.proposed_state.timestamp - context.initial_state.timestamp
            )

IncrementTaskRunCount

Bases: BaseUniversalTransform

Records the number of times a run enters a running state. For use with retries.

Source code in src/prefect/server/orchestration/global_policy.py
213
214
215
216
217
218
219
220
221
222
223
224
225
class IncrementTaskRunCount(BaseUniversalTransform):
    """
    Records the number of times a run enters a running state. For use with retries.
    """

    async def before_transition(self, context: OrchestrationContext) -> None:
        if self.nullified_transition():
            return

        # if entering a running state...
        if context.proposed_state.is_running():
            # increment the run count
            context.run.run_count += 1

RemoveResumingIndicator

Bases: BaseUniversalTransform

Removes the indicator on a flow run that marks it as resuming.

Source code in src/prefect/server/orchestration/global_policy.py
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
class RemoveResumingIndicator(BaseUniversalTransform):
    """
    Removes the indicator on a flow run that marks it as resuming.
    """

    async def before_transition(self, context: OrchestrationContext) -> None:
        if self.nullified_transition():
            return

        proposed_state = context.proposed_state

        api_version = context.parameters.get("api-version", None)
        if api_version is None or api_version >= Version("0.8.4"):
            if proposed_state.is_running() or proposed_state.is_final():
                if context.run.empirical_policy.resuming:
                    updated_policy = context.run.empirical_policy.model_dump()
                    updated_policy["resuming"] = False
                    context.run.empirical_policy = FlowRunPolicy(**updated_policy)

SetEndTime

Bases: BaseUniversalTransform

Records the time a run enters a terminal state.

With normal client usage, a run will not transition out of a terminal state. However, it's possible to force these transitions manually via the API. While leaving a terminal state, the end time will be unset.

Source code in src/prefect/server/orchestration/global_policy.py
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
class SetEndTime(BaseUniversalTransform):
    """
    Records the time a run enters a terminal state.

    With normal client usage, a run will not transition out of a terminal state.
    However, it's possible to force these transitions manually via the API. While
    leaving a terminal state, the end time will be unset.
    """

    async def before_transition(self, context: OrchestrationContext) -> None:
        if self.nullified_transition():
            return

        # if exiting a final state for a non-final state...
        if (
            context.initial_state
            and context.initial_state.is_final()
            and not context.proposed_state.is_final()
        ):
            # clear the end time
            context.run.end_time = None

        # if entering a final state...
        if context.proposed_state.is_final():
            # if the run has a start time and no end time, give it one
            if context.run.start_time and not context.run.end_time:
                context.run.end_time = context.proposed_state.timestamp

SetExpectedStartTime

Bases: BaseUniversalTransform

Estimates the time a state is expected to start running if not set.

For scheduled states, this estimate is simply the scheduled time. For other states, this is set to the time the proposed state was created by Prefect.

Source code in src/prefect/server/orchestration/global_policy.py
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
class SetExpectedStartTime(BaseUniversalTransform):
    """
    Estimates the time a state is expected to start running if not set.

    For scheduled states, this estimate is simply the scheduled time. For other states,
    this is set to the time the proposed state was created by Prefect.
    """

    async def before_transition(self, context: OrchestrationContext) -> None:
        if self.nullified_transition():
            return

        # set expected start time if this is the first state
        if not context.run.expected_start_time:
            if context.proposed_state.is_scheduled():
                context.run.expected_start_time = (
                    context.proposed_state.state_details.scheduled_time
                )
            else:
                context.run.expected_start_time = context.proposed_state.timestamp

SetNextScheduledStartTime

Bases: BaseUniversalTransform

Records the scheduled time on a run.

When a run enters a scheduled state, run.next_scheduled_start_time is set to the state's scheduled time. When leaving a scheduled state, run.next_scheduled_start_time is unset.

Source code in src/prefect/server/orchestration/global_policy.py
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
class SetNextScheduledStartTime(BaseUniversalTransform):
    """
    Records the scheduled time on a run.

    When a run enters a scheduled state, `run.next_scheduled_start_time` is set to
    the state's scheduled time. When leaving a scheduled state,
    `run.next_scheduled_start_time` is unset.
    """

    async def before_transition(self, context: OrchestrationContext) -> None:
        if self.nullified_transition():
            return

        # remove the next scheduled start time if exiting a scheduled state
        if context.initial_state and context.initial_state.is_scheduled():
            context.run.next_scheduled_start_time = None

        # set next scheduled start time if entering a scheduled state
        if context.proposed_state.is_scheduled():
            context.run.next_scheduled_start_time = (
                context.proposed_state.state_details.scheduled_time
            )

SetRunStateName

Bases: BaseUniversalTransform

Updates the state name of a run on a state transition.

Source code in src/prefect/server/orchestration/global_policy.py
85
86
87
88
89
90
91
92
93
94
95
class SetRunStateName(BaseUniversalTransform):
    """
    Updates the state name of a run on a state transition.
    """

    async def before_transition(self, context: OrchestrationContext) -> None:
        if self.nullified_transition():
            return

        # record the new state's name
        context.run.state_name = context.proposed_state.name

SetRunStateTimestamp

Bases: BaseUniversalTransform

Records the time a run changes states.

Source code in src/prefect/server/orchestration/global_policy.py
113
114
115
116
117
118
119
120
121
122
123
class SetRunStateTimestamp(BaseUniversalTransform):
    """
    Records the time a run changes states.
    """

    async def before_transition(self, context: OrchestrationContext) -> None:
        if self.nullified_transition():
            return

        # record the new state's timestamp
        context.run.state_timestamp = context.proposed_state.timestamp

SetRunStateType

Bases: BaseUniversalTransform

Updates the state type of a run on a state transition.

Source code in src/prefect/server/orchestration/global_policy.py
72
73
74
75
76
77
78
79
80
81
82
class SetRunStateType(BaseUniversalTransform):
    """
    Updates the state type of a run on a state transition.
    """

    async def before_transition(self, context: OrchestrationContext) -> None:
        if self.nullified_transition():
            return

        # record the new state's type
        context.run.state_type = context.proposed_state.type

SetStartTime

Bases: BaseUniversalTransform

Records the time a run enters a running state for the first time.

Source code in src/prefect/server/orchestration/global_policy.py
 98
 99
100
101
102
103
104
105
106
107
108
109
110
class SetStartTime(BaseUniversalTransform):
    """
    Records the time a run enters a running state for the first time.
    """

    async def before_transition(self, context: OrchestrationContext) -> None:
        if self.nullified_transition():
            return

        # if entering a running state and no start time is set...
        if context.proposed_state.is_running() and context.run.start_time is None:
            # set the start time
            context.run.start_time = context.proposed_state.timestamp

UpdateStateDetails

Bases: BaseUniversalTransform

Update a state's references to a corresponding flow- or task- run.

Source code in src/prefect/server/orchestration/global_policy.py
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
class UpdateStateDetails(BaseUniversalTransform):
    """
    Update a state's references to a corresponding flow- or task- run.
    """

    async def before_transition(
        self,
        context: OrchestrationContext,
    ) -> None:
        if self.nullified_transition():
            return

        if isinstance(context, FlowOrchestrationContext):
            flow_run = await context.flow_run()
            context.proposed_state.state_details.flow_run_id = flow_run.id

        elif isinstance(context, TaskOrchestrationContext):
            task_run = await context.task_run()
            context.proposed_state.state_details.flow_run_id = task_run.flow_run_id
            context.proposed_state.state_details.task_run_id = task_run.id

UpdateSubflowParentTask

Bases: BaseUniversalTransform

Whenever a subflow changes state, it must update its parent task run's state.

Source code in src/prefect/server/orchestration/global_policy.py
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
class UpdateSubflowParentTask(BaseUniversalTransform):
    """
    Whenever a subflow changes state, it must update its parent task run's state.
    """

    async def after_transition(self, context: OrchestrationContext) -> None:
        if self.nullified_transition():
            return

        # only applies to flow runs with a parent task run id
        if context.run.parent_task_run_id is not None:
            # avoid mutation of the flow run state
            subflow_parent_task_state = context.validated_state.fresh_copy()

            # set the task's "child flow run id" to be the subflow run id
            subflow_parent_task_state.state_details.child_flow_run_id = context.run.id

            await models.task_runs.set_task_run_state(
                session=context.session,
                task_run_id=context.run.parent_task_run_id,
                state=subflow_parent_task_state,
                force=True,
            )

UpdateSubflowStateDetails

Bases: BaseUniversalTransform

Update a child subflow state's references to a corresponding tracking task run id in the parent flow run

Source code in src/prefect/server/orchestration/global_policy.py
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
class UpdateSubflowStateDetails(BaseUniversalTransform):
    """
    Update a child subflow state's references to a corresponding tracking task run id
    in the parent flow run
    """

    async def before_transition(self, context: OrchestrationContext) -> None:
        if self.nullified_transition():
            return

        # only applies to flow runs with a parent task run id
        if context.run.parent_task_run_id is not None:
            context.proposed_state.state_details.task_run_id = (
                context.run.parent_task_run_id
            )