Skip to content

prefect.deployments

deploy(*deployments, work_pool_name=None, image=None, build=True, push=True, print_next_steps_message=True, ignore_warnings=False) async

Deploy the provided list of deployments to dynamic infrastructure via a work pool.

By default, calling this function will build a Docker image for the deployments, push it to a registry, and create each deployment via the Prefect API that will run the corresponding flow on the given schedule.

If you want to use an existing image, you can pass build=False to skip building and pushing an image.

Parameters:

Name Type Description Default
*deployments RunnerDeployment

A list of deployments to deploy.

()
work_pool_name Optional[str]

The name of the work pool to use for these deployments. Defaults to the value of PREFECT_DEFAULT_WORK_POOL_NAME.

None
image Optional[Union[str, DockerImage]]

The name of the Docker image to build, including the registry and repository. Pass a DockerImage instance to customize the Dockerfile used and build arguments.

None
build bool

Whether or not to build a new image for the flow. If False, the provided image will be used as-is and pulled at runtime.

True
push bool

Whether or not to skip pushing the built image to a registry.

True
print_next_steps_message bool

Whether or not to print a message with next steps after deploying the deployments.

True

Returns:

Type Description
List[UUID]

A list of deployment IDs for the created/updated deployments.

Examples:

Deploy a group of flows to a work pool:

from prefect import deploy, flow

@flow(log_prints=True)
def local_flow():
    print("I'm a locally defined flow!")

if __name__ == "__main__":
    deploy(
        local_flow.to_deployment(name="example-deploy-local-flow"),
        flow.from_source(
            source="https://github.com/org/repo.git",
            entrypoint="flows.py:my_flow",
        ).to_deployment(
            name="example-deploy-remote-flow",
        ),
        work_pool_name="my-work-pool",
        image="my-registry/my-image:dev",
    )
Source code in src/prefect/deployments/runner.py
 788
 789
 790
 791
 792
 793
 794
 795
 796
 797
 798
 799
 800
 801
 802
 803
 804
 805
 806
 807
 808
 809
 810
 811
 812
 813
 814
 815
 816
 817
 818
 819
 820
 821
 822
 823
 824
 825
 826
 827
 828
 829
 830
 831
 832
 833
 834
 835
 836
 837
 838
 839
 840
 841
 842
 843
 844
 845
 846
 847
 848
 849
 850
 851
 852
 853
 854
 855
 856
 857
 858
 859
 860
 861
 862
 863
 864
 865
 866
 867
 868
 869
 870
 871
 872
 873
 874
 875
 876
 877
 878
 879
 880
 881
 882
 883
 884
 885
 886
 887
 888
 889
 890
 891
 892
 893
 894
 895
 896
 897
 898
 899
 900
 901
 902
 903
 904
 905
 906
 907
 908
 909
 910
 911
 912
 913
 914
 915
 916
 917
 918
 919
 920
 921
 922
 923
 924
 925
 926
 927
 928
 929
 930
 931
 932
 933
 934
 935
 936
 937
 938
 939
 940
 941
 942
 943
 944
 945
 946
 947
 948
 949
 950
 951
 952
 953
 954
 955
 956
 957
 958
 959
 960
 961
 962
 963
 964
 965
 966
 967
 968
 969
 970
 971
 972
 973
 974
 975
 976
 977
 978
 979
 980
 981
 982
 983
 984
 985
 986
 987
 988
 989
 990
 991
 992
 993
 994
 995
 996
 997
 998
 999
