Skip to content

prefect.server.services.task_scheduling

The TaskSchedulingTimeouts service reschedules background tasks that are stuck PENDING.

TaskSchedulingTimeouts

Bases: LoopService

Source code in src/prefect/server/services/task_scheduling.py
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
class TaskSchedulingTimeouts(LoopService):
    _first_run: bool

    def __init__(self, loop_seconds: Optional[float] = None, **kwargs):
        self._first_run = True
        super().__init__(
            loop_seconds=loop_seconds
            or PREFECT_TASK_SCHEDULING_PENDING_TASK_TIMEOUT.value().total_seconds(),
            **kwargs,
        )

    @inject_db
    async def run_once(self, db: PrefectDBInterface):
        """
        Periodically reschedules pending task runs that have been pending for too long.
        """
        if not PREFECT_TASK_SCHEDULING_PENDING_TASK_TIMEOUT:
            return

        async with db.session_context(begin_transaction=True) as session:
            if self._first_run:
                await self.restore_scheduled_tasks_if_necessary(session)
                self._first_run = False

            await self.reschedule_pending_runs(session)

    async def restore_scheduled_tasks_if_necessary(self, session: AsyncSession):
        """
        Restores scheduled task runs from the database to the in-memory queues.
        """
        task_runs = await models.task_runs.read_task_runs(
            session=session,
            task_run_filter=filters.TaskRunFilter(
                flow_run_id=filters.TaskRunFilterFlowRunId(is_null_=True),
                state=filters.TaskRunFilterState(
                    type=filters.TaskRunFilterStateType(
                        any_=[states.StateType.SCHEDULED]
                    )
                ),
            ),
        )

        for task_run_model in task_runs:
            task_run: schemas.core.TaskRun = schemas.core.TaskRun.model_validate(
                task_run_model
            )
            await TaskQueue.for_key(task_run.task_key).retry(task_run)

        self.logger.info("Restored %s scheduled task runs", len(task_runs))

    async def reschedule_pending_runs(self, session: AsyncSession):
        """
        Transitions any background task runs that have been PENDING too long into
        SCHEDULED, and reenqueues them.
        """
        task_runs = await models.task_runs.read_task_runs(
            session=session,
            task_run_filter=filters.TaskRunFilter(
                flow_run_id=filters.TaskRunFilterFlowRunId(is_null_=True),
                state=filters.TaskRunFilterState(
                    type=filters.TaskRunFilterStateType(any_=[states.StateType.PENDING])
                ),
            ),
        )

        older_than = (
            pendulum.now("UTC") - PREFECT_TASK_SCHEDULING_PENDING_TASK_TIMEOUT.value()
        )
        task_runs = [t for t in task_runs if t.state.timestamp <= older_than]

        orchestration_parameters = (
            await orchestration_dependencies.provide_task_orchestration_parameters()
        )
        for task_run in task_runs:
            self.logger.info("Rescheduling task run %s", task_run.id)
            previous_states = await models.task_run_states.read_task_run_states(
                session=session, task_run_id=task_run.id
            )
            previous_states.sort(key=lambda s: s.timestamp)
            for prior_scheduled_state in previous_states:
                if prior_scheduled_state.type == states.StateType.SCHEDULED:
                    break
            else:
                # This wasn't originally a SCHEDULED background task, so we won't
                # attempt to reschedule it.
                continue

            rescheduled = states.Scheduled(
                state_details={
                    "deferred": True,
                    "task_parameters_id": prior_scheduled_state.state_details.task_parameters_id,
                }
            )

            await models.task_runs.set_task_run_state(
                session=session,
                task_run_id=task_run.id,
                state=rescheduled,
                force=True,
                orchestration_parameters=orchestration_parameters,
            )

        self.logger.info("Rescheduled %s pending task runs", len(task_runs))

reschedule_pending_runs(session) async

Transitions any background task runs that have been PENDING too long into SCHEDULED, and reenqueues them.

Source code in src/prefect/server/services/task_scheduling.py
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
async def reschedule_pending_runs(self, session: AsyncSession):
    """
    Transitions any background task runs that have been PENDING too long into
    SCHEDULED, and reenqueues them.
    """
    task_runs = await models.task_runs.read_task_runs(
        session=session,
        task_run_filter=filters.TaskRunFilter(
            flow_run_id=filters.TaskRunFilterFlowRunId(is_null_=True),
            state=filters.TaskRunFilterState(
                type=filters.TaskRunFilterStateType(any_=[states.StateType.PENDING])
            ),
        ),
    )

    older_than = (
        pendulum.now("UTC") - PREFECT_TASK_SCHEDULING_PENDING_TASK_TIMEOUT.value()
    )
    task_runs = [t for t in task_runs if t.state.timestamp <= older_than]

    orchestration_parameters = (
        await orchestration_dependencies.provide_task_orchestration_parameters()
    )
    for task_run in task_runs:
        self.logger.info("Rescheduling task run %s", task_run.id)
        previous_states = await models.task_run_states.read_task_run_states(
            session=session, task_run_id=task_run.id
        )
        previous_states.sort(key=lambda s: s.timestamp)
        for prior_scheduled_state in previous_states:
            if prior_scheduled_state.type == states.StateType.SCHEDULED:
                break
        else:
            # This wasn't originally a SCHEDULED background task, so we won't
            # attempt to reschedule it.
            continue

        rescheduled = states.Scheduled(
            state_details={
                "deferred": True,
                "task_parameters_id": prior_scheduled_state.state_details.task_parameters_id,
            }
        )

        await models.task_runs.set_task_run_state(
            session=session,
            task_run_id=task_run.id,
            state=rescheduled,
            force=True,
            orchestration_parameters=orchestration_parameters,
        )

    self.logger.info("Rescheduled %s pending task runs", len(task_runs))

restore_scheduled_tasks_if_necessary(session) async

Restores scheduled task runs from the database to the in-memory queues.

Source code in src/prefect/server/services/task_scheduling.py
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
async def restore_scheduled_tasks_if_necessary(self, session: AsyncSession):
    """
    Restores scheduled task runs from the database to the in-memory queues.
    """
    task_runs = await models.task_runs.read_task_runs(
        session=session,
        task_run_filter=filters.TaskRunFilter(
            flow_run_id=filters.TaskRunFilterFlowRunId(is_null_=True),
            state=filters.TaskRunFilterState(
                type=filters.TaskRunFilterStateType(
                    any_=[states.StateType.SCHEDULED]
                )
            ),
        ),
    )

    for task_run_model in task_runs:
        task_run: schemas.core.TaskRun = schemas.core.TaskRun.model_validate(
            task_run_model
        )
        await TaskQueue.for_key(task_run.task_key).retry(task_run)

    self.logger.info("Restored %s scheduled task runs", len(task_runs))

run_once(db) async

Periodically reschedules pending task runs that have been pending for too long.

Source code in src/prefect/server/services/task_scheduling.py
33
34
35
36
37
38
39
40
41
42
43
44
45
46
@inject_db
async def run_once(self, db: PrefectDBInterface):
    """
    Periodically reschedules pending task runs that have been pending for too long.
    """
    if not PREFECT_TASK_SCHEDULING_PENDING_TASK_TIMEOUT:
        return

    async with db.session_context(begin_transaction=True) as session:
        if self._first_run:
            await self.restore_scheduled_tasks_if_necessary(session)
            self._first_run = False

        await self.reschedule_pending_runs(session)