Skip to content

prefect.server.database.configurations

AioSqliteConfiguration

Bases: BaseDatabaseConfiguration

Source code in src/prefect/server/database/configurations.py
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
class AioSqliteConfiguration(BaseDatabaseConfiguration):
    MIN_SQLITE_VERSION = (3, 24, 0)

    async def engine(self) -> AsyncEngine:
        """Retrieves an async SQLAlchemy engine.

        Args:
            connection_url (str, optional): The database connection string.
                Defaults to self.connection_url
            echo (bool, optional): Whether to echo SQL sent
                to the database. Defaults to self.echo
            timeout (float, optional): The database statement timeout, in seconds.
                Defaults to self.timeout

        Returns:
            AsyncEngine: a SQLAlchemy engine
        """

        if sqlite3.sqlite_version_info < self.MIN_SQLITE_VERSION:
            required = ".".join(str(v) for v in self.MIN_SQLITE_VERSION)
            raise RuntimeError(
                f"Prefect requires sqlite >= {required} but we found version "
                f"{sqlite3.sqlite_version}"
            )

        kwargs = {}

        loop = get_running_loop()

        cache_key = (
            loop,
            self.connection_url,
            self.echo,
            self.timeout,
        )
        if cache_key not in ENGINES:
            # apply database timeout
            if self.timeout is not None:
                kwargs["connect_args"] = dict(timeout=self.timeout)

            # use `named` paramstyle for sqlite instead of `qmark` in very rare
            # circumstances, we've seen aiosqlite pass parameters in the wrong
            # order; by using named parameters we avoid this issue
            # see https://github.com/PrefectHQ/prefect/pull/6702
            kwargs["paramstyle"] = "named"

            # ensure a long-lasting pool is used with in-memory databases
            # because they disappear when the last connection closes
            if ":memory:" in self.connection_url:
                kwargs.update(poolclass=sa.pool.SingletonThreadPool)

            engine = create_async_engine(self.connection_url, echo=self.echo, **kwargs)
            sa.event.listen(engine.sync_engine, "connect", self.setup_sqlite)
            sa.event.listen(engine.sync_engine, "begin", self.begin_sqlite_stmt)

            if TRACKER.active:
                TRACKER.track_pool(engine.pool)

            ENGINES[cache_key] = engine
            await self.schedule_engine_disposal(cache_key)
        return ENGINES[cache_key]

    async def schedule_engine_disposal(self, cache_key):
        """
        Dispose of an engine once the event loop is closing.

        See caveats at `add_event_loop_shutdown_callback`.

        We attempted to lazily clean up old engines when new engines are created, but
        if the loop the engine is attached to is already closed then the connections
        cannot be cleaned up properly and warnings are displayed.

        Engine disposal should only be important when running the application
        ephemerally. Notably, this is an issue in our tests where many short-lived event
        loops and engines are created which can consume all of the available database
        connection slots. Users operating at a scale where connection limits are
        encountered should be encouraged to use a standalone server.
        """

        async def dispose_engine(cache_key):
            engine = ENGINES.pop(cache_key, None)
            if engine:
                await engine.dispose()

        await add_event_loop_shutdown_callback(partial(dispose_engine, cache_key))

    def setup_sqlite(self, conn, record):
        """Issue PRAGMA statements to SQLITE on connect. PRAGMAs only last for the
        duration of the connection. See https://www.sqlite.org/pragma.html for more info.
        """
        # workaround sqlite transaction behavior
        self.begin_sqlite_conn(conn, record)

        cursor = conn.cursor()

        # write to a write-ahead-log instead and regularly commit the changes
        # this allows multiple concurrent readers even during a write transaction
        # even with the WAL we can get busy errors if we have transactions that:
        #  - t1 reads from a database
        #  - t2 inserts to the database
        #  - t1 tries to insert to the database
        # this can be resolved by using the IMMEDIATE transaction mode in t1
        cursor.execute("PRAGMA journal_mode = WAL;")

        # enable foreign keys
        cursor.execute("PRAGMA foreign_keys = ON;")

        # disable legacy alter table behavior as it will cause problems during
        # migrations when tables are renamed as references would otherwise be retained
        # in some locations
        # https://www.sqlite.org/pragma.html#pragma_legacy_alter_table
        cursor.execute("PRAGMA legacy_alter_table=OFF")

        # when using the WAL, we do need to sync changes on every write. sqlite
        # recommends using 'normal' mode which is much faster
        cursor.execute("PRAGMA synchronous = NORMAL;")

        # a higher cache size (default of 2000) for more aggressive performance
        cursor.execute("PRAGMA cache_size = 20000;")

        # wait for this amount of time while a table is locked
        # before returning and raising an error
        # setting the value very high allows for more 'concurrency'
        # without running into errors, but may result in slow api calls
        if PREFECT_UNIT_TEST_MODE.value() is True:
            cursor.execute("PRAGMA busy_timeout = 5000;")  # 5s
        else:
            cursor.execute("PRAGMA busy_timeout = 60000;")  # 60s

        cursor.close()

    def begin_sqlite_conn(self, conn, record):
        # disable pysqlite's emitting of the BEGIN statement entirely.
        # also stops it from emitting COMMIT before any DDL.
        # requires `begin_sqlite_stmt`
        # see https://docs.sqlalchemy.org/en/20/dialects/sqlite.html#serializable-isolation-savepoints-transactional-ddl
        conn.isolation_level = None

    def begin_sqlite_stmt(self, conn):
        # emit our own BEGIN
        # requires `begin_sqlite_conn`
        # see https://docs.sqlalchemy.org/en/20/dialects/sqlite.html#serializable-isolation-savepoints-transactional-ddl
        mode = SQLITE_BEGIN_MODE.get()
        if mode is not None:
            conn.exec_driver_sql(f"BEGIN {mode}")

        # Note this is intentionally a no-op if there is no BEGIN MODE set
        # This allows us to use SQLite's default behavior for reads which do not need
        # to be wrapped in a long-running transaction

    @asynccontextmanager
    async def begin_transaction(
        self, session: AsyncSession, with_for_update: bool = False
    ):
        token = SQLITE_BEGIN_MODE.set("IMMEDIATE" if with_for_update else "DEFERRED")

        try:
            async with session.begin() as transaction:
                yield transaction
        finally:
            SQLITE_BEGIN_MODE.reset(token)

    async def session(self, engine: AsyncEngine) -> AsyncSession:
        """
        Retrieves a SQLAlchemy session for an engine.

        Args:
            engine: a sqlalchemy engine
        """
        return AsyncSession(engine, expire_on_commit=False)

    async def create_db(self, connection, base_metadata):
        """Create the database"""

        await connection.run_sync(base_metadata.create_all)

    async def drop_db(self, connection, base_metadata):
        """Drop the database"""

        await connection.run_sync(base_metadata.drop_all)

    def is_inmemory(self):
        """Returns true if database is run in memory"""

        return ":memory:" in self.connection_url or "mode=memory" in self.connection_url

