Skip to content

prefect.utilities.asyncutils

Utilities for interoperability with async functions and workers from various contexts.

GatherIncomplete

Bases: RuntimeError

Used to indicate retrieving gather results before completion

Source code in src/prefect/utilities/asyncutils.py
481
482
class GatherIncomplete(RuntimeError):
    """Used to indicate retrieving gather results before completion"""

GatherTaskGroup

Bases: TaskGroup

A task group that gathers results.

AnyIO does not include support gather. This class extends the TaskGroup interface to allow simple gathering.

See https://github.com/agronholm/anyio/issues/100

This class should be instantiated with create_gather_task_group.

Source code in src/prefect/utilities/asyncutils.py
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
class GatherTaskGroup(anyio.abc.TaskGroup):
    """
    A task group that gathers results.

    AnyIO does not include support `gather`. This class extends the `TaskGroup`
    interface to allow simple gathering.

    See https://github.com/agronholm/anyio/issues/100

    This class should be instantiated with `create_gather_task_group`.
    """

    def __init__(self, task_group: anyio.abc.TaskGroup):
        self._results: Dict[UUID, Any] = {}
        # The concrete task group implementation to use
        self._task_group: anyio.abc.TaskGroup = task_group

    async def _run_and_store(self, key, fn, args):
        self._results[key] = await fn(*args)

    def start_soon(self, fn, *args) -> UUID:
        key = uuid4()
        # Put a placeholder in-case the result is retrieved earlier
        self._results[key] = GatherIncomplete
        self._task_group.start_soon(self._run_and_store, key, fn, args)
        return key

    async def start(self, fn, *args):
        """
        Since `start` returns the result of `task_status.started()` but here we must
        return the key instead, we just won't support this method for now.
        """
        raise RuntimeError("`GatherTaskGroup` does not support `start`.")

    def get_result(self, key: UUID) -> Any:
        result = self._results[key]
        if result is GatherIncomplete:
            raise GatherIncomplete(
                "Task is not complete. "
                "Results should not be retrieved until the task group exits."
            )
        return result

    async def __aenter__(self):
        await self._task_group.__aenter__()
        return self

    async def __aexit__(self, *tb):
        try:
            retval = await self._task_group.__aexit__(*tb)
            return retval
        finally:
            del self._task_group

start(fn, *args) async

Since start returns the result of task_status.started() but here we must return the key instead, we just won't support this method for now.

Source code in src/prefect/utilities/asyncutils.py
512
513
514
515
516
517
async def start(self, fn, *args):
    """
    Since `start` returns the result of `task_status.started()` but here we must
    return the key instead, we just won't support this method for now.
    """
    raise RuntimeError("`GatherTaskGroup` does not support `start`.")

add_event_loop_shutdown_callback(coroutine_fn) async

Adds a callback to the given callable on event loop closure. The callable must be a coroutine function. It will be awaited when the current event loop is shutting down.

Requires use of asyncio.run() which waits for async generator shutdown by default or explicit call of asyncio.shutdown_asyncgens(). If the application is entered with asyncio.run_until_complete() and the user calls asyncio.close() without the generator shutdown call, this will not trigger callbacks.

asyncio does not provided any other way to clean up a resource when the event loop is about to close.

Source code in src/prefect/utilities/asyncutils.py
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
async def add_event_loop_shutdown_callback(coroutine_fn: Callable[[], Awaitable]):
    """
    Adds a callback to the given callable on event loop closure. The callable must be
    a coroutine function. It will be awaited when the current event loop is shutting
    down.

    Requires use of `asyncio.run()` which waits for async generator shutdown by
    default or explicit call of `asyncio.shutdown_asyncgens()`. If the application
    is entered with `asyncio.run_until_complete()` and the user calls
    `asyncio.close()` without the generator shutdown call, this will not trigger
    callbacks.

    asyncio does not provided _any_ other way to clean up a resource when the event
    loop is about to close.
    """

    async def on_shutdown(key):
        # It appears that EVENT_LOOP_GC_REFS is somehow being garbage collected early.
        # We hold a reference to it so as to preserve it, at least for the lifetime of
        # this coroutine. See the issue below for the initial report/discussion:
        # https://github.com/PrefectHQ/prefect/issues/7709#issuecomment-1560021109
        _ = EVENT_LOOP_GC_REFS
        try:
            yield
        except GeneratorExit:
            await coroutine_fn()
            # Remove self from the garbage collection set
            EVENT_LOOP_GC_REFS.pop(key)

    # Create the iterator and store it in a global variable so it is not garbage
    # collected. If the iterator is garbage collected before the event loop closes, the
    # callback will not run. Since this function does not know the scope of the event
    # loop that is calling it, a reference with global scope is necessary to ensure
    # garbage collection does not occur until after event loop closure.
    key = id(on_shutdown)
    EVENT_LOOP_GC_REFS[key] = on_shutdown(key)

    # Begin iterating so it will be cleaned up as an incomplete generator
    try:
        await EVENT_LOOP_GC_REFS[key].__anext__()
    # There is a poorly understood edge case we've seen in CI where the key is
    # removed from the dict before we begin generator iteration.
    except KeyError:
        logger.warning("The event loop shutdown callback was not properly registered. ")
        pass

