Skip to content

prefect.cli.global_concurrency_limit

create_global_concurrency_limit(name=typer.Argument(..., help='The name of the global concurrency limit to create.'), limit=typer.Option(..., '--limit', '-l', help='The limit of the global concurrency limit.'), disable=typer.Option(None, '--disable', help='Create an inactive global concurrency limit.'), active_slots=typer.Option(0, '--active-slots', help='The number of active slots.'), slot_decay_per_second=typer.Option(0.0, '--slot-decay-per-second', help='The slot decay per second.')) async

Create a global concurrency limit.

Arguments:

name (str): The name of the global concurrency limit to create.

limit (int): The limit of the global concurrency limit.

disable (Optional[bool]): Create an inactive global concurrency limit.

active_slots (Optional[int]): The number of active slots.

slot_decay_per_second (Optional[float]): The slot decay per second.

Examples:

$ prefect global-concurrency-limit create my-gcl --limit 10

$ prefect gcl create my-gcl --limit 5 --active-slots 3

$ prefect gcl create my-gcl --limit 5 --active-slots 3 --slot-decay-per-second 0.5

$ prefect gcl create my-gcl --limit 5 --inactive
Source code in src/prefect/cli/global_concurrency_limit.py
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
@global_concurrency_limit_app.command("create")
async def create_global_concurrency_limit(
    name: str = typer.Argument(
        ..., help="The name of the global concurrency limit to create."
    ),
    limit: int = typer.Option(
        ..., "--limit", "-l", help="The limit of the global concurrency limit."
    ),
    disable: Optional[bool] = typer.Option(
        None, "--disable", help="Create an inactive global concurrency limit."
    ),
    active_slots: Optional[int] = typer.Option(
        0, "--active-slots", help="The number of active slots."
    ),
    slot_decay_per_second: Optional[float] = typer.Option(
        0.0, "--slot-decay-per-second", help="The slot decay per second."
    ),
):
    """
    Create a global concurrency limit.

    Arguments:

        name (str): The name of the global concurrency limit to create.

        limit (int): The limit of the global concurrency limit.

        disable (Optional[bool]): Create an inactive global concurrency limit.

        active_slots (Optional[int]): The number of active slots.

        slot_decay_per_second (Optional[float]): The slot decay per second.

    Examples:

        $ prefect global-concurrency-limit create my-gcl --limit 10

        $ prefect gcl create my-gcl --limit 5 --active-slots 3

        $ prefect gcl create my-gcl --limit 5 --active-slots 3 --slot-decay-per-second 0.5

        $ prefect gcl create my-gcl --limit 5 --inactive
    """
    async with get_client() as client:
        try:
            await client.read_global_concurrency_limit_by_name(name=name)
        except ObjectNotFound:
            pass
        else:
            exit_with_error(
                f"Global concurrency limit {name!r} already exists. Please try creating with a different name."
            )

    try:
        gcl = GlobalConcurrencyLimitCreate(
            name=name,
            limit=limit,
            active=False if disable else True,
            active_slots=active_slots,
            slot_decay_per_second=slot_decay_per_second,
        )

    except ValidationError as exc:
        exit_with_error(f"Invalid arguments provided: {exc}")
    except Exception as exc:
        exit_with_error(f"Error creating global concurrency limit: {exc}")

    async with get_client() as client:
        try:
            gcl_id = await client.create_global_concurrency_limit(concurrency_limit=gcl)
        except PrefectHTTPStatusError as exc:
            parsed_response = exc.response.json()
            exc = parsed_response["exception_detail"][0]["msg"]

            exit_with_error(f"Error updating global concurrency limit: {exc}")

    exit_with_success(
        f"Created global concurrency limit with name {name!r} and ID '{gcl_id}'. Run `prefect gcl inspect {name}` to view details."
    )

delete_global_concurrency_limit(name=typer.Argument(..., help='The name of the global concurrency limit to delete.')) async

Delete a global concurrency limit.

Parameters:

Name Type Description Default
name str

The name of the global concurrency limit to delete.

