Skip to content

prefect.client.utilities

Utilities for working with clients.

get_or_create_client(client=None)

Returns provided client, infers a client from context if available, or creates a new client.

Parameters:

Name Type Description Default
- client (PrefectClient

an optional client to use

required

Returns:

Type Description
Tuple[PrefectClient, bool]
  • tuple: a tuple of the client and a boolean indicating if the client was inferred from context
Source code in src/prefect/client/utilities.py
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
def get_or_create_client(
    client: Optional["PrefectClient"] = None,
) -> Tuple["PrefectClient", bool]:
    """
    Returns provided client, infers a client from context if available, or creates a new client.

    Args:
        - client (PrefectClient, optional): an optional client to use

    Returns:
        - tuple: a tuple of the client and a boolean indicating if the client was inferred from context
    """
    if client is not None:
        return client, True
    from prefect._internal.concurrency.event_loop import get_running_loop
    from prefect.context import AsyncClientContext, FlowRunContext, TaskRunContext

    async_client_context = AsyncClientContext.get()
    flow_run_context = FlowRunContext.get()
    task_run_context = TaskRunContext.get()

    if async_client_context and async_client_context.client._loop == get_running_loop():
        return async_client_context.client, True
    elif (
        flow_run_context
        and getattr(flow_run_context.client, "_loop", None) == get_running_loop()
    ):
        return flow_run_context.client, True
    elif (
        task_run_context
        and getattr(task_run_context.client, "_loop", None) == get_running_loop()
    ):
        return task_run_context.client, True
    else:
        from prefect.client.orchestration import get_client as get_httpx_client

        return get_httpx_client(), False

inject_client(fn)

Simple helper to provide a context managed client to an asynchronous function.

The decorated function must take a client kwarg and if a client is passed when called it will be used instead of creating a new one, but it will not be context managed as it is assumed that the caller is managing the context.

Source code in src/prefect/client/utilities.py
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
def inject_client(
    fn: Callable[P, Coroutine[Any, Any, R]],
) -> Callable[P, Coroutine[Any, Any, R]]:
    """
    Simple helper to provide a context managed client to an asynchronous function.

    The decorated function _must_ take a `client` kwarg and if a client is passed when
    called it will be used instead of creating a new one, but it will not be context
    managed as it is assumed that the caller is managing the context.
    """

    @wraps(fn)
    async def with_injected_client(*args: P.args, **kwargs: P.kwargs) -> R:
        client = cast(Optional["PrefectClient"], kwargs.pop("client", None))
        client, inferred = get_or_create_client(client)
        if not inferred:
            context = client
        else:
            from prefect.utilities.asyncutils import asyncnullcontext

            context = asyncnullcontext()
        async with context as new_client:
            kwargs.setdefault("client", new_client or client)
            return await fn(*args, **kwargs)

    return with_injected_client