Skip to content

prefect.server.models.work_queues

Functions for interacting with work queue ORM objects. Intended for internal use by the Prefect REST API.

create_work_queue(session, work_queue) async

Inserts a WorkQueue.

If a WorkQueue with the same name exists, an error will be thrown.

Parameters:

Name Type Description Default
session AsyncSession

a database session

required
work_queue WorkQueue

a WorkQueue model

required

Returns:

Type Description
WorkQueue

orm_models.WorkQueue: the newly-created or updated WorkQueue

Source code in src/prefect/server/models/work_queues.py
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
async def create_work_queue(
    session: AsyncSession,
    work_queue: Union[schemas.core.WorkQueue, schemas.actions.WorkQueueCreate],
) -> orm_models.WorkQueue:
    """
    Inserts a WorkQueue.

    If a WorkQueue with the same name exists, an error will be thrown.

    Args:
        session (AsyncSession): a database session
        work_queue (schemas.core.WorkQueue): a WorkQueue model

    Returns:
        orm_models.WorkQueue: the newly-created or updated WorkQueue

    """
    data = work_queue.model_dump()

    if data.get("work_pool_id") is None:
        # If no work pool is provided, get or create the default agent work pool
        default_agent_work_pool = await models.workers.read_work_pool_by_name(
            session=session, work_pool_name=DEFAULT_AGENT_WORK_POOL_NAME
        )
        if default_agent_work_pool:
            data["work_pool_id"] = default_agent_work_pool.id
        else:
            default_agent_work_pool = await models.workers.create_work_pool(
                session=session,
                work_pool=schemas.actions.WorkPoolCreate(
                    name=DEFAULT_AGENT_WORK_POOL_NAME, type="prefect-agent"
                ),
            )
            if work_queue.name == "default":
                # If the desired work queue name is default, it was created when the
                # work pool was created. We can just return it.
                default_work_queue = await models.workers.read_work_queue(
                    session=session,
                    work_queue_id=default_agent_work_pool.default_queue_id,
                )
                assert default_work_queue
                return default_work_queue
            data["work_pool_id"] = default_agent_work_pool.id

    # Set the priority to be the max priority + 1
    # This will make the new queue the lowest priority
    if data["priority"] is None:
        # Set the priority to be the first priority value that isn't already taken
        priorities_query = sa.select(orm_models.WorkQueue.priority).where(
            orm_models.WorkQueue.work_pool_id == data["work_pool_id"]
        )
        priorities = (await session.execute(priorities_query)).scalars().all()

        priority = None
        for i, p in enumerate(sorted(priorities)):
            # if a rank was skipped (e.g. the set priority is different than the
            # enumerated priority) then we can "take" that spot for this work
            # queue
            if i + 1 != p:
                priority = i + 1
                break

        # otherwise take the maximum priority plus one
        if priority is None:
            priority = max(priorities, default=0) + 1

        data["priority"] = priority

    model = orm_models.WorkQueue(**data)

    session.add(model)
    await session.flush()
    await session.refresh(model)

    if work_queue.priority:
        await bulk_update_work_queue_priorities(
            session=session,
            work_pool_id=data["work_pool_id"],
            new_priorities={model.id: work_queue.priority},
        )

    return model

delete_work_queue(session, work_queue_id) async

Delete a WorkQueue by id.

Parameters:

Name Type Description Default
session AsyncSession

A database session

required
work_queue_id str

a WorkQueue id

required

Returns:

Name Type Description
bool bool

whether or not the WorkQueue was deleted

Source code in src/prefect/server/models/work_queues.py
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
async def delete_work_queue(session: AsyncSession, work_queue_id: UUID) -> bool:
    """
    Delete a WorkQueue by id.

    Args:
        session (AsyncSession): A database session
        work_queue_id (str): a WorkQueue id

    Returns:
        bool: whether or not the WorkQueue was deleted
    """
    result = await session.execute(
        delete(orm_models.WorkQueue).where(orm_models.WorkQueue.id == work_queue_id)
    )

    return result.rowcount > 0

ensure_work_queue_exists(session, name) async

Checks if a work queue exists and creates it if it does not.

