Skip to content

prefect.cli.events

stream(format=typer.Option(StreamFormat.json, '--format', help='Output format (json or text)'), output_file=typer.Option(None, '--output-file', help='File to write events to'), account=typer.Option(False, '--account', help='Stream events for entire account, including audit logs'), run_once=typer.Option(False, '--run-once', help='Stream only one event')) async

Subscribes to the event stream of a workspace, printing each event as it is received. By default, events are printed as JSON, but can be printed as text by passing --format text.

Source code in src/prefect/cli/events.py
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
@events_app.command()
async def stream(
    format: StreamFormat = typer.Option(
        StreamFormat.json, "--format", help="Output format (json or text)"
    ),
    output_file: str = typer.Option(
        None, "--output-file", help="File to write events to"
    ),
    account: bool = typer.Option(
        False,
        "--account",
        help="Stream events for entire account, including audit logs",
    ),
    run_once: bool = typer.Option(False, "--run-once", help="Stream only one event"),
):
    """Subscribes to the event stream of a workspace, printing each event
    as it is received. By default, events are printed as JSON, but can be
    printed as text by passing `--format text`.
    """

    try:
        if account:
            events_subscriber = PrefectCloudAccountEventSubscriber()
        else:
            events_subscriber = get_events_subscriber()

        app.console.print("Subscribing to event stream...")
        async with events_subscriber as subscriber:
            async for event in subscriber:
                await handle_event(event, format, output_file)
                if run_once:
                    typer.Exit(0)
    except Exception as exc:
        handle_error(exc)