Bases: LoopService
A simple loop service responsible for identifying flow runs that are "late".
A flow run is defined as "late" if has not scheduled within a certain amount
of time after its scheduled start time. The exact amount is configurable in
Prefect REST API Settings.
Source code in src/prefect/server/services/late_runs.py
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125 | class MarkLateRuns(LoopService):
"""
A simple loop service responsible for identifying flow runs that are "late".
A flow run is defined as "late" if has not scheduled within a certain amount
of time after its scheduled start time. The exact amount is configurable in
Prefect REST API Settings.
"""
def __init__(self, loop_seconds: Optional[float] = None, **kwargs):
super().__init__(
loop_seconds=loop_seconds
or PREFECT_API_SERVICES_LATE_RUNS_LOOP_SECONDS.value(),
**kwargs,
)
# mark runs late if they are this far past their expected start time
self.mark_late_after: datetime.timedelta = (
PREFECT_API_SERVICES_LATE_RUNS_AFTER_SECONDS.value()
)
# query for this many runs to mark as late at once
self.batch_size = 400
@inject_db
async def run_once(self, db: PrefectDBInterface):
"""
Mark flow runs as late by:
- Querying for flow runs in a scheduled state that are Scheduled to start in the past
- For any runs past the "late" threshold, setting the flow run state to a new `Late` state
"""
scheduled_to_start_before = pendulum.now("UTC").subtract(
seconds=self.mark_late_after.total_seconds()
)
while True:
async with db.session_context(begin_transaction=True) as session:
query = self._get_select_late_flow_runs_query(
scheduled_to_start_before=scheduled_to_start_before, db=db
)
result = await session.execute(query)
runs = result.all()
# mark each run as late
for run in runs:
await self._mark_flow_run_as_late(session=session, flow_run=run)
# if no runs were found, exit the loop
if len(runs) < self.batch_size:
break
self.logger.info("Finished monitoring for late runs.")
@inject_db
def _get_select_late_flow_runs_query(
self, scheduled_to_start_before: datetime.datetime, db: PrefectDBInterface
):
"""
Returns a sqlalchemy query for late flow runs.
Args:
scheduled_to_start_before: the maximum next scheduled start time of
scheduled flow runs to consider in the returned query
"""
query = (
sa.select(
db.FlowRun.id,
db.FlowRun.next_scheduled_start_time,
)
.where(
# The next scheduled start time is in the past, including the mark late
# after buffer
(db.FlowRun.next_scheduled_start_time <= scheduled_to_start_before),
db.FlowRun.state_type == states.StateType.SCHEDULED,
db.FlowRun.state_name == "Scheduled",
)
.limit(self.batch_size)
)
return query
async def _mark_flow_run_as_late(
self, session: AsyncSession, flow_run: PrefectDBInterface.FlowRun
) -> None:
"""
Mark a flow run as late.
Pass-through method for overrides.
"""
try:
await models.flow_runs.set_flow_run_state(
session=session,
flow_run_id=flow_run.id,
state=states.Late(scheduled_time=flow_run.next_scheduled_start_time),
flow_policy=MarkLateRunsPolicy, # type: ignore
)
except ObjectNotFoundError:
return # flow run was deleted, ignore it
|
run_once(db)
async
Mark flow runs as late by:
- Querying for flow runs in a scheduled state that are Scheduled to start in the past
- For any runs past the "late" threshold, setting the flow run state to a new
Late
state
Source code in src/prefect/server/services/late_runs.py
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80 | @inject_db
async def run_once(self, db: PrefectDBInterface):
"""
Mark flow runs as late by:
- Querying for flow runs in a scheduled state that are Scheduled to start in the past
- For any runs past the "late" threshold, setting the flow run state to a new `Late` state
"""
scheduled_to_start_before = pendulum.now("UTC").subtract(
seconds=self.mark_late_after.total_seconds()
)
while True:
async with db.session_context(begin_transaction=True) as session:
query = self._get_select_late_flow_runs_query(
scheduled_to_start_before=scheduled_to_start_before, db=db
)
result = await session.execute(query)
runs = result.all()
# mark each run as late
for run in runs:
await self._mark_flow_run_as_late(session=session, flow_run=run)
# if no runs were found, exit the loop
if len(runs) < self.batch_size:
break
self.logger.info("Finished monitoring for late runs.")
|