Skip to content

prefect.deployments

deploy(*deployments, work_pool_name=None, image=None, build=True, push=True, print_next_steps_message=True, ignore_warnings=False) async

Deploy the provided list of deployments to dynamic infrastructure via a work pool.

By default, calling this function will build a Docker image for the deployments, push it to a registry, and create each deployment via the Prefect API that will run the corresponding flow on the given schedule.

If you want to use an existing image, you can pass build=False to skip building and pushing an image.

Parameters:

Name Type Description Default
*deployments RunnerDeployment

A list of deployments to deploy.

()
work_pool_name Optional[str]

The name of the work pool to use for these deployments. Defaults to the value of PREFECT_DEFAULT_WORK_POOL_NAME.

None
image Optional[Union[str, DockerImage]]

The name of the Docker image to build, including the registry and repository. Pass a DockerImage instance to customize the Dockerfile used and build arguments.

None
build bool

Whether or not to build a new image for the flow. If False, the provided image will be used as-is and pulled at runtime.

True
push bool

Whether or not to skip pushing the built image to a registry.

True
print_next_steps_message bool

Whether or not to print a message with next steps after deploying the deployments.

True

Returns:

Type Description
List[UUID]

A list of deployment IDs for the created/updated deployments.

Examples:

Deploy a group of flows to a work pool:

from prefect import deploy, flow

@flow(log_prints=True)
def local_flow():
    print("I'm a locally defined flow!")

if __name__ == "__main__":
    deploy(
        local_flow.to_deployment(name="example-deploy-local-flow"),
        flow.from_source(
            source="https://github.com/org/repo.git",
            entrypoint="flows.py:my_flow",
        ).to_deployment(
            name="example-deploy-remote-flow",
        ),
        work_pool_name="my-work-pool",
        image="my-registry/my-image:dev",
    )
