Skip to content

prefect.cli.shell

Provides a set of tools for executing shell commands as Prefect flows. Includes functionalities for running shell commands ad-hoc or serving them as Prefect flows, with options for logging output, scheduling, and deployment customization.

output_collect(pipe, container)

Collects output from a subprocess pipe and stores it in a container list.

Parameters:

Name Type Description Default
pipe

The output pipe of the subprocess, either stdout or stderr.

required
container

A list to store the collected output lines.

required
Source code in src/prefect/cli/shell.py
47
48
49
50
51
52
53
54
55
56
def output_collect(pipe, container):
    """
    Collects output from a subprocess pipe and stores it in a container list.

    Args:
        pipe: The output pipe of the subprocess, either stdout or stderr.
        container: A list to store the collected output lines.
    """
    for line in iter(pipe.readline, ""):
        container.append(line)

output_stream(pipe, logger_function)

Read from a pipe line by line and log using the provided logging function.

Parameters:

Name Type Description Default
pipe IO

A file-like object for reading process output.

required
logger_function function

A logging function from the logger.

required
Source code in src/prefect/cli/shell.py
34
35
36
37
38
39
40
41
42
43
44
def output_stream(pipe, logger_function):
    """
    Read from a pipe line by line and log using the provided logging function.

    Args:
        pipe (IO): A file-like object for reading process output.
        logger_function (function): A logging function from the logger.
    """
    with pipe:
        for line in iter(pipe.readline, ""):
            logger_function(line.strip())

run_shell_process(command, log_output=True, stream_stdout=False, log_stderr=False)

Asynchronously executes the specified shell command and logs its output.

This function is designed to be used within Prefect flows to run shell commands as part of task execution. It handles both the execution of the command and the collection of its output for logging purposes.

Parameters:

Name Type Description Default
command str

The shell command to execute.

required
log_output bool

If True, the output of the command (both stdout and stderr) is logged to Prefect. Defaults to True

True
stream_stdout bool

If True, the stdout of the command is streamed to Prefect logs. Defaults to False.

False
log_stderr bool

If True, the stderr of the command is logged to Prefect logs. Defaults to False.

False
Source code in src/prefect/cli/shell.py
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
@flow
def run_shell_process(
    command: str,
    log_output: bool = True,
    stream_stdout: bool = False,
    log_stderr: bool = False,
):
    """
    Asynchronously executes the specified shell command and logs its output.

    This function is designed to be used within Prefect flows to run shell commands as part of task execution.
    It handles both the execution of the command and the collection of its output for logging purposes.

    Args:
        command (str): The shell command to execute.
        log_output (bool, optional): If True, the output of the command (both stdout and stderr) is logged to Prefect.
                                     Defaults to True
        stream_stdout (bool, optional): If True, the stdout of the command is streamed to Prefect logs. Defaults to False.
        log_stderr (bool, optional): If True, the stderr of the command is logged to Prefect logs. Defaults to False.


    """

    logger = get_run_logger() if log_output else logging.getLogger("prefect")

    # Containers for log batching
    stdout_container, stderr_container = [], []
    with subprocess.Popen(
        command,
        stdout=subprocess.PIPE,
        stderr=subprocess.PIPE,
        shell=True,
        text=True,
        bufsize=1,
        universal_newlines=True,
    ) as proc:
        # Create threads for collecting stdout and stderr
        if stream_stdout:
            stdout_logger = logger.info
            output = output_stream
        else:
            stdout_logger = stdout_container
            output = output_collect

        stdout_thread = threading.Thread(
            target=output, args=(proc.stdout, stdout_logger)
        )

        stderr_thread = threading.Thread(
            target=output_collect, args=(proc.stderr, stderr_container)
        )

        stdout_thread.start()
        stderr_thread.start()

        stdout_thread.join()
        stderr_thread.join()

        proc.wait()
        if stdout_container:
            logger.info("".join(stdout_container).strip())

        if stderr_container and log_stderr:
            logger.error("".join(stderr_container).strip())
            # Suppress traceback
        if proc.returncode != 0:
            logger.error("".join(stderr_container).strip())
            sys.tracebacklimit = 0
            raise FailedRun(f"Command failed with exit code {proc.returncode}")