create_gather_task_group()

Create a new task group that gathers results

Source code in src/prefect/utilities/asyncutils.py
540
541
542
543
544
545
def create_gather_task_group() -> GatherTaskGroup:
    """Create a new task group that gathers results"""
    # This function matches the AnyIO API which uses callables since the concrete
    # task group class depends on the async library being used and cannot be
    # determined until runtime
    return GatherTaskGroup(anyio.create_task_group())

create_task(coroutine)

Replacement for asyncio.create_task that will ensure that tasks aren't garbage collected before they complete. Allows for "fire and forget" behavior in which tasks can be created and the application can move on. Tasks can also be awaited normally.

See https://docs.python.org/3/library/asyncio-task.html#asyncio.create_task for details (and essentially this implementation)

Source code in src/prefect/utilities/asyncutils.py
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
def create_task(coroutine: Coroutine) -> asyncio.Task:
    """
    Replacement for asyncio.create_task that will ensure that tasks aren't
    garbage collected before they complete. Allows for "fire and forget"
    behavior in which tasks can be created and the application can move on.
    Tasks can also be awaited normally.

    See https://docs.python.org/3/library/asyncio-task.html#asyncio.create_task
    for details (and essentially this implementation)
    """

    task = asyncio.create_task(coroutine)

    # Add task to the set. This creates a strong reference.
    # Take a lock because this might be done from multiple threads.
    with background_task_lock:
        BACKGROUND_TASKS.add(task)

    # To prevent keeping references to finished tasks forever,
    # make each task remove its own reference from the set after
    # completion:
    task.add_done_callback(BACKGROUND_TASKS.discard)

    return task

gather(*calls) async

Run calls concurrently and gather their results.

Unlike asyncio.gather this expects to receive callables not coroutines. This matches anyio semantics.

Source code in src/prefect/utilities/asyncutils.py
548
549
550
551
552
553
554
555
556
557
558
559
async def gather(*calls: Callable[[], Coroutine[Any, Any, T]]) -> List[T]:
    """
    Run calls concurrently and gather their results.

    Unlike `asyncio.gather` this expects to receive _callables_ not _coroutines_.
    This matches `anyio` semantics.
    """
    keys = []
    async with create_gather_task_group() as tg:
        for call in calls:
            keys.append(tg.start_soon(call))
    return [tg.get_result(key) for key in keys]

is_async_fn(func)

Returns True if a function returns a coroutine.

See https://github.com/microsoft/pyright/issues/2142 for an example use

Source code in src/prefect/utilities/asyncutils.py
75
76
77
78
79
80
81
82
83
84
85
86
def is_async_fn(
    func: Union[Callable[P, R], Callable[P, Awaitable[R]]],
) -> TypeGuard[Callable[P, Awaitable[R]]]:
    """
    Returns `True` if a function returns a coroutine.

    See https://github.com/microsoft/pyright/issues/2142 for an example use
    """
    while hasattr(func, "__wrapped__"):
        func = func.__wrapped__

    return inspect.iscoroutinefunction(func)

is_async_gen_fn(func)

Returns True if a function is an async generator.

Source code in src/prefect/utilities/asyncutils.py
89
90
91
92
93
94
95
96
def is_async_gen_fn(func):
    """
    Returns `True` if a function is an async generator.
    """
    while hasattr(func, "__wrapped__"):
        func = func.__wrapped__

    return inspect.isasyncgenfunction(func)

run_async_from_worker_thread(__fn, *args, **kwargs)

Runs an async function in the main thread's event loop, blocking the worker thread until completion

Source code in src/prefect/utilities/asyncutils.py
282
283
284
285
286
287
288
289
290
def run_async_from_worker_thread(
    __fn: Callable[..., Awaitable[T]], *args: Any, **kwargs: Any
) -> T:
    """
    Runs an async function in the main thread's event loop, blocking the worker
    thread until completion
    """
    call = partial(__fn, *args, **kwargs)
    return anyio.from_thread.run(call)