Source code in src/prefect/deployments/runner.py
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
@sync_compatible
async def deploy(
    *deployments: RunnerDeployment,
    work_pool_name: Optional[str] = None,
    image: Optional[Union[str, DockerImage]] = None,
    build: bool = True,
    push: bool = True,
    print_next_steps_message: bool = True,
    ignore_warnings: bool = False,
) -> List[UUID]:
    """
    Deploy the provided list of deployments to dynamic infrastructure via a
    work pool.

    By default, calling this function will build a Docker image for the deployments, push it to a
    registry, and create each deployment via the Prefect API that will run the corresponding
    flow on the given schedule.

    If you want to use an existing image, you can pass `build=False` to skip building and pushing
    an image.

    Args:
        *deployments: A list of deployments to deploy.
        work_pool_name: The name of the work pool to use for these deployments. Defaults to
            the value of `PREFECT_DEFAULT_WORK_POOL_NAME`.
        image: The name of the Docker image to build, including the registry and
            repository. Pass a DockerImage instance to customize the Dockerfile used
            and build arguments.
        build: Whether or not to build a new image for the flow. If False, the provided
            image will be used as-is and pulled at runtime.
        push: Whether or not to skip pushing the built image to a registry.
        print_next_steps_message: Whether or not to print a message with next steps
            after deploying the deployments.

    Returns:
        A list of deployment IDs for the created/updated deployments.

    Examples:
        Deploy a group of flows to a work pool:

        ```python
        from prefect import deploy, flow

        @flow(log_prints=True)
        def local_flow():
            print("I'm a locally defined flow!")

        if __name__ == "__main__":
            deploy(
                local_flow.to_deployment(name="example-deploy-local-flow"),
                flow.from_source(
                    source="https://github.com/org/repo.git",
                    entrypoint="flows.py:my_flow",
                ).to_deployment(
                    name="example-deploy-remote-flow",
                ),
                work_pool_name="my-work-pool",
                image="my-registry/my-image:dev",
            )
        ```
    """
    work_pool_name = work_pool_name or PREFECT_DEFAULT_WORK_POOL_NAME.value()

    if not image and not all(
        d.storage or d.entrypoint_type == EntrypointType.MODULE_PATH
        for d in deployments
    ):
        raise ValueError(
            "Either an image or remote storage location must be provided when deploying"
            " a deployment."
        )

    if not work_pool_name:
        raise ValueError(
            "A work pool name must be provided when deploying a deployment. Either"
            " provide a work pool name when calling `deploy` or set"
            " `PREFECT_DEFAULT_WORK_POOL_NAME` in your profile."
        )

    if image and isinstance(image, str):
        image_name, image_tag = parse_image_tag(image)
        image = DockerImage(name=image_name, tag=image_tag)

    try:
        async with get_client() as client:
            work_pool = await client.read_work_pool(work_pool_name)
    except ObjectNotFound as exc:
        raise ValueError(
            f"Could not find work pool {work_pool_name!r}. Please create it before"
            " deploying this flow."
        ) from exc

    is_docker_based_work_pool = get_from_dict(
        work_pool.base_job_template, "variables.properties.image", False
    )
    is_block_based_work_pool = get_from_dict(
        work_pool.base_job_template, "variables.properties.block", False
    )
    # carve out an exception for block based work pools that only have a block in their base job template
    console = Console()
    if not is_docker_based_work_pool and not is_block_based_work_pool:
        if image:
            raise ValueError(
                f"Work pool {work_pool_name!r} does not support custom Docker images."
                " Please use a work pool with an `image` variable in its base job template"
                " or specify a remote storage location for the flow with `.from_source`."
                " If you are attempting to deploy a flow to a local process work pool,"
                " consider using `flow.serve` instead. See the documentation for more"
                " information: https://docs.prefect.io/latest/concepts/flows/#serving-a-flow"
            )
        elif work_pool.type == "process" and not ignore_warnings:
            console.print(
                "Looks like you're deploying to a process work pool. If you're creating a"
                " deployment for local development, calling `.serve` on your flow is a great"
                " way to get started. See the documentation for more information:"
                " https://docs.prefect.io/latest/concepts/flows/#serving-a-flow. "
                " Set `ignore_warnings=True` to suppress this message.",
                style="yellow",
            )

    is_managed_pool = work_pool.is_managed_pool
    if is_managed_pool:
        build = False
        push = False

    if image and build:
        with Progress(
            SpinnerColumn(),
            TextColumn(f"Building image {image.reference}..."),
            transient=True,
            console=console,
        ) as progress:
            docker_build_task = progress.add_task("docker_build", total=1)
            image.build()

            progress.update(docker_build_task, completed=1)
            console.print(
                f"Successfully built image {image.reference!r}", style="green"
            )

    if image and build and push:
        with Progress(
            SpinnerColumn(),
            TextColumn("Pushing image..."),
            transient=True,
            console=console,
        ) as progress:
            docker_push_task = progress.add_task("docker_push", total=1)

            image.push()

            progress.update(docker_push_task, completed=1)

        console.print(f"Successfully pushed image {image.reference!r}", style="green")

    deployment_exceptions = []
    deployment_ids = []
    image_ref = image.reference if image else None
    for deployment in track(
        deployments,
        description="Creating/updating deployments...",
        console=console,
        transient=True,
    ):
        try:
            deployment_ids.append(
                await deployment.apply(image=image_ref, work_pool_name=work_pool_name)
            )
        except Exception as exc:
            if len(deployments) == 1:
                raise
            deployment_exceptions.append({"deployment": deployment, "exc": exc})

    if deployment_exceptions:
        console.print(
            "Encountered errors while creating/updating deployments:\n",
            style="orange_red1",
        )
    else:
        console.print("Successfully created/updated all deployments!\n", style="green")

    complete_failure = len(deployment_exceptions) == len(deployments)

    table = Table(
        title="Deployments",
        show_lines=True,
    )

    table.add_column(header="Name", style="blue", no_wrap=True)
    table.add_column(header="Status", style="blue", no_wrap=True)
    table.add_column(header="Details", style="blue")

    for deployment in deployments:
        errored_deployment = next(
            (d for d in deployment_exceptions if d["deployment"] == deployment),
            None,
        )
        if errored_deployment:
            table.add_row(
                f"{deployment.flow_name}/{deployment.name}",
                "failed",
                str(errored_deployment["exc"]),
                style="red",
            )
        else:
            table.add_row(f"{deployment.flow_name}/{deployment.name}", "applied")
    console.print(table)

    if print_next_steps_message and not complete_failure:
        if not work_pool.is_push_pool and not work_pool.is_managed_pool:
            console.print(
                "\nTo execute flow runs from these deployments, start a worker in a"
                " separate terminal that pulls work from the"
                f" {work_pool_name!r} work pool:"
            )
            console.print(
                f"\n\t$ prefect worker start --pool {work_pool_name!r}",
                style="blue",
            )
        console.print(
            "\nTo trigger any of these deployments, use the"
            " following command:\n[blue]\n\t$ prefect deployment run"
            " [DEPLOYMENT_NAME]\n[/]"
        )

        if PREFECT_UI_URL:
            console.print(
                "\nYou can also trigger your deployments via the Prefect UI:"
                f" [blue]{PREFECT_UI_URL.value()}/deployments[/]\n"
            )

    return deployment_ids

initialize_project(name=None, recipe=None, inputs=None)

Initializes a basic project structure with base files. If no name is provided, the name of the current directory is used. If no recipe is provided, one is inferred.

Parameters:

Name Type Description Default
name str

the name of the project; if not provided, the current directory name

None
recipe str

the name of the recipe to use; if not provided, one is inferred