1000
1001
1002
1003
1004
1005
1006
1007
1008
1009
1010
1011
1012
1013
1014
1015
1016
1017
1018
1019
1020
1021
1022
1023
1024
@sync_compatible
async def deploy(
    *deployments: RunnerDeployment,
    work_pool_name: Optional[str] = None,
    image: Optional[Union[str, DockerImage]] = None,
    build: bool = True,
    push: bool = True,
    print_next_steps_message: bool = True,
    ignore_warnings: bool = False,
) -> List[UUID]:
    """
    Deploy the provided list of deployments to dynamic infrastructure via a
    work pool.

    By default, calling this function will build a Docker image for the deployments, push it to a
    registry, and create each deployment via the Prefect API that will run the corresponding
    flow on the given schedule.

    If you want to use an existing image, you can pass `build=False` to skip building and pushing
    an image.

    Args:
        *deployments: A list of deployments to deploy.
        work_pool_name: The name of the work pool to use for these deployments. Defaults to
            the value of `PREFECT_DEFAULT_WORK_POOL_NAME`.
        image: The name of the Docker image to build, including the registry and
            repository. Pass a DockerImage instance to customize the Dockerfile used
            and build arguments.
        build: Whether or not to build a new image for the flow. If False, the provided
            image will be used as-is and pulled at runtime.
        push: Whether or not to skip pushing the built image to a registry.
        print_next_steps_message: Whether or not to print a message with next steps
            after deploying the deployments.

    Returns:
        A list of deployment IDs for the created/updated deployments.

    Examples:
        Deploy a group of flows to a work pool:

        ```python
        from prefect import deploy, flow

        @flow(log_prints=True)
        def local_flow():
            print("I'm a locally defined flow!")

        if __name__ == "__main__":
            deploy(
                local_flow.to_deployment(name="example-deploy-local-flow"),
                flow.from_source(
                    source="https://github.com/org/repo.git",
                    entrypoint="flows.py:my_flow",
                ).to_deployment(
                    name="example-deploy-remote-flow",
                ),
                work_pool_name="my-work-pool",
                image="my-registry/my-image:dev",
            )
        ```
    """
    work_pool_name = work_pool_name or PREFECT_DEFAULT_WORK_POOL_NAME.value()

    if not image and not all(
        d.storage or d.entrypoint_type == EntrypointType.MODULE_PATH
        for d in deployments
    ):
        raise ValueError(
            "Either an image or remote storage location must be provided when deploying"
            " a deployment."
        )

    if not work_pool_name:
        raise ValueError(
            "A work pool name must be provided when deploying a deployment. Either"
            " provide a work pool name when calling `deploy` or set"
            " `PREFECT_DEFAULT_WORK_POOL_NAME` in your profile."
        )

    if image and isinstance(image, str):
        image_name, image_tag = parse_image_tag(image)
        image = DockerImage(name=image_name, tag=image_tag)

    try:
        async with get_client() as client:
            work_pool = await client.read_work_pool(work_pool_name)
            active_workers = await client.read_workers_for_work_pool(
                work_pool_name,
                worker_filter=WorkerFilter(status=WorkerFilterStatus(any_=["ONLINE"])),
            )
    except ObjectNotFound as exc:
        raise ValueError(
            f"Could not find work pool {work_pool_name!r}. Please create it before"
            " deploying this flow."
        ) from exc

    is_docker_based_work_pool = get_from_dict(
        work_pool.base_job_template, "variables.properties.image", False
    )
    is_block_based_work_pool = get_from_dict(
        work_pool.base_job_template, "variables.properties.block", False
    )
    # carve out an exception for block based work pools that only have a block in their base job template
    console = Console()
    if not is_docker_based_work_pool and not is_block_based_work_pool:
        if image:
            raise ValueError(
                f"Work pool {work_pool_name!r} does not support custom Docker images."
                " Please use a work pool with an `image` variable in its base job template"
                " or specify a remote storage location for the flow with `.from_source`."
                " If you are attempting to deploy a flow to a local process work pool,"
                " consider using `flow.serve` instead. See the documentation for more"
                " information: https://docs.prefect.io/latest/deploy/run-flows-in-local-processes"
            )
        elif work_pool.type == "process" and not ignore_warnings:
            console.print(
                "Looks like you're deploying to a process work pool. If you're creating a"
                " deployment for local development, calling `.serve` on your flow is a great"
                " way to get started. See the documentation for more information:"
                " https://docs.prefect.io/latest/deploy/run-flows-in-local-processes "
                " Set `ignore_warnings=True` to suppress this message.",
                style="yellow",
            )

    is_managed_pool = work_pool.is_managed_pool
    if is_managed_pool:
        build = False
        push = False

    if image and build:
        with Progress(
            SpinnerColumn(),
            TextColumn(f"Building image {image.reference}..."),
            transient=True,
            console=console,
        ) as progress:
            docker_build_task = progress.add_task("docker_build", total=1)
            image.build()

            progress.update(docker_build_task, completed=1)
            console.print(
                f"Successfully built image {image.reference!r}", style="green"
            )

    if image and build and push:
        with Progress(
            SpinnerColumn(),
            TextColumn("Pushing image..."),
            transient=True,
            console=console,
        ) as progress:
            docker_push_task = progress.add_task("docker_push", total=1)

            image.push()

            progress.update(docker_push_task, completed=1)

        console.print(f"Successfully pushed image {image.reference!r}", style="green")

    deployment_exceptions = []
    deployment_ids = []
    image_ref = image.reference if image else None
    for deployment in track(
        deployments,
        description="Creating/updating deployments...",
        console=console,
        transient=True,
    ):
        try:
            deployment_ids.append(
                await deployment.apply(image=image_ref, work_pool_name=work_pool_name)
            )
        except Exception as exc:
            if len(deployments) == 1:
                raise
            deployment_exceptions.append({"deployment": deployment, "exc": exc})

    if deployment_exceptions:
        console.print(
            "Encountered errors while creating/updating deployments:\n",
            style="orange_red1",
        )
    else:
        console.print("Successfully created/updated all deployments!\n", style="green")

    complete_failure = len(deployment_exceptions) == len(deployments)

    table = Table(
        title="Deployments",
        show_lines=True,
    )

    table.add_column(header="Name", style="blue", no_wrap=True)
    table.add_column(header="Status", style="blue", no_wrap=True)
    table.add_column(header="Details", style="blue")

    for deployment in deployments:
        errored_deployment = next(
            (d for d in deployment_exceptions if d["deployment"] == deployment),
            None,
        )
        if errored_deployment:
            table.add_row(
                f"{deployment.flow_name}/{deployment.name}",
                "failed",
                str(errored_deployment["exc"]),
                style="red",
            )
        else:
            table.add_row(f"{deployment.flow_name}/{deployment.name}", "applied")
    console.print(table)

    if print_next_steps_message and not complete_failure:
        if (
            not work_pool.is_push_pool
            and not work_pool.is_managed_pool
            and not active_workers
        ):
            console.print(
                "\nTo execute flow runs from these deployments, start a worker in a"
                " separate terminal that pulls work from the"
                f" {work_pool_name!r} work pool:"
                f"\n\t[blue]$ prefect worker start --pool {work_pool_name!r}[/]",
            )
        console.print(
            "\nTo trigger any of these deployments, use the"
            " following command:\n[blue]\n\t$ prefect deployment run"
            " [DEPLOYMENT_NAME]\n[/]"
        )

        if PREFECT_UI_URL:
            console.print(
                "\nYou can also trigger your deployments via the Prefect UI:"
                f" [blue]{PREFECT_UI_URL.value()}/deployments[/]\n"
            )

    return deployment_ids

