Skip to content

prefect.task_runners

PrefectTaskRunner

Bases: TaskRunner[PrefectDistributedFuture]

Source code in src/prefect/task_runners.py
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
class PrefectTaskRunner(TaskRunner[PrefectDistributedFuture]):
    def __init__(self):
        super().__init__()

    def duplicate(self) -> "PrefectTaskRunner":
        return type(self)()

    @overload
    def submit(
        self,
        task: "Task[P, Coroutine[Any, Any, R]]",
        parameters: Dict[str, Any],
        wait_for: Optional[Iterable[PrefectFuture]] = None,
        dependencies: Optional[Dict[str, Set[TaskRunInput]]] = None,
    ) -> PrefectDistributedFuture[R]:
        ...

    @overload
    def submit(
        self,
        task: "Task[Any, R]",
        parameters: Dict[str, Any],
        wait_for: Optional[Iterable[PrefectFuture]] = None,
        dependencies: Optional[Dict[str, Set[TaskRunInput]]] = None,
    ) -> PrefectDistributedFuture[R]:
        ...

    def submit(
        self,
        task: "Task",
        parameters: Dict[str, Any],
        wait_for: Optional[Iterable[PrefectFuture]] = None,
        dependencies: Optional[Dict[str, Set[TaskRunInput]]] = None,
    ):
        """
        Submit a task to the task run engine running in a separate thread.

        Args:
            task: The task to submit.
            parameters: The parameters to use when running the task.
            wait_for: A list of futures that the task depends on.

        Returns:
            A future object that can be used to wait for the task to complete and
            retrieve the result.
        """
        if not self._started:
            raise RuntimeError("Task runner is not started")
        from prefect.context import FlowRunContext

        flow_run_ctx = FlowRunContext.get()
        if flow_run_ctx:
            get_run_logger(flow_run_ctx).info(
                f"Submitting task {task.name} to for execution by a Prefect task worker..."
            )
        else:
            self.logger.info(
                f"Submitting task {task.name} to for execution by a Prefect task worker..."
            )

        return task.apply_async(
            kwargs=parameters, wait_for=wait_for, dependencies=dependencies
        )

    @overload
    def map(
        self,
        task: "Task[P, Coroutine[Any, Any, R]]",
        parameters: Dict[str, Any],
        wait_for: Optional[Iterable[PrefectFuture]] = None,
    ) -> PrefectFutureList[PrefectDistributedFuture[R]]:
        ...

    @overload
    def map(
        self,
        task: "Task[Any, R]",
        parameters: Dict[str, Any],
        wait_for: Optional[Iterable[PrefectFuture]] = None,
    ) -> PrefectFutureList[PrefectDistributedFuture[R]]:
        ...

    def map(
        self,
        task: "Task",
        parameters: Dict[str, Any],
        wait_for: Optional[Iterable[PrefectFuture]] = None,
    ):
        return super().map(task, parameters, wait_for)

submit(task, parameters, wait_for=None, dependencies=None)

submit(task: Task[P, Coroutine[Any, Any, R]], parameters: Dict[str, Any], wait_for: Optional[Iterable[PrefectFuture]] = None, dependencies: Optional[Dict[str, Set[TaskRunInput]]] = None) -> PrefectDistributedFuture[R]
submit(task: Task[Any, R], parameters: Dict[str, Any], wait_for: Optional[Iterable[PrefectFuture]] = None, dependencies: Optional[Dict[str, Set[TaskRunInput]]] = None) -> PrefectDistributedFuture[R]

Submit a task to the task run engine running in a separate thread.

Parameters:

Name Type Description Default
task Task

The task to submit.

required
parameters Dict[str, Any]

The parameters to use when running the task.

required
wait_for Optional[Iterable[PrefectFuture]]

A list of futures that the task depends on.

None

Returns:

Type Description

A future object that can be used to wait for the task to complete and

retrieve the result.

Source code in src/prefect/task_runners.py
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
def submit(
    self,
    task: "Task",
    parameters: Dict[str, Any],
    wait_for: Optional[Iterable[PrefectFuture]] = None,
    dependencies: Optional[Dict[str, Set[TaskRunInput]]] = None,
):
    """
    Submit a task to the task run engine running in a separate thread.

    Args:
        task: The task to submit.
        parameters: The parameters to use when running the task.
        wait_for: A list of futures that the task depends on.

    Returns:
        A future object that can be used to wait for the task to complete and
        retrieve the result.
    """
    if not self._started:
        raise RuntimeError("Task runner is not started")
    from prefect.context import FlowRunContext

    flow_run_ctx = FlowRunContext.get()
    if flow_run_ctx:
        get_run_logger(flow_run_ctx).info(
            f"Submitting task {task.name} to for execution by a Prefect task worker..."
        )
    else:
        self.logger.info(
            f"Submitting task {task.name} to for execution by a Prefect task worker..."
        )

    return task.apply_async(
        kwargs=parameters, wait_for=wait_for, dependencies=dependencies
    )

