Skip to content

prefect.server.events.services.event_persister

The event persister moves event messages from the event bus to storage storage as fast as it can. Never gets tired.

EventPersister

A service that persists events to the database as they arrive.

Source code in src/prefect/server/events/services/event_persister.py
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
class EventPersister:
    """A service that persists events to the database as they arrive."""

    name: str = "EventLogger"

    consumer_task: Optional[asyncio.Task] = None

    def __init__(self):
        self._started_event: Optional[asyncio.Event] = None

    @property
    def started_event(self) -> asyncio.Event:
        if self._started_event is None:
            self._started_event = asyncio.Event()
        return self._started_event

    @started_event.setter
    def started_event(self, value: asyncio.Event) -> None:
        self._started_event = value

    async def start(self):
        assert self.consumer_task is None, "Event persister already started"
        self.consumer = create_consumer("events")

        async with create_handler(
            batch_size=PREFECT_API_SERVICES_EVENT_PERSISTER_BATCH_SIZE.value(),
            flush_every=timedelta(
                seconds=PREFECT_API_SERVICES_EVENT_PERSISTER_FLUSH_INTERVAL.value()
            ),
        ) as handler:
            self.consumer_task = asyncio.create_task(self.consumer.run(handler))
            logger.debug("Event persister started")
            self.started_event.set()

            try:
                await self.consumer_task
            except asyncio.CancelledError:
                pass

    async def stop(self):
        assert self.consumer_task is not None, "Event persister not started"
        self.consumer_task.cancel()
        try:
            await self.consumer_task
        except asyncio.CancelledError:
            pass
        finally:
            self.consumer_task = None
            if self.started_event:
                self.started_event.clear()
        logger.debug("Event persister stopped")

create_handler(batch_size=20, flush_every=timedelta(seconds=5), trim_every=timedelta(minutes=15)) async

Set up a message handler that will accumulate and send events to the database every batch_size messages, or every flush_every interval to flush any remaining messages

Source code in src/prefect/server/events/services/event_persister.py
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
@asynccontextmanager
async def create_handler(
    batch_size: int = 20,
    flush_every: timedelta = timedelta(seconds=5),
    trim_every: timedelta = timedelta(minutes=15),
) -> AsyncGenerator[MessageHandler, None]:
    """
    Set up a message handler that will accumulate and send events to
    the database every `batch_size` messages, or every `flush_every` interval to flush
    any remaining messages
    """
    db = provide_database_interface()

    queue: asyncio.Queue[ReceivedEvent] = asyncio.Queue()

    async def flush() -> None:
        logger.debug(f"Persisting {queue.qsize()} events...")

        batch: List[ReceivedEvent] = []

        while queue.qsize() > 0:
            batch.append(await queue.get())

        try:
            async with db.session_context() as session:
                await write_events(session=session, events=batch)
                await session.commit()
                logger.debug("Finished persisting events.")
        except Exception:
            logger.debug("Error flushing events, restoring to queue", exc_info=True)
            for event in batch:
                queue.put_nowait(event)

    async def trim() -> None:
        older_than = pendulum.now("UTC") - PREFECT_EVENTS_RETENTION_PERIOD.value()

        try:
            async with db.session_context() as session:
                result = await session.execute(
                    sa.delete(db.Event).where(db.Event.occurred < older_than)
                )
                await session.commit()
                if result.rowcount:
                    logger.debug(
                        "Trimmed %s events older than %s.", result.rowcount, older_than
                    )
        except Exception:
            logger.exception("Error trimming events", exc_info=True)

    async def flush_periodically():
        try:
            while True:
                await asyncio.sleep(flush_every.total_seconds())
                if queue.qsize():
                    await flush()
        except asyncio.CancelledError:
            return

    async def trim_periodically():
        try:
            while True:
                await asyncio.sleep(trim_every.total_seconds())
                await trim()
        except asyncio.CancelledError:
            return

    async def message_handler(message: Message):
        if not message.data:
            return

        event = ReceivedEvent.model_validate_json(message.data)

        logger.debug(
            "Received event: %s with id: %s for resource: %s",
            event.event,
            event.id,
            event.resource.get("prefect.resource.id"),
        )

        await queue.put(event)

        if queue.qsize() >= batch_size:
            await flush()

    periodic_flush = asyncio.create_task(flush_periodically())
    periodic_trim = asyncio.create_task(trim_periodically())

    try:
        yield message_handler
    finally:
        periodic_flush.cancel()
        periodic_trim.cancel()
        if queue.qsize():
            await flush()