create_db(connection, base_metadata) async

Create the database

Source code in src/prefect/server/database/configurations.py
466
467
468
469
async def create_db(self, connection, base_metadata):
    """Create the database"""

    await connection.run_sync(base_metadata.create_all)

drop_db(connection, base_metadata) async

Drop the database

Source code in src/prefect/server/database/configurations.py
471
472
473
474
async def drop_db(self, connection, base_metadata):
    """Drop the database"""

    await connection.run_sync(base_metadata.drop_all)

engine() async

Retrieves an async SQLAlchemy engine.

Parameters:

Name Type Description Default
connection_url str

The database connection string. Defaults to self.connection_url

required
echo bool

Whether to echo SQL sent to the database. Defaults to self.echo

required
timeout float

The database statement timeout, in seconds. Defaults to self.timeout

required

Returns:

Name Type Description
AsyncEngine AsyncEngine

a SQLAlchemy engine

Source code in src/prefect/server/database/configurations.py
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
async def engine(self) -> AsyncEngine:
    """Retrieves an async SQLAlchemy engine.

    Args:
        connection_url (str, optional): The database connection string.
            Defaults to self.connection_url
        echo (bool, optional): Whether to echo SQL sent
            to the database. Defaults to self.echo
        timeout (float, optional): The database statement timeout, in seconds.
            Defaults to self.timeout

    Returns:
        AsyncEngine: a SQLAlchemy engine
    """

    if sqlite3.sqlite_version_info < self.MIN_SQLITE_VERSION:
        required = ".".join(str(v) for v in self.MIN_SQLITE_VERSION)
        raise RuntimeError(
            f"Prefect requires sqlite >= {required} but we found version "
            f"{sqlite3.sqlite_version}"
        )

    kwargs = {}

    loop = get_running_loop()

    cache_key = (
        loop,
        self.connection_url,
        self.echo,
        self.timeout,
    )
    if cache_key not in ENGINES:
        # apply database timeout
        if self.timeout is not None:
            kwargs["connect_args"] = dict(timeout=self.timeout)

        # use `named` paramstyle for sqlite instead of `qmark` in very rare
        # circumstances, we've seen aiosqlite pass parameters in the wrong
        # order; by using named parameters we avoid this issue
        # see https://github.com/PrefectHQ/prefect/pull/6702
        kwargs["paramstyle"] = "named"

        # ensure a long-lasting pool is used with in-memory databases
        # because they disappear when the last connection closes
        if ":memory:" in self.connection_url:
            kwargs.update(poolclass=sa.pool.SingletonThreadPool)

        engine = create_async_engine(self.connection_url, echo=self.echo, **kwargs)
        sa.event.listen(engine.sync_engine, "connect", self.setup_sqlite)
        sa.event.listen(engine.sync_engine, "begin", self.begin_sqlite_stmt)

        if TRACKER.active:
            TRACKER.track_pool(engine.pool)

        ENGINES[cache_key] = engine
        await self.schedule_engine_disposal(cache_key)
    return ENGINES[cache_key]