TaskRunner

Bases: ABC, Generic[F]

Abstract base class for task runners.

A task runner is responsible for submitting tasks to the task run engine running in an execution environment. Submitted tasks are non-blocking and return a future object that can be used to wait for the task to complete and retrieve the result.

Task runners are context managers and should be used in a with block to ensure proper cleanup of resources.

Source code in src/prefect/task_runners.py
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
class TaskRunner(abc.ABC, Generic[F]):
    """
    Abstract base class for task runners.

    A task runner is responsible for submitting tasks to the task run engine running
    in an execution environment. Submitted tasks are non-blocking and return a future
    object that can be used to wait for the task to complete and retrieve the result.

    Task runners are context managers and should be used in a `with` block to ensure
    proper cleanup of resources.
    """

    def __init__(self):
        self.logger = get_logger(f"task_runner.{self.name}")
        self._started = False

    @property
    def name(self):
        """The name of this task runner"""
        return type(self).__name__.lower().replace("taskrunner", "")

    @abc.abstractmethod
    def duplicate(self) -> Self:
        """Return a new instance of this task runner with the same configuration."""
        ...

    @abc.abstractmethod
    def submit(
        self,
        task: "Task",
        parameters: Dict[str, Any],
        wait_for: Optional[Iterable[PrefectFuture]] = None,
        dependencies: Optional[Dict[str, Set[TaskRunInput]]] = None,
    ) -> F:
        """
        Submit a task to the task run engine.

        Args:
            task: The task to submit.
            parameters: The parameters to use when running the task.
            wait_for: A list of futures that the task depends on.

        Returns:
            A future object that can be used to wait for the task to complete and
            retrieve the result.
        """
        ...

    def map(
        self,
        task: "Task",
        parameters: Dict[str, Any],
        wait_for: Optional[Iterable[PrefectFuture]] = None,
    ) -> PrefectFutureList[F]:
        """
        Submit multiple tasks to the task run engine.

        Args:
            task: The task to submit.
            parameters: The parameters to use when running the task.
            wait_for: A list of futures that the task depends on.

        Returns:
            An iterable of future objects that can be used to wait for the tasks to
            complete and retrieve the results.
        """
        if not self._started:
            raise RuntimeError(
                "The task runner must be started before submitting work."
            )

        from prefect.utilities.engine import (
            collect_task_run_inputs_sync,
            resolve_inputs_sync,
        )

        # We need to resolve some futures to map over their data, collect the upstream
        # links beforehand to retain relationship tracking.
        task_inputs = {
            k: collect_task_run_inputs_sync(v, max_depth=0)
            for k, v in parameters.items()
        }

        # Resolve the top-level parameters in order to get mappable data of a known length.
        # Nested parameters will be resolved in each mapped child where their relationships
        # will also be tracked.
        parameters = resolve_inputs_sync(parameters, max_depth=0)

        # Ensure that any parameters in kwargs are expanded before this check
        parameters = explode_variadic_parameter(task.fn, parameters)

        iterable_parameters = {}
        static_parameters = {}
        annotated_parameters = {}
        for key, val in parameters.items():
            if isinstance(val, (allow_failure, quote)):
                # Unwrap annotated parameters to determine if they are iterable
                annotated_parameters[key] = val
                val = val.unwrap()

            if isinstance(val, unmapped):
                static_parameters[key] = val.value
            elif isiterable(val):
                iterable_parameters[key] = list(val)
            else:
                static_parameters[key] = val

        if not len(iterable_parameters):
            raise MappingMissingIterable(
                "No iterable parameters were received. Parameters for map must "
                f"include at least one iterable. Parameters: {parameters}"
            )

        iterable_parameter_lengths = {
            key: len(val) for key, val in iterable_parameters.items()
        }
        lengths = set(iterable_parameter_lengths.values())
        if len(lengths) > 1:
            raise MappingLengthMismatch(
                "Received iterable parameters with different lengths. Parameters for map"
                f" must all be the same length. Got lengths: {iterable_parameter_lengths}"
            )

        map_length = list(lengths)[0]

        futures: List[PrefectFuture] = []
        for i in range(map_length):
            call_parameters = {
                key: value[i] for key, value in iterable_parameters.items()
            }
            call_parameters.update(
                {key: value for key, value in static_parameters.items()}
            )

            # Add default values for parameters; these are skipped earlier since they should
            # not be mapped over
            for key, value in get_parameter_defaults(task.fn).items():
                call_parameters.setdefault(key, value)

            # Re-apply annotations to each key again
            for key, annotation in annotated_parameters.items():
                call_parameters[key] = annotation.rewrap(call_parameters[key])

            # Collapse any previously exploded kwargs
            call_parameters = collapse_variadic_parameters(task.fn, call_parameters)

            futures.append(
                self.submit(
                    task=task,
                    parameters=call_parameters,
                    wait_for=wait_for,
                    dependencies=task_inputs,
                )
            )

        return PrefectFutureList(futures)

    def __enter__(self):
        if self._started:
            raise RuntimeError("This task runner is already started")

        self.logger.debug("Starting task runner")
        self._started = True
        return self

    def __exit__(self, exc_type, exc_value, traceback):
        self.logger.debug("Stopping task runner")
        self._started = False

