Skip to content

prefect.server.events.services.actions

Actions

Runs actions triggered by Automatinos

Source code in src/prefect/server/events/services/actions.py
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
class Actions:
    """Runs actions triggered by Automatinos"""

    name: str = "Actions"

    consumer_task: Optional[asyncio.Task] = None

    async def start(self):
        assert self.consumer_task is None, "Actions already started"
        self.consumer = create_consumer("actions")

        async with actions.consumer() as handler:
            self.consumer_task = asyncio.create_task(self.consumer.run(handler))
            logger.debug("Actions started")

            try:
                await self.consumer_task
            except asyncio.CancelledError:
                pass

    async def stop(self):
        assert self.consumer_task is not None, "Actions not started"
        self.consumer_task.cancel()
        try:
            await self.consumer_task
        except asyncio.CancelledError:
            pass
        finally:
            self.consumer_task = None
        logger.debug("Actions stopped")