Skip to content

prefect.server.database.interface

DBSingleton

Bases: type

Ensures that only one database interface is created per unique key

Source code in src/prefect/server/database/interface.py
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
class DBSingleton(type):
    """Ensures that only one database interface is created per unique key"""

    _instances = dict()

    def __call__(cls, *args, **kwargs):
        unique_key = (
            cls.__name__,
            kwargs["database_config"]._unique_key(),
            kwargs["query_components"]._unique_key(),
            kwargs["orm"]._unique_key(),
        )
        if unique_key not in cls._instances:
            cls._instances[unique_key] = super(DBSingleton, cls).__call__(
                *args, **kwargs
            )
        return cls._instances[unique_key]

PrefectDBInterface

An interface for backend-specific SqlAlchemy actions and ORM models.

The REST API can be configured to run against different databases in order maintain performance at different scales. This interface integrates database- and dialect- specific configuration into a unified interface that the orchestration engine runs against.

Source code in src/prefect/server/database/interface.py
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
class PrefectDBInterface(metaclass=DBSingleton):
    """
    An interface for backend-specific SqlAlchemy actions and ORM models.

    The REST API can be configured to run against different databases in order maintain
    performance at different scales. This interface integrates database- and dialect-
    specific configuration into a unified interface that the orchestration engine runs
    against.
    """

    def __init__(
        self,
        database_config: BaseDatabaseConfiguration,
        query_components: BaseQueryComponents,
        orm: orm_models.BaseORMConfiguration,
    ):
        self.database_config = database_config
        self.queries = query_components
        self.orm = orm

    async def create_db(self):
        """Create the database"""
        await self.run_migrations_upgrade()

    async def drop_db(self):
        """Drop the database"""
        await self.run_migrations_downgrade(revision="base")

    async def run_migrations_upgrade(self):
        """Run all upgrade migrations"""
        await run_sync_in_worker_thread(alembic_upgrade)

    async def run_migrations_downgrade(self, revision: str = "-1"):
        """Run all downgrade migrations"""
        await run_sync_in_worker_thread(alembic_downgrade, revision=revision)

    async def is_db_connectable(self):
        """
        Returns boolean indicating if the database is connectable.
        This method is used to determine if the server is ready to accept requests.
        """
        engine = await self.engine()
        try:
            async with engine.connect():
                return True
        except Exception:
            return False

    async def engine(self) -> AsyncEngine:
        """
        Provides a SqlAlchemy engine against a specific database.
        """
        engine = await self.database_config.engine()

        return engine

    async def session(self):
        """
        Provides a SQLAlchemy session.
        """
        engine = await self.engine()
        return await self.database_config.session(engine)

    @asynccontextmanager
    async def session_context(
        self, begin_transaction: bool = False, with_for_update: bool = False
    ):
        """
        Provides a SQLAlchemy session and a context manager for opening/closing
        the underlying connection.

        Args:
            begin_transaction: if True, the context manager will begin a SQL transaction.
                Exiting the context manager will COMMIT or ROLLBACK any changes.
        """
        session = await self.session()
        async with session:
            if begin_transaction:
                async with self.database_config.begin_transaction(
                    session, with_for_update=with_for_update
                ):
                    yield session
            else:
                yield session

    @property
    def dialect(self) -> sa.engine.Dialect:
        return get_dialect(self.database_config.connection_url)

    @property
    def Base(self):
        """Base class for orm models"""
        return orm_models.Base

    @property
    def Flow(self):
        """A flow orm model"""
        return orm_models.Flow

    @property
    def FlowRun(self):
        """A flow run orm model"""
        return orm_models.FlowRun

    @property
    def FlowRunState(self):
        """A flow run state orm model"""
        return orm_models.FlowRunState

    @property
    def TaskRun(self):
        """A task run orm model"""
        return orm_models.TaskRun

    @property
    def TaskRunState(self):
        """A task run state orm model"""
        return orm_models.TaskRunState

    @property
    def Artifact(self):
        """An artifact orm model"""
        return orm_models.Artifact

    @property
    def ArtifactCollection(self):
        """An artifact collection orm model"""
        return orm_models.ArtifactCollection

    @property
    def TaskRunStateCache(self):
        """A task run state cache orm model"""
        return orm_models.TaskRunStateCache

    @property
    def Deployment(self):
        """A deployment orm model"""
        return orm_models.Deployment

    @property
    def DeploymentSchedule(self):
        """A deployment schedule orm model"""
        return orm_models.DeploymentSchedule

    @property
    def SavedSearch(self):
        """A saved search orm model"""
        return orm_models.SavedSearch

    @property
    def WorkPool(self):
        """A work pool orm model"""
        return orm_models.WorkPool

    @property
    def Worker(self):
        """A worker process orm model"""
        return orm_models.Worker

    @property
    def Log(self):
        """A log orm model"""
        return orm_models.Log

    @property
    def ConcurrencyLimit(self):
        """A concurrency model"""
        return orm_models.ConcurrencyLimit

    @property
    def ConcurrencyLimitV2(self):
        """A v2 concurrency model"""
        return orm_models.ConcurrencyLimitV2

    @property
    def CsrfToken(self):
        """A csrf token model"""
        return orm_models.CsrfToken

    @property
    def WorkQueue(self):
        """A work queue model"""
        return orm_models.WorkQueue

    @property
    def Agent(self):
        """An agent model"""
        return orm_models.Agent

    @property
    def BlockType(self):
        """A block type model"""
        return orm_models.BlockType

    @property
    def BlockSchema(self):
        """A block schema model"""
        return orm_models.BlockSchema

    @property
    def BlockSchemaReference(self):
        """A block schema reference model"""
        return orm_models.BlockSchemaReference

    @property
    def BlockDocument(self):
        """A block document model"""
        return orm_models.BlockDocument

    @property
    def BlockDocumentReference(self):
        """A block document reference model"""
        return orm_models.BlockDocumentReference

    @property
    def FlowRunNotificationPolicy(self):
        """A flow run notification policy model"""
        return orm_models.FlowRunNotificationPolicy

    @property
    def FlowRunNotificationQueue(self):
        """A flow run notification queue model"""
        return orm_models.FlowRunNotificationQueue

    @property
    def Configuration(self):
        """An configuration model"""
        return orm_models.Configuration

    @property
    def Variable(self):
        """A variable model"""
        return orm_models.Variable

    @property
    def FlowRunInput(self):
        """A flow run input model"""
        return orm_models.FlowRunInput

    @property
    def Automation(self):
        """An automation model"""
        return orm_models.Automation

    @property
    def AutomationBucket(self):
        """An automation bucket model"""
        return orm_models.AutomationBucket

    @property
    def AutomationRelatedResource(self):
        """An automation related resource model"""
        return orm_models.AutomationRelatedResource

    @property
    def CompositeTriggerChildFiring(self):
        """A model capturing a composite trigger's child firing"""
        return orm_models.CompositeTriggerChildFiring

    @property
    def AutomationEventFollower(self):
        """A model capturing one event following another event"""
        return orm_models.AutomationEventFollower

    @property
    def Event(self):
        """An event model"""
        return orm_models.Event

    @property
    def EventResource(self):
        """An event resource model"""
        return orm_models.EventResource

    @property
    def deployment_unique_upsert_columns(self):
        """Unique columns for upserting a Deployment"""
        return self.orm.deployment_unique_upsert_columns

    @property
    def concurrency_limit_unique_upsert_columns(self):
        """Unique columns for upserting a ConcurrencyLimit"""
        return self.orm.concurrency_limit_unique_upsert_columns

    @property
    def flow_run_unique_upsert_columns(self):
        """Unique columns for upserting a FlowRun"""
        return self.orm.flow_run_unique_upsert_columns

    @property
    def artifact_collection_unique_upsert_columns(self):
        """Unique columns for upserting an ArtifactCollection"""
        return self.orm.artifact_collection_unique_upsert_columns

    @property
    def block_type_unique_upsert_columns(self):
        """Unique columns for upserting a BlockType"""
        return self.orm.block_type_unique_upsert_columns

    @property
    def block_schema_unique_upsert_columns(self):
        """Unique columns for upserting a BlockSchema"""
        return self.orm.block_schema_unique_upsert_columns

    @property
    def flow_unique_upsert_columns(self):
        """Unique columns for upserting a Flow"""
        return self.orm.flow_unique_upsert_columns

    @property
    def saved_search_unique_upsert_columns(self):
        """Unique columns for upserting a SavedSearch"""
        return self.orm.saved_search_unique_upsert_columns

    @property
    def task_run_unique_upsert_columns(self):
        """Unique columns for upserting a TaskRun"""
        return self.orm.task_run_unique_upsert_columns

    @property
    def block_document_unique_upsert_columns(self):
        """Unique columns for upserting a BlockDocument"""
        return self.orm.block_document_unique_upsert_columns

    def insert(self, model):
        """INSERTs a model into the database"""
        return self.queries.insert(model)

    def greatest(self, *values):
        return self.queries.greatest(*values)

    def make_timestamp_intervals(
        self,
        start_time: datetime.datetime,
        end_time: datetime.datetime,
        interval: datetime.timedelta,
    ):
        return self.queries.make_timestamp_intervals(start_time, end_time, interval)

    def set_state_id_on_inserted_flow_runs_statement(
        self, inserted_flow_run_ids, insert_flow_run_states
    ):
        """Given a list of flow run ids and associated states, set the state_id
        to the appropriate state for all flow runs"""
        return self.queries.set_state_id_on_inserted_flow_runs_statement(
            orm_models.FlowRun,
            orm_models.FlowRunState,
            inserted_flow_run_ids,
            insert_flow_run_states,
        )

    @property
    def uses_json_strings(self):
        return self.queries.uses_json_strings

    def cast_to_json(self, json_obj):
        return self.queries.cast_to_json(json_obj)

    def build_json_object(self, *args):
        return self.queries.build_json_object(*args)

    def json_arr_agg(self, json_array):
        return self.queries.json_arr_agg(json_array)

    async def get_flow_run_notifications_from_queue(
        self, session: sa.orm.Session, limit: int
    ):
        return await self.queries.get_flow_run_notifications_from_queue(
            session=session, limit=limit
        )

    async def read_configuration_value(self, session: sa.orm.Session, key: str):
        """Read a configuration value"""
        return await self.queries.read_configuration_value(session=session, key=key)

    def clear_configuration_value_cache_for_key(self, key: str):
        """Removes a configuration key from the cache."""
        return self.queries.clear_configuration_value_cache_for_key(key=key)