initialize_project(name=None, recipe=None, inputs=None)

Initializes a basic project structure with base files. If no name is provided, the name of the current directory is used. If no recipe is provided, one is inferred.

Parameters:

Name Type Description Default
name str

the name of the project; if not provided, the current directory name

None
recipe str

the name of the recipe to use; if not provided, one is inferred

None
inputs dict

a dictionary of inputs to use when formatting the recipe

None

Returns:

Type Description
List[str]

List[str]: a list of files / directories that were created

Source code in src/prefect/deployments/base.py
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
def initialize_project(
    name: Optional[str] = None,
    recipe: Optional[str] = None,
    inputs: Optional[Dict[str, Any]] = None,
) -> List[str]:
    """
    Initializes a basic project structure with base files.  If no name is provided, the name
    of the current directory is used.  If no recipe is provided, one is inferred.

    Args:
        name (str, optional): the name of the project; if not provided, the current directory name
        recipe (str, optional): the name of the recipe to use; if not provided, one is inferred
        inputs (dict, optional): a dictionary of inputs to use when formatting the recipe

    Returns:
        List[str]: a list of files / directories that were created
    """
    # determine if in git repo or use directory name as a default
    is_git_based = False
    formatting_kwargs = {"directory": str(Path(".").absolute().resolve())}
    dir_name = os.path.basename(os.getcwd())

    remote_url = _get_git_remote_origin_url()
    if remote_url:
        formatting_kwargs["repository"] = remote_url
        is_git_based = True
        branch = _get_git_branch()
        formatting_kwargs["branch"] = branch or "main"

    formatting_kwargs["name"] = dir_name

    has_dockerfile = Path("Dockerfile").exists()

    if has_dockerfile:
        formatting_kwargs["dockerfile"] = "Dockerfile"
    elif recipe is not None and "docker" in recipe:
        formatting_kwargs["dockerfile"] = "auto"

    # hand craft a pull step
    if is_git_based and recipe is None:
        if has_dockerfile:
            recipe = "docker-git"
        else:
            recipe = "git"
    elif recipe is None and has_dockerfile:
        recipe = "docker"
    elif recipe is None:
        recipe = "local"

    formatting_kwargs.update(inputs or {})
    configuration = configure_project_by_recipe(recipe=recipe, **formatting_kwargs)

    project_name = name or dir_name

    files = []
    if create_default_ignore_file("."):
        files.append(".prefectignore")
    if create_default_prefect_yaml(".", name=project_name, contents=configuration):
        files.append("prefect.yaml")

    return files

