Runs actions triggered by Automatinos
Source code in src/prefect/server/events/services/actions.py
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40 | class Actions:
"""Runs actions triggered by Automatinos"""
name: str = "Actions"
consumer_task: Optional[asyncio.Task] = None
async def start(self):
assert self.consumer_task is None, "Actions already started"
self.consumer = create_consumer("actions")
async with actions.consumer() as handler:
self.consumer_task = asyncio.create_task(self.consumer.run(handler))
logger.debug("Actions started")
try:
await self.consumer_task
except asyncio.CancelledError:
pass
async def stop(self):
assert self.consumer_task is not None, "Actions not started"
self.consumer_task.cancel()
try:
await self.consumer_task
except asyncio.CancelledError:
pass
finally:
self.consumer_task = None
logger.debug("Actions stopped")
|