Agent property

An agent model

Artifact property

An artifact orm model

ArtifactCollection property

An artifact collection orm model

Automation property

An automation model

AutomationBucket property

An automation bucket model

AutomationEventFollower property

A model capturing one event following another event

AutomationRelatedResource property

An automation related resource model

Base property

Base class for orm models

BlockDocument property

A block document model

BlockDocumentReference property

A block document reference model

BlockSchema property

A block schema model

BlockSchemaReference property

A block schema reference model

BlockType property

A block type model

CompositeTriggerChildFiring property

A model capturing a composite trigger's child firing

ConcurrencyLimit property

A concurrency model

ConcurrencyLimitV2 property

A v2 concurrency model

Configuration property

An configuration model

CsrfToken property

A csrf token model

Deployment property

A deployment orm model

DeploymentSchedule property

A deployment schedule orm model

Event property

An event model

EventResource property

An event resource model

Flow property

A flow orm model

FlowRun property

A flow run orm model

FlowRunInput property

A flow run input model

FlowRunNotificationPolicy property

A flow run notification policy model

FlowRunNotificationQueue property

A flow run notification queue model

FlowRunState property

A flow run state orm model

Log property

A log orm model

SavedSearch property

A saved search orm model

TaskRun property

A task run orm model

TaskRunState property