None
inputs dict

a dictionary of inputs to use when formatting the recipe

None

Returns:

Type Description
List[str]

List[str]: a list of files / directories that were created

Source code in src/prefect/deployments/base.py
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
def initialize_project(
    name: Optional[str] = None,
    recipe: Optional[str] = None,
    inputs: Optional[Dict[str, Any]] = None,
) -> List[str]:
    """
    Initializes a basic project structure with base files.  If no name is provided, the name
    of the current directory is used.  If no recipe is provided, one is inferred.

    Args:
        name (str, optional): the name of the project; if not provided, the current directory name
        recipe (str, optional): the name of the recipe to use; if not provided, one is inferred
        inputs (dict, optional): a dictionary of inputs to use when formatting the recipe

    Returns:
        List[str]: a list of files / directories that were created
    """
    # determine if in git repo or use directory name as a default
    is_git_based = False
    formatting_kwargs = {"directory": str(Path(".").absolute().resolve())}
    dir_name = os.path.basename(os.getcwd())

    remote_url = _get_git_remote_origin_url()
    if remote_url:
        formatting_kwargs["repository"] = remote_url
        is_git_based = True
        branch = _get_git_branch()
        formatting_kwargs["branch"] = branch or "main"

    formatting_kwargs["name"] = dir_name

    has_dockerfile = Path("Dockerfile").exists()

    if has_dockerfile:
        formatting_kwargs["dockerfile"] = "Dockerfile"
    elif recipe is not None and "docker" in recipe:
        formatting_kwargs["dockerfile"] = "auto"

    # hand craft a pull step
    if is_git_based and recipe is None:
        if has_dockerfile:
            recipe = "docker-git"
        else:
            recipe = "git"
    elif recipe is None and has_dockerfile:
        recipe = "docker"
    elif recipe is None:
        recipe = "local"

    formatting_kwargs.update(inputs or {})
    configuration = configure_project_by_recipe(recipe=recipe, **formatting_kwargs)

    project_name = name or dir_name

    files = []
    if create_default_ignore_file("."):
        files.append(".prefectignore")
    if create_default_prefect_yaml(".", name=project_name, contents=configuration):
        files.append("prefect.yaml")

    return files

run_deployment(name, client=None, parameters=None, scheduled_time=None, flow_run_name=None, timeout=None, poll_interval=5, tags=None, idempotency_key=None, work_queue_name=None, as_subflow=True, job_variables=None) async

Create a flow run for a deployment and return it after completion or a timeout.

By default, this function blocks until the flow run finishes executing. Specify a timeout (in seconds) to wait for the flow run to execute before returning flow run metadata. To return immediately, without waiting for the flow run to execute, set timeout=0.

Note that if you specify a timeout, this function will return the flow run metadata whether or not the flow run finished executing.

If called within a flow or task, the flow run this function creates will be linked to the current flow run as a subflow. Disable this behavior by passing as_subflow=False.

Parameters:

Name Type Description Default
name Union[str, UUID]

The deployment id or deployment name in the form: "flow name/deployment name"

required
parameters Optional[dict]

Parameter overrides for this flow run. Merged with the deployment defaults.

None
scheduled_time Optional[datetime]

The time to schedule the flow run for, defaults to scheduling the flow run to start now.

None
flow_run_name Optional[str]

A name for the created flow run

None
timeout Optional[float]

The amount of time to wait (in seconds) for the flow run to complete before returning. Setting timeout to 0 will return the flow run metadata immediately. Setting timeout to None will allow this function to poll indefinitely. Defaults to None.

None
poll_interval Optional[float]

The number of seconds between polls

5
tags Optional[Iterable[str]]

A list of tags to associate with this flow run; tags can be used in automations and for organizational purposes.

None
idempotency_key Optional[str]

A unique value to recognize retries of the same run, and prevent creating multiple flow runs.

None
work_queue_name Optional[str]

The name of a work queue to use for this run. Defaults to the default work queue for the deployment.

None
as_subflow Optional[bool]

Whether to link the flow run as a subflow of the current flow or task run.

True
job_variables Optional[dict]

A dictionary of dot delimited infrastructure overrides that will be applied at runtime; for example env.CONFIG_KEY=config_value or namespace='prefect'