serve(command, flow_name=typer.Option(..., help='Name of the flow'), deployment_name=typer.Option('CLI Runner Deployment', help='Name of the deployment'), deployment_tags=typer.Option(None, '--tag', help='Tag for the deployment (can be provided multiple times)'), log_output=typer.Option(True, help='Stream the output of the command', hidden=True), stream_stdout=typer.Option(True, help='Stream the output of the command'), cron_schedule=typer.Option(None, help='Cron schedule for the flow'), timezone=typer.Option(None, help='Timezone for the schedule'), concurrency_limit=typer.Option(None, min=1, help='The maximum number of flow runs that can execute at the same time'), run_once=typer.Option(False, help='Run the agent loop once, instead of forever.')) async

Creates and serves a Prefect deployment that runs a specified shell command according to a cron schedule or ad hoc.

This function allows users to integrate shell command execution into Prefect workflows seamlessly. It provides options for scheduled execution via cron expressions, flow and deployment naming for better management, and the application of tags for easier categorization and filtering within the Prefect UI. Additionally, it supports streaming command output to Prefect logs, setting concurrency limits to control flow execution, and optionally running the deployment once for ad-hoc tasks.

Parameters:

Name Type Description Default
command str

The shell command the flow will execute.

required
name str

The name assigned to the flow. This is required..

required
deployment_tags List[str]

Optional tags for the deployment to facilitate filtering and organization.

Option(None, '--tag', help='Tag for the deployment (can be provided multiple times)')
log_output bool

If True, streams the output of the shell command to the Prefect logs. Defaults to True.

Option(True, help='Stream the output of the command', hidden=True)
cron_schedule str

A cron expression that defines when the flow will run. If not provided, the flow can be triggered manually.

Option(None, help='Cron schedule for the flow')
timezone str

The timezone for the cron schedule. This is important if the schedule should align with local time.

Option(None, help='Timezone for the schedule')
concurrency_limit int

The maximum number of instances of the flow that can run simultaneously.

Option(None, min=1, help='The maximum number of flow runs that can execute at the same time')
deployment_name str

The name of the deployment. This helps distinguish deployments within the Prefect platform.

Option('CLI Runner Deployment', help='Name of the deployment')
run_once bool

When True, the flow will only run once upon deployment initiation, rather than continuously.