is_inmemory()

Returns true if database is run in memory

Source code in src/prefect/server/database/configurations.py
476
477
478
479
def is_inmemory(self):
    """Returns true if database is run in memory"""

    return ":memory:" in self.connection_url or "mode=memory" in self.connection_url

schedule_engine_disposal(cache_key) async

Dispose of an engine once the event loop is closing.

See caveats at add_event_loop_shutdown_callback.

We attempted to lazily clean up old engines when new engines are created, but if the loop the engine is attached to is already closed then the connections cannot be cleaned up properly and warnings are displayed.

Engine disposal should only be important when running the application ephemerally. Notably, this is an issue in our tests where many short-lived event loops and engines are created which can consume all of the available database connection slots. Users operating at a scale where connection limits are encountered should be encouraged to use a standalone server.

Source code in src/prefect/server/database/configurations.py
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
async def schedule_engine_disposal(self, cache_key):
    """
    Dispose of an engine once the event loop is closing.

    See caveats at `add_event_loop_shutdown_callback`.

    We attempted to lazily clean up old engines when new engines are created, but
    if the loop the engine is attached to is already closed then the connections
    cannot be cleaned up properly and warnings are displayed.

    Engine disposal should only be important when running the application
    ephemerally. Notably, this is an issue in our tests where many short-lived event
    loops and engines are created which can consume all of the available database
    connection slots. Users operating at a scale where connection limits are
    encountered should be encouraged to use a standalone server.
    """

    async def dispose_engine(cache_key):
        engine = ENGINES.pop(cache_key, None)
        if engine:
            await engine.dispose()

    await add_event_loop_shutdown_callback(partial(dispose_engine, cache_key))

session(engine) async

Retrieves a SQLAlchemy session for an engine.

Parameters:

Name Type Description Default
engine AsyncEngine

a sqlalchemy engine

required
Source code in src/prefect/server/database/configurations.py
457
458
459
460
461
462
463
464
async def session(self, engine: AsyncEngine) -> AsyncSession:
    """
    Retrieves a SQLAlchemy session for an engine.

    Args:
        engine: a sqlalchemy engine
    """
    return AsyncSession(engine, expire_on_commit=False)

setup_sqlite(conn, record)

Issue PRAGMA statements to SQLITE on connect. PRAGMAs only last for the duration of the connection. See https://www.sqlite.org/pragma.html for more info.