Useful when working with deployments, agents, and flow runs that automatically create work queues.

Will also create a work pool queue in the default agent pool to facilitate migration to work pools.

Source code in src/prefect/server/models/work_queues.py
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
async def ensure_work_queue_exists(
    session: AsyncSession, name: str
) -> orm_models.WorkQueue:
    """
    Checks if a work queue exists and creates it if it does not.

    Useful when working with deployments, agents, and flow runs that automatically create work queues.

    Will also create a work pool queue in the default agent pool to facilitate migration to work pools.
    """
    # read work queue
    work_queue = await models.work_queues.read_work_queue_by_name(
        session=session, name=name
    )
    if not work_queue:
        default_pool = await models.workers.read_work_pool_by_name(
            session=session, work_pool_name=DEFAULT_AGENT_WORK_POOL_NAME
        )

        if default_pool is None:
            work_queue = await models.work_queues.create_work_queue(
                session=session,
                work_queue=schemas.actions.WorkQueueCreate(name=name, priority=1),
            )
        else:
            if name != "default":
                work_queue = await models.workers.create_work_queue(
                    session=session,
                    work_pool_id=default_pool.id,
                    work_queue=schemas.actions.WorkQueueCreate(name=name, priority=1),
                )
            else:
                work_queue = await models.work_queues.read_work_queue(
                    session=session, work_queue_id=default_pool.default_queue_id
                )
                assert work_queue, "Default work queue not found"

    return work_queue

get_runs_in_work_queue(db, session, work_queue_id, limit=None, scheduled_before=None) async

Get runs from a work queue.

Parameters:

Name Type Description Default
session AsyncSession

A database session. work_queue_id: The work queue id.

required
scheduled_before Optional[datetime]

Only return runs scheduled to start before this time.

None
limit Optional[int]

An optional limit for the number of runs to return from the queue. This limit applies to the request only. It does not affect the work queue's concurrency limit. If limit exceeds the work queue's concurrency limit, it will be ignored.

None
Source code in src/prefect/server/models/work_queues.py
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
@db_injector
async def get_runs_in_work_queue(
    db: PrefectDBInterface,
    session: AsyncSession,
    work_queue_id: UUID,
    limit: Optional[int] = None,
    scheduled_before: Optional[datetime.datetime] = None,
) -> Tuple[orm_models.WorkQueue, Sequence[orm_models.FlowRun]]:
    """
    Get runs from a work queue.

    Args:
        session: A database session. work_queue_id: The work queue id.
        scheduled_before: Only return runs scheduled to start before this time.
        limit: An optional limit for the number of runs to return from the
            queue. This limit applies to the request only. It does not affect
            the work queue's concurrency limit. If `limit` exceeds the work
            queue's concurrency limit, it will be ignored.

    """
    work_queue = await read_work_queue(session=session, work_queue_id=work_queue_id)
    if not work_queue:
        raise ObjectNotFoundError(f"Work queue with id {work_queue_id} not found.")

    if work_queue.filter is None:
        query = db.queries.get_scheduled_flow_runs_from_work_queues(
            limit_per_queue=limit,
            work_queue_ids=[work_queue_id],
            scheduled_before=scheduled_before,
        )
        result = await session.execute(query)
        return work_queue, result.scalars().unique().all()

    # if the work queue has a filter, it's a deprecated tag-based work queue
    # and uses an old approach
    else:
        return work_queue, await _legacy_get_runs_in_work_queue(
            session=session,
            work_queue_id=work_queue_id,
            scheduled_before=scheduled_before,
            limit=limit,
        )

read_work_queue(session, work_queue_id) async

Reads a WorkQueue by id.

Parameters:

Name Type Description Default
session AsyncSession

A database session

required
work_queue_id str

a WorkQueue id

required

Returns:

Type Description
Optional[WorkQueue]

orm_models.WorkQueue: the WorkQueue

Source code in src/prefect/server/models/work_queues.py
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
async def read_work_queue(
    session: AsyncSession, work_queue_id: Union[UUID, PrefectUUID]
) -> Optional[orm_models.WorkQueue]:
    """
    Reads a WorkQueue by id.

    Args:
        session (AsyncSession): A database session
        work_queue_id (str): a WorkQueue id

    Returns:
        orm_models.WorkQueue: the WorkQueue
    """

    return await session.get(orm_models.WorkQueue, work_queue_id)

