Skip to content

prefect.server.services.task_run_recorder

TaskRunRecorder

A service to record task run and task run states from events.

Source code in src/prefect/server/services/task_run_recorder.py
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
class TaskRunRecorder:
    """A service to record task run and task run states from events."""

    name: str = "TaskRunRecorder"

    consumer_task: Optional[asyncio.Task] = None

    def __init__(self):
        self._started_event: Optional[asyncio.Event] = None

    @property
    def started_event(self) -> asyncio.Event:
        if self._started_event is None:
            self._started_event = asyncio.Event()
        return self._started_event

    @started_event.setter
    def started_event(self, value: asyncio.Event) -> None:
        self._started_event = value

    async def start(self):
        assert self.consumer_task is None, "TaskRunRecorder already started"
        self.consumer = create_consumer("events")

        async with consumer() as handler:
            self.consumer_task = asyncio.create_task(self.consumer.run(handler))
            logger.debug("TaskRunRecorder started")
            self.started_event.set()

            try:
                await self.consumer_task
            except asyncio.CancelledError:
                pass

    async def stop(self):
        assert self.consumer_task is not None, "Logger not started"
        self.consumer_task.cancel()
        try:
            await self.consumer_task
        except asyncio.CancelledError:
            pass
        finally:
            self.consumer_task = None
        logger.debug("TaskRunRecorder stopped")

periodically_process_followers(periodic_granularity) async

Periodically process followers that are waiting on a leader event that never arrived

Source code in src/prefect/server/services/task_run_recorder.py
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
async def periodically_process_followers(periodic_granularity: timedelta):
    """Periodically process followers that are waiting on a leader event that never arrived"""
    logger.debug(
        "Starting periodically process followers task every %s seconds",
        periodic_granularity.total_seconds(),
    )
    while True:
        try:
            await record_lost_follower_task_run_events()
        except asyncio.CancelledError:
            logger.debug("Periodically process followers task cancelled")
            return
        except Exception:
            logger.exception("Error while processing task-run-recorders followers.")
        finally:
            await asyncio.sleep(periodic_granularity.total_seconds())