Source code in src/prefect/server/database/configurations.py
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
def setup_sqlite(self, conn, record):
    """Issue PRAGMA statements to SQLITE on connect. PRAGMAs only last for the
    duration of the connection. See https://www.sqlite.org/pragma.html for more info.
    """
    # workaround sqlite transaction behavior
    self.begin_sqlite_conn(conn, record)

    cursor = conn.cursor()

    # write to a write-ahead-log instead and regularly commit the changes
    # this allows multiple concurrent readers even during a write transaction
    # even with the WAL we can get busy errors if we have transactions that:
    #  - t1 reads from a database
    #  - t2 inserts to the database
    #  - t1 tries to insert to the database
    # this can be resolved by using the IMMEDIATE transaction mode in t1
    cursor.execute("PRAGMA journal_mode = WAL;")

    # enable foreign keys
    cursor.execute("PRAGMA foreign_keys = ON;")

    # disable legacy alter table behavior as it will cause problems during
    # migrations when tables are renamed as references would otherwise be retained
    # in some locations
    # https://www.sqlite.org/pragma.html#pragma_legacy_alter_table
    cursor.execute("PRAGMA legacy_alter_table=OFF")

    # when using the WAL, we do need to sync changes on every write. sqlite
    # recommends using 'normal' mode which is much faster
    cursor.execute("PRAGMA synchronous = NORMAL;")

    # a higher cache size (default of 2000) for more aggressive performance
    cursor.execute("PRAGMA cache_size = 20000;")

    # wait for this amount of time while a table is locked
    # before returning and raising an error
    # setting the value very high allows for more 'concurrency'
    # without running into errors, but may result in slow api calls
    if PREFECT_UNIT_TEST_MODE.value() is True:
        cursor.execute("PRAGMA busy_timeout = 5000;")  # 5s
    else:
        cursor.execute("PRAGMA busy_timeout = 60000;")  # 60s

    cursor.close()

AsyncPostgresConfiguration

Bases: BaseDatabaseConfiguration

Source code in src/prefect/server/database/configurations.py
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
class AsyncPostgresConfiguration(BaseDatabaseConfiguration):
    async def engine(self) -> AsyncEngine:
        """Retrieves an async SQLAlchemy engine.

        Args:
            connection_url (str, optional): The database connection string.
                Defaults to self.connection_url
            echo (bool, optional): Whether to echo SQL sent
                to the database. Defaults to self.echo
            timeout (float, optional): The database statement timeout, in seconds.
                Defaults to self.timeout

        Returns:
            AsyncEngine: a SQLAlchemy engine
        """

        loop = get_running_loop()

        cache_key = (
            loop,
            self.connection_url,
            self.echo,
            self.timeout,
        )
        if cache_key not in ENGINES:
            # apply database timeout
            kwargs = dict()
            connect_args = dict()

            if self.timeout is not None:
                connect_args["command_timeout"] = self.timeout

            if self.connection_timeout is not None:
                connect_args["timeout"] = self.connection_timeout

            if connect_args:
                connect_args["server_settings"] = {"jit": "off"}
                kwargs["connect_args"] = connect_args

            if self.sqlalchemy_pool_size is not None:
                kwargs["pool_size"] = self.sqlalchemy_pool_size

            if self.sqlalchemy_max_overflow is not None:
                kwargs["max_overflow"] = self.sqlalchemy_max_overflow

            engine = create_async_engine(
                self.connection_url,
                echo=self.echo,
                # "pre-ping" connections upon checkout to ensure they have not been
                # closed on the server side
                pool_pre_ping=True,
                # Use connections in LIFO order to help reduce connections
                # after spiky load and in general increase the likelihood
                # that a given connection pulled from the pool will be
                # usable.
                pool_use_lifo=True,
                **kwargs,
            )

            if TRACKER.active:
                TRACKER.track_pool(engine.pool)

            ENGINES[cache_key] = engine
            await self.schedule_engine_disposal(cache_key)
        return ENGINES[cache_key]

    async def schedule_engine_disposal(self, cache_key):
        """
        Dispose of an engine once the event loop is closing.

        See caveats at `add_event_loop_shutdown_callback`.

        We attempted to lazily clean up old engines when new engines are created, but
        if the loop the engine is attached to is already closed then the connections
        cannot be cleaned up properly and warnings are displayed.

        Engine disposal should only be important when running the application
        ephemerally. Notably, this is an issue in our tests where many short-lived event
        loops and engines are created which can consume all of the available database
        connection slots. Users operating at a scale where connection limits are
        encountered should be encouraged to use a standalone server.
        """

        async def dispose_engine(cache_key):
            engine = ENGINES.pop(cache_key, None)
            if engine:
                await engine.dispose()

        await add_event_loop_shutdown_callback(partial(dispose_engine, cache_key))

    async def session(self, engine: AsyncEngine) -> AsyncSession:
        """
        Retrieves a SQLAlchemy session for an engine.

        Args:
            engine: a sqlalchemy engine
        """
        return AsyncSession(engine, expire_on_commit=False)

    @asynccontextmanager
    async def begin_transaction(
        self, session: AsyncSession, with_for_update: bool = False
    ):
        # `with_for_update` is for SQLite only. For Postgres, lock the row on read
        # for update instead.
        async with session.begin() as transaction:
            yield transaction

    async def create_db(self, connection, base_metadata):
        """Create the database"""

        await connection.run_sync(base_metadata.create_all)

    async def drop_db(self, connection, base_metadata):
        """Drop the database"""

        await connection.run_sync(base_metadata.drop_all)

    def is_inmemory(self) -> Literal[False]:
        """Returns true if database is run in memory"""

        return False

