Skip to content

prefect.server.models.task_workers

InMemoryTaskWorkerTracker

Source code in src/prefect/server/models/task_workers.py
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
class InMemoryTaskWorkerTracker:
    def __init__(self):
        self.workers: dict[WorkerId, Set[TaskKey]] = {}
        self.task_keys: Dict[TaskKey, Set[WorkerId]] = defaultdict(set)
        self.worker_timestamps: Dict[WorkerId, float] = {}

    async def observe_worker(
        self,
        task_keys: List[TaskKey],
        worker_id: WorkerId,
    ) -> None:
        self.workers[worker_id] = self.workers.get(worker_id, set()) | set(task_keys)
        self.worker_timestamps[worker_id] = time.monotonic()

        for task_key in task_keys:
            self.task_keys[task_key].add(worker_id)

    async def forget_worker(
        self,
        worker_id: WorkerId,
    ) -> None:
        if worker_id in self.workers:
            task_keys = self.workers.pop(worker_id)
            for task_key in task_keys:
                self.task_keys[task_key].discard(worker_id)
                if not self.task_keys[task_key]:
                    del self.task_keys[task_key]
        self.worker_timestamps.pop(worker_id, None)

    async def get_workers_for_task_keys(
        self,
        task_keys: List[TaskKey],
    ) -> List[TaskWorkerResponse]:
        if not task_keys:
            return await self.get_all_workers()
        active_workers = set().union(*(self.task_keys[key] for key in task_keys))
        return [self._create_worker_response(worker_id) for worker_id in active_workers]

    async def get_all_workers(self) -> List[TaskWorkerResponse]:
        return [
            self._create_worker_response(worker_id)
            for worker_id in self.worker_timestamps.keys()
        ]

    def _create_worker_response(self, worker_id: WorkerId) -> TaskWorkerResponse:
        timestamp = time.monotonic() - self.worker_timestamps[worker_id]
        return TaskWorkerResponse(
            identifier=worker_id,
            task_keys=list(self.workers.get(worker_id, set())),
            timestamp=DateTime.utcnow().subtract(seconds=timestamp),
        )

    def reset(self):
        """Testing utility to reset the state of the task worker tracker"""
        self.workers.clear()
        self.task_keys.clear()
        self.worker_timestamps.clear()

reset()

Testing utility to reset the state of the task worker tracker

Source code in src/prefect/server/models/task_workers.py
71
72
73
74
75
def reset(self):
    """Testing utility to reset the state of the task worker tracker"""
    self.workers.clear()
    self.task_keys.clear()
    self.worker_timestamps.clear()