Argument(..., help='The name of the global concurrency limit to delete.')
Source code in src/prefect/cli/global_concurrency_limit.py
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
@global_concurrency_limit_app.command("delete")
async def delete_global_concurrency_limit(
    name: str = typer.Argument(
        ..., help="The name of the global concurrency limit to delete."
    ),
):
    """
    Delete a global concurrency limit.

    Arguments:
        name (str): The name of the global concurrency limit to delete.
    """
    if is_interactive() and not typer.confirm(
        f"Are you sure you want to delete global concurrency limit with name {name!r}?",
        default=False,
    ):
        exit_with_error("Deletion aborted.")

    async with get_client() as client:
        try:
            await client.delete_global_concurrency_limit_by_name(name=name)
        except ObjectNotFound:
            exit_with_error(f"Global concurrency limit {name!r} not found.")

    exit_with_success(f"Deleted global concurrency limit with name {name!r}.")

disable_global_concurrency_limit(name=typer.Argument(..., help='The name of the global concurrency limit to disable.')) async

Disable a global concurrency limit.

Parameters:

Name Type Description Default
name str

The name of the global concurrency limit to disable.

Argument(..., help='The name of the global concurrency limit to disable.')
Source code in src/prefect/cli/global_concurrency_limit.py
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
@global_concurrency_limit_app.command("disable")
async def disable_global_concurrency_limit(
    name: str = typer.Argument(
        ..., help="The name of the global concurrency limit to disable."
    ),
):
    """
    Disable a global concurrency limit.

    Arguments:
        name (str): The name of the global concurrency limit to disable.
    """
    async with get_client() as client:
        try:
            gcl_limit = await client.read_global_concurrency_limit_by_name(name=name)
            if not gcl_limit.active:
                exit_with_error(
                    f"Global concurrency limit with name {name!r} is already disabled."
                )
            await client.update_global_concurrency_limit(
                name=name,
                concurrency_limit=GlobalConcurrencyLimitUpdate(active=False),
            )
        except ObjectNotFound:
            exit_with_error(f"Global concurrency limit {name!r} not found.")

    exit_with_success(f"Disabled global concurrency limit with name {name!r}.")

enable_global_concurrency_limit(name=typer.Argument(..., help='The name of the global concurrency limit to enable.')) async

Enable a global concurrency limit.

Parameters:

Name Type Description Default
name str

The name of the global concurrency limit to enable.

Argument(..., help='The name of the global concurrency limit to enable.')
Source code in src/prefect/cli/global_concurrency_limit.py
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
@global_concurrency_limit_app.command("enable")
async def enable_global_concurrency_limit(
    name: str = typer.Argument(
        ..., help="The name of the global concurrency limit to enable."
    ),
):
    """
    Enable a global concurrency limit.

    Arguments:
        name (str): The name of the global concurrency limit to enable.
    """
    async with get_client() as client:
        try:
            gcl_limit = await client.read_global_concurrency_limit_by_name(name=name)
            if gcl_limit.active:
                exit_with_error(
                    f"Global concurrency limit with name {name!r} is already enabled."
                )
            await client.update_global_concurrency_limit(
                name=name,
                concurrency_limit=GlobalConcurrencyLimitUpdate(active=True),
            )
        except ObjectNotFound:
            exit_with_error(f"Global concurrency limit {name!r} not found.")

    exit_with_success(f"Enabled global concurrency limit with name {name!r}.")

inspect_global_concurrency_limit(name=typer.Argument(..., help='The name of the global concurrency limit to inspect.'), output=typer.Option(None, '--output', '-o', help='Output format for the command.', case_sensitive=False), file_path=typer.Option(None, '--file', '-f', help='Path to .json file to write the global concurrency limit output to.')) async

Inspect a global concurrency limit.

Parameters:

Name Type Description Default
name str

The name of the global concurrency limit to inspect.

Argument(..., help='The name of the global concurrency limit to inspect.')
output Optional[OutputFormat]

An output format for the command. Currently only supports JSON. Required if --file/-f is set.

Option(None, '--output', '-o', help='Output format for the command.', case_sensitive=False)
file_path Optional[Path]

A path to .json file to write the global concurrent limit output to.

Option(None, '--file', '-f', help='Path to .json file to write the global concurrency limit output to.')

Returns:

Name Type Description
id str

The ID of the global concurrency limit.

created str

The created date of the global concurrency limit.

updated str

The updated date of the global concurrency limit.

name str

The name of the global concurrency limit.

limit int

The limit of the global concurrency limit.

