Skip to content

prefect.server.schemas.responses

Schemas for special responses from the Prefect REST API.

FlowRunResponse

Bases: ORMBaseModel

Source code in src/prefect/server/schemas/responses.py
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
class FlowRunResponse(ORMBaseModel):
    name: str = Field(
        default_factory=lambda: generate_slug(2),
        description=(
            "The name of the flow run. Defaults to a random slug if not specified."
        ),
        examples=["my-flow-run"],
    )
    flow_id: UUID = Field(default=..., description="The id of the flow being run.")
    state_id: Optional[UUID] = Field(
        default=None, description="The id of the flow run's current state."
    )
    deployment_id: Optional[UUID] = Field(
        default=None,
        description=(
            "The id of the deployment associated with this flow run, if available."
        ),
    )
    deployment_version: Optional[str] = Field(
        default=None,
        description="The version of the deployment associated with this flow run.",
        examples=["1.0"],
    )
    work_queue_id: Optional[UUID] = Field(
        default=None, description="The id of the run's work pool queue."
    )
    work_queue_name: Optional[str] = Field(
        default=None, description="The work queue that handled this flow run."
    )
    flow_version: Optional[str] = Field(
        default=None,
        description="The version of the flow executed in this flow run.",
        examples=["1.0"],
    )
    parameters: Dict[str, Any] = Field(
        default_factory=dict, description="Parameters for the flow run."
    )
    idempotency_key: Optional[str] = Field(
        default=None,
        description=(
            "An optional idempotency key for the flow run. Used to ensure the same flow"
            " run is not created multiple times."
        ),
    )
    context: Dict[str, Any] = Field(
        default_factory=dict,
        description="Additional context for the flow run.",
        examples=[{"my_var": "my_val"}],
    )
    empirical_policy: FlowRunPolicy = Field(
        default_factory=FlowRunPolicy,
    )
    tags: List[str] = Field(
        default_factory=list,
        description="A list of tags on the flow run",
        examples=[["tag-1", "tag-2"]],
    )
    parent_task_run_id: Optional[UUID] = Field(
        default=None,
        description=(
            "If the flow run is a subflow, the id of the 'dummy' task in the parent"
            " flow used to track subflow state."
        ),
    )
    state_type: Optional[schemas.states.StateType] = Field(
        default=None, description="The type of the current flow run state."
    )
    state_name: Optional[str] = Field(
        default=None, description="The name of the current flow run state."
    )
    run_count: int = Field(
        default=0, description="The number of times the flow run was executed."
    )
    expected_start_time: Optional[DateTime] = Field(
        default=None,
        description="The flow run's expected start time.",
    )
    next_scheduled_start_time: Optional[DateTime] = Field(
        default=None,
        description="The next time the flow run is scheduled to start.",
    )
    start_time: Optional[DateTime] = Field(
        default=None, description="The actual start time."
    )
    end_time: Optional[DateTime] = Field(
        default=None, description="The actual end time."
    )
    total_run_time: datetime.timedelta = Field(
        default=datetime.timedelta(0),
        description=(
            "Total run time. If the flow run was executed multiple times, the time of"
            " each run will be summed."
        ),
    )
    estimated_run_time: datetime.timedelta = Field(
        default=datetime.timedelta(0),
        description="A real-time estimate of the total run time.",
    )
    estimated_start_time_delta: datetime.timedelta = Field(
        default=datetime.timedelta(0),
        description="The difference between actual and expected start time.",
    )
    auto_scheduled: bool = Field(
        default=False,
        description="Whether or not the flow run was automatically scheduled.",
    )
    infrastructure_document_id: Optional[UUID] = Field(
        default=None,
        description="The block document defining infrastructure to use this flow run.",
    )
    infrastructure_pid: Optional[str] = Field(
        default=None,
        description="The id of the flow run as returned by an infrastructure block.",
    )
    created_by: Optional[CreatedBy] = Field(
        default=None,
        description="Optional information about the creator of this flow run.",
    )
    work_pool_id: Optional[UUID] = Field(
        default=None,
        description="The id of the flow run's work pool.",
    )
    work_pool_name: Optional[str] = Field(
        default=None,
        description="The name of the flow run's work pool.",
        examples=["my-work-pool"],
    )
    state: Optional[schemas.states.State] = Field(
        default=None, description="The current state of the flow run."
    )
    job_variables: Optional[Dict[str, Any]] = Field(
        default=None,
        description="Variables used as overrides in the base job template",
    )

    @classmethod
    def model_validate(
        cls: Type[Self],
        obj: Any,
        *,
        strict: Optional[bool] = None,
        from_attributes: Optional[bool] = None,
        context: Optional[dict[str, Any]] = None,
    ) -> Self:
        response = super().model_validate(obj)

        if from_attributes:
            if obj.work_queue:
                response.work_queue_id = obj.work_queue.id
                response.work_queue_name = obj.work_queue.name
                if obj.work_queue.work_pool:
                    response.work_pool_id = obj.work_queue.work_pool.id
                    response.work_pool_name = obj.work_queue.work_pool.name

        return response

    def __eq__(self, other: Any) -> bool:
        """
        Check for "equality" to another flow run schema

        Estimates times are rolling and will always change with repeated queries for
        a flow run so we ignore them during equality checks.
        """
        if isinstance(other, FlowRunResponse):
            exclude_fields = {"estimated_run_time", "estimated_start_time_delta"}
            return self.model_dump(exclude=exclude_fields) == other.model_dump(
                exclude=exclude_fields
            )
        return super().__eq__(other)

