Skip to content

prefect.flow_runs

pause_flow_run(wait_for_input=None, timeout=3600, poll_interval=10, key=None) async

Pauses the current flow run by blocking execution until resumed.

When called within a flow run, execution will block and no downstream tasks will run until the flow is resumed. Task runs that have already started will continue running. A timeout parameter can be passed that will fail the flow run if it has not been resumed within the specified time.

Parameters:

Name Type Description Default
timeout int

the number of seconds to wait for the flow to be resumed before failing. Defaults to 1 hour (3600 seconds). If the pause timeout exceeds any configured flow-level timeout, the flow might fail even after resuming.

3600
poll_interval int

The number of seconds between checking whether the flow has been resumed. Defaults to 10 seconds.

10
key Optional[str]

An optional key to prevent calling pauses more than once. This defaults to the number of pauses observed by the flow so far, and prevents pauses that use the "reschedule" option from running the same pause twice. A custom key can be supplied for custom pausing behavior.

None
wait_for_input Optional[Type[T]]

a subclass of RunInput or any type supported by Pydantic. If provided when the flow pauses, the flow will wait for the input to be provided before resuming. If the flow is resumed without providing the input, the flow will fail. If the flow is resumed with the input, the flow will resume and the input will be loaded and returned from this function.

None
@task
def task_one():
    for i in range(3):
        sleep(1)

@flow
def my_flow():
    terminal_state = task_one.submit(return_state=True)
    if terminal_state.type == StateType.COMPLETED:
        print("Task one succeeded! Pausing flow run..")
        pause_flow_run(timeout=2)
    else:
        print("Task one failed. Skipping pause flow run..")
Source code in src/prefect/flow_runs.py
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
@sync_compatible
async def pause_flow_run(
    wait_for_input: Optional[Type[T]] = None,
    timeout: int = 3600,
    poll_interval: int = 10,
    key: Optional[str] = None,
) -> Optional[T]:
    """
    Pauses the current flow run by blocking execution until resumed.

    When called within a flow run, execution will block and no downstream tasks will
    run until the flow is resumed. Task runs that have already started will continue
    running. A timeout parameter can be passed that will fail the flow run if it has not
    been resumed within the specified time.

    Args:
        timeout: the number of seconds to wait for the flow to be resumed before
            failing. Defaults to 1 hour (3600 seconds). If the pause timeout exceeds
            any configured flow-level timeout, the flow might fail even after resuming.
        poll_interval: The number of seconds between checking whether the flow has been
            resumed. Defaults to 10 seconds.
        key: An optional key to prevent calling pauses more than once. This defaults to
            the number of pauses observed by the flow so far, and prevents pauses that
            use the "reschedule" option from running the same pause twice. A custom key
            can be supplied for custom pausing behavior.
        wait_for_input: a subclass of `RunInput` or any type supported by
            Pydantic. If provided when the flow pauses, the flow will wait for the
            input to be provided before resuming. If the flow is resumed without
            providing the input, the flow will fail. If the flow is resumed with the
            input, the flow will resume and the input will be loaded and returned
            from this function.

    Example:
    ```python
    @task
    def task_one():
        for i in range(3):
            sleep(1)

    @flow
    def my_flow():
        terminal_state = task_one.submit(return_state=True)
        if terminal_state.type == StateType.COMPLETED:
            print("Task one succeeded! Pausing flow run..")
            pause_flow_run(timeout=2)
        else:
            print("Task one failed. Skipping pause flow run..")
    ```

    """
    return await _in_process_pause(
        timeout=timeout,
        poll_interval=poll_interval,
        key=key,
        wait_for_input=wait_for_input,
    )

resume_flow_run(flow_run_id, run_input=None) async

Resumes a paused flow.

Parameters:

Name Type Description Default
flow_run_id

the flow_run_id to resume

required
run_input Optional[Dict]

a dictionary of inputs to provide to the flow run.

None
Source code in src/prefect/flow_runs.py
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
@sync_compatible
async def resume_flow_run(flow_run_id, run_input: Optional[Dict] = None):
    """
    Resumes a paused flow.

    Args:
        flow_run_id: the flow_run_id to resume
        run_input: a dictionary of inputs to provide to the flow run.
    """
    client = get_client()
    async with client:
        flow_run = await client.read_flow_run(flow_run_id)

        if not flow_run.state.is_paused():
            raise NotPausedError("Cannot resume a run that isn't paused!")

        response = await client.resume_flow_run(flow_run_id, run_input=run_input)

    if response.status == SetStateStatus.REJECT:
        if response.state.type == StateType.FAILED:
            raise FlowPauseTimeout("Flow run can no longer be resumed.")
        else:
            raise RuntimeError(f"Cannot resume this run: {response.details.reason}")