active_slots int

The number of active slots.

slot_decay_per_second float

The slot decay per second.

Source code in src/prefect/cli/global_concurrency_limit.py
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
@global_concurrency_limit_app.command("inspect")
async def inspect_global_concurrency_limit(
    name: str = typer.Argument(
        ..., help="The name of the global concurrency limit to inspect."
    ),
    output: Optional[OutputFormat] = typer.Option(
        None,
        "--output",
        "-o",
        help="Output format for the command.",
        case_sensitive=False,
    ),
    file_path: Optional[Path] = typer.Option(
        None,
        "--file",
        "-f",
        help="Path to .json file to write the global concurrency limit output to.",
    ),
):
    """
    Inspect a global concurrency limit.

    Arguments:
        name (str): The name of the global concurrency limit to inspect.
        output (Optional[OutputFormat]): An output format for the command. Currently only supports JSON.
            Required if --file/-f is set.
        file_path (Optional[Path]): A path to .json file to write the global concurrent limit output to.

    Returns:
        id (str): The ID of the global concurrency limit.
        created (str): The created date of the global concurrency limit.
        updated (str): The updated date of the global concurrency limit.
        name (str): The name of the global concurrency limit.
        limit (int): The limit of the global concurrency limit.
        active_slots (int): The number of active slots.
        slot_decay_per_second (float): The slot decay per second.

    """
    if file_path and not output:
        exit_with_error("The --file/-f option requires the --output option to be set.")

    async with get_client() as client:
        try:
            gcl_limit = await client.read_global_concurrency_limit_by_name(name=name)
        except ObjectNotFound:
            exit_with_error(f"Global concurrency limit {name!r} not found.")

    if output:
        gcl_limit = gcl_limit.model_dump(mode="json")
        json_output = orjson.dumps(gcl_limit, option=orjson.OPT_INDENT_2).decode()
        if not file_path:
            app.console.print(json_output)

        else:
            with open(file_path, "w") as f:
                f.write(json_output)
                exit_with_success(
                    f"Global concurrency limit {name!r} written to {file_path}"
                )

    else:
        app.console.print(Pretty(gcl_limit))

list_global_concurrency_limits() async

List all global concurrency limits.

Source code in src/prefect/cli/global_concurrency_limit.py
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
@global_concurrency_limit_app.command("ls")
async def list_global_concurrency_limits():
    """
    List all global concurrency limits.
    """
    async with get_client() as client:
        gcl_limits = await client.read_global_concurrency_limits(limit=100, offset=0)
        if not gcl_limits:
            exit_with_success("No global concurrency limits found.")

    table = Table(
        title="Global Concurrency Limits",
        caption="List Global Concurrency Limits using `prefect global-concurrency-limit ls`",
        show_header=True,
    )

    table.add_column("ID", justify="right", style="cyan", no_wrap=True, overflow="fold")
    table.add_column("Name", style="blue", no_wrap=True, overflow="fold")
    table.add_column("Active", style="blue", no_wrap=True)
    table.add_column("Limit", style="blue", no_wrap=True)
    table.add_column("Active Slots", style="blue", no_wrap=True)
    table.add_column("Slot Decay Per Second", style="blue", no_wrap=True)
    table.add_column("Created", style="blue", no_wrap=True)
    table.add_column("Updated", style="blue", no_wrap=True)

    for gcl_limit in sorted(gcl_limits, key=lambda x: f"{x.name}"):
        table.add_row(
            str(gcl_limit.id),
            gcl_limit.name,
            str(gcl_limit.active),
            str(gcl_limit.limit),
            str(gcl_limit.active_slots),
            str(gcl_limit.slot_decay_per_second),
            pendulum.instance(gcl_limit.created).isoformat(),
            pendulum.instance(gcl_limit.updated).diff_for_humans(),
        )

    app.console.print(table)

update_global_concurrency_limit(name=typer.Argument(..., help='The name of the global concurrency limit to update.'), enable=typer.Option(None, '--enable', help='Enable the global concurrency limit.'), disable=typer.Option(None, '--disable', help='Disable the global concurrency limit.'), limit=typer.Option(None, '--limit', '-l', help='The limit of the global concurrency limit.'), active_slots=typer.Option(None, '--active-slots', help='The number of active slots.'), slot_decay_per_second=typer.Option(None, '--slot-decay-per-second', help='The slot decay per second.')) async

