Skip to content

prefect.cli.flow_run

Command line interface for working with flow runs

cancel(id) async

Cancel a flow run by ID.

Source code in src/prefect/cli/flow_run.py
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
@flow_run_app.command()
async def cancel(id: UUID):
    """Cancel a flow run by ID."""
    async with get_client() as client:
        cancelling_state = State(type=StateType.CANCELLING)
        try:
            result = await client.set_flow_run_state(
                flow_run_id=id, state=cancelling_state
            )
        except ObjectNotFound:
            exit_with_error(f"Flow run '{id}' not found!")

    if result.status == SetStateStatus.ABORT:
        exit_with_error(
            f"Flow run '{id}' was unable to be cancelled. Reason:"
            f" '{result.details.reason}'"
        )

    exit_with_success(f"Flow run '{id}' was successfully scheduled for cancellation.")

delete(id) async

Delete a flow run by ID.

Source code in src/prefect/cli/flow_run.py
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
@flow_run_app.command()
async def delete(id: UUID):
    """
    Delete a flow run by ID.
    """
    async with get_client() as client:
        try:
            if is_interactive() and not typer.confirm(
                (f"Are you sure you want to delete flow run with id {id!r}?"),
                default=False,
            ):
                exit_with_error("Deletion aborted.")
            await client.delete_flow_run(id)
        except ObjectNotFound:
            exit_with_error(f"Flow run '{id}' not found!")

    exit_with_success(f"Successfully deleted flow run '{id}'.")

inspect(id) async

View details about a flow run.

Source code in src/prefect/cli/flow_run.py
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
@flow_run_app.command()
async def inspect(id: UUID):
    """
    View details about a flow run.
    """
    async with get_client() as client:
        try:
            flow_run = await client.read_flow_run(id)
        except httpx.HTTPStatusError as exc:
            if exc.response.status_code == status.HTTP_404_NOT_FOUND:
                exit_with_error(f"Flow run {id!r} not found!")
            else:
                raise

    app.console.print(Pretty(flow_run))

logs(id, head=typer.Option(False, '--head', '-h', help=f'Show the first {LOGS_WITH_LIMIT_FLAG_DEFAULT_NUM_LOGS} logs instead of all logs.'), num_logs=typer.Option(None, '--num-logs', '-n', help=f'Number of logs to show when using the --head or --tail flag. If None, defaults to {LOGS_WITH_LIMIT_FLAG_DEFAULT_NUM_LOGS}.', min=1), reverse=typer.Option(False, '--reverse', '-r', help='Reverse the logs order to print the most recent logs first'), tail=typer.Option(False, '--tail', '-t', help=f'Show the last {LOGS_WITH_LIMIT_FLAG_DEFAULT_NUM_LOGS} logs instead of all logs.')) async

View logs for a flow run.