run_deployment(name, client=None, parameters=None, scheduled_time=None, flow_run_name=None, timeout=None, poll_interval=5, tags=None, idempotency_key=None, work_queue_name=None, as_subflow=True, job_variables=None) async

Create a flow run for a deployment and return it after completion or a timeout.

By default, this function blocks until the flow run finishes executing. Specify a timeout (in seconds) to wait for the flow run to execute before returning flow run metadata. To return immediately, without waiting for the flow run to execute, set timeout=0.

Note that if you specify a timeout, this function will return the flow run metadata whether or not the flow run finished executing.

If called within a flow or task, the flow run this function creates will be linked to the current flow run as a subflow. Disable this behavior by passing as_subflow=False.

Parameters:

Name Type Description Default
name Union[str, UUID]

The deployment id or deployment name in the form: "flow name/deployment name"

required
parameters Optional[dict]

Parameter overrides for this flow run. Merged with the deployment defaults.

None
scheduled_time Optional[datetime]

The time to schedule the flow run for, defaults to scheduling the flow run to start now.

None
flow_run_name Optional[str]

A name for the created flow run

None
timeout Optional[float]

The amount of time to wait (in seconds) for the flow run to complete before returning. Setting timeout to 0 will return the flow run metadata immediately. Setting timeout to None will allow this function to poll indefinitely. Defaults to None.

None
poll_interval Optional[float]

The number of seconds between polls

5
tags Optional[Iterable[str]]

A list of tags to associate with this flow run; tags can be used in automations and for organizational purposes.

None
idempotency_key Optional[str]

A unique value to recognize retries of the same run, and prevent creating multiple flow runs.

None
work_queue_name Optional[str]

The name of a work queue to use for this run. Defaults to the default work queue for the deployment.

None
as_subflow Optional[bool]

Whether to link the flow run as a subflow of the current flow or task run.

True
job_variables Optional[dict]

A dictionary of dot delimited infrastructure overrides that will be applied at runtime; for example env.CONFIG_KEY=config_value or namespace='prefect'