Option(False, help='Run the agent loop once, instead of forever.')
Source code in src/prefect/cli/shell.py
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
@shell_app.command("serve")
async def serve(
    command: str,
    flow_name: str = typer.Option(..., help="Name of the flow"),
    deployment_name: str = typer.Option(
        "CLI Runner Deployment", help="Name of the deployment"
    ),
    deployment_tags: List[str] = typer.Option(
        None, "--tag", help="Tag for the deployment (can be provided multiple times)"
    ),
    log_output: bool = typer.Option(
        True, help="Stream the output of the command", hidden=True
    ),
    stream_stdout: bool = typer.Option(True, help="Stream the output of the command"),
    cron_schedule: str = typer.Option(None, help="Cron schedule for the flow"),
    timezone: str = typer.Option(None, help="Timezone for the schedule"),
    concurrency_limit: int = typer.Option(
        None,
        min=1,
        help="The maximum number of flow runs that can execute at the same time",
    ),
    run_once: bool = typer.Option(
        False, help="Run the agent loop once, instead of forever."
    ),
):
    """
    Creates and serves a Prefect deployment that runs a specified shell command according to a cron schedule or ad hoc.

    This function allows users to integrate shell command execution into Prefect workflows seamlessly. It provides options for
    scheduled execution via cron expressions, flow and deployment naming for better management, and the application of tags for
    easier categorization and filtering within the Prefect UI. Additionally, it supports streaming command output to Prefect logs,
    setting concurrency limits to control flow execution, and optionally running the deployment once for ad-hoc tasks.

    Args:
        command (str): The shell command the flow will execute.
        name (str): The name assigned to the flow. This is required..
        deployment_tags (List[str], optional): Optional tags for the deployment to facilitate filtering and organization.
        log_output (bool, optional): If True, streams the output of the shell command to the Prefect logs. Defaults to True.
        cron_schedule (str, optional): A cron expression that defines when the flow will run. If not provided, the flow can be triggered manually.
        timezone (str, optional): The timezone for the cron schedule. This is important if the schedule should align with local time.
        concurrency_limit (int, optional): The maximum number of instances of the flow that can run simultaneously.
        deployment_name (str, optional): The name of the deployment. This helps distinguish deployments within the Prefect platform.
        run_once (bool, optional): When True, the flow will only run once upon deployment initiation, rather than continuously.
    """
    schedule = (
        CronSchedule(cron=cron_schedule, timezone=timezone) if cron_schedule else None
    )
    defined_flow = run_shell_process.with_options(name=flow_name)

    runner_deployment = await defined_flow.to_deployment(
        name=deployment_name,
        parameters={
            "command": command,
            "log_output": log_output,
            "stream_stdout": stream_stdout,
        },
        entrypoint_type=EntrypointType.MODULE_PATH,
        schedules=[DeploymentScheduleCreate(schedule=schedule)] if schedule else [],
        tags=(deployment_tags or []) + ["shell"],
    )

    runner = Runner(name=flow_name)
    deployment_id = await runner.add_deployment(runner_deployment)
    help_message = (
        f"[green]Your flow {runner_deployment.flow_name!r} is being served and polling"
        " for scheduled runs!\n[/]\nTo trigger a run for this flow, use the following"
        " command:\n[blue]\n\t$ prefect deployment run"
        f" '{runner_deployment.flow_name}/{deployment_name}'\n[/]"
    )
    if PREFECT_UI_URL:
        help_message += (
            "\nYou can also run your flow via the Prefect UI:"
            f" [blue]{PREFECT_UI_URL.value()}/deployments/deployment/{deployment_id}[/]\n"
        )

    app.console.print(help_message, soft_wrap=True)
    await runner.start(run_once=run_once)

watch(command, log_output=typer.Option(True, help='Log the output of the command to Prefect logs.'), flow_run_name=typer.Option(None, help='Name of the flow run.'), flow_name=typer.Option('Shell Command', help='Name of the flow.'), stream_stdout=typer.Option(True, help='Stream the output of the command.'), tag=None) async

Executes a shell command and observes it as Prefect flow.

Parameters:

Name Type Description Default
command str

The shell command to be executed.

required
log_output bool

If True, logs the command's output. Defaults to True.

Option(True, help='Log the output of the command to Prefect logs.')
flow_run_name str

An optional name for the flow run.

Option(None, help='Name of the flow run.')
flow_name str

An optional name for the flow. Useful for identification in the Prefect UI.

Option('Shell Command', help='Name of the flow.')
tag List[str]

An optional list of tags for categorizing and filtering flows in the Prefect UI.

None
Source code in src/prefect/cli/shell.py
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
@shell_app.command("watch")
async def watch(
    command: str,
    log_output: bool = typer.Option(
        True, help="Log the output of the command to Prefect logs."
    ),
    flow_run_name: str = typer.Option(None, help="Name of the flow run."),
    flow_name: str = typer.Option("Shell Command", help="Name of the flow."),
    stream_stdout: bool = typer.Option(True, help="Stream the output of the command."),
    tag: Annotated[
        Optional[List[str]], typer.Option(help="Optional tags for the flow run.")
    ] = None,
):
    """
    Executes a shell command and observes it as Prefect flow.

    Args:
        command (str): The shell command to be executed.
        log_output (bool, optional): If True, logs the command's output. Defaults to True.
        flow_run_name (str, optional): An optional name for the flow run.
        flow_name (str, optional): An optional name for the flow. Useful for identification in the Prefect UI.
        tag (List[str], optional): An optional list of tags for categorizing and filtering flows in the Prefect UI.
    """
    tag = (tag or []) + ["shell"]

    # Call the shell_run_command flow with provided arguments
    defined_flow = run_shell_process.with_options(
        name=flow_name, flow_run_name=flow_run_name
    )
    with tags(*tag):
        defined_flow(
            command=command, log_output=log_output, stream_stdout=stream_stdout
        )