Skip to content

prefect.concurrency.sync

concurrency(names, occupy=1, timeout_seconds=None, max_retries=None, strict=False, create_if_missing=None)

A context manager that acquires and releases concurrency slots from the given concurrency limits.

Parameters:

Name Type Description Default
names Union[str, List[str]]

The names of the concurrency limits to acquire slots from.

required
occupy int

The number of slots to acquire and hold from each limit.

1
timeout_seconds Optional[float]

The number of seconds to wait for the slots to be acquired before raising a TimeoutError. A timeout of None will wait indefinitely.

None
max_retries Optional[int]

The maximum number of retries to acquire the concurrency slots.

None
strict bool

A boolean specifying whether to raise an error if the concurrency limit does not exist. Defaults to False.

False

Raises:

Type Description
TimeoutError

If the slots are not acquired within the given timeout.

ConcurrencySlotAcquisitionError

If the concurrency limit does not exist and strict is True.

A simple example of using the sync concurrency context manager:

from prefect.concurrency.sync import concurrency

def resource_heavy():
    with concurrency("test", occupy=1):
        print("Resource heavy task")

def main():
    resource_heavy()
Source code in src/prefect/concurrency/sync.py
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
@contextmanager
def concurrency(
    names: Union[str, List[str]],
    occupy: int = 1,
    timeout_seconds: Optional[float] = None,
    max_retries: Optional[int] = None,
    strict: bool = False,
    create_if_missing: Optional[bool] = None,
) -> Generator[None, None, None]:
    """A context manager that acquires and releases concurrency slots from the
    given concurrency limits.

    Args:
        names: The names of the concurrency limits to acquire slots from.
        occupy: The number of slots to acquire and hold from each limit.
        timeout_seconds: The number of seconds to wait for the slots to be acquired before
            raising a `TimeoutError`. A timeout of `None` will wait indefinitely.
        max_retries: The maximum number of retries to acquire the concurrency slots.
        strict: A boolean specifying whether to raise an error if the concurrency limit does not exist.
            Defaults to `False`.

    Raises:
        TimeoutError: If the slots are not acquired within the given timeout.
        ConcurrencySlotAcquisitionError: If the concurrency limit does not exist and `strict` is `True`.

    Example:
    A simple example of using the sync `concurrency` context manager:
    ```python
    from prefect.concurrency.sync import concurrency

    def resource_heavy():
        with concurrency("test", occupy=1):
            print("Resource heavy task")

    def main():
        resource_heavy()
    ```
    """
    if not names:
        yield
        return

    names = names if isinstance(names, list) else [names]

    limits: List[MinimalConcurrencyLimitResponse] = _acquire_concurrency_slots(
        names,
        occupy,
        timeout_seconds=timeout_seconds,
        create_if_missing=create_if_missing,
        strict=strict,
        max_retries=max_retries,
        _sync=True,
    )
    acquisition_time = pendulum.now("UTC")
    emitted_events = _emit_concurrency_acquisition_events(limits, occupy)

    try:
        yield
    finally:
        occupancy_period = cast(Interval, pendulum.now("UTC") - acquisition_time)
        _release_concurrency_slots(
            names,
            occupy,
            occupancy_period.total_seconds(),
            _sync=True,
        )
        _emit_concurrency_release_events(limits, occupy, emitted_events)

rate_limit(names, occupy=1, timeout_seconds=None, create_if_missing=None, strict=False)

Block execution until an occupy number of slots of the concurrency limits given in names are acquired. Requires that all given concurrency limits have a slot decay.

Parameters:

Name Type Description Default
names Union[str, List[str]]

The names of the concurrency limits to acquire slots from.

required
occupy int

The number of slots to acquire and hold from each limit.

1
timeout_seconds Optional[float]

The number of seconds to wait for the slots to be acquired before raising a TimeoutError. A timeout of None will wait indefinitely.

None
strict bool

A boolean specifying whether to raise an error if the concurrency limit does not exist. Defaults to False.

False

Raises:

Type Description
TimeoutError

If the slots are not acquired within the given timeout.

ConcurrencySlotAcquisitionError

If the concurrency limit does not exist and strict is True.

Source code in src/prefect/concurrency/sync.py
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
def rate_limit(
    names: Union[str, List[str]],
    occupy: int = 1,
    timeout_seconds: Optional[float] = None,
    create_if_missing: Optional[bool] = None,
    strict: bool = False,
) -> None:
    """Block execution until an `occupy` number of slots of the concurrency
    limits given in `names` are acquired. Requires that all given concurrency
    limits have a slot decay.

    Args:
        names: The names of the concurrency limits to acquire slots from.
        occupy: The number of slots to acquire and hold from each limit.
        timeout_seconds: The number of seconds to wait for the slots to be acquired before
            raising a `TimeoutError`. A timeout of `None` will wait indefinitely.
        strict: A boolean specifying whether to raise an error if the concurrency limit does not exist.
            Defaults to `False`.

    Raises:
        TimeoutError: If the slots are not acquired within the given timeout.
        ConcurrencySlotAcquisitionError: If the concurrency limit does not exist and `strict` is `True`.
    """
    if not names:
        return

    names = names if isinstance(names, list) else [names]

    limits = _acquire_concurrency_slots(
        names,
        occupy,
        mode="rate_limit",
        timeout_seconds=timeout_seconds,
        create_if_missing=create_if_missing,
        strict=strict,
        _sync=True,
    )
    _emit_concurrency_acquisition_events(limits, occupy)