Skip to content

prefect.server.events.ordering

Manages the partial causal ordering of events for a particular consumer. This module maintains a buffer of events to be processed, aiming to process them in the order they occurred causally.

CausalOrdering

Source code in src/prefect/server/events/ordering.py
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
class CausalOrdering:
    _seen_events: Mapping[str, MutableMapping[UUID, bool]] = defaultdict(
        lambda: TTLCache(maxsize=10000, ttl=SEEN_EXPIRATION.total_seconds())
    )

    scope: str

    def __init__(self, scope: str):
        self.scope = scope

    async def event_has_been_seen(self, event: Union[UUID, Event]) -> bool:
        id = event.id if isinstance(event, Event) else event
        return self._seen_events[self.scope].get(id, False)

    async def record_event_as_seen(self, event: ReceivedEvent) -> None:
        self._seen_events[self.scope][event.id] = True

    @db_injector
    async def record_follower(db: PrefectDBInterface, self: Self, event: ReceivedEvent):
        """Remember that this event is waiting on another event to arrive"""
        assert event.follows

        async with db.session_context(begin_transaction=True) as session:
            await session.execute(
                sa.insert(AutomationEventFollower).values(
                    scope=self.scope,
                    leader_event_id=event.follows,
                    follower_event_id=event.id,
                    received=event.received,
                    follower=event,
                )
            )

    @db_injector
    async def forget_follower(
        db: PrefectDBInterface, self: Self, follower: ReceivedEvent
    ):
        """Forget that this event is waiting on another event to arrive"""
        assert follower.follows

        async with db.session_context(begin_transaction=True) as session:
            await session.execute(
                sa.delete(AutomationEventFollower).where(
                    AutomationEventFollower.scope == self.scope,
                    AutomationEventFollower.follower_event_id == follower.id,
                )
            )

    @db_injector
    async def get_followers(
        db: PrefectDBInterface, self: Self, leader: ReceivedEvent
    ) -> List[ReceivedEvent]:
        """Returns events that were waiting on this leader event to arrive"""
        async with db.session_context() as session:
            query = sa.select(AutomationEventFollower.follower).where(
                AutomationEventFollower.scope == self.scope,
                AutomationEventFollower.leader_event_id == leader.id,
            )
            result = await session.execute(query)
            followers = result.scalars().all()
            return sorted(followers, key=lambda e: e.occurred)

    @db_injector
    async def get_lost_followers(db: PrefectDBInterface, self) -> List[ReceivedEvent]:
        """Returns events that were waiting on a leader event that never arrived"""
        earlier = pendulum.now("UTC") - PRECEDING_EVENT_LOOKBACK

        async with db.session_context(begin_transaction=True) as session:
            query = sa.select(AutomationEventFollower.follower).where(
                AutomationEventFollower.scope == self.scope,
                AutomationEventFollower.received < earlier,
            )
            result = await session.execute(query)
            followers = result.scalars().all()

            # forget these followers, since they are never going to see their leader event

            await session.execute(
                sa.delete(AutomationEventFollower).where(
                    AutomationEventFollower.scope == self.scope,
                    AutomationEventFollower.received < earlier,
                )
            )

            return sorted(followers, key=lambda e: e.occurred)

    @asynccontextmanager
    async def preceding_event_confirmed(
        self, handler: event_handler, event: ReceivedEvent, depth: int = 0
    ):
        """Events may optionally declare that they logically follow another event, so that
        we can preserve important event orderings in the face of unreliable delivery and
        ordering of messages from the queues.

        This function keeps track of the ID of each event that this shard has successfully
        processed going back to the PRECEDING_EVENT_LOOKBACK period.  If an event arrives
        that must follow another one, confirm that we have recently seen and processed that
        event before proceeding.

        Args:
        event (ReceivedEvent): The event to be processed. This object should include metadata indicating
            if and what event it follows.
        depth (int, optional): The current recursion depth, used to prevent infinite recursion due to
            cyclic dependencies between events. Defaults to 0.


        Raises EventArrivedEarly if the current event shouldn't be processed yet."""

        if depth > MAX_DEPTH_OF_PRECEDING_EVENT:
            logger.exception(
                "Event %r (%s) for %r has exceeded the maximum recursion depth of %s",
                event.event,
                event.id,
                event.resource.id,
                MAX_DEPTH_OF_PRECEDING_EVENT,
            )
            raise MaxDepthExceeded(event)

        if event.follows:
            if not await self.event_has_been_seen(event.follows):
                age = pendulum.now("UTC") - event.received
                if age < PRECEDING_EVENT_LOOKBACK:
                    logger.debug(
                        "Event %r (%s) for %r arrived before the event it follows %s",
                        event.event,
                        event.id,
                        event.resource.id,
                        event.follows,
                    )

                    # record this follower for safe-keeping
                    await self.record_follower(event)
                    raise EventArrivedEarly(event)

        yield

        await self.record_event_as_seen(event)

        # we have just processed an event that other events were waiting on, so let's
        # react to them now in the order they occurred
        for waiter in await self.get_followers(event):
            await handler(waiter, depth + 1)

        # if this event was itself waiting on something, let's consider it as resolved now
        # that it has been processed
        if event.follows:
            await self.forget_follower(event)

forget_follower(db, self, follower) async

Forget that this event is waiting on another event to arrive