A task run state orm model

TaskRunStateCache property

A task run state cache orm model

Variable property

A variable model

WorkPool property

A work pool orm model

WorkQueue property

A work queue model

Worker property

A worker process orm model

artifact_collection_unique_upsert_columns property

Unique columns for upserting an ArtifactCollection

block_document_unique_upsert_columns property

Unique columns for upserting a BlockDocument

block_schema_unique_upsert_columns property

Unique columns for upserting a BlockSchema

block_type_unique_upsert_columns property

Unique columns for upserting a BlockType

concurrency_limit_unique_upsert_columns property

Unique columns for upserting a ConcurrencyLimit

deployment_unique_upsert_columns property

Unique columns for upserting a Deployment

flow_run_unique_upsert_columns property

Unique columns for upserting a FlowRun

flow_unique_upsert_columns property

Unique columns for upserting a Flow

saved_search_unique_upsert_columns property

Unique columns for upserting a SavedSearch

task_run_unique_upsert_columns property

Unique columns for upserting a TaskRun

clear_configuration_value_cache_for_key(key)

Removes a configuration key from the cache.

Source code in src/prefect/server/database/interface.py
409
410
411
def clear_configuration_value_cache_for_key(self, key: str):
    """Removes a configuration key from the cache."""
    return self.queries.clear_configuration_value_cache_for_key(key=key)

