Skip to content

prefect.testing.utilities

Internal utilities for tests.

assert_does_not_warn(ignore_warnings=[])

Converts warnings to errors within this context to assert warnings are not raised, except for those specified in ignore_warnings.

Parameters: - ignore_warnings: List of warning types to ignore. Example: [DeprecationWarning, UserWarning]

Source code in src/prefect/testing/utilities.py
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
@contextmanager
def assert_does_not_warn(ignore_warnings=[]):
    """
    Converts warnings to errors within this context to assert warnings are not raised,
    except for those specified in ignore_warnings.

    Parameters:
    - ignore_warnings: List of warning types to ignore. Example: [DeprecationWarning, UserWarning]
    """
    with warnings.catch_warnings():
        warnings.simplefilter("error")
        for warning_type in ignore_warnings:
            warnings.filterwarnings("ignore", category=warning_type)

        try:
            yield
        except Warning as warning:
            raise AssertionError(f"Warning was raised. {warning!r}") from warning

exceptions_equal(a, b)

Exceptions cannot be compared by ==. They can be compared using is but this will fail if the exception is serialized/deserialized so this utility does its best to assert equality using the type and args used to initialize the exception

Source code in src/prefect/testing/utilities.py
31
32
33
34
35
36
37
38
39
def exceptions_equal(a, b):
    """
    Exceptions cannot be compared by `==`. They can be compared using `is` but this
    will fail if the exception is serialized/deserialized so this utility does its
    best to assert equality using the type and args used to initialize the exception
    """
    if a == b:
        return True
    return type(a) == type(b) and getattr(a, "args", None) == getattr(b, "args", None)

prefect_test_harness()

Temporarily run flows against a local SQLite database for testing.

Examples:

>>> from prefect import flow
>>> from prefect.testing.utilities import prefect_test_harness
>>>
>>>
>>> @flow
>>> def my_flow():
>>>     return 'Done!'
>>>
>>> with prefect_test_harness():
>>>     assert my_flow() == 'Done!' # run against temporary db
Source code in src/prefect/testing/utilities.py
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
@contextmanager
def prefect_test_harness():
    """
    Temporarily run flows against a local SQLite database for testing.

    Examples:
        >>> from prefect import flow
        >>> from prefect.testing.utilities import prefect_test_harness
        >>>
        >>>
        >>> @flow
        >>> def my_flow():
        >>>     return 'Done!'
        >>>
        >>> with prefect_test_harness():
        >>>     assert my_flow() == 'Done!' # run against temporary db
    """
    from prefect.server.database.dependencies import temporary_database_interface

    # create temp directory for the testing database
    temp_dir = mkdtemp()

    def cleanup_temp_dir(temp_dir):
        shutil.rmtree(temp_dir)

    atexit.register(cleanup_temp_dir, temp_dir)

    with ExitStack() as stack:
        # temporarily override any database interface components
        stack.enter_context(temporary_database_interface())

        DB_PATH = "sqlite+aiosqlite:///" + str(Path(temp_dir) / "prefect-test.db")
        stack.enter_context(
            prefect.settings.temporary_settings(
                # Use a temporary directory for the database
                updates={
                    prefect.settings.PREFECT_API_DATABASE_CONNECTION_URL: DB_PATH,
                },
            )
        )
        # start a subprocess server to test against
        test_server = SubprocessASGIServer()
        test_server.start(timeout=30)
        stack.enter_context(
            prefect.settings.temporary_settings(
                # Use a temporary directory for the database
                updates={
                    prefect.settings.PREFECT_API_URL: test_server.api_url,
                },
            )
        )
        yield
        # drain the logs before stopping the server to avoid connection errors on shutdown
        APILogWorker.instance().drain()
        test_server.stop()