suspend_flow_run(wait_for_input=None, flow_run_id=None, timeout=3600, key=None, client=None) async

Suspends a flow run by stopping code execution until resumed.

When suspended, the flow run will continue execution until the NEXT task is orchestrated, at which point the flow will exit. Any tasks that have already started will run until completion. When resumed, the flow run will be rescheduled to finish execution. In order suspend a flow run in this way, the flow needs to have an associated deployment and results need to be configured with the persist_result option.

Parameters:

Name Type Description Default
flow_run_id Optional[UUID]

a flow run id. If supplied, this function will attempt to suspend the specified flow run. If not supplied will attempt to suspend the current flow run.

None
timeout Optional[int]

the number of seconds to wait for the flow to be resumed before failing. Defaults to 1 hour (3600 seconds). If the pause timeout exceeds any configured flow-level timeout, the flow might fail even after resuming.

3600
key Optional[str]

An optional key to prevent calling suspend more than once. This defaults to a random string and prevents suspends from running the same suspend twice. A custom key can be supplied for custom suspending behavior.

None
wait_for_input Optional[Type[T]]

a subclass of RunInput or any type supported by Pydantic. If provided when the flow suspends, the flow will remain suspended until receiving the input before resuming. If the flow is resumed without providing the input, the flow will fail. If the flow is resumed with the input, the flow will resume and the input will be loaded and returned from this function.

None
Source code in src/prefect/flow_runs.py
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
@sync_compatible
@inject_client
async def suspend_flow_run(
    wait_for_input: Optional[Type[T]] = None,
    flow_run_id: Optional[UUID] = None,
    timeout: Optional[int] = 3600,
    key: Optional[str] = None,
    client: PrefectClient = None,
) -> Optional[T]:
    """
    Suspends a flow run by stopping code execution until resumed.

    When suspended, the flow run will continue execution until the NEXT task is
    orchestrated, at which point the flow will exit. Any tasks that have
    already started will run until completion. When resumed, the flow run will
    be rescheduled to finish execution. In order suspend a flow run in this
    way, the flow needs to have an associated deployment and results need to be
    configured with the `persist_result` option.

    Args:
        flow_run_id: a flow run id. If supplied, this function will attempt to
            suspend the specified flow run. If not supplied will attempt to
            suspend the current flow run.
        timeout: the number of seconds to wait for the flow to be resumed before
            failing. Defaults to 1 hour (3600 seconds). If the pause timeout
            exceeds any configured flow-level timeout, the flow might fail even
            after resuming.
        key: An optional key to prevent calling suspend more than once. This
            defaults to a random string and prevents suspends from running the
            same suspend twice. A custom key can be supplied for custom
            suspending behavior.
        wait_for_input: a subclass of `RunInput` or any type supported by
            Pydantic. If provided when the flow suspends, the flow will remain
            suspended until receiving the input before resuming. If the flow is
            resumed without providing the input, the flow will fail. If the flow is
            resumed with the input, the flow will resume and the input will be
            loaded and returned from this function.
    """
    context = FlowRunContext.get()

    if flow_run_id is None:
        if TaskRunContext.get():
            raise RuntimeError("Cannot suspend task runs.")

        if context is None or context.flow_run is None:
            raise RuntimeError(
                "Flow runs can only be suspended from within a flow run."
            )

        logger = get_run_logger(context=context)
        logger.info(
            "Suspending flow run, execution will be rescheduled when this flow run is"
            " resumed."
        )
        flow_run_id = context.flow_run.id
        suspending_current_flow_run = True
        pause_counter = _observed_flow_pauses(context)
        pause_key = key or str(pause_counter)
    else:
        # Since we're suspending another flow run we need to generate a pause
        # key that won't conflict with whatever suspends/pauses that flow may
        # have. Since this method won't be called during that flow run it's
        # okay that this is non-deterministic.
        suspending_current_flow_run = False
        pause_key = key or str(uuid4())

    proposed_state = Suspended(timeout_seconds=timeout, pause_key=pause_key)

    if wait_for_input:
        wait_for_input = run_input_subclass_from_type(wait_for_input)
        run_input_keyset = keyset_from_paused_state(proposed_state)
        proposed_state.state_details.run_input_keyset = run_input_keyset

    try:
        state = await propose_state(
            client=client,
            state=proposed_state,
            flow_run_id=flow_run_id,
        )
    except Abort as exc:
        # Aborted requests mean the suspension is not allowed
        raise RuntimeError(f"Flow run cannot be suspended: {exc}")

    if state.is_running():
        # The orchestrator rejected the suspended state which means that this
        # suspend has happened before and the flow run has been resumed.
        if wait_for_input:
            # The flow run wanted input, so we need to load it and return it
            # to the user.
            return await wait_for_input.load(run_input_keyset)
        return

    if not state.is_paused():
        # If we receive anything but a PAUSED state, we are unable to continue
        raise RuntimeError(
            f"Flow run cannot be suspended. Received unexpected state from API: {state}"
        )

    if wait_for_input:
        await wait_for_input.save(run_input_keyset)

    if suspending_current_flow_run:
        # Exit this process so the run can be resubmitted later
        raise Pause()

