Skip to content

prefect.cli.task

serve(entrypoints=typer.Argument(..., help='The paths to one or more tasks, in the form of `./path/to/file.py:task_func_name`.'), limit=typer.Option(10, help='The maximum number of tasks that can be run concurrently. Defaults to 10.')) async

Serve the provided tasks so that their runs may be submitted to and executed in the engine.

Parameters:

Name Type Description Default
entrypoints List[str]

List of strings representing the paths to one or more tasks. Each path should be in the format ./path/to/file.py:task_func_name.

Argument(..., help='The paths to one or more tasks, in the form of `./path/to/file.py:task_func_name`.')
limit int

The maximum number of tasks that can be run concurrently.

Option(10, help='The maximum number of tasks that can be run concurrently. Defaults to 10.')
Source code in src/prefect/cli/task.py
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
@task_app.command()
async def serve(
    entrypoints: List[str] = typer.Argument(
        ...,
        help="The paths to one or more tasks, in the form of `./path/to/file.py:task_func_name`.",
    ),
    limit: int = typer.Option(
        10,
        help="The maximum number of tasks that can be run concurrently. Defaults to 10.",
    ),
):
    """
    Serve the provided tasks so that their runs may be submitted to and
    executed in the engine.

    Args:
        entrypoints: List of strings representing the paths to one or more
            tasks. Each path should be in the format
            `./path/to/file.py:task_func_name`.
        limit: The maximum number of tasks that can be run concurrently.
    """
    tasks = []

    for entrypoint in entrypoints:
        if ".py:" not in entrypoint:
            exit_with_error(
                (
                    f"Error: Invalid entrypoint format {entrypoint!r}. It "
                    "must be of the form `./path/to/file.py:task_func_name`."
                )
            )

        try:
            tasks.append(import_object(entrypoint))
        except Exception:
            module, task_name = entrypoint.split(":")
            exit_with_error(
                f"Error: {module!r} has no function {task_name!r}.", style="red"
            )

    await task_serve(*tasks, limit=limit)