Skip to content

prefect.server.services.loop_service

The base class for all Prefect REST API loop services.

LoopService

Loop services are relatively lightweight maintenance routines that need to run periodically.

This class makes it straightforward to design and integrate them. Users only need to define the run_once coroutine to describe the behavior of the service on each loop.

Source code in src/prefect/server/services/loop_service.py
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
class LoopService:
    """
    Loop services are relatively lightweight maintenance routines that need to run periodically.

    This class makes it straightforward to design and integrate them. Users only need to
    define the `run_once` coroutine to describe the behavior of the service on each loop.
    """

    loop_seconds = 60

    def __init__(
        self, loop_seconds: Optional[float] = None, handle_signals: bool = False
    ):
        """
        Args:
            loop_seconds (float): if provided, overrides the loop interval
                otherwise specified as a class variable
            handle_signals (bool): if True, SIGINT and SIGTERM are
                gracefully intercepted and shut down the running service.
        """
        if loop_seconds:
            self.loop_seconds = loop_seconds  # seconds between runs
        self._should_stop = False  # flag for whether the service should stop running
        self._is_running = False  # flag for whether the service is running
        self.name = type(self).__name__
        self.logger = get_logger(f"server.services.{self.name.lower()}")

        if handle_signals:
            _register_signal(signal.SIGINT, self._stop)
            _register_signal(signal.SIGTERM, self._stop)

    @inject_db
    async def _on_start(self, db: PrefectDBInterface) -> None:
        """
        Called prior to running the service
        """
        # reset the _should_stop flag
        self._should_stop = False
        # set the _is_running flag
        self._is_running = True

    async def _on_stop(self) -> None:
        """
        Called after running the service
        """
        # reset the _is_running flag
        self._is_running = False

    async def start(self, loops=None) -> None:
        """
        Run the service `loops` time. Pass loops=None to run forever.

        Args:
            loops (int, optional): the number of loops to run before exiting.
        """

        await self._on_start()

        i = 0
        while not self._should_stop:
            start_time = pendulum.now("UTC")

            try:
                self.logger.debug(f"About to run {self.name}...")
                await self.run_once()

            except NotImplementedError as exc:
                raise exc from None

            # if an error is raised, log and continue
            except Exception as exc:
                # avoid circular import
                from prefect.server.api.server import is_client_retryable_exception

                retryable_error = is_client_retryable_exception(exc)
                if not retryable_error or (
                    retryable_error and PREFECT_API_LOG_RETRYABLE_ERRORS.value()
                ):
                    self.logger.error(
                        f"Unexpected error in: {repr(exc)}", exc_info=True
                    )

            end_time = pendulum.now("UTC")

            # if service took longer than its loop interval, log a warning
            # that the interval might be too short
            if (end_time - start_time).total_seconds() > self.loop_seconds:
                self.logger.warning(
                    f"{self.name} took {(end_time-start_time).total_seconds()} seconds"
                    " to run, which is longer than its loop interval of"
                    f" {self.loop_seconds} seconds."
                )

            # check if early stopping was requested
            i += 1
            if loops is not None and i == loops:
                self.logger.debug(f"{self.name} exiting after {loops} loop(s).")
                await self.stop(block=False)

            # next run is every "loop seconds" after each previous run *started*.
            # note that if the loop took unexpectedly long, the "next_run" time
            # might be in the past, which will result in an instant start
            next_run = max(
                start_time.add(seconds=self.loop_seconds), pendulum.now("UTC")
            )
            self.logger.debug(f"Finished running {self.name}. Next run at {next_run}")

            # check the `_should_stop` flag every 1 seconds until the next run time is reached
            while pendulum.now("UTC") < next_run and not self._should_stop:
                await asyncio.sleep(
                    min(1, (next_run - pendulum.now("UTC")).total_seconds())
                )

        await self._on_stop()

    async def stop(self, block=True) -> None:
        """
        Gracefully stops a running LoopService and optionally blocks until the
        service stops.

        Args:
            block (bool): if True, blocks until the service is
                finished running. Otherwise it requests a stop and returns but
                the service may still be running a final loop.

        """
        self._stop()

        if block:
            # if block=True, sleep until the service stops running,
            # but no more than `loop_seconds` to avoid a deadlock
            with anyio.move_on_after(self.loop_seconds):
                while self._is_running:
                    await asyncio.sleep(0.1)

            # if the service is still running after `loop_seconds`, something's wrong
            if self._is_running:
                self.logger.warning(
                    f"`stop(block=True)` was called on {self.name} but more than one"
                    f" loop interval ({self.loop_seconds} seconds) has passed. This"
                    " usually means something is wrong. If `stop()` was called from"
                    " inside the loop service, use `stop(block=False)` instead."
                )

    def _stop(self, *_) -> None:
        """
        Private, synchronous method for setting the `_should_stop` flag. Takes arbitrary
        arguments so it can be used as a signal handler.
        """
        self._should_stop = True

    async def run_once(self) -> None:
        """
        Represents one loop of the service.

        Users should override this method.

        To actually run the service once, call `LoopService().start(loops=1)`
        instead of `LoopService().run_once()`, because this method will not invoke setup
        and teardown methods properly.
        """
        raise NotImplementedError("LoopService subclasses must implement this method.")

__init__(loop_seconds=None, handle_signals=False)

Parameters:

Name Type Description Default
loop_seconds float

if provided, overrides the loop interval otherwise specified as a class variable

None
handle_signals bool

if True, SIGINT and SIGTERM are gracefully intercepted and shut down the running service.

