Skip to content

prefect.concurrency.v1.sync

concurrency(names, task_run_id, timeout_seconds=None)

A context manager that acquires and releases concurrency slots from the given concurrency limits.

Parameters:

Name Type Description Default
names Union[str, List[str]]

The names of the concurrency limits to acquire.

required
task_run_id UUID

The task run ID acquiring the limits.

required
timeout_seconds Optional[float]

The number of seconds to wait to acquire the limits before raising a TimeoutError. A timeout of None will wait indefinitely.

None

Raises:

Type Description
TimeoutError

If the limits are not acquired within the given timeout.

A simple example of using the sync concurrency context manager:

from prefect.concurrency.v1.sync import concurrency

def resource_heavy():
    with concurrency("test"):
        print("Resource heavy task")

def main():
    resource_heavy()
Source code in src/prefect/concurrency/v1/sync.py
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
@contextmanager
def concurrency(
    names: Union[str, List[str]],
    task_run_id: UUID,
    timeout_seconds: Optional[float] = None,
) -> Generator[None, None, None]:
    """
    A context manager that acquires and releases concurrency slots from the
    given concurrency limits.

    Args:
        names: The names of the concurrency limits to acquire.
        task_run_id: The task run ID acquiring the limits.
        timeout_seconds: The number of seconds to wait to acquire the limits before
            raising a `TimeoutError`. A timeout of `None` will wait indefinitely.

    Raises:
        TimeoutError: If the limits are not acquired within the given timeout.

    Example:
    A simple example of using the sync `concurrency` context manager:
    ```python
    from prefect.concurrency.v1.sync import concurrency

    def resource_heavy():
        with concurrency("test"):
            print("Resource heavy task")

    def main():
        resource_heavy()
    ```
    """
    if not names:
        yield
        return

    names = names if isinstance(names, list) else [names]

    limits: List[MinimalConcurrencyLimitResponse] = _acquire_concurrency_slots(
        names,
        timeout_seconds=timeout_seconds,
        task_run_id=task_run_id,
        _sync=True,
    )
    acquisition_time = pendulum.now("UTC")
    emitted_events = _emit_concurrency_acquisition_events(limits, task_run_id)

    try:
        yield
    finally:
        occupancy_period = cast(Interval, pendulum.now("UTC") - acquisition_time)
        _release_concurrency_slots(
            names,
            task_run_id,
            occupancy_period.total_seconds(),
            _sync=True,
        )
        _emit_concurrency_release_events(limits, emitted_events, task_run_id)