__eq__(other)

Check for "equality" to another flow run schema

Estimates times are rolling and will always change with repeated queries for a flow run so we ignore them during equality checks.

Source code in src/prefect/server/schemas/responses.py
331
332
333
334
335
336
337
338
339
340
341
342
343
def __eq__(self, other: Any) -> bool:
    """
    Check for "equality" to another flow run schema

    Estimates times are rolling and will always change with repeated queries for
    a flow run so we ignore them during equality checks.
    """
    if isinstance(other, FlowRunResponse):
        exclude_fields = {"estimated_run_time", "estimated_start_time_delta"}
        return self.model_dump(exclude=exclude_fields) == other.model_dump(
            exclude=exclude_fields
        )
    return super().__eq__(other)

GlobalConcurrencyLimitResponse

Bases: ORMBaseModel

A response object for global concurrency limits.

Source code in src/prefect/server/schemas/responses.py
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
class GlobalConcurrencyLimitResponse(ORMBaseModel):
    """
    A response object for global concurrency limits.
    """

    active: bool = Field(
        default=True, description="Whether the global concurrency limit is active."
    )
    name: str = Field(
        default=..., description="The name of the global concurrency limit."
    )
    limit: int = Field(default=..., description="The concurrency limit.")
    active_slots: int = Field(default=..., description="The number of active slots.")
    slot_decay_per_second: float = Field(
        default=2.0,
        description="The decay rate for active slots when used as a rate limit.",
    )

HistoryResponse

Bases: PrefectBaseModel

Represents a history of aggregation states over an interval

Source code in src/prefect/server/schemas/responses.py
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
class HistoryResponse(PrefectBaseModel):
    """Represents a history of aggregation states over an interval"""

    interval_start: DateTime = Field(
        default=..., description="The start date of the interval."
    )
    interval_end: DateTime = Field(
        default=..., description="The end date of the interval."
    )
    states: List[HistoryResponseState] = Field(
        default=..., description="A list of state histories during the interval."
    )

    @model_validator(mode="before")
    @classmethod
    def validate_timestamps(
        cls, values: dict
    ) -> dict:  # TODO: remove this, handle with ORM
        d = {"interval_start": None, "interval_end": None}
        for field in d.keys():
            val = values.get(field)
            if isinstance(val, datetime.datetime):
                d[field] = pendulum.instance(values[field])
            else:
                d[field] = val

        return {**values, **d}

HistoryResponseState

Bases: PrefectBaseModel

Represents a single state's history over an interval.