Source code in src/prefect/cli/flow_run.py
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
@flow_run_app.command()
async def logs(
    id: UUID,
    head: bool = typer.Option(
        False,
        "--head",
        "-h",
        help=(
            f"Show the first {LOGS_WITH_LIMIT_FLAG_DEFAULT_NUM_LOGS} logs instead of"
            " all logs."
        ),
    ),
    num_logs: int = typer.Option(
        None,
        "--num-logs",
        "-n",
        help=(
            "Number of logs to show when using the --head or --tail flag. If None,"
            f" defaults to {LOGS_WITH_LIMIT_FLAG_DEFAULT_NUM_LOGS}."
        ),
        min=1,
    ),
    reverse: bool = typer.Option(
        False,
        "--reverse",
        "-r",
        help="Reverse the logs order to print the most recent logs first",
    ),
    tail: bool = typer.Option(
        False,
        "--tail",
        "-t",
        help=(
            f"Show the last {LOGS_WITH_LIMIT_FLAG_DEFAULT_NUM_LOGS} logs instead of"
            " all logs."
        ),
    ),
):
    """
    View logs for a flow run.
    """
    # Pagination - API returns max 200 (LOGS_DEFAULT_PAGE_SIZE) logs at a time
    offset = 0
    more_logs = True
    num_logs_returned = 0

    # if head and tail flags are being used together
    if head and tail:
        exit_with_error("Please provide either a `head` or `tail` option but not both.")

    user_specified_num_logs = (
        num_logs or LOGS_WITH_LIMIT_FLAG_DEFAULT_NUM_LOGS
        if head or tail or num_logs
        else None
    )

    # if using tail update offset according to LOGS_DEFAULT_PAGE_SIZE
    if tail:
        offset = max(0, user_specified_num_logs - LOGS_DEFAULT_PAGE_SIZE)

    log_filter = LogFilter(flow_run_id={"any_": [id]})

    async with get_client() as client:
        # Get the flow run
        try:
            flow_run = await client.read_flow_run(id)
        except ObjectNotFound:
            exit_with_error(f"Flow run {str(id)!r} not found!")

        while more_logs:
            num_logs_to_return_from_page = (
                LOGS_DEFAULT_PAGE_SIZE
                if user_specified_num_logs is None
                else min(
                    LOGS_DEFAULT_PAGE_SIZE, user_specified_num_logs - num_logs_returned
                )
            )

            # Get the next page of logs
            page_logs = await client.read_logs(
                log_filter=log_filter,
                limit=num_logs_to_return_from_page,
                offset=offset,
                sort=(
                    LogSort.TIMESTAMP_DESC if reverse or tail else LogSort.TIMESTAMP_ASC
                ),
            )

            for log in reversed(page_logs) if tail and not reverse else page_logs:
                # Print following the flow run format (declared in logging.yml)
                timestamp = f"{log.timestamp:%Y-%m-%d %H:%M:%S.%f}"[:-3]
                log_level = f"{logging.getLevelName(log.level):7s}"
                flow_run_info = f"Flow run {flow_run.name!r} - {escape(log.message)}"

                log_message = f"{timestamp} | {log_level} | {flow_run_info}"
                app.console.print(
                    log_message,
                    soft_wrap=True,
                )

            # Update the number of logs retrieved
            num_logs_returned += num_logs_to_return_from_page

            if tail:
                #  If the current offset is not 0, update the offset for the next page
                if offset != 0:
                    offset = (
                        0
                        # Reset the offset to 0 if there are less logs than the LOGS_DEFAULT_PAGE_SIZE to get the remaining log
                        if offset < LOGS_DEFAULT_PAGE_SIZE
                        else offset - LOGS_DEFAULT_PAGE_SIZE
                    )
                else:
                    more_logs = False
            else:
                if len(page_logs) == LOGS_DEFAULT_PAGE_SIZE:
                    offset += LOGS_DEFAULT_PAGE_SIZE
                else:
                    # No more logs to show, exit
                    more_logs = False

ls(flow_name=typer.Option(None, help='Name of the flow'), limit=typer.Option(15, help='Maximum number of flow runs to list'), state=typer.Option(None, help="Name of the flow run's state"), state_type=typer.Option(None, help="Type of the flow run's state")) async

View recent flow runs or flow runs for specific flows.

Arguments:

flow_name: Name of the flow

limit: Maximum number of flow runs to list. Defaults to 15.

state: Name of the flow run's state. Can be provided multiple times. Options are 'SCHEDULED', 'PENDING', 'RUNNING', 'COMPLETED', 'FAILED', 'CRASHED', 'CANCELLING', 'CANCELLED', 'PAUSED', 'SUSPENDED', 'AWAITINGRETRY', 'RETRYING', and 'LATE'.

state_type: Type of the flow run's state. Can be provided multiple times. Options are 'SCHEDULED', 'PENDING', 'RUNNING', 'COMPLETED', 'FAILED', 'CRASHED', 'CANCELLING', 'CANCELLED', 'CRASHED', and 'PAUSED'.

Examples:

$ prefect flow-runs ls --state Running

$ prefect flow-runs ls --state Running --state late

$ prefect flow-runs ls --state-type RUNNING

$ prefect flow-runs ls --state-type RUNNING --state-type FAILED

