30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261 | class Foreman(LoopService):
"""
A loop service responsible for monitoring the status of workers.
Handles updating the status of workers and their associated work pools.
"""
def __init__(
self,
loop_seconds: Optional[float] = None,
inactivity_heartbeat_multiple: Optional[int] = None,
fallback_heartbeat_interval_seconds: Optional[int] = None,
deployment_last_polled_timeout_seconds: Optional[int] = None,
work_queue_last_polled_timeout_seconds: Optional[int] = None,
**kwargs,
):
super().__init__(
loop_seconds=loop_seconds
or PREFECT_API_SERVICES_FOREMAN_LOOP_SECONDS.value(),
**kwargs,
)
self._inactivity_heartbeat_multiple = (
PREFECT_API_SERVICES_FOREMAN_INACTIVITY_HEARTBEAT_MULTIPLE.value()
if inactivity_heartbeat_multiple is None
else inactivity_heartbeat_multiple
)
self._fallback_heartbeat_interval_seconds = (
PREFECT_API_SERVICES_FOREMAN_FALLBACK_HEARTBEAT_INTERVAL_SECONDS.value()
if fallback_heartbeat_interval_seconds is None
else fallback_heartbeat_interval_seconds
)
self._deployment_last_polled_timeout_seconds = (
PREFECT_API_SERVICES_FOREMAN_DEPLOYMENT_LAST_POLLED_TIMEOUT_SECONDS.value()
if deployment_last_polled_timeout_seconds is None
else deployment_last_polled_timeout_seconds
)
self._work_queue_last_polled_timeout_seconds = (
PREFECT_API_SERVICES_FOREMAN_WORK_QUEUE_LAST_POLLED_TIMEOUT_SECONDS.value()
if work_queue_last_polled_timeout_seconds is None
else work_queue_last_polled_timeout_seconds
)
@db_injector
async def run_once(db: PrefectDBInterface, self: Self) -> None:
"""
Iterate over workers current marked as online. Mark workers as offline
if they have an old last_heartbeat_time. Marks work pools as not ready
if they do not have any online workers and are currently marked as ready.
Mark deployments as not ready if they have a last_polled time that is
older than the configured deployment last polled timeout.
"""
await self._mark_online_workers_without_a_recent_heartbeat_as_offline()
await self._mark_work_pools_as_not_ready()
await self._mark_deployments_as_not_ready()
await self._mark_work_queues_as_not_ready()
@db_injector
async def _mark_online_workers_without_a_recent_heartbeat_as_offline(
db: PrefectDBInterface,
self: Self,
) -> None:
"""
Updates the status of workers that have an old last heartbeat time
to OFFLINE.
An old heartbeat last heartbeat that is one more than
their heartbeat interval multiplied by the
INACTIVITY_HEARTBEAT_MULTIPLE seconds ago.
Args:
session (AsyncSession): The session to use for the database operation.
"""
async with db.session_context(begin_transaction=True) as session:
if db.dialect.name == "postgresql":
worker_update_stmt = sa.text(
"""
UPDATE worker
SET status = 'OFFLINE'
WHERE (
last_heartbeat_time <
CURRENT_TIMESTAMP - (
interval '1 second' * :multiplier *
COALESCE(heartbeat_interval_seconds, :default_interval)
)
)
AND status = 'ONLINE';
"""
)
elif db.dialect.name == "sqlite":
worker_update_stmt = sa.text(
"""
UPDATE worker
SET status = 'OFFLINE'
WHERE (
julianday(last_heartbeat_time) <
julianday('now') - ((
:multiplier *
COALESCE(heartbeat_interval_seconds, :default_interval)
) / 86400.0)
)
AND status = 'ONLINE';
"""
)
else:
raise NotImplementedError(
f"No implementation for database dialect {db.dialect.name}"
)
result = await session.execute(
worker_update_stmt,
{
"multiplier": self._inactivity_heartbeat_multiple,
"default_interval": self._fallback_heartbeat_interval_seconds,
},
)
if result.rowcount:
self.logger.info(f"Marked {result.rowcount} workers as offline.")
@db_injector
async def _mark_work_pools_as_not_ready(db: PrefectDBInterface, self: Self):
"""
Marks a work pool as not ready.
Emits and event and updates any bookkeeping fields on the work pool.
Args:
work_pool (db.WorkPool): The work pool to mark as not ready.
"""
async with db.session_context(begin_transaction=True) as session:
work_pools_select_stmt = (
sa.select(db.WorkPool)
.filter(db.WorkPool.status == "READY")
.outerjoin(
db.Worker,
sa.and_(
db.Worker.work_pool_id == db.WorkPool.id,
db.Worker.status == "ONLINE",
),
)
.group_by(db.WorkPool.id)
.having(sa.func.count(db.Worker.id) == 0)
)
result = await session.execute(work_pools_select_stmt)
work_pools = result.scalars().all()
for work_pool in work_pools:
await models.workers.update_work_pool(
session=session,
work_pool_id=work_pool.id,
work_pool=InternalWorkPoolUpdate(status=WorkPoolStatus.NOT_READY),
emit_status_change=emit_work_pool_status_event,
)
self.logger.info(f"Marked work pool {work_pool.id} as NOT_READY.")
@db_injector
async def _mark_deployments_as_not_ready(
db: PrefectDBInterface,
self: Self,
):
"""
Marks a deployment as NOT_READY and emits a deployment status event.
Emits an event and updates any bookkeeping fields on the deployment.
Args:
session (AsyncSession): The session to use for the database operation.
"""
async with db.session_context(begin_transaction=True) as session:
status_timeout_threshold = pendulum.now("UTC") - timedelta(
seconds=self._deployment_last_polled_timeout_seconds
)
deployment_id_select_stmt = (
sa.select(db.Deployment.id)
.outerjoin(db.WorkQueue, db.WorkQueue.id == db.Deployment.work_queue_id)
.filter(db.Deployment.status == DeploymentStatus.READY)
.filter(db.Deployment.last_polled.isnot(None))
.filter(
sa.or_(
# if work_queue.last_polled doesn't exist, use only deployment's
# last_polled
sa.and_(
db.WorkQueue.last_polled.is_(None),
db.Deployment.last_polled < status_timeout_threshold,
),
# if work_queue.last_polled exists, both times should be less than
# the threshold
sa.and_(
db.WorkQueue.last_polled.isnot(None),
db.Deployment.last_polled < status_timeout_threshold,
db.WorkQueue.last_polled < status_timeout_threshold,
),
)
)
)
result = await session.execute(deployment_id_select_stmt)
deployment_ids_to_mark_unready = result.scalars().all()
await mark_deployments_not_ready(
deployment_ids=deployment_ids_to_mark_unready,
)
@db_injector
async def _mark_work_queues_as_not_ready(
db: PrefectDBInterface,
self: Self,
):
"""
Marks work queues as NOT_READY based on their last_polled field.
Args:
session (AsyncSession): The session to use for the database operation.
"""
async with db.session_context(begin_transaction=True) as session:
status_timeout_threshold = pendulum.now("UTC") - timedelta(
seconds=self._work_queue_last_polled_timeout_seconds
)
id_select_stmt = (
sa.select(db.WorkQueue.id)
.outerjoin(db.WorkPool, db.WorkPool.id == db.WorkQueue.work_pool_id)
.filter(db.WorkQueue.status == "READY")
.filter(db.WorkQueue.last_polled.isnot(None))
.filter(db.WorkQueue.last_polled < status_timeout_threshold)
.order_by(db.WorkQueue.last_polled.asc())
)
result = await session.execute(id_select_stmt)
unready_work_queue_ids = result.scalars().all()
await mark_work_queues_not_ready(
work_queue_ids=unready_work_queue_ids,
)
|