prefect.cli.shell
Provides a set of tools for executing shell commands as Prefect flows. Includes functionalities for running shell commands ad-hoc or serving them as Prefect flows, with options for logging output, scheduling, and deployment customization.
output_collect(pipe, container)
Collects output from a subprocess pipe and stores it in a container list.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
pipe |
The output pipe of the subprocess, either stdout or stderr. |
required | |
container |
A list to store the collected output lines. |
required |
Source code in src/prefect/cli/shell.py
47 48 49 50 51 52 53 54 55 56 |
|
output_stream(pipe, logger_function)
Read from a pipe line by line and log using the provided logging function.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
pipe |
IO
|
A file-like object for reading process output. |
required |
logger_function |
function
|
A logging function from the logger. |
required |
Source code in src/prefect/cli/shell.py
34 35 36 37 38 39 40 41 42 43 44 |
|
run_shell_process(command, log_output=True, stream_stdout=False, log_stderr=False)
Asynchronously executes the specified shell command and logs its output.
This function is designed to be used within Prefect flows to run shell commands as part of task execution. It handles both the execution of the command and the collection of its output for logging purposes.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
command |
str
|
The shell command to execute. |
required |
log_output |
bool
|
If True, the output of the command (both stdout and stderr) is logged to Prefect. Defaults to True |
True
|
stream_stdout |
bool
|
If True, the stdout of the command is streamed to Prefect logs. Defaults to False. |
False
|
log_stderr |
bool
|
If True, the stderr of the command is logged to Prefect logs. Defaults to False. |
False
|
Source code in src/prefect/cli/shell.py
59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 |
|
serve(command, flow_name=typer.Option(..., help='Name of the flow'), deployment_name=typer.Option('CLI Runner Deployment', help='Name of the deployment'), deployment_tags=typer.Option(None, '--tag', help='Tag for the deployment (can be provided multiple times)'), log_output=typer.Option(True, help='Stream the output of the command', hidden=True), stream_stdout=typer.Option(True, help='Stream the output of the command'), cron_schedule=typer.Option(None, help='Cron schedule for the flow'), timezone=typer.Option(None, help='Timezone for the schedule'), concurrency_limit=typer.Option(None, min=1, help='The maximum number of flow runs that can execute at the same time'), run_once=typer.Option(False, help='Run the agent loop once, instead of forever.'))
async
Creates and serves a Prefect deployment that runs a specified shell command according to a cron schedule or ad hoc.
This function allows users to integrate shell command execution into Prefect workflows seamlessly. It provides options for scheduled execution via cron expressions, flow and deployment naming for better management, and the application of tags for easier categorization and filtering within the Prefect UI. Additionally, it supports streaming command output to Prefect logs, setting concurrency limits to control flow execution, and optionally running the deployment once for ad-hoc tasks.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
command |
str
|
The shell command the flow will execute. |
required |
name |
str
|
The name assigned to the flow. This is required.. |
required |
deployment_tags |
List[str]
|
Optional tags for the deployment to facilitate filtering and organization. |
Option(None, '--tag', help='Tag for the deployment (can be provided multiple times)')
|
log_output |
bool
|
If True, streams the output of the shell command to the Prefect logs. Defaults to True. |
Option(True, help='Stream the output of the command', hidden=True)
|
cron_schedule |
str
|
A cron expression that defines when the flow will run. If not provided, the flow can be triggered manually. |
Option(None, help='Cron schedule for the flow')
|
timezone |
str
|
The timezone for the cron schedule. This is important if the schedule should align with local time. |
Option(None, help='Timezone for the schedule')
|
concurrency_limit |
int
|
The maximum number of instances of the flow that can run simultaneously. |
Option(None, min=1, help='The maximum number of flow runs that can execute at the same time')
|
deployment_name |
str
|
The name of the deployment. This helps distinguish deployments within the Prefect platform. |
Option('CLI Runner Deployment', help='Name of the deployment')
|
run_once |
bool
|
When True, the flow will only run once upon deployment initiation, rather than continuously. |
Option(False, help='Run the agent loop once, instead of forever.')
|
Source code in src/prefect/cli/shell.py
165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 |
|
watch(command, log_output=typer.Option(True, help='Log the output of the command to Prefect logs.'), flow_run_name=typer.Option(None, help='Name of the flow run.'), flow_name=typer.Option('Shell Command', help='Name of the flow.'), stream_stdout=typer.Option(True, help='Stream the output of the command.'), tag=None)
async
Executes a shell command and observes it as Prefect flow.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
command |
str
|
The shell command to be executed. |
required |
log_output |
bool
|
If True, logs the command's output. Defaults to True. |
Option(True, help='Log the output of the command to Prefect logs.')
|
flow_run_name |
str
|
An optional name for the flow run. |
Option(None, help='Name of the flow run.')
|
flow_name |
str
|
An optional name for the flow. Useful for identification in the Prefect UI. |
Option('Shell Command', help='Name of the flow.')
|
tag |
List[str]
|
An optional list of tags for categorizing and filtering flows in the Prefect UI. |
None
|
Source code in src/prefect/cli/shell.py
130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 |
|