Source code in src/prefect/server/schemas/responses.py
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
class HistoryResponseState(PrefectBaseModel):
    """Represents a single state's history over an interval."""

    state_type: schemas.states.StateType = Field(
        default=..., description="The state type."
    )
    state_name: str = Field(default=..., description="The state name.")
    count_runs: int = Field(
        default=...,
        description="The number of runs in the specified state during the interval.",
    )
    sum_estimated_run_time: datetime.timedelta = Field(
        default=...,
        description="The total estimated run time of all runs during the interval.",
    )
    sum_estimated_lateness: datetime.timedelta = Field(
        default=...,
        description=(
            "The sum of differences between actual and expected start time during the"
            " interval."
        ),
    )

OrchestrationResult

Bases: PrefectBaseModel

A container for the output of state orchestration.

Source code in src/prefect/server/schemas/responses.py
157
158
159
160
161
162
163
164
class OrchestrationResult(PrefectBaseModel):
    """
    A container for the output of state orchestration.
    """

    state: Optional[schemas.states.State]
    status: SetStateStatus
    details: StateResponseDetails

SetStateStatus

Bases: AutoEnum

Enumerates return statuses for setting run states.

Source code in src/prefect/server/schemas/responses.py
26
27
28
29
30
31
32
class SetStateStatus(AutoEnum):
    """Enumerates return statuses for setting run states."""

    ACCEPT = AutoEnum.auto()
    REJECT = AutoEnum.auto()
    ABORT = AutoEnum.auto()
    WAIT = AutoEnum.auto()

StateAbortDetails

Bases: PrefectBaseModel

Details associated with an ABORT state transition.

Source code in src/prefect/server/schemas/responses.py
62
63
64
65
66
67
68
69
70
71
72
73
74
class StateAbortDetails(PrefectBaseModel):
    """Details associated with an ABORT state transition."""

    type: Literal["abort_details"] = Field(
        default="abort_details",
        description=(
            "The type of state transition detail. Used to ensure pydantic does not"
            " coerce into a different type."
        ),
    )
    reason: Optional[str] = Field(
        default=None, description="The reason why the state transition was aborted."
    )

StateAcceptDetails

Bases: PrefectBaseModel

Details associated with an ACCEPT state transition.

Source code in src/prefect/server/schemas/responses.py
35
36
37
38
39
40
41
42
43
44
class StateAcceptDetails(PrefectBaseModel):
    """Details associated with an ACCEPT state transition."""

    type: Literal["accept_details"] = Field(
        default="accept_details",
        description=(
            "The type of state transition detail. Used to ensure pydantic does not"
            " coerce into a different type."
        ),
    )

StateRejectDetails

Bases: PrefectBaseModel

Details associated with a REJECT state transition.

Source code in src/prefect/server/schemas/responses.py
47
48
49
50
51
52
53
54
55
56
57
58
59
class StateRejectDetails(PrefectBaseModel):
    """Details associated with a REJECT state transition."""

    type: Literal["reject_details"] = Field(
        default="reject_details",
        description=(
            "The type of state transition detail. Used to ensure pydantic does not"
            " coerce into a different type."
        ),
    )
    reason: Optional[str] = Field(
        default=None, description="The reason why the state transition was rejected."
    )

StateWaitDetails

Bases: PrefectBaseModel

Details associated with a WAIT state transition.

Source code in src/prefect/server/schemas/responses.py
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
class StateWaitDetails(PrefectBaseModel):
    """Details associated with a WAIT state transition."""

    type: Literal["wait_details"] = Field(
        default="wait_details",
        description=(
            "The type of state transition detail. Used to ensure pydantic does not"
            " coerce into a different type."
        ),
    )
    delay_seconds: int = Field(
        default=...,
        description=(
            "The length of time in seconds the client should wait before transitioning"
            " states."
        ),
    )
    reason: Optional[str] = Field(
        default=None, description="The reason why the state transition should wait."
    )

WorkQueueWithStatus

Bases: WorkQueueResponse, WorkQueueStatusDetail

Combines a work queue and its status details into a single object

Source code in src/prefect/server/schemas/responses.py
492
493
class WorkQueueWithStatus(WorkQueueResponse, WorkQueueStatusDetail):
    """Combines a work queue and its status details into a single object"""