prefect.flow_runs
pause_flow_run(wait_for_input=None, timeout=3600, poll_interval=10, key=None)
async
pause_flow_run(wait_for_input: None = None, timeout: int = 3600, poll_interval: int = 10, key: Optional[str] = None) -> None
pause_flow_run(wait_for_input: Type[T], timeout: int = 3600, poll_interval: int = 10, key: Optional[str] = None) -> T
Pauses the current flow run by blocking execution until resumed.
When called within a flow run, execution will block and no downstream tasks will run until the flow is resumed. Task runs that have already started will continue running. A timeout parameter can be passed that will fail the flow run if it has not been resumed within the specified time.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
timeout
|
int
|
the number of seconds to wait for the flow to be resumed before failing. Defaults to 1 hour (3600 seconds). If the pause timeout exceeds any configured flow-level timeout, the flow might fail even after resuming. |
3600
|
poll_interval
|
int
|
The number of seconds between checking whether the flow has been resumed. Defaults to 10 seconds. |
10
|
key
|
Optional[str]
|
An optional key to prevent calling pauses more than once. This defaults to the number of pauses observed by the flow so far, and prevents pauses that use the "reschedule" option from running the same pause twice. A custom key can be supplied for custom pausing behavior. |
None
|
wait_for_input
|
Optional[Type[T]]
|
a subclass of |
None
|
@task
def task_one():
for i in range(3):
sleep(1)
@flow
def my_flow():
terminal_state = task_one.submit(return_state=True)
if terminal_state.type == StateType.COMPLETED:
print("Task one succeeded! Pausing flow run..")
pause_flow_run(timeout=2)
else:
print("Task one failed. Skipping pause flow run..")
Source code in src/prefect/flow_runs.py
156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 |
|
resume_flow_run(flow_run_id, run_input=None)
async
Resumes a paused flow.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
flow_run_id
|
the flow_run_id to resume |
required | |
run_input
|
Optional[Dict]
|
a dictionary of inputs to provide to the flow run. |
None
|
Source code in src/prefect/flow_runs.py
432 433 434 435 436 437 438 439 440 441 442 443 444 445 446 447 448 449 450 451 452 453 454 |
|
suspend_flow_run(wait_for_input=None, flow_run_id=None, timeout=3600, key=None, client=None)
async
suspend_flow_run(wait_for_input: None = None, flow_run_id: Optional[UUID] = None, timeout: Optional[int] = 3600, key: Optional[str] = None, client: PrefectClient = None) -> None
suspend_flow_run(wait_for_input: Type[T], flow_run_id: Optional[UUID] = None, timeout: Optional[int] = 3600, key: Optional[str] = None, client: PrefectClient = None) -> T
Suspends a flow run by stopping code execution until resumed.
When suspended, the flow run will continue execution until the NEXT task is
orchestrated, at which point the flow will exit. Any tasks that have
already started will run until completion. When resumed, the flow run will
be rescheduled to finish execution. In order suspend a flow run in this
way, the flow needs to have an associated deployment and results need to be
configured with the persist_result
option.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
flow_run_id
|
Optional[UUID]
|
a flow run id. If supplied, this function will attempt to suspend the specified flow run. If not supplied will attempt to suspend the current flow run. |
None
|
timeout
|
Optional[int]
|
the number of seconds to wait for the flow to be resumed before failing. Defaults to 1 hour (3600 seconds). If the pause timeout exceeds any configured flow-level timeout, the flow might fail even after resuming. |
3600
|
key
|
Optional[str]
|
An optional key to prevent calling suspend more than once. This defaults to a random string and prevents suspends from running the same suspend twice. A custom key can be supplied for custom suspending behavior. |
None
|
wait_for_input
|
Optional[Type[T]]
|
a subclass of |
None
|
Source code in src/prefect/flow_runs.py
326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392 393 394 395 396 397 398 399 400 401 402 403 404 405 406 407 408 409 410 411 412 413 414 415 416 417 418 419 420 421 422 423 424 425 426 427 428 429 |
|
wait_for_flow_run(flow_run_id, timeout=10800, poll_interval=5, client=None, log_states=False)
async
Waits for the prefect flow run to finish and returns the FlowRun
Parameters:
Name | Type | Description | Default |
---|---|---|---|
flow_run_id
|
UUID
|
The flow run ID for the flow run to wait for. |
required |
timeout
|
Optional[int]
|
The wait timeout in seconds. Defaults to 10800 (3 hours). |
10800
|
poll_interval
|
int
|
The poll interval in seconds. Defaults to 5. |
5
|
Returns:
Name | Type | Description |
---|---|---|
FlowRun |
FlowRun
|
The finished flow run. |
Raises:
Type | Description |
---|---|
FlowWaitTimeout
|
If flow run goes over the timeout. |
Examples:
Create a flow run for a deployment and wait for it to finish: ```python import asyncio
from prefect.client.orchestration import get_client
from prefect.flow_runs import wait_for_flow_run
async def main():
async with get_client() as client:
flow_run = await client.create_flow_run_from_deployment(deployment_id="my-deployment-id")
flow_run = await wait_for_flow_run(flow_run_id=flow_run.id)
print(flow_run.state)
if __name__ == "__main__":
asyncio.run(main())
```
Trigger multiple flow runs and wait for them to finish: ```python import asyncio
from prefect.client.orchestration import get_client
from prefect.flow_runs import wait_for_flow_run
async def main(num_runs: int):
async with get_client() as client:
flow_runs = [
await client.create_flow_run_from_deployment(deployment_id="my-deployment-id")
for _
in range(num_runs)
]
coros = [wait_for_flow_run(flow_run_id=flow_run.id) for flow_run in flow_runs]
finished_flow_runs = await asyncio.gather(*coros)
print([flow_run.state for flow_run in finished_flow_runs])
if __name__ == "__main__":
asyncio.run(main(num_runs=10))
```
Source code in src/prefect/flow_runs.py
52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 |
|