Skip to content

prefect.utilities.services

critical_service_loop(workload, interval, memory=10, consecutive=3, backoff=1, printer=print, run_once=False, jitter_range=None) async

Runs the given workload function on the specified interval, while being forgiving of intermittent issues like temporary HTTP errors. If more than a certain number of consecutive errors occur, print a summary of up to memory recent exceptions to printer, then begin backoff.

The loop will exit after reaching the consecutive error limit backoff times. On each backoff, the interval will be doubled. On a successful loop, the backoff will be reset.

Parameters:

Name Type Description Default
workload Callable[..., Coroutine]

the function to call

required
interval float

how frequently to call it

required
memory int

how many recent errors to remember

10
consecutive int

how many consecutive errors must we see before we begin backoff

3
backoff int

how many times we should allow consecutive errors before exiting

1
printer Callable[..., None]

a print-like function where errors will be reported

print
run_once bool

if set, the loop will only run once then return

False
jitter_range Optional[float]

if set, the interval will be a random variable (rv) drawn from a clamped Poisson distribution where lambda = interval and the rv is bound between interval * (1 - range) < rv < interval * (1 + range)

None
Source code in src/prefect/utilities/services.py
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
async def critical_service_loop(
    workload: Callable[..., Coroutine],
    interval: float,
    memory: int = 10,
    consecutive: int = 3,
    backoff: int = 1,
    printer: Callable[..., None] = print,
    run_once: bool = False,
    jitter_range: Optional[float] = None,
):
    """
    Runs the given `workload` function on the specified `interval`, while being
    forgiving of intermittent issues like temporary HTTP errors.  If more than a certain
    number of `consecutive` errors occur, print a summary of up to `memory` recent
    exceptions to `printer`, then begin backoff.

    The loop will exit after reaching the consecutive error limit `backoff` times.
    On each backoff, the interval will be doubled. On a successful loop, the backoff
    will be reset.

    Args:
        workload: the function to call
        interval: how frequently to call it
        memory: how many recent errors to remember
        consecutive: how many consecutive errors must we see before we begin backoff
        backoff: how many times we should allow consecutive errors before exiting
        printer: a `print`-like function where errors will be reported
        run_once: if set, the loop will only run once then return
        jitter_range: if set, the interval will be a random variable (rv) drawn from
            a clamped Poisson distribution where lambda = interval and the rv is bound
            between `interval * (1 - range) < rv < interval * (1 + range)`
    """

    track_record: Deque[bool] = deque([True] * consecutive, maxlen=consecutive)
    failures: Deque[Tuple[Exception, TracebackType]] = deque(maxlen=memory)
    backoff_count = 0

    while True:
        try:
            workload_display_name = (
                workload.__name__ if hasattr(workload, "__name__") else workload
            )
            logger.debug(f"Starting run of {workload_display_name!r}")
            await workload()

            # Reset the backoff count on success; we may want to consider resetting
            # this only if the track record is _all_ successful to avoid ending backoff
            # prematurely
            if backoff_count > 0:
                printer("Resetting backoff due to successful run.")
                backoff_count = 0

            track_record.append(True)
        except httpx.TransportError as exc:
            # httpx.TransportError is the base class for any kind of communications
            # error, like timeouts, connection failures, etc.  This does _not_ cover
            # routine HTTP error codes (even 5xx errors like 502/503) so this
            # handler should not be attempting to cover cases where the Prefect server
            # or Prefect Cloud is having an outage (which will be covered by the
            # exception clause below)
            track_record.append(False)
            failures.append((exc, sys.exc_info()[-1]))
            logger.debug(
                f"Run of {workload!r} failed with TransportError", exc_info=exc
            )
        except httpx.HTTPStatusError as exc:
            if exc.response.status_code >= 500:
                # 5XX codes indicate a potential outage of the Prefect API which is
                # likely to be temporary and transient.  Don't quit over these unless
                # it is prolonged.
                track_record.append(False)
                failures.append((exc, sys.exc_info()[-1]))
                logger.debug(
                    f"Run of {workload!r} failed with HTTPStatusError", exc_info=exc
                )
            else:
                raise

        # Decide whether to exit now based on recent history.
        #
        # Given some typical background error rate of, say, 1%, we may still observe
        # quite a few errors in our recent samples, but this is not necessarily a cause
        # for concern.
        #
        # Imagine two distributions that could reflect our situation at any time: the
        # everything-is-fine distribution of errors, and the everything-is-on-fire
        # distribution of errors. We are trying to determine which of those two worlds
        # we are currently experiencing.  We compare the likelihood that we'd draw N
        # consecutive errors from each.  In the everything-is-fine distribution, that
        # would be a very low-probability occurrence, but in the everything-is-on-fire
        # distribution, that is a high-probability occurrence.
        #
        # Remarkably, we only need to look back for a small number of consecutive
        # errors to have reasonable confidence that this is indeed an anomaly.
        # @anticorrelator and @chrisguidry estimated that we should only need to look
        # back for 3 consecutive errors.
        if not any(track_record):
            # We've failed enough times to be sure something is wrong, the writing is
            # on the wall.  Let's explain what we've seen and exit.
            printer(
                f"\nFailed the last {consecutive} attempts. "
                "Please check your environment and configuration."
            )

            printer("Examples of recent errors:\n")

            failures_by_type = distinct(
                reversed(failures),
                key=lambda pair: type(pair[0]),  # Group by the type of exception
            )
            for exception, traceback in failures_by_type:
                printer("".join(format_exception(None, exception, traceback)))
                printer()

            backoff_count += 1

            if backoff_count >= backoff:
                raise RuntimeError("Service exceeded error threshold.")

            # Reset the track record
            track_record.extend([True] * consecutive)
            failures.clear()
            printer(
                "Backing off due to consecutive errors, using increased interval of "
                f" {interval * 2**backoff_count}s."
            )

        if run_once:
            return

        if jitter_range is not None:
            sleep = clamped_poisson_interval(interval, clamping_factor=jitter_range)
        else:
            sleep = interval * 2**backoff_count

        await anyio.sleep(sleep)

start_client_metrics_server()

Start the process-wide Prometheus metrics server for client metrics (if enabled with PREFECT_CLIENT_METRICS_ENABLED) on the port PREFECT_CLIENT_METRICS_PORT.

Source code in src/prefect/utilities/services.py
161
162
163
164
165
166
167
168
169
170
171
172
173
def start_client_metrics_server():
    """Start the process-wide Prometheus metrics server for client metrics (if enabled
    with `PREFECT_CLIENT_METRICS_ENABLED`) on the port `PREFECT_CLIENT_METRICS_PORT`."""
    if not PREFECT_CLIENT_METRICS_ENABLED:
        return

    global _metrics_server
    if _metrics_server:
        return

    from prometheus_client import start_http_server

    _metrics_server = start_http_server(port=PREFECT_CLIENT_METRICS_PORT.value())

stop_client_metrics_server()

Start the process-wide Prometheus metrics server for client metrics, if it has previously been started

Source code in src/prefect/utilities/services.py
176
177
178
179
180
181
182
183
184
def stop_client_metrics_server():
    """Start the process-wide Prometheus metrics server for client metrics, if it has
    previously been started"""
    global _metrics_server
    if _metrics_server:
        server, thread = _metrics_server
        server.shutdown()
        thread.join()
        _metrics_server = None