read_work_queue_by_name(session, name) async

Reads a WorkQueue by id.

Parameters:

Name Type Description Default
session AsyncSession

A database session

required
work_queue_id str

a WorkQueue id

required

Returns:

Type Description
Optional[WorkQueue]

orm_models.WorkQueue: the WorkQueue

Source code in src/prefect/server/models/work_queues.py
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
async def read_work_queue_by_name(
    session: AsyncSession, name: str
) -> Optional[orm_models.WorkQueue]:
    """
    Reads a WorkQueue by id.

    Args:
        session (AsyncSession): A database session
        work_queue_id (str): a WorkQueue id

    Returns:
        orm_models.WorkQueue: the WorkQueue
    """
    default_work_pool = await models.workers.read_work_pool_by_name(
        session=session, work_pool_name=DEFAULT_AGENT_WORK_POOL_NAME
    )
    # Logic to make sure this functionality doesn't break during migration
    if default_work_pool is not None:
        query = select(orm_models.WorkQueue).filter_by(
            name=name, work_pool_id=default_work_pool.id
        )
    else:
        query = select(orm_models.WorkQueue).filter_by(name=name)
    result = await session.execute(query)
    return result.scalar()

read_work_queue_status(session, work_queue_id) async

Get work queue status by id.

Parameters:

Name Type Description Default
session AsyncSession

A database session

required
work_queue_id str

a WorkQueue id

required

Returns:

Type Description
WorkQueueStatusDetail

Information about the status of the work queue.

Source code in src/prefect/server/models/work_queues.py
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
async def read_work_queue_status(
    session: AsyncSession, work_queue_id: UUID
) -> schemas.core.WorkQueueStatusDetail:
    """
    Get work queue status by id.

    Args:
        session (AsyncSession): A database session
        work_queue_id (str): a WorkQueue id

    Returns:
        Information about the status of the work queue.
    """

    work_queue = await read_work_queue(session=session, work_queue_id=work_queue_id)
    if not work_queue:
        raise ObjectNotFoundError(f"Work queue with id {work_queue_id} not found")

    work_queue_late_runs_count = await models.flow_runs.count_flow_runs(
        session=session,
        flow_run_filter=schemas.filters.FlowRunFilter(
            state=schemas.filters.FlowRunFilterState(name={"any_": ["Late"]}),
        ),
        work_queue_filter=schemas.filters.WorkQueueFilter(
            id=schemas.filters.WorkQueueFilterId(any_=[work_queue_id])
        ),
    )

    # All work queues use the default policy for now
    health_check_policy = schemas.core.WorkQueueHealthPolicy(
        maximum_late_runs=0, maximum_seconds_since_last_polled=60
    )

    healthy = health_check_policy.evaluate_health_status(
        late_runs_count=work_queue_late_runs_count,
        last_polled=work_queue.last_polled,  # type: ignore
    )

    return schemas.core.WorkQueueStatusDetail(
        healthy=healthy,
        late_runs_count=work_queue_late_runs_count,
        last_polled=work_queue.last_polled,
        health_check_policy=health_check_policy,
    )

read_work_queues(session, offset=None, limit=None, work_queue_filter=None) async

Read WorkQueues.

Parameters:

Name Type Description Default
session AsyncSession

A database session

required
offset Optional[int]

Query offset

None
limit Optional[int]

Query limit

None
work_queue_filter Optional[WorkQueueFilter]

only select work queues matching these filters

None
Source code in src/prefect/server/models/work_queues.py
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
async def read_work_queues(
    session: AsyncSession,
    offset: Optional[int] = None,
    limit: Optional[int] = None,
    work_queue_filter: Optional[schemas.filters.WorkQueueFilter] = None,
) -> Sequence[orm_models.WorkQueue]:
    """
    Read WorkQueues.

    Args:
        session: A database session
        offset: Query offset
        limit: Query limit
        work_queue_filter: only select work queues matching these filters
    Returns:
        Sequence[orm_models.WorkQueue]: WorkQueues
    """

    query = select(orm_models.WorkQueue).order_by(orm_models.WorkQueue.name)

    if offset is not None:
        query = query.offset(offset)
    if limit is not None:
        query = query.limit(limit)
    if work_queue_filter:
        query = query.where(work_queue_filter.as_sql_filter())

    result = await session.execute(query)
    return result.scalars().unique().all()

