Skip to content

prefect.testing.utilities

Internal utilities for tests.

assert_does_not_warn(ignore_warnings=[])

Converts warnings to errors within this context to assert warnings are not raised, except for those specified in ignore_warnings.

Parameters: - ignore_warnings: List of warning types to ignore. Example: [DeprecationWarning, UserWarning]

Source code in src/prefect/testing/utilities.py
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
@contextmanager
def assert_does_not_warn(ignore_warnings=[]):
    """
    Converts warnings to errors within this context to assert warnings are not raised,
    except for those specified in ignore_warnings.

    Parameters:
    - ignore_warnings: List of warning types to ignore. Example: [DeprecationWarning, UserWarning]
    """
    with warnings.catch_warnings():
        warnings.simplefilter("error")
        for warning_type in ignore_warnings:
            warnings.filterwarnings("ignore", category=warning_type)

        try:
            yield
        except Warning as warning:
            raise AssertionError(f"Warning was raised. {warning!r}") from warning

exceptions_equal(a, b)

Exceptions cannot be compared by ==. They can be compared using is but this will fail if the exception is serialized/deserialized so this utility does its best to assert equality using the type and args used to initialize the exception

Source code in src/prefect/testing/utilities.py
36
37
38
39
40
41
42
43
44
def exceptions_equal(a, b):
    """
    Exceptions cannot be compared by `==`. They can be compared using `is` but this
    will fail if the exception is serialized/deserialized so this utility does its
    best to assert equality using the type and args used to initialize the exception
    """
    if a == b:
        return True
    return type(a) == type(b) and getattr(a, "args", None) == getattr(b, "args", None)

prefect_test_harness(server_startup_timeout=30)

Temporarily run flows against a local SQLite database for testing.

Parameters:

Name Type Description Default
server_startup_timeout Optional[int]

The maximum time to wait for the server to start. Defaults to 30 seconds. If set to None, the value of PREFECT_SERVER_EPHEMERAL_STARTUP_TIMEOUT_SECONDS will be used.

30

Examples:

>>> from prefect import flow
>>> from prefect.testing.utilities import prefect_test_harness
>>>
>>>
>>> @flow
>>> def my_flow():
>>>     return 'Done!'
>>>
>>> with prefect_test_harness():
>>>     assert my_flow() == 'Done!' # run against temporary db
Source code in src/prefect/testing/utilities.py
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
@contextmanager
def prefect_test_harness(server_startup_timeout: Optional[int] = 30):
    """
    Temporarily run flows against a local SQLite database for testing.

    Args:
        server_startup_timeout: The maximum time to wait for the server to start.
            Defaults to 30 seconds. If set to `None`, the value of
            `PREFECT_SERVER_EPHEMERAL_STARTUP_TIMEOUT_SECONDS` will be used.

    Examples:
        >>> from prefect import flow
        >>> from prefect.testing.utilities import prefect_test_harness
        >>>
        >>>
        >>> @flow
        >>> def my_flow():
        >>>     return 'Done!'
        >>>
        >>> with prefect_test_harness():
        >>>     assert my_flow() == 'Done!' # run against temporary db
    """
    from prefect.server.database.dependencies import temporary_database_interface

    # create temp directory for the testing database
    temp_dir = mkdtemp()

    def cleanup_temp_dir(temp_dir):
        shutil.rmtree(temp_dir)

    atexit.register(cleanup_temp_dir, temp_dir)

    with ExitStack() as stack:
        # temporarily override any database interface components
        stack.enter_context(temporary_database_interface())

        DB_PATH = "sqlite+aiosqlite:///" + str(Path(temp_dir) / "prefect-test.db")
        stack.enter_context(
            prefect.settings.temporary_settings(
                # Use a temporary directory for the database
                updates={
                    prefect.settings.PREFECT_API_DATABASE_CONNECTION_URL: DB_PATH,
                },
            )
        )
        # start a subprocess server to test against
        test_server = SubprocessASGIServer()
        test_server.start(
            timeout=server_startup_timeout
            if server_startup_timeout is not None
            else prefect.settings.PREFECT_SERVER_EPHEMERAL_STARTUP_TIMEOUT_SECONDS.value()
        )
        stack.enter_context(
            prefect.settings.temporary_settings(
                # Use a temporary directory for the database
                updates={
                    prefect.settings.PREFECT_API_URL: test_server.api_url,
                },
            )
        )
        yield
        # drain the logs before stopping the server to avoid connection errors on shutdown
        APILogWorker.instance().drain()
        test_server.stop()