Skip to content

prefect.server.models.concurrency_limits

Functions for interacting with concurrency limit ORM objects. Intended for internal use by the Prefect REST API.

filter_concurrency_limits_for_orchestration(session, tags) async

Filters concurrency limits by tag. This will apply a "select for update" lock on these rows to prevent simultaneous read race conditions from enabling the the concurrency limit on these tags from being temporarily exceeded.

Source code in src/prefect/server/models/concurrency_limits.py
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
async def filter_concurrency_limits_for_orchestration(
    session: sa.orm.Session,
    tags: List[str],
):
    """
    Filters concurrency limits by tag. This will apply a "select for update" lock on
    these rows to prevent simultaneous read race conditions from enabling the
    the concurrency limit on these tags from being temporarily exceeded.
    """

    query = (
        sa.select(orm_models.ConcurrencyLimit)
        .filter(orm_models.ConcurrencyLimit.tag.in_(tags))
        .order_by(orm_models.ConcurrencyLimit.tag)
        .with_for_update()
    )
    result = await session.execute(query)
    return result.scalars().all()

read_concurrency_limit(session, concurrency_limit_id) async

Reads a concurrency limit by id. If used for orchestration, simultaneous read race conditions might allow the concurrency limit to be temporarily exceeded.

Source code in src/prefect/server/models/concurrency_limits.py
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
async def read_concurrency_limit(
    session: sa.orm.Session,
    concurrency_limit_id: UUID,
):
    """
    Reads a concurrency limit by id. If used for orchestration, simultaneous read race
    conditions might allow the concurrency limit to be temporarily exceeded.
    """

    query = sa.select(orm_models.ConcurrencyLimit).where(
        orm_models.ConcurrencyLimit.id == concurrency_limit_id
    )

    result = await session.execute(query)
    return result.scalar()

read_concurrency_limit_by_tag(session, tag) async

Reads a concurrency limit by tag. If used for orchestration, simultaneous read race conditions might allow the concurrency limit to be temporarily exceeded.

Source code in src/prefect/server/models/concurrency_limits.py
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
async def read_concurrency_limit_by_tag(
    session: sa.orm.Session,
    tag: str,
):
    """
    Reads a concurrency limit by tag. If used for orchestration, simultaneous read race
    conditions might allow the concurrency limit to be temporarily exceeded.
    """

    query = sa.select(orm_models.ConcurrencyLimit).where(
        orm_models.ConcurrencyLimit.tag == tag
    )

    result = await session.execute(query)
    return result.scalar()

read_concurrency_limits(session, limit=None, offset=None) async

Reads a concurrency limits. If used for orchestration, simultaneous read race conditions might allow the concurrency limit to be temporarily exceeded.

Parameters:

Name Type Description Default
session Session

A database session

required
offset Optional[int]

Query offset

None
limit Optional[int]

Query limit

None

Returns:

Type Description

List[orm_models.ConcurrencyLimit]: concurrency limits

Source code in src/prefect/server/models/concurrency_limits.py
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
async def read_concurrency_limits(
    session: sa.orm.Session,
    limit: Optional[int] = None,
    offset: Optional[int] = None,
):
    """
    Reads a concurrency limits. If used for orchestration, simultaneous read race
    conditions might allow the concurrency limit to be temporarily exceeded.

    Args:
        session: A database session
        offset: Query offset
        limit: Query limit

    Returns:
        List[orm_models.ConcurrencyLimit]: concurrency limits
    """

    query = sa.select(orm_models.ConcurrencyLimit).order_by(
        orm_models.ConcurrencyLimit.tag
    )

    if offset is not None:
        query = query.offset(offset)
    if limit is not None:
        query = query.limit(limit)

    result = await session.execute(query)
    return result.scalars().unique().all()

reset_concurrency_limit_by_tag(session, tag, slot_override=None) async

Resets a concurrency limit by tag.

Source code in src/prefect/server/models/concurrency_limits.py
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
async def reset_concurrency_limit_by_tag(
    session: sa.orm.Session,
    tag: str,
    slot_override: Optional[List[UUID]] = None,
):
    """
    Resets a concurrency limit by tag.
    """
    query = sa.select(orm_models.ConcurrencyLimit).where(
        orm_models.ConcurrencyLimit.tag == tag
    )
    result = await session.execute(query)
    concurrency_limit = result.scalar()
    if concurrency_limit:
        if slot_override is not None:
            concurrency_limit.active_slots = [str(slot) for slot in slot_override]
        else:
            concurrency_limit.active_slots = []
    return concurrency_limit