name property

The name of this task runner

duplicate() abstractmethod

Return a new instance of this task runner with the same configuration.

Source code in src/prefect/task_runners.py
71
72
73
74
@abc.abstractmethod
def duplicate(self) -> Self:
    """Return a new instance of this task runner with the same configuration."""
    ...

map(task, parameters, wait_for=None)

Submit multiple tasks to the task run engine.

Parameters:

Name Type Description Default
task Task

The task to submit.

required
parameters Dict[str, Any]

The parameters to use when running the task.

required
wait_for Optional[Iterable[PrefectFuture]]

A list of futures that the task depends on.

None

Returns:

Type Description
PrefectFutureList[F]

An iterable of future objects that can be used to wait for the tasks to

PrefectFutureList[F]

complete and retrieve the results.

Source code in src/prefect/task_runners.py
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
def map(
    self,
    task: "Task",
    parameters: Dict[str, Any],
    wait_for: Optional[Iterable[PrefectFuture]] = None,
) -> PrefectFutureList[F]:
    """
    Submit multiple tasks to the task run engine.

    Args:
        task: The task to submit.
        parameters: The parameters to use when running the task.
        wait_for: A list of futures that the task depends on.

    Returns:
        An iterable of future objects that can be used to wait for the tasks to
        complete and retrieve the results.
    """
    if not self._started:
        raise RuntimeError(
            "The task runner must be started before submitting work."
        )

    from prefect.utilities.engine import (
        collect_task_run_inputs_sync,
        resolve_inputs_sync,
    )

    # We need to resolve some futures to map over their data, collect the upstream
    # links beforehand to retain relationship tracking.
    task_inputs = {
        k: collect_task_run_inputs_sync(v, max_depth=0)
        for k, v in parameters.items()
    }

    # Resolve the top-level parameters in order to get mappable data of a known length.
    # Nested parameters will be resolved in each mapped child where their relationships
    # will also be tracked.
    parameters = resolve_inputs_sync(parameters, max_depth=0)

    # Ensure that any parameters in kwargs are expanded before this check
    parameters = explode_variadic_parameter(task.fn, parameters)

    iterable_parameters = {}
    static_parameters = {}
    annotated_parameters = {}
    for key, val in parameters.items():
        if isinstance(val, (allow_failure, quote)):
            # Unwrap annotated parameters to determine if they are iterable
            annotated_parameters[key] = val
            val = val.unwrap()

        if isinstance(val, unmapped):
            static_parameters[key] = val.value
        elif isiterable(val):
            iterable_parameters[key] = list(val)
        else:
            static_parameters[key] = val

    if not len(iterable_parameters):
        raise MappingMissingIterable(
            "No iterable parameters were received. Parameters for map must "
            f"include at least one iterable. Parameters: {parameters}"
        )

    iterable_parameter_lengths = {
        key: len(val) for key, val in iterable_parameters.items()
    }
    lengths = set(iterable_parameter_lengths.values())
    if len(lengths) > 1:
        raise MappingLengthMismatch(
            "Received iterable parameters with different lengths. Parameters for map"
            f" must all be the same length. Got lengths: {iterable_parameter_lengths}"
        )

    map_length = list(lengths)[0]

    futures: List[PrefectFuture] = []
    for i in range(map_length):
        call_parameters = {
            key: value[i] for key, value in iterable_parameters.items()
        }
        call_parameters.update(
            {key: value for key, value in static_parameters.items()}
        )

        # Add default values for parameters; these are skipped earlier since they should
        # not be mapped over
        for key, value in get_parameter_defaults(task.fn).items():
            call_parameters.setdefault(key, value)

        # Re-apply annotations to each key again
        for key, annotation in annotated_parameters.items():
            call_parameters[key] = annotation.rewrap(call_parameters[key])

        # Collapse any previously exploded kwargs
        call_parameters = collapse_variadic_parameters(task.fn, call_parameters)

        futures.append(
            self.submit(
                task=task,
                parameters=call_parameters,
                wait_for=wait_for,
                dependencies=task_inputs,
            )
        )

    return PrefectFutureList(futures)