create_db(connection, base_metadata) async

Create the database

Source code in src/prefect/server/database/configurations.py
279
280
281
282
async def create_db(self, connection, base_metadata):
    """Create the database"""

    await connection.run_sync(base_metadata.create_all)

drop_db(connection, base_metadata) async

Drop the database

Source code in src/prefect/server/database/configurations.py
284
285
286
287
async def drop_db(self, connection, base_metadata):
    """Drop the database"""

    await connection.run_sync(base_metadata.drop_all)

engine() async

Retrieves an async SQLAlchemy engine.

Parameters:

Name Type Description Default
connection_url str

The database connection string. Defaults to self.connection_url

required
echo bool

Whether to echo SQL sent to the database. Defaults to self.echo

required
timeout float

The database statement timeout, in seconds. Defaults to self.timeout

required

Returns:

Name Type Description
AsyncEngine AsyncEngine

a SQLAlchemy engine

Source code in src/prefect/server/database/configurations.py
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
async def engine(self) -> AsyncEngine:
    """Retrieves an async SQLAlchemy engine.

    Args:
        connection_url (str, optional): The database connection string.
            Defaults to self.connection_url
        echo (bool, optional): Whether to echo SQL sent
            to the database. Defaults to self.echo
        timeout (float, optional): The database statement timeout, in seconds.
            Defaults to self.timeout

    Returns:
        AsyncEngine: a SQLAlchemy engine
    """

    loop = get_running_loop()

    cache_key = (
        loop,
        self.connection_url,
        self.echo,
        self.timeout,
    )
    if cache_key not in ENGINES:
        # apply database timeout
        kwargs = dict()
        connect_args = dict()

        if self.timeout is not None:
            connect_args["command_timeout"] = self.timeout

        if self.connection_timeout is not None:
            connect_args["timeout"] = self.connection_timeout

        if connect_args:
            connect_args["server_settings"] = {"jit": "off"}
            kwargs["connect_args"] = connect_args

        if self.sqlalchemy_pool_size is not None:
            kwargs["pool_size"] = self.sqlalchemy_pool_size

        if self.sqlalchemy_max_overflow is not None:
            kwargs["max_overflow"] = self.sqlalchemy_max_overflow

        engine = create_async_engine(
            self.connection_url,
            echo=self.echo,
            # "pre-ping" connections upon checkout to ensure they have not been
            # closed on the server side
            pool_pre_ping=True,
            # Use connections in LIFO order to help reduce connections
            # after spiky load and in general increase the likelihood
            # that a given connection pulled from the pool will be
            # usable.
            pool_use_lifo=True,
            **kwargs,
        )

        if TRACKER.active:
            TRACKER.track_pool(engine.pool)

        ENGINES[cache_key] = engine
        await self.schedule_engine_disposal(cache_key)
    return ENGINES[cache_key]