run_coro_as_sync(coroutine, force_new_thread=False, wait_for_result=True)

Runs a coroutine from a synchronous context, as if it were a synchronous function.

The coroutine is scheduled to run in the "run sync" event loop, which is running in its own thread and is started the first time it is needed. This allows us to share objects like async httpx clients among all coroutines running in the loop.

If run_sync is called from within the run_sync loop, it will run the coroutine in a new thread, because otherwise a deadlock would occur. Note that this behavior should not appear anywhere in the Prefect codebase or in user code.

Parameters:

Name Type Description Default
coroutine Awaitable

The coroutine to be run as a synchronous function.

required
force_new_thread bool

If True, the coroutine will always be run in a new thread. Defaults to False.

False
wait_for_result bool

If True, the function will wait for the coroutine to complete and return the result. If False, the function will submit the coroutine to the "run sync" event loop and return immediately, where it will eventually be run. Defaults to True.

True

Returns:

Type Description
Union[R, None]

The result of the coroutine if wait_for_result is True, otherwise None.

Source code in src/prefect/utilities/asyncutils.py
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
def run_coro_as_sync(
    coroutine: Awaitable[R],
    force_new_thread: bool = False,
    wait_for_result: bool = True,
) -> Union[R, None]:
    """
    Runs a coroutine from a synchronous context, as if it were a synchronous
    function.

    The coroutine is scheduled to run in the "run sync" event loop, which is
    running in its own thread and is started the first time it is needed. This
    allows us to share objects like async httpx clients among all coroutines
    running in the loop.

    If run_sync is called from within the run_sync loop, it will run the
    coroutine in a new thread, because otherwise a deadlock would occur. Note
    that this behavior should not appear anywhere in the Prefect codebase or in
    user code.

    Args:
        coroutine (Awaitable): The coroutine to be run as a synchronous function.
        force_new_thread (bool, optional): If True, the coroutine will always be run in a new thread.
            Defaults to False.
        wait_for_result (bool, optional): If True, the function will wait for the coroutine to complete
            and return the result. If False, the function will submit the coroutine to the "run sync"
            event loop and return immediately, where it will eventually be run. Defaults to True.

    Returns:
        The result of the coroutine if wait_for_result is True, otherwise None.
    """

    async def coroutine_wrapper() -> Union[R, None]:
        """
        Set flags so that children (and grandchildren...) of this task know they are running in a new
        thread and do not try to run on the run_sync thread, which would cause a
        deadlock.
        """
        token1 = RUNNING_IN_RUN_SYNC_LOOP_FLAG.set(True)
        token2 = RUNNING_ASYNC_FLAG.set(True)
        try:
            # use `asyncio.create_task` because it copies context variables automatically
            task = create_task(coroutine)
            if wait_for_result:
                return await task
        finally:
            RUNNING_IN_RUN_SYNC_LOOP_FLAG.reset(token1)
            RUNNING_ASYNC_FLAG.reset(token2)

    # if we are already in the run_sync loop, or a descendent of a coroutine
    # that is running in the run_sync loop, we need to run this coroutine in a
    # new thread
    if in_run_sync_loop() or RUNNING_IN_RUN_SYNC_LOOP_FLAG.get() or force_new_thread:
        return from_sync.call_in_new_thread(coroutine_wrapper)

    # otherwise, we can run the coroutine in the run_sync loop
    # and wait for the result
    else:
        call = _cast_to_call(coroutine_wrapper)
        runner = get_run_sync_loop()
        runner.submit(call)
        try:
            return call.result()
        except KeyboardInterrupt:
            call.cancel()

            logger.debug("Coroutine cancelled due to KeyboardInterrupt.")
            raise

run_sync_in_worker_thread(__fn, *args, **kwargs) async

Runs a sync function in a new worker thread so that the main thread's event loop is not blocked.

Unlike the anyio function, this defaults to a cancellable thread and does not allow passing arguments to the anyio function so users can pass kwargs to their function.

Note that cancellation of threads will not result in interrupted computation, the thread may continue running — the outcome will just be ignored.

Source code in src/prefect/utilities/asyncutils.py
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
async def run_sync_in_worker_thread(
    __fn: Callable[..., T], *args: Any, **kwargs: Any
) -> T:
    """
    Runs a sync function in a new worker thread so that the main thread's event loop
    is not blocked.

    Unlike the anyio function, this defaults to a cancellable thread and does not allow
    passing arguments to the anyio function so users can pass kwargs to their function.

    Note that cancellation of threads will not result in interrupted computation, the
    thread may continue running — the outcome will just be ignored.
    """
    # When running a sync function in a worker thread, we set this flag so that
    # any root sync compatible functions will run as sync functions
    token = RUNNING_ASYNC_FLAG.set(False)
    try:
        call = partial(__fn, *args, **kwargs)
        result = await anyio.to_thread.run_sync(
            call_with_mark, call, abandon_on_cancel=True, limiter=get_thread_limiter()
        )
        return result
    finally:
        RUNNING_ASYNC_FLAG.reset(token)

