Skip to content

prefect.server.events.services.triggers

ReactiveTriggers

Runs the reactive triggers consumer

Source code in src/prefect/server/events/services/triggers.py
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
class ReactiveTriggers:
    """Runs the reactive triggers consumer"""

    name: str = "ReactiveTriggers"

    consumer_task: Optional[asyncio.Task] = None

    async def start(self):
        assert self.consumer_task is None, "Reactive triggers already started"
        self.consumer = create_consumer("events")

        async with triggers.consumer() as handler:
            self.consumer_task = asyncio.create_task(self.consumer.run(handler))
            logger.debug("Reactive triggers started")

            try:
                await self.consumer_task
            except asyncio.CancelledError:
                pass

    async def stop(self):
        assert self.consumer_task is not None, "Reactive triggers not started"
        self.consumer_task.cancel()
        try:
            await self.consumer_task
        except asyncio.CancelledError:
            pass
        finally:
            self.consumer_task = None
        logger.debug("Reactive triggers stopped")