is_inmemory()

Returns true if database is run in memory

Source code in src/prefect/server/database/configurations.py
289
290
291
292
def is_inmemory(self) -> Literal[False]:
    """Returns true if database is run in memory"""

    return False

schedule_engine_disposal(cache_key) async

Dispose of an engine once the event loop is closing.

See caveats at add_event_loop_shutdown_callback.

We attempted to lazily clean up old engines when new engines are created, but if the loop the engine is attached to is already closed then the connections cannot be cleaned up properly and warnings are displayed.

Engine disposal should only be important when running the application ephemerally. Notably, this is an issue in our tests where many short-lived event loops and engines are created which can consume all of the available database connection slots. Users operating at a scale where connection limits are encountered should be encouraged to use a standalone server.

Source code in src/prefect/server/database/configurations.py
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
async def schedule_engine_disposal(self, cache_key):
    """
    Dispose of an engine once the event loop is closing.

    See caveats at `add_event_loop_shutdown_callback`.

    We attempted to lazily clean up old engines when new engines are created, but
    if the loop the engine is attached to is already closed then the connections
    cannot be cleaned up properly and warnings are displayed.

    Engine disposal should only be important when running the application
    ephemerally. Notably, this is an issue in our tests where many short-lived event
    loops and engines are created which can consume all of the available database
    connection slots. Users operating at a scale where connection limits are
    encountered should be encouraged to use a standalone server.
    """

    async def dispose_engine(cache_key):
        engine = ENGINES.pop(cache_key, None)
        if engine:
            await engine.dispose()

    await add_event_loop_shutdown_callback(partial(dispose_engine, cache_key))

session(engine) async

Retrieves a SQLAlchemy session for an engine.

Parameters:

Name Type Description Default
engine AsyncEngine

a sqlalchemy engine

required
Source code in src/prefect/server/database/configurations.py
261
262
263
264
265
266
267
268
async def session(self, engine: AsyncEngine) -> AsyncSession:
    """
    Retrieves a SQLAlchemy session for an engine.

    Args:
        engine: a sqlalchemy engine
    """
    return AsyncSession(engine, expire_on_commit=False)

BaseDatabaseConfiguration

Bases: ABC

Abstract base class used to inject database connection configuration into Prefect.

This configuration is responsible for defining how Prefect REST API creates and manages database connections and sessions.

Source code in src/prefect/server/database/configurations.py
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
class BaseDatabaseConfiguration(ABC):
    """
    Abstract base class used to inject database connection configuration into Prefect.

    This configuration is responsible for defining how Prefect REST API creates and manages
    database connections and sessions.
    """

    def __init__(
        self,
        connection_url: str,
        echo: Optional[bool] = None,
        timeout: Optional[float] = None,
        connection_timeout: Optional[float] = None,
        sqlalchemy_pool_size: Optional[int] = None,
        sqlalchemy_max_overflow: Optional[int] = None,
    ):
        self.connection_url = connection_url
        self.echo = echo or PREFECT_API_DATABASE_ECHO.value()
        self.timeout = timeout or PREFECT_API_DATABASE_TIMEOUT.value()
        self.connection_timeout = (
            connection_timeout or PREFECT_API_DATABASE_CONNECTION_TIMEOUT.value()
        )
        self.sqlalchemy_pool_size = (
            sqlalchemy_pool_size or PREFECT_SQLALCHEMY_POOL_SIZE.value()
        )
        self.sqlalchemy_max_overflow = (
            sqlalchemy_max_overflow or PREFECT_SQLALCHEMY_MAX_OVERFLOW.value()
        )

    def _unique_key(self) -> Tuple[Hashable, ...]:
        """
        Returns a key used to determine whether to instantiate a new DB interface.
        """
        return (self.__class__, self.connection_url)

    @abstractmethod
    async def engine(self) -> AsyncEngine:
        """Returns a SqlAlchemy engine"""

    @abstractmethod
    async def session(self, engine: AsyncEngine) -> AsyncSession:
        """
        Retrieves a SQLAlchemy session for an engine.
        """

    @abstractmethod
    async def create_db(self, connection, base_metadata):
        """Create the database"""

    @abstractmethod
    async def drop_db(self, connection, base_metadata):
        """Drop the database"""

    @abstractmethod
    def is_inmemory(self) -> bool:
        """Returns true if database is run in memory"""

    @abstractmethod
    async def begin_transaction(
        self, session: AsyncSession, with_for_update: bool = False
    ):
        """Enter a transaction for a session"""
        pass