sync(__async_fn, *args, **kwargs)

Call an async function from a synchronous context. Block until completion.

If in an asynchronous context, we will run the code in a separate loop instead of failing but a warning will be displayed since this is not recommended.

Source code in src/prefect/utilities/asyncutils.py
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
def sync(__async_fn: Callable[P, Awaitable[T]], *args: P.args, **kwargs: P.kwargs) -> T:
    """
    Call an async function from a synchronous context. Block until completion.

    If in an asynchronous context, we will run the code in a separate loop instead of
    failing but a warning will be displayed since this is not recommended.
    """
    if in_async_main_thread():
        warnings.warn(
            "`sync` called from an asynchronous context; "
            "you should `await` the async function directly instead."
        )
        with anyio.start_blocking_portal() as portal:
            return portal.call(partial(__async_fn, *args, **kwargs))
    elif in_async_worker_thread():
        # In a sync context but we can access the event loop thread; send the async
        # call to the parent
        return run_async_from_worker_thread(__async_fn, *args, **kwargs)
    else:
        # In a sync context and there is no event loop; just create an event loop
        # to run the async code then tear it down
        return run_async_in_new_loop(__async_fn, *args, **kwargs)

sync_compatible(async_fn)

Converts an async function into a dual async and sync function.

When the returned function is called, we will attempt to determine the best way to enter the async function.

  • If in a thread with a running event loop, we will return the coroutine for the caller to await. This is normal async behavior.
  • If in a blocking worker thread with access to an event loop in another thread, we will submit the async method to the event loop.
  • If we cannot find an event loop, we will create a new one and run the async method then tear down the loop.
Source code in src/prefect/utilities/asyncutils.py
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
def sync_compatible(
    async_fn: Callable[..., Coroutine[Any, Any, R]],
) -> Callable[..., Union[R, Coroutine[Any, Any, R]]]:
    """
    Converts an async function into a dual async and sync function.

    When the returned function is called, we will attempt to determine the best way
    to enter the async function.

    - If in a thread with a running event loop, we will return the coroutine for the
        caller to await. This is normal async behavior.
    - If in a blocking worker thread with access to an event loop in another thread, we
        will submit the async method to the event loop.
    - If we cannot find an event loop, we will create a new one and run the async method
        then tear down the loop.
    """

    @wraps(async_fn)
    def coroutine_wrapper(
        *args: Any, _sync: Optional[bool] = None, **kwargs: Any
    ) -> Union[R, Coroutine[Any, Any, R]]:
        from prefect.context import MissingContextError, get_run_context

        if _sync is False:
            return async_fn(*args, **kwargs)

        is_async = True

        # if _sync is set, we do as we're told
        # otherwise, we make some determinations
        if _sync is None:
            try:
                run_ctx = get_run_context()
                parent_obj = getattr(run_ctx, "task", None)
                if not parent_obj:
                    parent_obj = getattr(run_ctx, "flow", None)
                is_async = getattr(parent_obj, "isasync", True)
            except MissingContextError:
                # not in an execution context, make best effort to
                # decide whether to syncify
                try:
                    asyncio.get_running_loop()
                    is_async = True
                except RuntimeError:
                    is_async = False

        async def ctx_call():
            """
            Wrapper that is submitted using copy_context().run to ensure
            mutations of RUNNING_ASYNC_FLAG are tightly scoped to this coroutine's frame.
            """
            token = RUNNING_ASYNC_FLAG.set(True)
            try:
                result = await async_fn(*args, **kwargs)
            finally:
                RUNNING_ASYNC_FLAG.reset(token)
            return result

        if _sync is True:
            return run_coro_as_sync(ctx_call())
        elif _sync is False or RUNNING_ASYNC_FLAG.get() or is_async:
            return ctx_call()
        else:
            return run_coro_as_sync(ctx_call())

    if is_async_fn(async_fn):
        wrapper = coroutine_wrapper
    elif is_async_gen_fn(async_fn):
        raise ValueError("Async generators cannot yet be marked as `sync_compatible`")
    else:
        raise TypeError("The decorated function must be async.")

    wrapper.aio = async_fn  # type: ignore
    return wrapper