Runs the reactive triggers consumer
Source code in src/prefect/server/events/services/triggers.py
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42 | class ReactiveTriggers:
"""Runs the reactive triggers consumer"""
name: str = "ReactiveTriggers"
consumer_task: Optional[asyncio.Task] = None
async def start(self):
assert self.consumer_task is None, "Reactive triggers already started"
self.consumer = create_consumer("events")
async with triggers.consumer() as handler:
self.consumer_task = asyncio.create_task(self.consumer.run(handler))
logger.debug("Reactive triggers started")
try:
await self.consumer_task
except asyncio.CancelledError:
pass
async def stop(self):
assert self.consumer_task is not None, "Reactive triggers not started"
self.consumer_task.cancel()
try:
await self.consumer_task
except asyncio.CancelledError:
pass
finally:
self.consumer_task = None
logger.debug("Reactive triggers stopped")
|