None
Source code in src/prefect/deployments/flow_runs.py
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
@sync_compatible
@inject_client
async def run_deployment(
    name: Union[str, UUID],
    client: Optional["PrefectClient"] = None,
    parameters: Optional[dict] = None,
    scheduled_time: Optional[datetime] = None,
    flow_run_name: Optional[str] = None,
    timeout: Optional[float] = None,
    poll_interval: Optional[float] = 5,
    tags: Optional[Iterable[str]] = None,
    idempotency_key: Optional[str] = None,
    work_queue_name: Optional[str] = None,
    as_subflow: Optional[bool] = True,
    job_variables: Optional[dict] = None,
) -> "FlowRun":
    """
    Create a flow run for a deployment and return it after completion or a timeout.

    By default, this function blocks until the flow run finishes executing.
    Specify a timeout (in seconds) to wait for the flow run to execute before
    returning flow run metadata. To return immediately, without waiting for the
    flow run to execute, set `timeout=0`.

    Note that if you specify a timeout, this function will return the flow run
    metadata whether or not the flow run finished executing.

    If called within a flow or task, the flow run this function creates will
    be linked to the current flow run as a subflow. Disable this behavior by
    passing `as_subflow=False`.

    Args:
        name: The deployment id or deployment name in the form:
            `"flow name/deployment name"`
        parameters: Parameter overrides for this flow run. Merged with the deployment
            defaults.
        scheduled_time: The time to schedule the flow run for, defaults to scheduling
            the flow run to start now.
        flow_run_name: A name for the created flow run
        timeout: The amount of time to wait (in seconds) for the flow run to
            complete before returning. Setting `timeout` to 0 will return the flow
            run metadata immediately. Setting `timeout` to None will allow this
            function to poll indefinitely. Defaults to None.
        poll_interval: The number of seconds between polls
        tags: A list of tags to associate with this flow run; tags can be used in
            automations and for organizational purposes.
        idempotency_key: A unique value to recognize retries of the same run, and
            prevent creating multiple flow runs.
        work_queue_name: The name of a work queue to use for this run. Defaults to
            the default work queue for the deployment.
        as_subflow: Whether to link the flow run as a subflow of the current
            flow or task run.
        job_variables: A dictionary of dot delimited infrastructure overrides that
            will be applied at runtime; for example `env.CONFIG_KEY=config_value` or
            `namespace='prefect'`
    """
    if timeout is not None and timeout < 0:
        raise ValueError("`timeout` cannot be negative")

    if scheduled_time is None:
        scheduled_time = pendulum.now("UTC")

    parameters = parameters or {}

    deployment_id = None

    if isinstance(name, UUID):
        deployment_id = name
    else:
        try:
            deployment_id = UUID(name)
        except ValueError:
            pass

    if deployment_id:
        deployment = await client.read_deployment(deployment_id=deployment_id)
    else:
        deployment = await client.read_deployment_by_name(name)

    flow_run_ctx = FlowRunContext.get()
    task_run_ctx = TaskRunContext.get()
    if as_subflow and (flow_run_ctx or task_run_ctx):
        # TODO: this logic can likely be simplified by using `Task.create_run`
        from prefect.utilities.engine import (
            _dynamic_key_for_task_run,
            collect_task_run_inputs,
        )

        # This was called from a flow. Link the flow run as a subflow.
        task_inputs = {
            k: await collect_task_run_inputs(v) for k, v in parameters.items()
        }

        if deployment_id:
            flow = await client.read_flow(deployment.flow_id)
            deployment_name = f"{flow.name}/{deployment.name}"
        else:
            deployment_name = name

        # Generate a task in the parent flow run to represent the result of the subflow
        dummy_task = Task(
            name=deployment_name,
            fn=lambda: None,
            version=deployment.version,
        )
        # Override the default task key to include the deployment name
        dummy_task.task_key = f"{__name__}.run_deployment.{slugify(deployment_name)}"
        flow_run_id = (
            flow_run_ctx.flow_run.id
            if flow_run_ctx
            else task_run_ctx.task_run.flow_run_id
        )
        dynamic_key = (
            _dynamic_key_for_task_run(flow_run_ctx, dummy_task)
            if flow_run_ctx
            else task_run_ctx.task_run.dynamic_key
        )
        parent_task_run = await client.create_task_run(
            task=dummy_task,
            flow_run_id=flow_run_id,
            dynamic_key=dynamic_key,
            task_inputs=task_inputs,
            state=Pending(),
        )
        parent_task_run_id = parent_task_run.id
    else:
        parent_task_run_id = None

    flow_run = await client.create_flow_run_from_deployment(
        deployment.id,
        parameters=parameters,
        state=Scheduled(scheduled_time=scheduled_time),
        name=flow_run_name,
        tags=tags,
        idempotency_key=idempotency_key,
        parent_task_run_id=parent_task_run_id,
        work_queue_name=work_queue_name,
        job_variables=job_variables,
    )

    flow_run_id = flow_run.id

    if timeout == 0:
        return flow_run

    with anyio.move_on_after(timeout):
        while True:
            flow_run = await client.read_flow_run(flow_run_id)
            flow_state = flow_run.state
            if flow_state and flow_state.is_final():
                return flow_run
            await anyio.sleep(poll_interval)

    return flow_run