Skip to content

prefect.server.events.pipeline

EventsPipeline

Source code in src/prefect/server/events/pipeline.py
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
class EventsPipeline:
    @staticmethod
    def events_to_messages(events) -> List[MemoryMessage]:
        messages = []
        for event in events:
            received_event = ReceivedEvent(**event.model_dump())
            message = MemoryMessage(
                data=received_event.model_dump_json().encode(),
                attributes={"id": str(event.id), "event": event.event},
            )
            messages.append(message)
        return messages

    async def process_events(self, events: List[Event]):
        messages = self.events_to_messages(events)
        await self.process_messages(messages)

    async def process_messages(self, messages: List[MemoryMessage]):
        for message in messages:
            await self.process_message(message)

    async def process_message(self, message: MemoryMessage):
        """Process a single event message"""

        # TODO: Investigate if we want to include triggers/actions etc.
        async with task_run_recorder.consumer() as handler:
            await handler(message)

        async with event_persister.create_handler(batch_size=1) as handler:
            await handler(message)

process_message(message) async

Process a single event message

Source code in src/prefect/server/events/pipeline.py
30
31
32
33
34
35
36
37
38
async def process_message(self, message: MemoryMessage):
    """Process a single event message"""

    # TODO: Investigate if we want to include triggers/actions etc.
    async with task_run_recorder.consumer() as handler:
        await handler(message)

    async with event_persister.create_handler(batch_size=1) as handler:
        await handler(message)