create_db() async

Create the database

Source code in src/prefect/server/database/interface.py
54
55
56
async def create_db(self):
    """Create the database"""
    await self.run_migrations_upgrade()

drop_db() async

Drop the database

Source code in src/prefect/server/database/interface.py
58
59
60
async def drop_db(self):
    """Drop the database"""
    await self.run_migrations_downgrade(revision="base")

engine() async

Provides a SqlAlchemy engine against a specific database.

Source code in src/prefect/server/database/interface.py
82
83
84
85
86
87
88
async def engine(self) -> AsyncEngine:
    """
    Provides a SqlAlchemy engine against a specific database.
    """
    engine = await self.database_config.engine()

    return engine

insert(model)

INSERTs a model into the database

Source code in src/prefect/server/database/interface.py
358
359
360
def insert(self, model):
    """INSERTs a model into the database"""
    return self.queries.insert(model)

is_db_connectable() async

Returns boolean indicating if the database is connectable. This method is used to determine if the server is ready to accept requests.

Source code in src/prefect/server/database/interface.py
70
71
72
73
74
75
76
77
78
79
80
async def is_db_connectable(self):
    """
    Returns boolean indicating if the database is connectable.
    This method is used to determine if the server is ready to accept requests.
    """
    engine = await self.engine()
    try:
        async with engine.connect():
            return True
    except Exception:
        return False

read_configuration_value(session, key) async

Read a configuration value

Source code in src/prefect/server/database/interface.py
405
406
407
async def read_configuration_value(self, session: sa.orm.Session, key: str):
    """Read a configuration value"""
    return await self.queries.read_configuration_value(session=session, key=key)

run_migrations_downgrade(revision='-1') async

Run all downgrade migrations

Source code in src/prefect/server/database/interface.py
66
67
68
async def run_migrations_downgrade(self, revision: str = "-1"):
    """Run all downgrade migrations"""
    await run_sync_in_worker_thread(alembic_downgrade, revision=revision)

run_migrations_upgrade() async

Run all upgrade migrations

Source code in src/prefect/server/database/interface.py
62
63
64
async def run_migrations_upgrade(self):
    """Run all upgrade migrations"""
    await run_sync_in_worker_thread(alembic_upgrade)

session() async

Provides a SQLAlchemy session.

Source code in src/prefect/server/database/interface.py
90
91
92
93
94
95
async def session(self):
    """
    Provides a SQLAlchemy session.
    """
    engine = await self.engine()
    return await self.database_config.session(engine)

session_context(begin_transaction=False, with_for_update=False) async

Provides a SQLAlchemy session and a context manager for opening/closing the underlying connection.

Parameters:

Name Type Description Default
begin_transaction bool

if True, the context manager will begin a SQL transaction. Exiting the context manager will COMMIT or ROLLBACK any changes.

False
Source code in src/prefect/server/database/interface.py
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
@asynccontextmanager
async def session_context(
    self, begin_transaction: bool = False, with_for_update: bool = False
):
    """
    Provides a SQLAlchemy session and a context manager for opening/closing
    the underlying connection.

    Args:
        begin_transaction: if True, the context manager will begin a SQL transaction.
            Exiting the context manager will COMMIT or ROLLBACK any changes.
    """
    session = await self.session()
    async with session:
        if begin_transaction:
            async with self.database_config.begin_transaction(
                session, with_for_update=with_for_update
            ):
                yield session
        else:
            yield session

set_state_id_on_inserted_flow_runs_statement(inserted_flow_run_ids, insert_flow_run_states)

Given a list of flow run ids and associated states, set the state_id to the appropriate state for all flow runs

Source code in src/prefect/server/database/interface.py
373
374
375
376
377
378
379
380
381
382
383
def set_state_id_on_inserted_flow_runs_statement(
    self, inserted_flow_run_ids, insert_flow_run_states
):
    """Given a list of flow run ids and associated states, set the state_id
    to the appropriate state for all flow runs"""
    return self.queries.set_state_id_on_inserted_flow_runs_statement(
        orm_models.FlowRun,
        orm_models.FlowRunState,
        inserted_flow_run_ids,
        insert_flow_run_states,
    )