Skip to content

prefect.testing.fixtures

hosted_api_server(unused_tcp_port_factory) async

Runs an instance of the Prefect API server in a subprocess instead of the using the ephemeral application.

Uses the same database as the rest of the tests.

Yields:

Type Description

The API URL

Source code in src/prefect/testing/fixtures.py
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
@pytest.fixture(scope="session")
async def hosted_api_server(unused_tcp_port_factory):
    """
    Runs an instance of the Prefect API server in a subprocess instead of the using the
    ephemeral application.

    Uses the same database as the rest of the tests.

    Yields:
        The API URL
    """
    port = unused_tcp_port_factory()

    # Will connect to the same database as normal test clients
    async with open_process(
        command=[
            "uvicorn",
            "--factory",
            "prefect.server.api.server:create_app",
            "--host",
            "127.0.0.1",
            "--port",
            str(port),
            "--log-level",
            "info",
        ],
        stdout=sys.stdout,
        stderr=sys.stderr,
        env={
            **os.environ,
            **get_current_settings().to_environment_variables(exclude_unset=True),
        },
    ) as process:
        api_url = f"http://localhost:{port}/api"

        # Wait for the server to be ready
        async with httpx.AsyncClient() as client:
            response = None
            with anyio.move_on_after(20):
                while True:
                    try:
                        response = await client.get(api_url + "/health")
                    except httpx.ConnectError:
                        pass
                    else:
                        if response.status_code == 200:
                            break
                    await anyio.sleep(0.1)
            if response:
                response.raise_for_status()
            if not response:
                raise RuntimeError(
                    "Timed out while attempting to connect to hosted test Prefect API."
                )

        # Yield to the consuming tests
        yield api_url

        # Then shutdown the process
        try:
            process.terminate()

            # Give the process a 10 second grace period to shutdown
            for _ in range(10):
                if process.returncode is not None:
                    break
                await anyio.sleep(1)
            else:
                # Kill the process if it is not shutdown in time
                process.kill()

        except ProcessLookupError:
            pass

mock_anyio_sleep(monkeypatch)

Mock sleep used to not actually sleep but to set the current time to now + sleep delay seconds while still yielding to other tasks in the event loop.

Provides "assert_sleeps_for" context manager which asserts a sleep time occurred within the context while using the actual runtime of the context as a tolerance.

Source code in src/prefect/testing/fixtures.py
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
@pytest.fixture
def mock_anyio_sleep(monkeypatch):
    """
    Mock sleep used to not actually sleep but to set the current time to now + sleep
    delay seconds while still yielding to other tasks in the event loop.

    Provides "assert_sleeps_for" context manager which asserts a sleep time occurred
    within the context while using the actual runtime of the context as a tolerance.
    """
    original_now = pendulum.now
    original_sleep = anyio.sleep
    time_shift = 0.0

    async def callback(delay_in_seconds):
        nonlocal time_shift
        time_shift += float(delay_in_seconds)
        # Preserve yield effects of sleep
        await original_sleep(0)

    def latest_now(*args):
        # Fast-forwards the time by the total sleep time
        return original_now(*args).add(
            # Ensure we retain float precision
            seconds=int(time_shift),
            microseconds=(time_shift - int(time_shift)) * 1000000,
        )

    monkeypatch.setattr("pendulum.now", latest_now)

    sleep = AsyncMock(side_effect=callback)
    monkeypatch.setattr("anyio.sleep", sleep)

    @contextmanager
    def assert_sleeps_for(
        seconds: Union[int, float], extra_tolerance: Union[int, float] = 0
    ):
        """
        Assert that sleep was called for N seconds during the duration of the context.
        The runtime of the code during the context of the duration is used as an
        upper tolerance to account for sleeps that start based on a time. This is less
        brittle than attempting to freeze the current time.

        If an integer is provided, the upper tolerance will be rounded up to the nearest
        integer. If a float is provided, the upper tolerance will be a float.

        An optional extra tolerance may be provided to account for any other issues.
        This will be applied symmetrically.
        """
        run_t0 = original_now().timestamp()
        sleep_t0 = time_shift
        yield
        run_t1 = original_now().timestamp()
        sleep_t1 = time_shift
        runtime = run_t1 - run_t0
        if isinstance(seconds, int):
            # Round tolerance up to the nearest integer if input is an int
            runtime = int(runtime) + 1
        sleeptime = sleep_t1 - sleep_t0
        assert (
            sleeptime - float(extra_tolerance)
            <= seconds
            <= sleeptime + runtime + extra_tolerance
        ), (
            f"Sleep was called for {sleeptime}; expected {seconds} with tolerance of"
            f" +{runtime + extra_tolerance}, -{extra_tolerance}"
        )

    sleep.assert_sleeps_for = assert_sleeps_for

    return sleep

use_hosted_api_server(hosted_api_server)

Sets PREFECT_API_URL to the test session's hosted API endpoint.

Source code in src/prefect/testing/fixtures.py
125
126
127
128
129
130
131
132
133
134
135
136
@pytest.fixture
def use_hosted_api_server(hosted_api_server):
    """
    Sets `PREFECT_API_URL` to the test session's hosted API endpoint.
    """
    with temporary_settings(
        {
            PREFECT_API_URL: hosted_api_server,
            PREFECT_SERVER_CSRF_PROTECTION_ENABLED: False,
        }
    ):
        yield hosted_api_server