Skip to content

prefect.concurrency.asyncio

AcquireConcurrencySlotTimeoutError

Bases: TimeoutError

Raised when acquiring a concurrency slot times out.

Source code in src/prefect/concurrency/asyncio.py
30
31
class AcquireConcurrencySlotTimeoutError(TimeoutError):
    """Raised when acquiring a concurrency slot times out."""

ConcurrencySlotAcquisitionError

Bases: Exception

Raised when an unhandlable occurs while acquiring concurrency slots.

Source code in src/prefect/concurrency/asyncio.py
26
27
class ConcurrencySlotAcquisitionError(Exception):
    """Raised when an unhandlable occurs while acquiring concurrency slots."""

concurrency(names, occupy=1, timeout_seconds=None, create_if_missing=True, max_retries=None) async

A context manager that acquires and releases concurrency slots from the given concurrency limits.

Parameters:

Name Type Description Default
names Union[str, List[str]]

The names of the concurrency limits to acquire slots from.

required
occupy int

The number of slots to acquire and hold from each limit.

1
timeout_seconds Optional[float]

The number of seconds to wait for the slots to be acquired before raising a TimeoutError. A timeout of None will wait indefinitely.

None
create_if_missing bool

Whether to create the concurrency limits if they do not exist.

True
max_retries Optional[int]

The maximum number of retries to acquire the concurrency slots.

None

Raises:

Type Description
TimeoutError

If the slots are not acquired within the given timeout.

A simple example of using the async concurrency context manager:

from prefect.concurrency.asyncio import concurrency

async def resource_heavy():
    async with concurrency("test", occupy=1):
        print("Resource heavy task")

async def main():
    await resource_heavy()
Source code in src/prefect/concurrency/asyncio.py
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
@asynccontextmanager
async def concurrency(
    names: Union[str, List[str]],
    occupy: int = 1,
    timeout_seconds: Optional[float] = None,
    create_if_missing: bool = True,
    max_retries: Optional[int] = None,
) -> AsyncGenerator[None, None]:
    """A context manager that acquires and releases concurrency slots from the
    given concurrency limits.

    Args:
        names: The names of the concurrency limits to acquire slots from.
        occupy: The number of slots to acquire and hold from each limit.
        timeout_seconds: The number of seconds to wait for the slots to be acquired before
            raising a `TimeoutError`. A timeout of `None` will wait indefinitely.
        create_if_missing: Whether to create the concurrency limits if they do not exist.
        max_retries: The maximum number of retries to acquire the concurrency slots.

    Raises:
        TimeoutError: If the slots are not acquired within the given timeout.

    Example:
    A simple example of using the async `concurrency` context manager:
    ```python
    from prefect.concurrency.asyncio import concurrency

    async def resource_heavy():
        async with concurrency("test", occupy=1):
            print("Resource heavy task")

    async def main():
        await resource_heavy()
    ```
    """
    if not names:
        yield
        return

    names = names if isinstance(names, list) else [names]

    limits = await _acquire_concurrency_slots(
        names,
        occupy,
        timeout_seconds=timeout_seconds,
        create_if_missing=create_if_missing,
        max_retries=max_retries,
    )
    acquisition_time = pendulum.now("UTC")
    emitted_events = _emit_concurrency_acquisition_events(limits, occupy)

    try:
        yield
    finally:
        occupancy_period = cast(Interval, (pendulum.now("UTC") - acquisition_time))
        try:
            await _release_concurrency_slots(
                names, occupy, occupancy_period.total_seconds()
            )
        except anyio.get_cancelled_exc_class():
            # The task was cancelled before it could release the slots. Add the
            # slots to the cleanup list so they can be released when the
            # concurrency context is exited.
            if ctx := ConcurrencyContext.get():
                ctx.cleanup_slots.append(
                    (names, occupy, occupancy_period.total_seconds())
                )

        _emit_concurrency_release_events(limits, occupy, emitted_events)

rate_limit(names, occupy=1, timeout_seconds=None, create_if_missing=True) async

Block execution until an occupy number of slots of the concurrency limits given in names are acquired. Requires that all given concurrency limits have a slot decay.

Parameters:

Name Type Description Default
names Union[str, List[str]]

The names of the concurrency limits to acquire slots from.

required
occupy int

The number of slots to acquire and hold from each limit.

1
timeout_seconds Optional[float]

The number of seconds to wait for the slots to be acquired before raising a TimeoutError. A timeout of None will wait indefinitely.

None
create_if_missing Optional[bool]

Whether to create the concurrency limits if they do not exist.

True
Source code in src/prefect/concurrency/asyncio.py
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
async def rate_limit(
    names: Union[str, List[str]],
    occupy: int = 1,
    timeout_seconds: Optional[float] = None,
    create_if_missing: Optional[bool] = True,
) -> None:
    """Block execution until an `occupy` number of slots of the concurrency
    limits given in `names` are acquired. Requires that all given concurrency
    limits have a slot decay.

    Args:
        names: The names of the concurrency limits to acquire slots from.
        occupy: The number of slots to acquire and hold from each limit.
        timeout_seconds: The number of seconds to wait for the slots to be acquired before
            raising a `TimeoutError`. A timeout of `None` will wait indefinitely.
        create_if_missing: Whether to create the concurrency limits if they do not exist.
    """
    if not names:
        return

    names = names if isinstance(names, list) else [names]

    limits = await _acquire_concurrency_slots(
        names,
        occupy,
        mode="rate_limit",
        timeout_seconds=timeout_seconds,
        create_if_missing=create_if_missing,
    )
    _emit_concurrency_acquisition_events(limits, occupy)