False
Source code in src/prefect/server/services/loop_service.py
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
def __init__(
    self, loop_seconds: Optional[float] = None, handle_signals: bool = False
):
    """
    Args:
        loop_seconds (float): if provided, overrides the loop interval
            otherwise specified as a class variable
        handle_signals (bool): if True, SIGINT and SIGTERM are
            gracefully intercepted and shut down the running service.
    """
    if loop_seconds:
        self.loop_seconds = loop_seconds  # seconds between runs
    self._should_stop = False  # flag for whether the service should stop running
    self._is_running = False  # flag for whether the service is running
    self.name = type(self).__name__
    self.logger = get_logger(f"server.services.{self.name.lower()}")

    if handle_signals:
        _register_signal(signal.SIGINT, self._stop)
        _register_signal(signal.SIGTERM, self._stop)

run_once() async

Represents one loop of the service.

Users should override this method.

To actually run the service once, call LoopService().start(loops=1) instead of LoopService().run_once(), because this method will not invoke setup and teardown methods properly.

Source code in src/prefect/server/services/loop_service.py
170
171
172
173
174
175
176
177
178
179
180
async def run_once(self) -> None:
    """
    Represents one loop of the service.

    Users should override this method.

    To actually run the service once, call `LoopService().start(loops=1)`
    instead of `LoopService().run_once()`, because this method will not invoke setup
    and teardown methods properly.
    """
    raise NotImplementedError("LoopService subclasses must implement this method.")

start(loops=None) async

Run the service loops time. Pass loops=None to run forever.

Parameters:

Name Type Description Default
loops int

the number of loops to run before exiting.

None
Source code in src/prefect/server/services/loop_service.py
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
async def start(self, loops=None) -> None:
    """
    Run the service `loops` time. Pass loops=None to run forever.

    Args:
        loops (int, optional): the number of loops to run before exiting.
    """

    await self._on_start()

    i = 0
    while not self._should_stop:
        start_time = pendulum.now("UTC")

        try:
            self.logger.debug(f"About to run {self.name}...")
            await self.run_once()

        except NotImplementedError as exc:
            raise exc from None

        # if an error is raised, log and continue
        except Exception as exc:
            # avoid circular import
            from prefect.server.api.server import is_client_retryable_exception

            retryable_error = is_client_retryable_exception(exc)
            if not retryable_error or (
                retryable_error and PREFECT_API_LOG_RETRYABLE_ERRORS.value()
            ):
                self.logger.error(
                    f"Unexpected error in: {repr(exc)}", exc_info=True
                )

        end_time = pendulum.now("UTC")

        # if service took longer than its loop interval, log a warning
        # that the interval might be too short
        if (end_time - start_time).total_seconds() > self.loop_seconds:
            self.logger.warning(
                f"{self.name} took {(end_time-start_time).total_seconds()} seconds"
                " to run, which is longer than its loop interval of"
                f" {self.loop_seconds} seconds."
            )

        # check if early stopping was requested
        i += 1
        if loops is not None and i == loops:
            self.logger.debug(f"{self.name} exiting after {loops} loop(s).")
            await self.stop(block=False)

        # next run is every "loop seconds" after each previous run *started*.
        # note that if the loop took unexpectedly long, the "next_run" time
        # might be in the past, which will result in an instant start
        next_run = max(
            start_time.add(seconds=self.loop_seconds), pendulum.now("UTC")
        )
        self.logger.debug(f"Finished running {self.name}. Next run at {next_run}")

        # check the `_should_stop` flag every 1 seconds until the next run time is reached
        while pendulum.now("UTC") < next_run and not self._should_stop:
            await asyncio.sleep(
                min(1, (next_run - pendulum.now("UTC")).total_seconds())
            )

    await self._on_stop()

stop(block=True) async

Gracefully stops a running LoopService and optionally blocks until the service stops.

Parameters:

Name Type Description Default
block bool

if True, blocks until the service is finished running. Otherwise it requests a stop and returns but the service may still be running a final loop.

True
Source code in src/prefect/server/services/loop_service.py
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
async def stop(self, block=True) -> None:
    """
    Gracefully stops a running LoopService and optionally blocks until the
    service stops.

    Args:
        block (bool): if True, blocks until the service is
            finished running. Otherwise it requests a stop and returns but
            the service may still be running a final loop.

    """
    self._stop()

    if block:
        # if block=True, sleep until the service stops running,
        # but no more than `loop_seconds` to avoid a deadlock
        with anyio.move_on_after(self.loop_seconds):
            while self._is_running:
                await asyncio.sleep(0.1)

        # if the service is still running after `loop_seconds`, something's wrong
        if self._is_running:
            self.logger.warning(
                f"`stop(block=True)` was called on {self.name} but more than one"
                f" loop interval ({self.loop_seconds} seconds) has passed. This"
                " usually means something is wrong. If `stop()` was called from"
                " inside the loop service, use `stop(block=False)` instead."
            )

run_multiple_services(loop_services) async

Only one signal handler can be active at a time, so this function takes a list of loop services and runs all of them with a global signal handler.

Source code in src/prefect/server/services/loop_service.py
183
184
185
186
187
188
189
190
191
192
193
194
195
async def run_multiple_services(loop_services: List[LoopService]):
    """
    Only one signal handler can be active at a time, so this function takes a list
    of loop services and runs all of them with a global signal handler.
    """

    def stop_all_services(self, *_):
        for service in loop_services:
            service._stop()

    signal.signal(signal.SIGINT, stop_all_services)
    signal.signal(signal.SIGTERM, stop_all_services)
    await asyncio.gather(*[service.start() for service in loop_services])