The FailExpiredPauses service. Responsible for putting Paused flow runs in a Failed state if they are not resumed on time.
FailExpiredPauses
Bases: LoopService
A simple loop service responsible for identifying Paused flow runs that no longer can be resumed.
Source code in src/prefect/server/services/pause_expirations.py
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80 | class FailExpiredPauses(LoopService):
"""
A simple loop service responsible for identifying Paused flow runs that no longer can be resumed.
"""
def __init__(self, loop_seconds: Optional[float] = None, **kwargs):
super().__init__(
loop_seconds=loop_seconds
or PREFECT_API_SERVICES_PAUSE_EXPIRATIONS_LOOP_SECONDS.value(),
**kwargs,
)
# query for this many runs to mark failed at once
self.batch_size = 200
@inject_db
async def run_once(self, db: PrefectDBInterface):
"""
Mark flow runs as failed by:
- Querying for flow runs in a Paused state that have timed out
- For any runs past the "expiration" threshold, setting the flow run state to a new `Failed` state
"""
while True:
async with db.session_context(begin_transaction=True) as session:
query = (
sa.select(db.FlowRun)
.where(
db.FlowRun.state_type == states.StateType.PAUSED,
)
.limit(self.batch_size)
)
result = await session.execute(query)
runs = result.scalars().all()
# mark each run as failed
for run in runs:
await self._mark_flow_run_as_failed(session=session, flow_run=run)
# if no runs were found, exit the loop
if len(runs) < self.batch_size:
break
self.logger.info("Finished monitoring for late runs.")
async def _mark_flow_run_as_failed(
self, session: AsyncSession, flow_run: PrefectDBInterface.FlowRun
) -> None:
"""
Mark a flow run as failed.
Pass-through method for overrides.
"""
if flow_run.state.state_details.pause_timeout < pendulum.now("UTC"):
await models.flow_runs.set_flow_run_state(
session=session,
flow_run_id=flow_run.id,
state=states.Failed(message="The flow was paused and never resumed."),
force=True,
)
|
run_once(db)
async
Mark flow runs as failed by:
- Querying for flow runs in a Paused state that have timed out
- For any runs past the "expiration" threshold, setting the flow run state to a new
Failed
state
Source code in src/prefect/server/services/pause_expirations.py
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64 | @inject_db
async def run_once(self, db: PrefectDBInterface):
"""
Mark flow runs as failed by:
- Querying for flow runs in a Paused state that have timed out
- For any runs past the "expiration" threshold, setting the flow run state to a new `Failed` state
"""
while True:
async with db.session_context(begin_transaction=True) as session:
query = (
sa.select(db.FlowRun)
.where(
db.FlowRun.state_type == states.StateType.PAUSED,
)
.limit(self.batch_size)
)
result = await session.execute(query)
runs = result.scalars().all()
# mark each run as failed
for run in runs:
await self._mark_flow_run_as_failed(session=session, flow_run=run)
# if no runs were found, exit the loop
if len(runs) < self.batch_size:
break
self.logger.info("Finished monitoring for late runs.")
|