Skip to content

prefect.server.services.late_runs

The MarkLateRuns service. Responsible for putting flow runs in a Late state if they are not started on time. The threshold for a late run can be configured by changing PREFECT_API_SERVICES_LATE_RUNS_AFTER_SECONDS.

MarkLateRuns

Bases: LoopService

A simple loop service responsible for identifying flow runs that are "late".

A flow run is defined as "late" if has not scheduled within a certain amount of time after its scheduled start time. The exact amount is configurable in Prefect REST API Settings.

Source code in src/prefect/server/services/late_runs.py
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
class MarkLateRuns(LoopService):
    """
    A simple loop service responsible for identifying flow runs that are "late".

    A flow run is defined as "late" if has not scheduled within a certain amount
    of time after its scheduled start time. The exact amount is configurable in
    Prefect REST API Settings.
    """

    def __init__(self, loop_seconds: Optional[float] = None, **kwargs):
        super().__init__(
            loop_seconds=loop_seconds
            or PREFECT_API_SERVICES_LATE_RUNS_LOOP_SECONDS.value(),
            **kwargs,
        )

        # mark runs late if they are this far past their expected start time
        self.mark_late_after: datetime.timedelta = (
            PREFECT_API_SERVICES_LATE_RUNS_AFTER_SECONDS.value()
        )

        # query for this many runs to mark as late at once
        self.batch_size = 400

    @inject_db
    async def run_once(self, db: PrefectDBInterface):
        """
        Mark flow runs as late by:

        - Querying for flow runs in a scheduled state that are Scheduled to start in the past
        - For any runs past the "late" threshold, setting the flow run state to a new `Late` state
        """
        scheduled_to_start_before = pendulum.now("UTC").subtract(
            seconds=self.mark_late_after.total_seconds()
        )

        while True:
            async with db.session_context(begin_transaction=True) as session:
                query = self._get_select_late_flow_runs_query(
                    scheduled_to_start_before=scheduled_to_start_before, db=db
                )

                result = await session.execute(query)
                runs = result.all()

                # mark each run as late
                for run in runs:
                    await self._mark_flow_run_as_late(session=session, flow_run=run)

                # if no runs were found, exit the loop
                if len(runs) < self.batch_size:
                    break

        self.logger.info("Finished monitoring for late runs.")

    @inject_db
    def _get_select_late_flow_runs_query(
        self, scheduled_to_start_before: datetime.datetime, db: PrefectDBInterface
    ):
        """
        Returns a sqlalchemy query for late flow runs.

        Args:
            scheduled_to_start_before: the maximum next scheduled start time of
                scheduled flow runs to consider in the returned query
        """
        query = (
            sa.select(
                db.FlowRun.id,
                db.FlowRun.next_scheduled_start_time,
            )
            .where(
                # The next scheduled start time is in the past, including the mark late
                # after buffer
                (db.FlowRun.next_scheduled_start_time <= scheduled_to_start_before),
                db.FlowRun.state_type == states.StateType.SCHEDULED,
                db.FlowRun.state_name == "Scheduled",
            )
            .limit(self.batch_size)
        )
        return query

    async def _mark_flow_run_as_late(
        self, session: AsyncSession, flow_run: PrefectDBInterface.FlowRun
    ) -> None:
        """
        Mark a flow run as late.

        Pass-through method for overrides.
        """
        try:
            await models.flow_runs.set_flow_run_state(
                session=session,
                flow_run_id=flow_run.id,
                state=states.Late(scheduled_time=flow_run.next_scheduled_start_time),
                flow_policy=MarkLateRunsPolicy,  # type: ignore
            )
        except ObjectNotFoundError:
            return  # flow run was deleted, ignore it

run_once(db) async

Mark flow runs as late by:

  • Querying for flow runs in a scheduled state that are Scheduled to start in the past
  • For any runs past the "late" threshold, setting the flow run state to a new Late state
Source code in src/prefect/server/services/late_runs.py
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
@inject_db
async def run_once(self, db: PrefectDBInterface):
    """
    Mark flow runs as late by:

    - Querying for flow runs in a scheduled state that are Scheduled to start in the past
    - For any runs past the "late" threshold, setting the flow run state to a new `Late` state
    """
    scheduled_to_start_before = pendulum.now("UTC").subtract(
        seconds=self.mark_late_after.total_seconds()
    )

    while True:
        async with db.session_context(begin_transaction=True) as session:
            query = self._get_select_late_flow_runs_query(
                scheduled_to_start_before=scheduled_to_start_before, db=db
            )

            result = await session.execute(query)
            runs = result.all()

            # mark each run as late
            for run in runs:
                await self._mark_flow_run_as_late(session=session, flow_run=run)

            # if no runs were found, exit the loop
            if len(runs) < self.batch_size:
                break

    self.logger.info("Finished monitoring for late runs.")