Implements an in-memory task queue for delivering background task runs to TaskWorkers.
MultiQueue
A queue that can pull tasks from from any of a number of task queues
Source code in src/prefect/server/task_queue.py
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103 | class MultiQueue:
"""A queue that can pull tasks from from any of a number of task queues"""
_queues: List[TaskQueue]
def __init__(self, task_keys: List[str]):
self._queues = [TaskQueue.for_key(task_key) for task_key in task_keys]
async def get(self) -> schemas.core.TaskRun:
"""Gets the next task_run from any of the given queues"""
while True:
for queue in self._queues:
try:
return queue.get_nowait()
except asyncio.QueueEmpty:
continue
await asyncio.sleep(0.01)
|
get()
async
Gets the next task_run from any of the given queues
Source code in src/prefect/server/task_queue.py
95
96
97
98
99
100
101
102
103 | async def get(self) -> schemas.core.TaskRun:
"""Gets the next task_run from any of the given queues"""
while True:
for queue in self._queues:
try:
return queue.get_nowait()
except asyncio.QueueEmpty:
continue
await asyncio.sleep(0.01)
|
TaskQueue
Source code in src/prefect/server/task_queue.py
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84 | class TaskQueue:
_task_queues: Dict[str, Self] = {}
default_scheduled_max_size: int = (
PREFECT_TASK_SCHEDULING_MAX_SCHEDULED_QUEUE_SIZE.value()
)
default_retry_max_size: int = PREFECT_TASK_SCHEDULING_MAX_RETRY_QUEUE_SIZE.value()
_queue_size_configs: Dict[str, Tuple[int, int]] = {}
task_key: str
_scheduled_queue: asyncio.Queue
_retry_queue: asyncio.Queue
@classmethod
async def enqueue(cls, task_run: schemas.core.TaskRun) -> None:
await cls.for_key(task_run.task_key).put(task_run)
@classmethod
def configure_task_key(
cls,
task_key: str,
scheduled_size: Optional[int] = None,
retry_size: Optional[int] = None,
):
scheduled_size = scheduled_size or cls.default_scheduled_max_size
retry_size = retry_size or cls.default_retry_max_size
cls._queue_size_configs[task_key] = (scheduled_size, retry_size)
@classmethod
def for_key(cls, task_key: str) -> Self:
if task_key not in cls._task_queues:
sizes = cls._queue_size_configs.get(
task_key, (cls.default_scheduled_max_size, cls.default_retry_max_size)
)
cls._task_queues[task_key] = cls(task_key, *sizes)
return cls._task_queues[task_key]
@classmethod
def reset(cls) -> None:
"""A unit testing utility to reset the state of the task queues subsystem"""
cls._task_queues.clear()
cls._scheduled_tasks_already_restored = False
def __init__(self, task_key: str, scheduled_queue_size: int, retry_queue_size: int):
self.task_key = task_key
self._scheduled_queue = asyncio.Queue(maxsize=scheduled_queue_size)
self._retry_queue = asyncio.Queue(maxsize=retry_queue_size)
async def get(self) -> schemas.core.TaskRun:
# First, check if there's anything in the retry queue
try:
return self._retry_queue.get_nowait()
except asyncio.QueueEmpty:
return await self._scheduled_queue.get()
def get_nowait(self) -> schemas.core.TaskRun:
# First, check if there's anything in the retry queue
try:
return self._retry_queue.get_nowait()
except asyncio.QueueEmpty:
return self._scheduled_queue.get_nowait()
async def put(self, task_run: schemas.core.TaskRun) -> None:
await self._scheduled_queue.put(task_run)
async def retry(self, task_run: schemas.core.TaskRun) -> None:
await self._retry_queue.put(task_run)
|
reset()
classmethod
A unit testing utility to reset the state of the task queues subsystem
Source code in src/prefect/server/task_queue.py
| @classmethod
def reset(cls) -> None:
"""A unit testing utility to reset the state of the task queues subsystem"""
cls._task_queues.clear()
cls._scheduled_tasks_already_restored = False
|