record_work_queue_polls(session, polled_work_queue_ids, ready_work_queue_ids) async

Record that the given work queues were polled, and also update the given ready_work_queue_ids to READY.

Source code in src/prefect/server/models/work_queues.py
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
async def record_work_queue_polls(
    session: AsyncSession,
    polled_work_queue_ids: Sequence[UUID],
    ready_work_queue_ids: Sequence[UUID],
) -> None:
    """Record that the given work queues were polled, and also update the given
    ready_work_queue_ids to READY."""
    polled = pendulum.now("UTC")

    if polled_work_queue_ids:
        await session.execute(
            sa.update(orm_models.WorkQueue)
            .where(orm_models.WorkQueue.id.in_(polled_work_queue_ids))
            .values(last_polled=polled)
        )

    if ready_work_queue_ids:
        await session.execute(
            sa.update(orm_models.WorkQueue)
            .where(orm_models.WorkQueue.id.in_(ready_work_queue_ids))
            .values(last_polled=polled, status=WorkQueueStatus.READY)
        )

update_work_queue(session, work_queue_id, work_queue, emit_status_change=None) async

Update a WorkQueue by id.

Parameters:

Name Type Description Default
session AsyncSession

A database session

required
work_queue WorkQueueUpdate

the work queue data

required
work_queue_id str

a WorkQueue id

required

Returns:

Name Type Description
bool bool

whether or not the WorkQueue was updated

Source code in src/prefect/server/models/work_queues.py
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
async def update_work_queue(
    session: AsyncSession,
    work_queue_id: UUID,
    work_queue: schemas.actions.WorkQueueUpdate,
    emit_status_change: Optional[
        Callable[[orm_models.WorkQueue], Awaitable[None]]
    ] = None,
) -> bool:
    """
    Update a WorkQueue by id.

    Args:
        session (AsyncSession): A database session
        work_queue: the work queue data
        work_queue_id (str): a WorkQueue id

    Returns:
        bool: whether or not the WorkQueue was updated
    """
    # exclude_unset=True allows us to only update values provided by
    # the user, ignoring any defaults on the model
    update_data = work_queue.model_dump_for_orm(exclude_unset=True)

    if "is_paused" in update_data:
        wq = await read_work_queue(session=session, work_queue_id=work_queue_id)
        if wq is None:
            return False

        # Only update the status to paused if it's not already paused. This ensures a work queue that is already
        # paused will not get a status update if it's paused again
        if update_data.get("is_paused") and wq.status != WorkQueueStatus.PAUSED:
            update_data["status"] = WorkQueueStatus.PAUSED

        # If unpausing, only update status if it's currently paused. This ensures a work queue that is already
        # unpaused will not get a status update if it's unpaused again
        if (
            update_data.get("is_paused") is False
            and wq.status == WorkQueueStatus.PAUSED
        ):
            # Default status if unpaused
            update_data["status"] = WorkQueueStatus.NOT_READY

            # Determine source of last_polled: update_data or database
            last_polled: Optional[pendulum.DateTime]
            if "last_polled" in update_data:
                last_polled = cast(pendulum.DateTime, update_data["last_polled"])
            else:
                last_polled = wq.last_polled

            # Check if last polled is recent and set status to READY if so
            if is_last_polled_recent(last_polled):
                update_data["status"] = schemas.statuses.WorkQueueStatus.READY

    update_stmt = (
        sa.update(orm_models.WorkQueue)
        .where(orm_models.WorkQueue.id == work_queue_id)
        .values(**update_data)
    )
    result = await session.execute(update_stmt)
    updated = result.rowcount > 0

    if updated:
        if "status" in update_data and emit_status_change:
            wq = await read_work_queue(session=session, work_queue_id=work_queue_id)
            assert wq
            await emit_status_change(wq)

    return updated