Skip to content

prefect.concurrency.v1.asyncio

AcquireConcurrencySlotTimeoutError

Bases: TimeoutError

Raised when acquiring a concurrency slot times out.

Source code in src/prefect/concurrency/v1/asyncio.py
32
33
class AcquireConcurrencySlotTimeoutError(TimeoutError):
    """Raised when acquiring a concurrency slot times out."""

ConcurrencySlotAcquisitionError

Bases: Exception

Raised when an unhandlable occurs while acquiring concurrency slots.

Source code in src/prefect/concurrency/v1/asyncio.py
28
29
class ConcurrencySlotAcquisitionError(Exception):
    """Raised when an unhandlable occurs while acquiring concurrency slots."""

concurrency(names, task_run_id, timeout_seconds=None) async

A context manager that acquires and releases concurrency slots from the given concurrency limits.

Parameters:

Name Type Description Default
names Union[str, List[str]]

The names of the concurrency limits to acquire slots from.

required
task_run_id UUID

The name of the task_run_id that is incrementing the slots.

required
timeout_seconds Optional[float]

The number of seconds to wait for the slots to be acquired before raising a TimeoutError. A timeout of None will wait indefinitely.

None

Raises:

Type Description
TimeoutError

If the slots are not acquired within the given timeout.

A simple example of using the async concurrency context manager:

from prefect.concurrency.v1.asyncio import concurrency

async def resource_heavy():
    async with concurrency("test", task_run_id):
        print("Resource heavy task")

async def main():
    await resource_heavy()
Source code in src/prefect/concurrency/v1/asyncio.py
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
@asynccontextmanager
async def concurrency(
    names: Union[str, List[str]],
    task_run_id: UUID,
    timeout_seconds: Optional[float] = None,
) -> AsyncGenerator[None, None]:
    """A context manager that acquires and releases concurrency slots from the
    given concurrency limits.

    Args:
        names: The names of the concurrency limits to acquire slots from.
        task_run_id: The name of the task_run_id that is incrementing the slots.
        timeout_seconds: The number of seconds to wait for the slots to be acquired before
            raising a `TimeoutError`. A timeout of `None` will wait indefinitely.

    Raises:
        TimeoutError: If the slots are not acquired within the given timeout.

    Example:
    A simple example of using the async `concurrency` context manager:
    ```python
    from prefect.concurrency.v1.asyncio import concurrency

    async def resource_heavy():
        async with concurrency("test", task_run_id):
            print("Resource heavy task")

    async def main():
        await resource_heavy()
    ```
    """
    if not names:
        yield
        return

    names_normalized: List[str] = names if isinstance(names, list) else [names]

    limits = await _acquire_concurrency_slots(
        names_normalized,
        task_run_id=task_run_id,
        timeout_seconds=timeout_seconds,
    )
    acquisition_time = pendulum.now("UTC")
    emitted_events = _emit_concurrency_acquisition_events(limits, task_run_id)

    try:
        yield
    finally:
        occupancy_period = cast(Interval, (pendulum.now("UTC") - acquisition_time))
        try:
            await _release_concurrency_slots(
                names_normalized, task_run_id, occupancy_period.total_seconds()
            )
        except anyio.get_cancelled_exc_class():
            # The task was cancelled before it could release the slots. Add the
            # slots to the cleanup list so they can be released when the
            # concurrency context is exited.
            if ctx := ConcurrencyContext.get():
                ctx.cleanup_slots.append(
                    (names_normalized, occupancy_period.total_seconds(), task_run_id)
                )

        _emit_concurrency_release_events(limits, emitted_events, task_run_id)