submit(task, parameters, wait_for=None, dependencies=None) abstractmethod

Submit a task to the task run engine.

Parameters:

Name Type Description Default
task Task

The task to submit.

required
parameters Dict[str, Any]

The parameters to use when running the task.

required
wait_for Optional[Iterable[PrefectFuture]]

A list of futures that the task depends on.

None

Returns:

Type Description
F

A future object that can be used to wait for the task to complete and

F

retrieve the result.

Source code in src/prefect/task_runners.py
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
@abc.abstractmethod
def submit(
    self,
    task: "Task",
    parameters: Dict[str, Any],
    wait_for: Optional[Iterable[PrefectFuture]] = None,
    dependencies: Optional[Dict[str, Set[TaskRunInput]]] = None,
) -> F:
    """
    Submit a task to the task run engine.

    Args:
        task: The task to submit.
        parameters: The parameters to use when running the task.
        wait_for: A list of futures that the task depends on.

    Returns:
        A future object that can be used to wait for the task to complete and
        retrieve the result.
    """
    ...

ThreadPoolTaskRunner

Bases: TaskRunner[PrefectConcurrentFuture]

Source code in src/prefect/task_runners.py
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
class ThreadPoolTaskRunner(TaskRunner[PrefectConcurrentFuture]):
    def __init__(self, max_workers: Optional[int] = None):
        super().__init__()
        self._executor: Optional[ThreadPoolExecutor] = None
        self._max_workers = (
            (PREFECT_TASK_RUNNER_THREAD_POOL_MAX_WORKERS.value() or sys.maxsize)
            if max_workers is None
            else max_workers
        )
        self._cancel_events: Dict[uuid.UUID, threading.Event] = {}

    def duplicate(self) -> "ThreadPoolTaskRunner":
        return type(self)(max_workers=self._max_workers)

    @overload
    def submit(
        self,
        task: "Task[P, Coroutine[Any, Any, R]]",
        parameters: Dict[str, Any],
        wait_for: Optional[Iterable[PrefectFuture]] = None,
        dependencies: Optional[Dict[str, Set[TaskRunInput]]] = None,
    ) -> PrefectConcurrentFuture[R]:
        ...

    @overload
    def submit(
        self,
        task: "Task[Any, R]",
        parameters: Dict[str, Any],
        wait_for: Optional[Iterable[PrefectFuture]] = None,
        dependencies: Optional[Dict[str, Set[TaskRunInput]]] = None,
    ) -> PrefectConcurrentFuture[R]:
        ...

    def submit(
        self,
        task: "Task",
        parameters: Dict[str, Any],
        wait_for: Optional[Iterable[PrefectFuture]] = None,
        dependencies: Optional[Dict[str, Set[TaskRunInput]]] = None,
    ):
        """
        Submit a task to the task run engine running in a separate thread.

        Args:
            task: The task to submit.
            parameters: The parameters to use when running the task.
            wait_for: A list of futures that the task depends on.

        Returns:
            A future object that can be used to wait for the task to complete and
            retrieve the result.
        """
        if not self._started or self._executor is None:
            raise RuntimeError("Task runner is not started")

        from prefect.context import FlowRunContext
        from prefect.task_engine import run_task_async, run_task_sync

        task_run_id = uuid.uuid4()
        cancel_event = threading.Event()
        self._cancel_events[task_run_id] = cancel_event
        context = copy_context()

        flow_run_ctx = FlowRunContext.get()
        if flow_run_ctx:
            get_run_logger(flow_run_ctx).debug(
                f"Submitting task {task.name} to thread pool executor..."
            )
        else:
            self.logger.debug(f"Submitting task {task.name} to thread pool executor...")

        submit_kwargs = dict(
            task=task,
            task_run_id=task_run_id,
            parameters=parameters,
            wait_for=wait_for,
            return_type="state",
            dependencies=dependencies,
            context=dict(cancel_event=cancel_event),
        )

        if task.isasync:
            # TODO: Explore possibly using a long-lived thread with an event loop
            # for better performance
            future = self._executor.submit(
                context.run,
                asyncio.run,
                run_task_async(**submit_kwargs),
            )
        else:
            future = self._executor.submit(
                context.run,
                run_task_sync,
                **submit_kwargs,
            )
        prefect_future = PrefectConcurrentFuture(
            task_run_id=task_run_id, wrapped_future=future
        )
        return prefect_future

    @overload
    def map(
        self,
        task: "Task[P, Coroutine[Any, Any, R]]",
        parameters: Dict[str, Any],
        wait_for: Optional[Iterable[PrefectFuture]] = None,
    ) -> PrefectFutureList[PrefectConcurrentFuture[R]]:
        ...

    @overload
    def map(
        self,
        task: "Task[Any, R]",
        parameters: Dict[str, Any],
        wait_for: Optional[Iterable[PrefectFuture]] = None,
    ) -> PrefectFutureList[PrefectConcurrentFuture[R]]:
        ...

    def map(
        self,
        task: "Task",
        parameters: Dict[str, Any],
        wait_for: Optional[Iterable[PrefectFuture]] = None,
    ):
        return super().map(task, parameters, wait_for)

    def cancel_all(self):
        for event in self._cancel_events.values():
            event.set()
            self.logger.debug("Set cancel event")

        if self._executor is not None:
            self._executor.shutdown(cancel_futures=True)
            self._executor = None

    def __enter__(self):
        super().__enter__()
        self._executor = ThreadPoolExecutor(max_workers=self._max_workers)
        return self

    def __exit__(self, exc_type, exc_value, traceback):
        self.cancel_all()
        if self._executor is not None:
            self._executor.shutdown(cancel_futures=True)
            self._executor = None
        super().__exit__(exc_type, exc_value, traceback)

    def __eq__(self, value: object) -> bool:
        if not isinstance(value, ThreadPoolTaskRunner):
            return False
        return self._max_workers == value._max_workers