Update a global concurrency limit.

Parameters:

Name Type Description Default
name str

The name of the global concurrency limit to update.

Argument(..., help='The name of the global concurrency limit to update.')
enable Optional[bool]

Enable the global concurrency limit.

Option(None, '--enable', help='Enable the global concurrency limit.')
disable Optional[bool]

Disable the global concurrency limit.

Option(None, '--disable', help='Disable the global concurrency limit.')
limit Optional[int]

The limit of the global concurrency limit.

Option(None, '--limit', '-l', help='The limit of the global concurrency limit.')
active_slots Optional[int]

The number of active slots.

Option(None, '--active-slots', help='The number of active slots.')
slot_decay_per_second Optional[float]

The slot decay per second.

Option(None, '--slot-decay-per-second', help='The slot decay per second.')

Examples:

$ prefect global-concurrency-limit update my-gcl --limit 10 $ prefect gcl update my-gcl --active-slots 5 $ prefect gcl update my-gcl --slot-decay-per-second 0.5 $ prefect gcl update my-gcl --enable $ prefect gcl update my-gcl --disable --limit 5

Source code in src/prefect/cli/global_concurrency_limit.py
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
@global_concurrency_limit_app.command("update")
async def update_global_concurrency_limit(
    name: str = typer.Argument(
        ..., help="The name of the global concurrency limit to update."
    ),
    enable: Optional[bool] = typer.Option(
        None, "--enable", help="Enable the global concurrency limit."
    ),
    disable: Optional[bool] = typer.Option(
        None, "--disable", help="Disable the global concurrency limit."
    ),
    limit: Optional[int] = typer.Option(
        None, "--limit", "-l", help="The limit of the global concurrency limit."
    ),
    active_slots: Optional[int] = typer.Option(
        None, "--active-slots", help="The number of active slots."
    ),
    slot_decay_per_second: Optional[float] = typer.Option(
        None, "--slot-decay-per-second", help="The slot decay per second."
    ),
):
    """
    Update a global concurrency limit.

    Arguments:
        name (str): The name of the global concurrency limit to update.
        enable (Optional[bool]): Enable the global concurrency limit.
        disable (Optional[bool]): Disable the global concurrency limit.
        limit (Optional[int]): The limit of the global concurrency limit.
        active_slots (Optional[int]): The number of active slots.
        slot_decay_per_second (Optional[float]): The slot decay per second.

    Examples:
        $ prefect global-concurrency-limit update my-gcl --limit 10
        $ prefect gcl update my-gcl --active-slots 5
        $ prefect gcl update my-gcl --slot-decay-per-second 0.5
        $ prefect gcl update my-gcl --enable
        $ prefect gcl update my-gcl --disable --limit 5
    """
    gcl = GlobalConcurrencyLimitUpdate()

    if enable and disable:
        exit_with_error(
            "Cannot enable and disable a global concurrency limit at the same time."
        )

    if enable:
        gcl.active = True
    if disable:
        gcl.active = False

    if limit is not None:
        gcl.limit = limit

    if active_slots is not None:
        gcl.active_slots = active_slots

    if slot_decay_per_second is not None:
        gcl.slot_decay_per_second = slot_decay_per_second

    if not gcl.model_dump(exclude_unset=True):
        exit_with_error("No update arguments provided.")

    try:
        GlobalConcurrencyLimitUpdate(**gcl.model_dump())
    except ValidationError as exc:
        exit_with_error(f"Invalid arguments provided: {exc}")
    except Exception as exc:
        exit_with_error(f"Error creating global concurrency limit: {exc}")

    async with get_client() as client:
        try:
            await client.update_global_concurrency_limit(
                name=name, concurrency_limit=gcl
            )
        except ObjectNotFound:
            exit_with_error(f"Global concurrency limit {name!r} not found.")
        except PrefectHTTPStatusError as exc:
            if exc.response.status_code == 422:
                parsed_response = exc.response.json()

                error_message = parsed_response["exception_detail"][0]["msg"]

                exit_with_error(
                    f"Error updating global concurrency limit: {error_message}"
                )

    exit_with_success(f"Updated global concurrency limit with name {name!r}.")