A service to record task run and task run states from events.
Source code in src/prefect/server/services/task_run_recorder.py
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239 | class TaskRunRecorder:
"""A service to record task run and task run states from events."""
name: str = "TaskRunRecorder"
consumer_task: Optional[asyncio.Task] = None
def __init__(self):
self._started_event: Optional[asyncio.Event] = None
@property
def started_event(self) -> asyncio.Event:
if self._started_event is None:
self._started_event = asyncio.Event()
return self._started_event
@started_event.setter
def started_event(self, value: asyncio.Event) -> None:
self._started_event = value
async def start(self):
assert self.consumer_task is None, "TaskRunRecorder already started"
self.consumer = create_consumer("events")
async with consumer() as handler:
self.consumer_task = asyncio.create_task(self.consumer.run(handler))
logger.debug("TaskRunRecorder started")
self.started_event.set()
try:
await self.consumer_task
except asyncio.CancelledError:
pass
async def stop(self):
assert self.consumer_task is not None, "Logger not started"
self.consumer_task.cancel()
try:
await self.consumer_task
except asyncio.CancelledError:
pass
finally:
self.consumer_task = None
logger.debug("TaskRunRecorder stopped")
|