Skip to content

prefect.server.task_queue

Implements an in-memory task queue for delivering background task runs to TaskWorkers.

MultiQueue

A queue that can pull tasks from from any of a number of task queues

Source code in src/prefect/server/task_queue.py
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
class MultiQueue:
    """A queue that can pull tasks from from any of a number of task queues"""

    _queues: List[TaskQueue]

    def __init__(self, task_keys: List[str]):
        self._queues = [TaskQueue.for_key(task_key) for task_key in task_keys]

    async def get(self) -> schemas.core.TaskRun:
        """Gets the next task_run from any of the given queues"""
        while True:
            for queue in self._queues:
                try:
                    return queue.get_nowait()
                except asyncio.QueueEmpty:
                    continue
            await asyncio.sleep(0.01)

get() async

Gets the next task_run from any of the given queues

Source code in src/prefect/server/task_queue.py
 95
 96
 97
 98
 99
100
101
102
103
async def get(self) -> schemas.core.TaskRun:
    """Gets the next task_run from any of the given queues"""
    while True:
        for queue in self._queues:
            try:
                return queue.get_nowait()
            except asyncio.QueueEmpty:
                continue
        await asyncio.sleep(0.01)

TaskQueue

Source code in src/prefect/server/task_queue.py
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
class TaskQueue:
    _task_queues: Dict[str, Self] = {}

    default_scheduled_max_size: int = (
        PREFECT_TASK_SCHEDULING_MAX_SCHEDULED_QUEUE_SIZE.value()
    )
    default_retry_max_size: int = PREFECT_TASK_SCHEDULING_MAX_RETRY_QUEUE_SIZE.value()

    _queue_size_configs: Dict[str, Tuple[int, int]] = {}

    task_key: str
    _scheduled_queue: asyncio.Queue
    _retry_queue: asyncio.Queue

    @classmethod
    async def enqueue(cls, task_run: schemas.core.TaskRun) -> None:
        await cls.for_key(task_run.task_key).put(task_run)

    @classmethod
    def configure_task_key(
        cls,
        task_key: str,
        scheduled_size: Optional[int] = None,
        retry_size: Optional[int] = None,
    ):
        scheduled_size = scheduled_size or cls.default_scheduled_max_size
        retry_size = retry_size or cls.default_retry_max_size
        cls._queue_size_configs[task_key] = (scheduled_size, retry_size)

    @classmethod
    def for_key(cls, task_key: str) -> Self:
        if task_key not in cls._task_queues:
            sizes = cls._queue_size_configs.get(
                task_key, (cls.default_scheduled_max_size, cls.default_retry_max_size)
            )
            cls._task_queues[task_key] = cls(task_key, *sizes)
        return cls._task_queues[task_key]

    @classmethod
    def reset(cls) -> None:
        """A unit testing utility to reset the state of the task queues subsystem"""
        cls._task_queues.clear()
        cls._scheduled_tasks_already_restored = False

    def __init__(self, task_key: str, scheduled_queue_size: int, retry_queue_size: int):
        self.task_key = task_key
        self._scheduled_queue = asyncio.Queue(maxsize=scheduled_queue_size)
        self._retry_queue = asyncio.Queue(maxsize=retry_queue_size)

    async def get(self) -> schemas.core.TaskRun:
        # First, check if there's anything in the retry queue
        try:
            return self._retry_queue.get_nowait()
        except asyncio.QueueEmpty:
            return await self._scheduled_queue.get()

    def get_nowait(self) -> schemas.core.TaskRun:
        # First, check if there's anything in the retry queue
        try:
            return self._retry_queue.get_nowait()
        except asyncio.QueueEmpty:
            return self._scheduled_queue.get_nowait()

    async def put(self, task_run: schemas.core.TaskRun) -> None:
        await self._scheduled_queue.put(task_run)

    async def retry(self, task_run: schemas.core.TaskRun) -> None:
        await self._retry_queue.put(task_run)

reset() classmethod

A unit testing utility to reset the state of the task queues subsystem

Source code in src/prefect/server/task_queue.py
55
56
57
58
59
@classmethod
def reset(cls) -> None:
    """A unit testing utility to reset the state of the task queues subsystem"""
    cls._task_queues.clear()
    cls._scheduled_tasks_already_restored = False