begin_transaction(session, with_for_update=False) abstractmethod async

Enter a transaction for a session

Source code in src/prefect/server/database/configurations.py
163
164
165
166
167
168
@abstractmethod
async def begin_transaction(
    self, session: AsyncSession, with_for_update: bool = False
):
    """Enter a transaction for a session"""
    pass

create_db(connection, base_metadata) abstractmethod async

Create the database

Source code in src/prefect/server/database/configurations.py
151
152
153
@abstractmethod
async def create_db(self, connection, base_metadata):
    """Create the database"""

drop_db(connection, base_metadata) abstractmethod async

Drop the database

Source code in src/prefect/server/database/configurations.py
155
156
157
@abstractmethod
async def drop_db(self, connection, base_metadata):
    """Drop the database"""

engine() abstractmethod async

Returns a SqlAlchemy engine

Source code in src/prefect/server/database/configurations.py
141
142
143
@abstractmethod
async def engine(self) -> AsyncEngine:
    """Returns a SqlAlchemy engine"""

is_inmemory() abstractmethod

Returns true if database is run in memory

Source code in src/prefect/server/database/configurations.py
159
160
161
@abstractmethod
def is_inmemory(self) -> bool:
    """Returns true if database is run in memory"""

session(engine) abstractmethod async

Retrieves a SQLAlchemy session for an engine.

Source code in src/prefect/server/database/configurations.py
145
146
147
148
149
@abstractmethod
async def session(self, engine: AsyncEngine) -> AsyncSession:
    """
    Retrieves a SQLAlchemy session for an engine.
    """

ConnectionTracker

A test utility which tracks the connections given out by a connection pool, to make it easy to see which connections are currently checked out and open.

Source code in src/prefect/server/database/configurations.py
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
class ConnectionTracker:
    """A test utility which tracks the connections given out by a connection pool, to
    make it easy to see which connections are currently checked out and open."""

    all_connections: Dict[AdaptedConnection, str]
    open_connections: Dict[AdaptedConnection, str]
    left_field_closes: Dict[AdaptedConnection, str]
    connects: int
    closes: int
    active: bool

    def __init__(self) -> None:
        self.active = False
        self.all_connections = {}
        self.open_connections = {}
        self.left_field_closes = {}
        self.connects = 0
        self.closes = 0

    def track_pool(self, pool: sa.pool.Pool):
        sa.event.listen(pool, "connect", self.on_connect)
        sa.event.listen(pool, "close", self.on_close)
        sa.event.listen(pool, "close_detached", self.on_close_detached)

    def on_connect(
        self,
        adapted_connection: AdaptedConnection,
        connection_record: ConnectionPoolEntry,
    ):
        self.all_connections[adapted_connection] = traceback.format_stack()
        self.open_connections[adapted_connection] = traceback.format_stack()
        self.connects += 1

    def on_close(
        self,
        adapted_connection: AdaptedConnection,
        connection_record: ConnectionPoolEntry,
    ):
        try:
            del self.open_connections[adapted_connection]
        except KeyError:
            self.left_field_closes[adapted_connection] = traceback.format_stack()
        self.closes += 1

    def on_close_detached(
        self,
        adapted_connection: AdaptedConnection,
    ):
        try:
            del self.open_connections[adapted_connection]
        except KeyError:
            self.left_field_closes[adapted_connection] = traceback.format_stack()
        self.closes += 1

    def clear(self):
        self.all_connections.clear()
        self.open_connections.clear()
        self.left_field_closes.clear()
        self.connects = 0
        self.closes = 0