Source code in src/prefect/cli/flow_run.py
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
@flow_run_app.command()
async def ls(
    flow_name: List[str] = typer.Option(None, help="Name of the flow"),
    limit: int = typer.Option(15, help="Maximum number of flow runs to list"),
    state: List[str] = typer.Option(None, help="Name of the flow run's state"),
    state_type: List[str] = typer.Option(None, help="Type of the flow run's state"),
):
    """
    View recent flow runs or flow runs for specific flows.

    Arguments:

        flow_name: Name of the flow

        limit: Maximum number of flow runs to list. Defaults to 15.

        state: Name of the flow run's state. Can be provided multiple times. Options are 'SCHEDULED', 'PENDING', 'RUNNING', 'COMPLETED', 'FAILED', 'CRASHED', 'CANCELLING', 'CANCELLED', 'PAUSED', 'SUSPENDED', 'AWAITINGRETRY', 'RETRYING', and 'LATE'.

        state_type: Type of the flow run's state. Can be provided multiple times. Options are 'SCHEDULED', 'PENDING', 'RUNNING', 'COMPLETED', 'FAILED', 'CRASHED', 'CANCELLING', 'CANCELLED', 'CRASHED', and 'PAUSED'.

    Examples:

    $ prefect flow-runs ls --state Running

    $ prefect flow-runs ls --state Running --state late

    $ prefect flow-runs ls --state-type RUNNING

    $ prefect flow-runs ls --state-type RUNNING --state-type FAILED
    """

    # Handling `state` and `state_type` argument validity in the function instead of by specifying
    # List[StateType] and List[StateName] in the type hints, allows users to provide
    # case-insensitive arguments for `state` and `state_type`.

    prefect_state_names = {
        "SCHEDULED": "Scheduled",
        "PENDING": "Pending",
        "RUNNING": "Running",
        "COMPLETED": "Completed",
        "FAILED": "Failed",
        "CANCELLED": "Cancelled",
        "CRASHED": "Crashed",
        "PAUSED": "Paused",
        "CANCELLING": "Cancelling",
        "SUSPENDED": "Suspended",
        "AWAITINGRETRY": "AwaitingRetry",
        "RETRYING": "Retrying",
        "LATE": "Late",
    }

    state_filter = {}
    formatted_states = []

    if state:
        for s in state:
            uppercased_state = s.upper()
            if uppercased_state in prefect_state_names:
                capitalized_state = prefect_state_names[uppercased_state]
                formatted_states.append(capitalized_state)
            else:
                # Do not change the case of the state name if it is not one of the official Prefect state names
                formatted_states.append(s)
                logger.warning(
                    f"State name {repr(s)} is not one of the official Prefect state names."
                )

        state_filter["name"] = {"any_": formatted_states}

    if state_type:
        upper_cased_states = [s.upper() for s in state_type]
        if not all(s in StateType.__members__ for s in upper_cased_states):
            exit_with_error(
                f"Invalid state type. Options are {', '.join(StateType.__members__)}."
            )

        state_filter["type"] = {
            "any_": [StateType[s].value for s in upper_cased_states]
        }

    async with get_client() as client:
        flow_runs = await client.read_flow_runs(
            flow_filter=FlowFilter(name={"any_": flow_name}) if flow_name else None,
            flow_run_filter=FlowRunFilter(state=state_filter) if state_filter else None,
            limit=limit,
            sort=FlowRunSort.EXPECTED_START_TIME_DESC,
        )
        flows_by_id = {
            flow.id: flow
            for flow in await client.read_flows(
                flow_filter=FlowFilter(id={"any_": [run.flow_id for run in flow_runs]})
            )
        }

        if not flow_runs:
            exit_with_success("No flow runs found.")

    table = Table(title="Flow Runs")
    table.add_column("ID", justify="right", style="cyan", no_wrap=True)
    table.add_column("Flow", style="blue", no_wrap=True)
    table.add_column("Name", style="green", no_wrap=True)
    table.add_column("State", no_wrap=True)
    table.add_column("When", style="bold", no_wrap=True)

    for flow_run in sorted(flow_runs, key=lambda d: d.created, reverse=True):
        flow = flows_by_id[flow_run.flow_id]
        timestamp = (
            flow_run.state.state_details.scheduled_time
            if flow_run.state.is_scheduled()
            else flow_run.state.timestamp
        )
        table.add_row(
            str(flow_run.id),
            str(flow.name),
            str(flow_run.name),
            str(flow_run.state.type.value),
            pendulum.instance(timestamp).diff_for_humans(),
        )

    app.console.print(table)