Skip to content

prefect.server.models.block_types

Functions for interacting with block type ORM objects. Intended for internal use by the Prefect REST API.

create_block_type(db, session, block_type, override=False) async

Create a new block type.

Parameters:

Name Type Description Default
session AsyncSession

A database session

required
block_type Union[BlockType, BlockType]

a block type object

required

Returns:

Name Type Description
block_type BlockType

an ORM block type model

Source code in src/prefect/server/models/block_types.py
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
@db_injector
async def create_block_type(
    db: PrefectDBInterface,
    session: AsyncSession,
    block_type: Union[schemas.core.BlockType, "ClientBlockType"],
    override: bool = False,
) -> "BlockType":
    """
    Create a new block type.

    Args:
        session: A database session
        block_type: a block type object

    Returns:
        block_type: an ORM block type model
    """
    # We take a shortcut in many unit tests and in block registration to pass client
    # models directly to this function.  We will support this by converting them to
    # the appropriate server model.
    if not isinstance(block_type, schemas.core.BlockType):
        block_type = schemas.core.BlockType.model_validate(
            block_type.model_dump(mode="json")
        )

    insert_values = block_type.model_dump_for_orm(
        exclude_unset=False, exclude={"created", "updated", "id"}
    )
    if insert_values.get("description") is not None:
        insert_values["description"] = html.escape(
            insert_values["description"], quote=False
        )
    if insert_values.get("code_example") is not None:
        insert_values["code_example"] = html.escape(
            insert_values["code_example"], quote=False
        )
    insert_stmt = db.insert(BlockType).values(**insert_values)
    if override:
        insert_stmt = insert_stmt.on_conflict_do_update(
            index_elements=db.block_type_unique_upsert_columns,
            set_=insert_values,
        )
    await session.execute(insert_stmt)

    query = (
        sa.select(BlockType)
        .where(
            sa.and_(
                BlockType.name == insert_values["name"],
            )
        )
        .execution_options(populate_existing=True)
    )

    result = await session.execute(query)
    return result.scalar()

delete_block_type(session, block_type_id) async

Delete a block type by id.

Parameters:

Name Type Description Default
session AsyncSession

A database session

required
block_type_id str

A block type id

required

Returns:

Name Type Description
bool

True if the block type was updated

Source code in src/prefect/server/models/block_types.py
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
async def delete_block_type(session: AsyncSession, block_type_id: str):
    """
    Delete a block type by id.

    Args:
        session: A database session
        block_type_id: A block type id

    Returns:
        bool: True if the block type was updated
    """

    result = await session.execute(
        sa.delete(BlockType).where(BlockType.id == block_type_id)
    )
    return result.rowcount > 0

read_block_type(session, block_type_id) async

Reads a block type by id.

Parameters:

Name Type Description Default
session AsyncSession

A database session

required
block_type_id UUID

a block_type id

required

Returns:

Name Type Description
BlockType

an ORM block type model

Source code in src/prefect/server/models/block_types.py
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
async def read_block_type(
    session: AsyncSession,
    block_type_id: UUID,
):
    """
    Reads a block type by id.

    Args:
        session: A database session
        block_type_id: a block_type id

    Returns:
        BlockType: an ORM block type model
    """
    return await session.get(BlockType, block_type_id)

read_block_type_by_slug(session, block_type_slug) async

Reads a block type by slug.

Parameters:

Name Type Description Default
session AsyncSession

A database session

required
block_type_slug str

a block type slug

required

Returns:

Name Type Description
BlockType

an ORM block type model

Source code in src/prefect/server/models/block_types.py
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
async def read_block_type_by_slug(session: AsyncSession, block_type_slug: str):
    """
    Reads a block type by slug.

    Args:
        session: A database session
        block_type_slug: a block type slug

    Returns:
        BlockType: an ORM block type model

    """
    result = await session.execute(
        sa.select(BlockType).where(BlockType.slug == block_type_slug)
    )
    return result.scalar()

read_block_types(session, block_type_filter=None, block_schema_filter=None, limit=None, offset=None) async

Reads block types with an optional limit and offset

Args:

Returns:

Type Description

List[BlockType]: List of

Source code in src/prefect/server/models/block_types.py
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
async def read_block_types(
    session: AsyncSession,
    block_type_filter: Optional[schemas.filters.BlockTypeFilter] = None,
    block_schema_filter: Optional[schemas.filters.BlockSchemaFilter] = None,
    limit: Optional[int] = None,
    offset: Optional[int] = None,
):
    """
    Reads block types with an optional limit and offset

    Args:

    Returns:
        List[BlockType]: List of
    """
    query = sa.select(BlockType).order_by(BlockType.name)

    if block_type_filter is not None:
        query = query.where(block_type_filter.as_sql_filter())

    if block_schema_filter is not None:
        exists_clause = sa.select(BlockSchema).where(
            BlockSchema.block_type_id == BlockType.id,
            block_schema_filter.as_sql_filter(),
        )
        query = query.where(exists_clause.exists())

    if offset is not None:
        query = query.offset(offset)

    if limit is not None:
        query = query.limit(limit)

    result = await session.execute(query)
    return result.scalars().unique().all()

update_block_type(session, block_type_id, block_type) async

Update a block type by id.

Parameters:

Name Type Description Default
session AsyncSession

A database session

required
block_type_id str

Data to update block type with

required
block_type Union[BlockTypeUpdate, BlockTypeUpdate]

A block type id

required

Returns:

Name Type Description
bool bool

True if the block type was updated

Source code in src/prefect/server/models/block_types.py
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
async def update_block_type(
    session: AsyncSession,
    block_type_id: str,
    block_type: Union[schemas.actions.BlockTypeUpdate, "ClientBlockTypeUpdate"],
) -> bool:
    """
    Update a block type by id.

    Args:
        session: A database session
        block_type_id: Data to update block type with
        block_type: A block type id

    Returns:
        bool: True if the block type was updated
    """

    # We take a shortcut in many unit tests and in block registration to pass client
    # models directly to this function.  We will support this by converting them to
    # the appropriate server model.
    if not isinstance(block_type, schemas.actions.BlockTypeUpdate):
        block_type = schemas.actions.BlockTypeUpdate.model_validate(
            block_type.model_dump(
                mode="json",
                exclude={"id", "created", "updated", "name", "slug", "is_protected"},
            )
        )

    update_statement = (
        sa.update(BlockType)
        .where(BlockType.id == block_type_id)
        .values(**block_type.model_dump_for_orm(exclude_unset=True, exclude={"id"}))
    )
    result = await session.execute(update_statement)
    return result.rowcount > 0