Skip to content

prefect.runner.submit

submit_to_runner(prefect_callable, parameters=None, retry_failed_submissions=True) async

Submit a callable in the background via the runner webserver one or more times.

Parameters:

Name Type Description Default
prefect_callable Union[Flow, Task]

the callable to run (only flows are supported for now, but eventually tasks)

required
parameters Optional[Union[Dict[str, Any], List[Dict[str, Any]]]]

keyword arguments to pass to the callable. May be a list of dictionaries where each dictionary represents a discrete invocation of the callable

None
retry_failed_submissions bool

Whether to retry failed submissions to the runner webserver.

True
Source code in src/prefect/runner/submit.py
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
@sync_compatible
async def submit_to_runner(
    prefect_callable: Union[Flow, Task],
    parameters: Optional[Union[Dict[str, Any], List[Dict[str, Any]]]] = None,
    retry_failed_submissions: bool = True,
) -> Union[FlowRun, List[FlowRun]]:
    """
    Submit a callable in the background via the runner webserver one or more times.

    Args:
        prefect_callable: the callable to run (only flows are supported for now, but eventually tasks)
        parameters: keyword arguments to pass to the callable. May be a list of dictionaries where
            each dictionary represents a discrete invocation of the callable
        retry_failed_submissions: Whether to retry failed submissions to the runner webserver.
    """
    if not isinstance(prefect_callable, (Flow, Task)):
        raise TypeError(
            "The `submit_to_runner` utility only supports submitting flows and tasks."
        )

    if not PREFECT_EXPERIMENTAL_ENABLE_EXTRA_RUNNER_ENDPOINTS.value():
        raise ValueError(
            "The `submit_to_runner` utility requires the `Runner` webserver to be"
            " running and built with extra endpoints enabled. To enable this, set the"
            " `PREFECT_EXPERIMENTAL_ENABLE_EXTRA_RUNNER_ENDPOINTS` setting to `True`."
        )

    parameters = parameters or {}
    if isinstance(parameters, List):
        return_single = False
    elif isinstance(parameters, dict):
        parameters = [parameters]
        return_single = True
    else:
        raise TypeError("Parameters must be a dictionary or a list of dictionaries.")

    submitted_runs = []
    unsubmitted_parameters = []

    for p in parameters:
        try:
            flow_run = await _submit_flow_to_runner(
                prefect_callable, p, retry_failed_submissions
            )
            if inspect.isawaitable(flow_run):
                flow_run = await flow_run
            submitted_runs.append(flow_run)
        except httpx.ConnectError as exc:
            raise RuntimeError(
                "Failed to connect to the `Runner` webserver. Ensure that the server is"
                " running and reachable. You can run the webserver either by starting"
                " your `serve` process with `webserver=True`, or by setting"
                " `PREFECT_RUNNER_SERVER_ENABLE=True`."
            ) from exc
        except httpx.HTTPStatusError as exc:
            if exc.response.status_code == 429:
                unsubmitted_parameters.append(p)
            else:
                raise exc

    if unsubmitted_parameters:
        logger.warning(
            f"Failed to submit {len(unsubmitted_parameters)} runs to the runner, as all"
            f" of the available {PREFECT_RUNNER_PROCESS_LIMIT.value()} slots were"
            " occupied. To increase the number of available slots, configure"
            " the`PREFECT_RUNNER_PROCESS_LIMIT` setting."
        )

    # If one run was submitted, return the corresponding FlowRun directly
    if return_single:
        return submitted_runs[0]
    return submitted_runs

wait_for_submitted_runs(flow_run_filter=None, task_run_filter=None, timeout=None, poll_interval=3.0) async

Wait for completion of any provided flow runs (eventually task runs), as well as subflow runs of the current flow run (if called from within a flow run and subflow runs exist).

Parameters:

Name Type Description Default
flow_run_filter Optional[FlowRunFilter]

A filter to apply to the flow runs to wait for.

None
task_run_filter Optional[TaskRunFilter]

A filter to apply to the task runs to wait for. # TODO: /task/run

None
timeout Optional[float]

How long to wait for completion of all runs (seconds).

None
poll_interval float

How long to wait between polling each run's state (seconds).

3.0
Source code in src/prefect/runner/submit.py
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
@sync_compatible
async def wait_for_submitted_runs(
    flow_run_filter: Optional[FlowRunFilter] = None,
    task_run_filter: Optional[TaskRunFilter] = None,
    timeout: Optional[float] = None,
    poll_interval: float = 3.0,
):
    """
    Wait for completion of any provided flow runs (eventually task runs), as well as subflow runs
    of the current flow run (if called from within a flow run and subflow runs exist).

    Args:
        flow_run_filter: A filter to apply to the flow runs to wait for.
        task_run_filter: A filter to apply to the task runs to wait for. # TODO: /task/run
        timeout: How long to wait for completion of all runs (seconds).
        poll_interval: How long to wait between polling each run's state (seconds).
    """

    parent_flow_run_id = ctx.flow_run.id if (ctx := FlowRunContext.get()) else None

    if task_run_filter:
        raise NotImplementedError("Waiting for task runs is not yet supported.")

    async def wait_for_final_state(
        run_type: Literal["flow", "task"], run_id: uuid.UUID
    ):
        read_run_method = getattr(client, f"read_{run_type}_run")
        while True:
            run = await read_run_method(run_id)
            if run.state and run.state.is_final():
                return run_id
            await anyio.sleep(poll_interval)

    async with get_client() as client:
        with anyio.move_on_after(timeout):
            flow_runs_to_wait_for = (
                await client.read_flow_runs(flow_run_filter=flow_run_filter)
                if flow_run_filter
                else []
            )

            if parent_flow_run_id is not None:
                subflow_runs = await client.read_flow_runs(
                    flow_run_filter=FlowRunFilter(
                        parent_flow_run_id=dict(any_=[parent_flow_run_id])
                    )
                )

                flow_runs_to_wait_for.extend(subflow_runs)

            await asyncio.gather(
                *(wait_for_final_state("flow", run.id) for run in flow_runs_to_wait_for)
            )