Skip to content

prefect_ray.context

Contexts to manage Ray clusters and tasks.

RemoteOptionsContext

Bases: ContextModel

The context for Ray remote_options management.

Attributes:

Name Type Description
current_remote_options Dict[str, Any]

A set of current remote_options in the context.

Source code in prefect_ray/context.py
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
class RemoteOptionsContext(ContextModel):
    """
    The context for Ray remote_options management.

    Attributes:
        current_remote_options: A set of current remote_options in the context.
    """

    __var__: ContextVar = ContextVar("remote_options")
    current_remote_options: Dict[str, Any] = Field(default_factory=dict)

    @classmethod
    def get(cls) -> "RemoteOptionsContext":
        """
        Return an empty `RemoteOptionsContext`
        instead of `None` if no context exists.
        """
        return cls.__var__.get(RemoteOptionsContext())

    __var__ = ContextVar("remote_options")

get() classmethod

Return an empty RemoteOptionsContext instead of None if no context exists.

Source code in prefect_ray/context.py
24
25
26
27
28
29
30
@classmethod
def get(cls) -> "RemoteOptionsContext":
    """
    Return an empty `RemoteOptionsContext`
    instead of `None` if no context exists.
    """
    return cls.__var__.get(RemoteOptionsContext())

remote_options(**new_remote_options)

Context manager to add keyword arguments to Ray @remote calls for task runs. If contexts are nested, new options are merged with options in the outer context. If a key is present in both, the new option will be used.

Yields:

Type Description
Generator[None, Dict[str, Any], None]

The current set of remote options.

Examples:

Use 4 CPUs and 2 GPUs for the process task:

from prefect import flow, task
from prefect_ray.task_runners import RayTaskRunner
from prefect_ray.context import remote_options

@task
def process(x):
    return x + 1

@flow(task_runner=RayTaskRunner())
def my_flow():
    # equivalent to setting @ray.remote(num_cpus=4, num_gpus=2)
    with remote_options(num_cpus=4, num_gpus=2):
        process.submit(42)
Source code in prefect_ray/context.py
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
@contextmanager
def remote_options(
    **new_remote_options: Dict[str, Any],
) -> Generator[None, Dict[str, Any], None]:
    """
    Context manager to add keyword arguments to Ray `@remote` calls
    for task runs. If contexts are nested, new options are merged with options
    in the outer context. If a key is present in both, the new option will be used.

    Yields:
        The current set of remote options.

    Examples:
        Use 4 CPUs and 2 GPUs for the `process` task:
        ```python
        from prefect import flow, task
        from prefect_ray.task_runners import RayTaskRunner
        from prefect_ray.context import remote_options

        @task
        def process(x):
            return x + 1

        @flow(task_runner=RayTaskRunner())
        def my_flow():
            # equivalent to setting @ray.remote(num_cpus=4, num_gpus=2)
            with remote_options(num_cpus=4, num_gpus=2):
                process.submit(42)
        ```
    """
    current_remote_options = RemoteOptionsContext.get().current_remote_options
    with RemoteOptionsContext(
        current_remote_options={**current_remote_options, **new_remote_options}
    ):
        yield