submit(task, parameters, wait_for=None, dependencies=None)

submit(task: Task[P, Coroutine[Any, Any, R]], parameters: Dict[str, Any], wait_for: Optional[Iterable[PrefectFuture]] = None, dependencies: Optional[Dict[str, Set[TaskRunInput]]] = None) -> PrefectConcurrentFuture[R]
submit(task: Task[Any, R], parameters: Dict[str, Any], wait_for: Optional[Iterable[PrefectFuture]] = None, dependencies: Optional[Dict[str, Set[TaskRunInput]]] = None) -> PrefectConcurrentFuture[R]

Submit a task to the task run engine running in a separate thread.

Parameters:

Name Type Description Default
task Task

The task to submit.

required
parameters Dict[str, Any]

The parameters to use when running the task.

required
wait_for Optional[Iterable[PrefectFuture]]

A list of futures that the task depends on.

None

Returns:

Type Description

A future object that can be used to wait for the task to complete and

retrieve the result.

Source code in src/prefect/task_runners.py
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
def submit(
    self,
    task: "Task",
    parameters: Dict[str, Any],
    wait_for: Optional[Iterable[PrefectFuture]] = None,
    dependencies: Optional[Dict[str, Set[TaskRunInput]]] = None,
):
    """
    Submit a task to the task run engine running in a separate thread.

    Args:
        task: The task to submit.
        parameters: The parameters to use when running the task.
        wait_for: A list of futures that the task depends on.

    Returns:
        A future object that can be used to wait for the task to complete and
        retrieve the result.
    """
    if not self._started or self._executor is None:
        raise RuntimeError("Task runner is not started")

    from prefect.context import FlowRunContext
    from prefect.task_engine import run_task_async, run_task_sync

    task_run_id = uuid.uuid4()
    cancel_event = threading.Event()
    self._cancel_events[task_run_id] = cancel_event
    context = copy_context()

    flow_run_ctx = FlowRunContext.get()
    if flow_run_ctx:
        get_run_logger(flow_run_ctx).debug(
            f"Submitting task {task.name} to thread pool executor..."
        )
    else:
        self.logger.debug(f"Submitting task {task.name} to thread pool executor...")

    submit_kwargs = dict(
        task=task,
        task_run_id=task_run_id,
        parameters=parameters,
        wait_for=wait_for,
        return_type="state",
        dependencies=dependencies,
        context=dict(cancel_event=cancel_event),
    )

    if task.isasync:
        # TODO: Explore possibly using a long-lived thread with an event loop
        # for better performance
        future = self._executor.submit(
            context.run,
            asyncio.run,
            run_task_async(**submit_kwargs),
        )
    else:
        future = self._executor.submit(
            context.run,
            run_task_sync,
            **submit_kwargs,
        )
    prefect_future = PrefectConcurrentFuture(
        task_run_id=task_run_id, wrapped_future=future
    )
    return prefect_future