Skip to content

prefect.server.services.pause_expirations

The FailExpiredPauses service. Responsible for putting Paused flow runs in a Failed state if they are not resumed on time.

FailExpiredPauses

Bases: LoopService

A simple loop service responsible for identifying Paused flow runs that no longer can be resumed.

Source code in src/prefect/server/services/pause_expirations.py
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
class FailExpiredPauses(LoopService):
    """
    A simple loop service responsible for identifying Paused flow runs that no longer can be resumed.
    """

    def __init__(self, loop_seconds: Optional[float] = None, **kwargs):
        super().__init__(
            loop_seconds=loop_seconds
            or PREFECT_API_SERVICES_PAUSE_EXPIRATIONS_LOOP_SECONDS.value(),
            **kwargs,
        )

        # query for this many runs to mark failed at once
        self.batch_size = 200

    @inject_db
    async def run_once(self, db: PrefectDBInterface):
        """
        Mark flow runs as failed by:

        - Querying for flow runs in a Paused state that have timed out
        - For any runs past the "expiration" threshold, setting the flow run state to a new `Failed` state
        """
        while True:
            async with db.session_context(begin_transaction=True) as session:
                query = (
                    sa.select(db.FlowRun)
                    .where(
                        db.FlowRun.state_type == states.StateType.PAUSED,
                    )
                    .limit(self.batch_size)
                )

                result = await session.execute(query)
                runs = result.scalars().all()

                # mark each run as failed
                for run in runs:
                    await self._mark_flow_run_as_failed(session=session, flow_run=run)

                # if no runs were found, exit the loop
                if len(runs) < self.batch_size:
                    break

        self.logger.info("Finished monitoring for late runs.")

    async def _mark_flow_run_as_failed(
        self, session: AsyncSession, flow_run: PrefectDBInterface.FlowRun
    ) -> None:
        """
        Mark a flow run as failed.

        Pass-through method for overrides.
        """
        if flow_run.state.state_details.pause_timeout < pendulum.now("UTC"):
            await models.flow_runs.set_flow_run_state(
                session=session,
                flow_run_id=flow_run.id,
                state=states.Failed(message="The flow was paused and never resumed."),
                force=True,
            )

run_once(db) async

Mark flow runs as failed by:

  • Querying for flow runs in a Paused state that have timed out
  • For any runs past the "expiration" threshold, setting the flow run state to a new Failed state
Source code in src/prefect/server/services/pause_expirations.py
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
@inject_db
async def run_once(self, db: PrefectDBInterface):
    """
    Mark flow runs as failed by:

    - Querying for flow runs in a Paused state that have timed out
    - For any runs past the "expiration" threshold, setting the flow run state to a new `Failed` state
    """
    while True:
        async with db.session_context(begin_transaction=True) as session:
            query = (
                sa.select(db.FlowRun)
                .where(
                    db.FlowRun.state_type == states.StateType.PAUSED,
                )
                .limit(self.batch_size)
            )

            result = await session.execute(query)
            runs = result.scalars().all()

            # mark each run as failed
            for run in runs:
                await self._mark_flow_run_as_failed(session=session, flow_run=run)

            # if no runs were found, exit the loop
            if len(runs) < self.batch_size:
                break

    self.logger.info("Finished monitoring for late runs.")