Source code in src/prefect/server/events/ordering.py
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
@db_injector
async def forget_follower(
    db: PrefectDBInterface, self: Self, follower: ReceivedEvent
):
    """Forget that this event is waiting on another event to arrive"""
    assert follower.follows

    async with db.session_context(begin_transaction=True) as session:
        await session.execute(
            sa.delete(AutomationEventFollower).where(
                AutomationEventFollower.scope == self.scope,
                AutomationEventFollower.follower_event_id == follower.id,
            )
        )

get_followers(db, self, leader) async

Returns events that were waiting on this leader event to arrive

Source code in src/prefect/server/events/ordering.py
108
109
110
111
112
113
114
115
116
117
118
119
120
@db_injector
async def get_followers(
    db: PrefectDBInterface, self: Self, leader: ReceivedEvent
) -> List[ReceivedEvent]:
    """Returns events that were waiting on this leader event to arrive"""
    async with db.session_context() as session:
        query = sa.select(AutomationEventFollower.follower).where(
            AutomationEventFollower.scope == self.scope,
            AutomationEventFollower.leader_event_id == leader.id,
        )
        result = await session.execute(query)
        followers = result.scalars().all()
        return sorted(followers, key=lambda e: e.occurred)

get_lost_followers(db, self) async

Returns events that were waiting on a leader event that never arrived

Source code in src/prefect/server/events/ordering.py
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
@db_injector
async def get_lost_followers(db: PrefectDBInterface, self) -> List[ReceivedEvent]:
    """Returns events that were waiting on a leader event that never arrived"""
    earlier = pendulum.now("UTC") - PRECEDING_EVENT_LOOKBACK

    async with db.session_context(begin_transaction=True) as session:
        query = sa.select(AutomationEventFollower.follower).where(
            AutomationEventFollower.scope == self.scope,
            AutomationEventFollower.received < earlier,
        )
        result = await session.execute(query)
        followers = result.scalars().all()

        # forget these followers, since they are never going to see their leader event

        await session.execute(
            sa.delete(AutomationEventFollower).where(
                AutomationEventFollower.scope == self.scope,
                AutomationEventFollower.received < earlier,
            )
        )

        return sorted(followers, key=lambda e: e.occurred)

preceding_event_confirmed(handler, event, depth=0) async

Events may optionally declare that they logically follow another event, so that we can preserve important event orderings in the face of unreliable delivery and ordering of messages from the queues.

This function keeps track of the ID of each event that this shard has successfully processed going back to the PRECEDING_EVENT_LOOKBACK period. If an event arrives that must follow another one, confirm that we have recently seen and processed that event before proceeding.

event (ReceivedEvent): The event to be processed. This object should include metadata indicating if and what event it follows. depth (int, optional): The current recursion depth, used to prevent infinite recursion due to cyclic dependencies between events. Defaults to 0.

Raises EventArrivedEarly if the current event shouldn't be processed yet.

Source code in src/prefect/server/events/ordering.py
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
@asynccontextmanager
async def preceding_event_confirmed(
    self, handler: event_handler, event: ReceivedEvent, depth: int = 0
):
    """Events may optionally declare that they logically follow another event, so that
    we can preserve important event orderings in the face of unreliable delivery and
    ordering of messages from the queues.

    This function keeps track of the ID of each event that this shard has successfully
    processed going back to the PRECEDING_EVENT_LOOKBACK period.  If an event arrives
    that must follow another one, confirm that we have recently seen and processed that
    event before proceeding.

    Args:
    event (ReceivedEvent): The event to be processed. This object should include metadata indicating
        if and what event it follows.
    depth (int, optional): The current recursion depth, used to prevent infinite recursion due to
        cyclic dependencies between events. Defaults to 0.


    Raises EventArrivedEarly if the current event shouldn't be processed yet."""

    if depth > MAX_DEPTH_OF_PRECEDING_EVENT:
        logger.exception(
            "Event %r (%s) for %r has exceeded the maximum recursion depth of %s",
            event.event,
            event.id,
            event.resource.id,
            MAX_DEPTH_OF_PRECEDING_EVENT,
        )
        raise MaxDepthExceeded(event)

    if event.follows:
        if not await self.event_has_been_seen(event.follows):
            age = pendulum.now("UTC") - event.received
            if age < PRECEDING_EVENT_LOOKBACK:
                logger.debug(
                    "Event %r (%s) for %r arrived before the event it follows %s",
                    event.event,
                    event.id,
                    event.resource.id,
                    event.follows,
                )

                # record this follower for safe-keeping
                await self.record_follower(event)
                raise EventArrivedEarly(event)

    yield

    await self.record_event_as_seen(event)

    # we have just processed an event that other events were waiting on, so let's
    # react to them now in the order they occurred
    for waiter in await self.get_followers(event):
        await handler(waiter, depth + 1)

    # if this event was itself waiting on something, let's consider it as resolved now
    # that it has been processed
    if event.follows:
        await self.forget_follower(event)

record_follower(db, self, event) async

Remember that this event is waiting on another event to arrive

Source code in src/prefect/server/events/ordering.py
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
@db_injector
async def record_follower(db: PrefectDBInterface, self: Self, event: ReceivedEvent):
    """Remember that this event is waiting on another event to arrive"""
    assert event.follows

    async with db.session_context(begin_transaction=True) as session:
        await session.execute(
            sa.insert(AutomationEventFollower).values(
                scope=self.scope,
                leader_event_id=event.follows,
                follower_event_id=event.id,
                received=event.received,
                follower=event,
            )
        )