Skip to content

prefect.workers.server

build_healthcheck_server(worker, query_interval_seconds, log_level='error')

Build a healthcheck FastAPI server for a worker.

Parameters:

Name Type Description Default
worker BaseWorker | ProcessWorker

the worker whose health we will check

required
log_level str

the log

'error'
Source code in src/prefect/workers/server.py
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
def build_healthcheck_server(
    worker: Union[BaseWorker, ProcessWorker],
    query_interval_seconds: float,
    log_level: str = "error",
):
    """
    Build a healthcheck FastAPI server for a worker.

    Args:
        worker (BaseWorker | ProcessWorker): the worker whose health we will check
        log_level (str): the log
    """
    app = FastAPI()
    router = APIRouter()

    def perform_health_check():
        did_recently_poll = worker.is_worker_still_polling(
            query_interval_seconds=query_interval_seconds
        )

        if not did_recently_poll:
            return JSONResponse(
                status_code=status.HTTP_503_SERVICE_UNAVAILABLE,
                content={"message": "Worker may be unresponsive at this time"},
            )
        return JSONResponse(status_code=status.HTTP_200_OK, content={"message": "OK"})

    router.add_api_route("/health", perform_health_check, methods=["GET"])

    app.include_router(router)

    config = uvicorn.Config(
        app=app,
        host=PREFECT_WORKER_WEBSERVER_HOST.value(),
        port=PREFECT_WORKER_WEBSERVER_PORT.value(),
        log_level=log_level,
    )
    return uvicorn.Server(config=config)

start_healthcheck_server(worker, query_interval_seconds, log_level='error')

Run a healthcheck FastAPI server for a worker.

Parameters:

Name Type Description Default
worker BaseWorker | ProcessWorker

the worker whose health we will check

required
log_level str

the log level to use for the server

'error'
Source code in src/prefect/workers/server.py
56
57
58
59
60
61
62
63
64
65
66
67
68
69
def start_healthcheck_server(
    worker: Union[BaseWorker, ProcessWorker],
    query_interval_seconds: float,
    log_level: str = "error",
) -> None:
    """
    Run a healthcheck FastAPI server for a worker.

    Args:
        worker (BaseWorker | ProcessWorker): the worker whose health we will check
        log_level (str): the log level to use for the server
    """
    server = build_healthcheck_server(worker, query_interval_seconds, log_level)
    server.run()