Skip to content

prefect.server.models.block_documents

Functions for interacting with block document ORM objects. Intended for internal use by the Prefect REST API.

count_block_documents(session, block_document_filter=None, block_type_filter=None, block_schema_filter=None) async

Count block documents that match the filters.

Source code in src/prefect/server/models/block_documents.py
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
async def count_block_documents(
    session: AsyncSession,
    block_document_filter: Optional[schemas.filters.BlockDocumentFilter] = None,
    block_type_filter: Optional[schemas.filters.BlockTypeFilter] = None,
    block_schema_filter: Optional[schemas.filters.BlockSchemaFilter] = None,
) -> int:
    """
    Count block documents that match the filters.
    """
    query = sa.select(sa.func.count()).select_from(orm_models.BlockDocument)

    query = _apply_block_document_filters(
        query=query,
        block_document_filter=block_document_filter,
        block_schema_filter=block_schema_filter,
        block_type_filter=block_type_filter,
    )

    result = await session.execute(query)
    return result.scalar()  # type: ignore

read_block_document_by_name(session, name, block_type_slug, include_secrets=False) async

Read a block document with the given name and block type slug.

Source code in src/prefect/server/models/block_documents.py
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
async def read_block_document_by_name(
    session: AsyncSession,
    name: str,
    block_type_slug: str,
    include_secrets: bool = False,
) -> Union[BlockDocument, None]:
    """
    Read a block document with the given name and block type slug.
    """
    block_documents = await read_block_documents(
        session=session,
        block_document_filter=schemas.filters.BlockDocumentFilter(
            name=dict(any_=[name]),
            # don't apply any anonymous filtering
            is_anonymous=None,
        ),
        block_type_filter=schemas.filters.BlockTypeFilter(
            slug=dict(any_=[block_type_slug])
        ),
        include_secrets=include_secrets,
        limit=1,
    )
    return block_documents[0] if block_documents else None

read_block_documents(session, block_document_filter=None, block_type_filter=None, block_schema_filter=None, include_secrets=False, sort=schemas.sorting.BlockDocumentSort.NAME_ASC, offset=None, limit=None) async

Read block documents with an optional limit and offset

Source code in src/prefect/server/models/block_documents.py
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
async def read_block_documents(
    session: AsyncSession,
    block_document_filter: Optional[schemas.filters.BlockDocumentFilter] = None,
    block_type_filter: Optional[schemas.filters.BlockTypeFilter] = None,
    block_schema_filter: Optional[schemas.filters.BlockSchemaFilter] = None,
    include_secrets: bool = False,
    sort: schemas.sorting.BlockDocumentSort = schemas.sorting.BlockDocumentSort.NAME_ASC,
    offset: Optional[int] = None,
    limit: Optional[int] = None,
) -> List[BlockDocument]:
    """
    Read block documents with an optional limit and offset
    """
    # --- Build an initial query that filters for the requested block documents
    filtered_block_documents_query = sa.select(orm_models.BlockDocument.id)
    filtered_block_documents_query = _apply_block_document_filters(
        query=filtered_block_documents_query,
        block_document_filter=block_document_filter,
        block_type_filter=block_type_filter,
        block_schema_filter=block_schema_filter,
    )
    filtered_block_documents_query = filtered_block_documents_query.order_by(
        sort.as_sql_sort()
    )

    if offset is not None:
        filtered_block_documents_query = filtered_block_documents_query.offset(offset)

    if limit is not None:
        filtered_block_documents_query = filtered_block_documents_query.limit(limit)

    filtered_block_documents_cte = filtered_block_documents_query.cte(
        "filtered_block_documents"
    )

    # --- Build a recursive query that starts with the filtered block documents
    # and iteratively loads all referenced block documents. The query includes
    # the ID of each block document as well as the ID of the document that
    # references it and name it's referenced by, if applicable.
    parent_documents = (
        sa.select(
            filtered_block_documents_cte.c.id,
            sa.cast(sa.null(), sa.String).label("reference_name"),
            sa.cast(sa.null(), UUIDTypeDecorator).label(
                "reference_parent_block_document_id"
            ),
        )
        .select_from(filtered_block_documents_cte)
        .cte("all_block_documents", recursive=True)
    )
    # recursive part of query
    referenced_documents = (
        sa.select(
            orm_models.BlockDocumentReference.reference_block_document_id,
            orm_models.BlockDocumentReference.name,
            orm_models.BlockDocumentReference.parent_block_document_id,
        )
        .select_from(parent_documents)
        .join(
            orm_models.BlockDocumentReference,
            orm_models.BlockDocumentReference.parent_block_document_id
            == parent_documents.c.id,
        )
    )
    # union the recursive CTE
    all_block_documents_query = parent_documents.union_all(referenced_documents)

    # --- Join the recursive query that contains all required document IDs
    # back to the BlockDocument table to load info for every document
    # and order by name
    final_query = (
        sa.select(
            orm_models.BlockDocument,
            all_block_documents_query.c.reference_name,
            all_block_documents_query.c.reference_parent_block_document_id,
        )
        .select_from(all_block_documents_query)
        .join(
            orm_models.BlockDocument,
            orm_models.BlockDocument.id == all_block_documents_query.c.id,
        )
        .order_by(sort.as_sql_sort())
    )

    result = await session.execute(
        final_query.execution_options(populate_existing=True)
    )

    block_documents_with_references = result.unique().all()

    # identify true "parent" documents as those with no reference parent ids
    parent_block_document_ids = [
        d[0].id
        for d in block_documents_with_references
        if d.reference_parent_block_document_id is None
    ]

    # walk the resulting dataset and hydrate all block documents
    fully_constructed_block_documents: List[BlockDocument] = []
    visited_block_document_ids = []
    for root_orm_block_document, _, _ in block_documents_with_references:
        if (
            root_orm_block_document.id in parent_block_document_ids
            and root_orm_block_document.id not in visited_block_document_ids
        ):
            root_block_document = await BlockDocument.from_orm_model(
                session, root_orm_block_document, include_secrets=include_secrets
            )
            constructed = await _construct_full_block_document(
                session,
                block_documents_with_references,  # type: ignore
                root_block_document,
                include_secrets=include_secrets,
            )
            assert constructed

            fully_constructed_block_documents.append(constructed)
            visited_block_document_ids.append(root_orm_block_document.id)

    block_schema_ids = [
        block_document.block_schema_id
        for block_document in fully_constructed_block_documents
    ]
    block_schemas = await models.block_schemas.read_block_schemas(
        session=session,
        block_schema_filter=BlockSchemaFilter(id=dict(any_=block_schema_ids)),
    )
    for block_document in fully_constructed_block_documents:
        corresponding_block_schema = next(
            block_schema
            for block_schema in block_schemas
            if block_schema.id == block_document.block_schema_id
        )
        block_document.block_schema = corresponding_block_schema

    return fully_constructed_block_documents