None
Source code in src/prefect/deployments/flow_runs.py
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
@sync_compatible
@inject_client
async def run_deployment(
    name: Union[str, UUID],
    client: Optional["PrefectClient"] = None,
    parameters: Optional[dict] = None,
    scheduled_time: Optional[datetime] = None,
    flow_run_name: Optional[str] = None,
    timeout: Optional[float] = None,
    poll_interval: Optional[float] = 5,
    tags: Optional[Iterable[str]] = None,
    idempotency_key: Optional[str] = None,
    work_queue_name: Optional[str] = None,
    as_subflow: Optional[bool] = True,
    job_variables: Optional[dict] = None,
) -> "FlowRun":
    """
    Create a flow run for a deployment and return it after completion or a timeout.

    By default, this function blocks until the flow run finishes executing.
    Specify a timeout (in seconds) to wait for the flow run to execute before
    returning flow run metadata. To return immediately, without waiting for the
    flow run to execute, set `timeout=0`.

    Note that if you specify a timeout, this function will return the flow run
    metadata whether or not the flow run finished executing.

    If called within a flow or task, the flow run this function creates will
    be linked to the current flow run as a subflow. Disable this behavior by
    passing `as_subflow=False`.

    Args:
        name: The deployment id or deployment name in the form:
            `"flow name/deployment name"`
        parameters: Parameter overrides for this flow run. Merged with the deployment
            defaults.
        scheduled_time: The time to schedule the flow run for, defaults to scheduling
            the flow run to start now.
        flow_run_name: A name for the created flow run
        timeout: The amount of time to wait (in seconds) for the flow run to
            complete before returning. Setting `timeout` to 0 will return the flow
            run metadata immediately. Setting `timeout` to None will allow this
            function to poll indefinitely. Defaults to None.
        poll_interval: The number of seconds between polls
        tags: A list of tags to associate with this flow run; tags can be used in
            automations and for organizational purposes.
        idempotency_key: A unique value to recognize retries of the same run, and
            prevent creating multiple flow runs.
        work_queue_name: The name of a work queue to use for this run. Defaults to
            the default work queue for the deployment.
        as_subflow: Whether to link the flow run as a subflow of the current
            flow or task run.
        job_variables: A dictionary of dot delimited infrastructure overrides that
            will be applied at runtime; for example `env.CONFIG_KEY=config_value` or
            `namespace='prefect'`
    """
    if timeout is not None and timeout < 0:
        raise ValueError("`timeout` cannot be negative")

    if scheduled_time is None:
        scheduled_time = pendulum.now("UTC")

    parameters = parameters or {}

    deployment_id = None

    if isinstance(name, UUID):
        deployment_id = name
    else:
        try:
            deployment_id = UUID(name)
        except ValueError:
            pass

    if deployment_id:
        deployment = await client.read_deployment(deployment_id=deployment_id)
    else:
        deployment = await client.read_deployment_by_name(name)

    flow_run_ctx = FlowRunContext.get()
    task_run_ctx = TaskRunContext.get()
    if as_subflow and (flow_run_ctx or task_run_ctx):
        # TODO: this logic can likely be simplified by using `Task.create_run`
        from prefect.utilities.engine import (
            _dynamic_key_for_task_run,
            collect_task_run_inputs,
        )

        # This was called from a flow. Link the flow run as a subflow.
        task_inputs = {
            k: await collect_task_run_inputs(v) for k, v in parameters.items()
        }

        if deployment_id:
            flow = await client.read_flow(deployment.flow_id)
            deployment_name = f"{flow.name}/{deployment.name}"
        else:
            deployment_name = name

        # Generate a task in the parent flow run to represent the result of the subflow
        dummy_task = Task(
            name=deployment_name,
            fn=lambda: None,
            version=deployment.version,
        )
        # Override the default task key to include the deployment name
        dummy_task.task_key = f"{__name__}.run_deployment.{slugify(deployment_name)}"
        flow_run_id = (
            flow_run_ctx.flow_run.id
            if flow_run_ctx
            else task_run_ctx.task_run.flow_run_id
        )
        dynamic_key = (
            _dynamic_key_for_task_run(flow_run_ctx, dummy_task)
            if flow_run_ctx
            else task_run_ctx.task_run.dynamic_key
        )
        parent_task_run = await client.create_task_run(
            task=dummy_task,
            flow_run_id=flow_run_id,
            dynamic_key=dynamic_key,
            task_inputs=task_inputs,
            state=Pending(),
        )
        parent_task_run_id = parent_task_run.id
    else:
        parent_task_run_id = None

    flow_run = await client.create_flow_run_from_deployment(
        deployment.id,
        parameters=parameters,
        state=Scheduled(scheduled_time=scheduled_time),
        name=flow_run_name,
        tags=tags,
        idempotency_key=idempotency_key,
        parent_task_run_id=parent_task_run_id,
        work_queue_name=work_queue_name,
        job_variables=job_variables,
    )

    flow_run_id = flow_run.id

    if timeout == 0:
        return flow_run

    with anyio.move_on_after(timeout):
        while True:
            flow_run = await client.read_flow_run(flow_run_id)
            flow_state = flow_run.state
            if flow_state and flow_state.is_final():
                return flow_run
            await anyio.sleep(poll_interval)

    return flow_run