wait_for_flow_run(flow_run_id, timeout=10800, poll_interval=5, client=None, log_states=False) async

Waits for the prefect flow run to finish and returns the FlowRun

Parameters:

Name Type Description Default
flow_run_id UUID

The flow run ID for the flow run to wait for.

required
timeout Optional[int]

The wait timeout in seconds. Defaults to 10800 (3 hours).

10800
poll_interval int

The poll interval in seconds. Defaults to 5.

5

Returns:

Name Type Description
FlowRun FlowRun

The finished flow run.

Raises:

Type Description
FlowWaitTimeout

If flow run goes over the timeout.

Examples:

Create a flow run for a deployment and wait for it to finish: ```python import asyncio

from prefect.client.orchestration import get_client
from prefect.flow_runs import wait_for_flow_run

async def main():
    async with get_client() as client:
        flow_run = await client.create_flow_run_from_deployment(deployment_id="my-deployment-id")
        flow_run = await wait_for_flow_run(flow_run_id=flow_run.id)
        print(flow_run.state)

if __name__ == "__main__":
    asyncio.run(main())

```

Trigger multiple flow runs and wait for them to finish: ```python import asyncio

from prefect.client.orchestration import get_client
from prefect.flow_runs import wait_for_flow_run

async def main(num_runs: int):
    async with get_client() as client:
        flow_runs = [
            await client.create_flow_run_from_deployment(deployment_id="my-deployment-id")
            for _
            in range(num_runs)
        ]
        coros = [wait_for_flow_run(flow_run_id=flow_run.id) for flow_run in flow_runs]
        finished_flow_runs = await asyncio.gather(*coros)
        print([flow_run.state for flow_run in finished_flow_runs])

if __name__ == "__main__":
    asyncio.run(main(num_runs=10))

```
Source code in src/prefect/flow_runs.py
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
@inject_client
async def wait_for_flow_run(
    flow_run_id: UUID,
    timeout: Optional[int] = 10800,
    poll_interval: int = 5,
    client: Optional["PrefectClient"] = None,
    log_states: bool = False,
) -> FlowRun:
    """
    Waits for the prefect flow run to finish and returns the FlowRun

    Args:
        flow_run_id: The flow run ID for the flow run to wait for.
        timeout: The wait timeout in seconds. Defaults to 10800 (3 hours).
        poll_interval: The poll interval in seconds. Defaults to 5.

    Returns:
        FlowRun: The finished flow run.

    Raises:
        prefect.exceptions.FlowWaitTimeout: If flow run goes over the timeout.

    Examples:
        Create a flow run for a deployment and wait for it to finish:
            ```python
            import asyncio

            from prefect.client.orchestration import get_client
            from prefect.flow_runs import wait_for_flow_run

            async def main():
                async with get_client() as client:
                    flow_run = await client.create_flow_run_from_deployment(deployment_id="my-deployment-id")
                    flow_run = await wait_for_flow_run(flow_run_id=flow_run.id)
                    print(flow_run.state)

            if __name__ == "__main__":
                asyncio.run(main())

            ```

        Trigger multiple flow runs and wait for them to finish:
            ```python
            import asyncio

            from prefect.client.orchestration import get_client
            from prefect.flow_runs import wait_for_flow_run

            async def main(num_runs: int):
                async with get_client() as client:
                    flow_runs = [
                        await client.create_flow_run_from_deployment(deployment_id="my-deployment-id")
                        for _
                        in range(num_runs)
                    ]
                    coros = [wait_for_flow_run(flow_run_id=flow_run.id) for flow_run in flow_runs]
                    finished_flow_runs = await asyncio.gather(*coros)
                    print([flow_run.state for flow_run in finished_flow_runs])

            if __name__ == "__main__":
                asyncio.run(main(num_runs=10))

            ```
    """
    assert client is not None, "Client injection failed"
    logger = get_logger()
    with anyio.move_on_after(timeout):
        while True:
            flow_run = await client.read_flow_run(flow_run_id)
            flow_state = flow_run.state
            if log_states:
                logger.info(f"Flow run is in state {flow_run.state.name!r}")
            if flow_state and flow_state.is_final():
                return flow_run
            await anyio.sleep(poll_interval)
    raise FlowRunWaitTimeout(
        f"Flow run with ID {flow_run_id} exceeded watch timeout of {timeout} seconds"
    )