Skip to content

prefect.concurrency.asyncio

AcquireConcurrencySlotTimeoutError

Bases: TimeoutError

Raised when acquiring a concurrency slot times out.

Source code in src/prefect/concurrency/asyncio.py
34
35
class AcquireConcurrencySlotTimeoutError(TimeoutError):
    """Raised when acquiring a concurrency slot times out."""

ConcurrencySlotAcquisitionError

Bases: Exception

Raised when an unhandlable occurs while acquiring concurrency slots.

Source code in src/prefect/concurrency/asyncio.py
30
31
class ConcurrencySlotAcquisitionError(Exception):
    """Raised when an unhandlable occurs while acquiring concurrency slots."""

concurrency(names, occupy=1, timeout_seconds=None, max_retries=None, create_if_missing=None, strict=False) async

A context manager that acquires and releases concurrency slots from the given concurrency limits.

Parameters:

Name Type Description Default
names Union[str, List[str]]

The names of the concurrency limits to acquire slots from.

required
occupy int

The number of slots to acquire and hold from each limit.

1
timeout_seconds Optional[float]

The number of seconds to wait for the slots to be acquired before raising a TimeoutError. A timeout of None will wait indefinitely.

None
max_retries Optional[int]

The maximum number of retries to acquire the concurrency slots.

None
strict bool

A boolean specifying whether to raise an error if the concurrency limit does not exist. Defaults to False.

False

Raises:

Type Description
TimeoutError

If the slots are not acquired within the given timeout.

ConcurrencySlotAcquisitionError

If the concurrency limit does not exist and strict is True.

A simple example of using the async concurrency context manager:

from prefect.concurrency.asyncio import concurrency

async def resource_heavy():
    async with concurrency("test", occupy=1):
        print("Resource heavy task")

async def main():
    await resource_heavy()
Source code in src/prefect/concurrency/asyncio.py
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
@asynccontextmanager
async def concurrency(
    names: Union[str, List[str]],
    occupy: int = 1,
    timeout_seconds: Optional[float] = None,
    max_retries: Optional[int] = None,
    create_if_missing: Optional[bool] = None,
    strict: bool = False,
) -> AsyncGenerator[None, None]:
    """A context manager that acquires and releases concurrency slots from the
    given concurrency limits.

    Args:
        names: The names of the concurrency limits to acquire slots from.
        occupy: The number of slots to acquire and hold from each limit.
        timeout_seconds: The number of seconds to wait for the slots to be acquired before
            raising a `TimeoutError`. A timeout of `None` will wait indefinitely.
        max_retries: The maximum number of retries to acquire the concurrency slots.
        strict: A boolean specifying whether to raise an error if the concurrency limit does not exist.
            Defaults to `False`.

    Raises:
        TimeoutError: If the slots are not acquired within the given timeout.
        ConcurrencySlotAcquisitionError: If the concurrency limit does not exist and `strict` is `True`.

    Example:
    A simple example of using the async `concurrency` context manager:
    ```python
    from prefect.concurrency.asyncio import concurrency

    async def resource_heavy():
        async with concurrency("test", occupy=1):
            print("Resource heavy task")

    async def main():
        await resource_heavy()
    ```
    """
    if not names:
        yield
        return

    names = names if isinstance(names, list) else [names]

    limits = await _acquire_concurrency_slots(
        names,
        occupy,
        timeout_seconds=timeout_seconds,
        create_if_missing=create_if_missing,
        max_retries=max_retries,
        strict=strict,
    )
    acquisition_time = pendulum.now("UTC")
    emitted_events = _emit_concurrency_acquisition_events(limits, occupy)

    try:
        yield
    finally:
        occupancy_period = cast(Interval, (pendulum.now("UTC") - acquisition_time))
        try:
            await _release_concurrency_slots(
                names, occupy, occupancy_period.total_seconds()
            )
        except anyio.get_cancelled_exc_class():
            # The task was cancelled before it could release the slots. Add the
            # slots to the cleanup list so they can be released when the
            # concurrency context is exited.
            if ctx := ConcurrencyContext.get():
                ctx.cleanup_slots.append(
                    (names, occupy, occupancy_period.total_seconds())
                )

        _emit_concurrency_release_events(limits, occupy, emitted_events)

rate_limit(names, occupy=1, timeout_seconds=None, create_if_missing=None, strict=False) async

Block execution until an occupy number of slots of the concurrency limits given in names are acquired. Requires that all given concurrency limits have a slot decay.

Parameters:

Name Type Description Default
names Union[str, List[str]]

The names of the concurrency limits to acquire slots from.

required
occupy int

The number of slots to acquire and hold from each limit.

1
timeout_seconds Optional[float]

The number of seconds to wait for the slots to be acquired before raising a TimeoutError. A timeout of None will wait indefinitely.

None
strict bool

A boolean specifying whether to raise an error if the concurrency limit does not exist. Defaults to False.

False

Raises:

Type Description
TimeoutError

If the slots are not acquired within the given timeout.

ConcurrencySlotAcquisitionError

If the concurrency limit does not exist and strict is True.

Source code in src/prefect/concurrency/asyncio.py
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
async def rate_limit(
    names: Union[str, List[str]],
    occupy: int = 1,
    timeout_seconds: Optional[float] = None,
    create_if_missing: Optional[bool] = None,
    strict: bool = False,
) -> None:
    """Block execution until an `occupy` number of slots of the concurrency
    limits given in `names` are acquired. Requires that all given concurrency
    limits have a slot decay.

    Args:
        names: The names of the concurrency limits to acquire slots from.
        occupy: The number of slots to acquire and hold from each limit.
        timeout_seconds: The number of seconds to wait for the slots to be acquired before
            raising a `TimeoutError`. A timeout of `None` will wait indefinitely.
        strict: A boolean specifying whether to raise an error if the concurrency limit does not exist.
            Defaults to `False`.

    Raises:
        TimeoutError: If the slots are not acquired within the given timeout.
        ConcurrencySlotAcquisitionError: If the concurrency limit does not exist and `strict` is `True`.
    """
    if not names:
        return

    names = names if isinstance(names, list) else [names]

    limits = await _acquire_concurrency_slots(
        names,
        occupy,
        mode="rate_limit",
        timeout_seconds=timeout_seconds,
        create_if_missing=create_if_missing,
        strict=strict,
    )
    _emit_concurrency_acquisition_events(limits, occupy)