Skip to content

prefect.server.models.flow_run_notification_policies

Functions for interacting with flow run notification policy ORM objects. Intended for internal use by the Prefect REST API.

create_flow_run_notification_policy(session, flow_run_notification_policy) async

Creates a FlowRunNotificationPolicy.

Parameters:

Name Type Description Default
session AsyncSession

a database session

required
flow_run_notification_policy FlowRunNotificationPolicy

a FlowRunNotificationPolicy model

required

Returns:

Type Description
FlowRunNotificationPolicy

orm_models.FlowRunNotificationPolicy: the newly-created FlowRunNotificationPolicy

Source code in src/prefect/server/models/flow_run_notification_policies.py
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
async def create_flow_run_notification_policy(
    session: AsyncSession,
    flow_run_notification_policy: schemas.core.FlowRunNotificationPolicy,
) -> orm_models.FlowRunNotificationPolicy:
    """
    Creates a FlowRunNotificationPolicy.

    Args:
        session (AsyncSession): a database session
        flow_run_notification_policy (schemas.core.FlowRunNotificationPolicy): a FlowRunNotificationPolicy model

    Returns:
        orm_models.FlowRunNotificationPolicy: the newly-created FlowRunNotificationPolicy

    """
    model = orm_models.FlowRunNotificationPolicy(
        **flow_run_notification_policy.model_dump()
    )
    session.add(model)
    await session.flush()

    return model

delete_flow_run_notification_policy(session, flow_run_notification_policy_id) async

Delete a FlowRunNotificationPolicy by id.

Parameters:

Name Type Description Default
session AsyncSession

A database session

required
flow_run_notification_policy_id str

a FlowRunNotificationPolicy id

required

Returns:

Name Type Description
bool bool

whether or not the FlowRunNotificationPolicy was deleted

Source code in src/prefect/server/models/flow_run_notification_policies.py
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
async def delete_flow_run_notification_policy(
    session: AsyncSession,
    flow_run_notification_policy_id: UUID,
) -> bool:
    """
    Delete a FlowRunNotificationPolicy by id.

    Args:
        session (AsyncSession): A database session
        flow_run_notification_policy_id (str): a FlowRunNotificationPolicy id

    Returns:
        bool: whether or not the FlowRunNotificationPolicy was deleted
    """

    result = await session.execute(
        delete(orm_models.FlowRunNotificationPolicy).where(
            orm_models.FlowRunNotificationPolicy.id == flow_run_notification_policy_id
        )
    )
    return result.rowcount > 0

read_flow_run_notification_policies(session, flow_run_notification_policy_filter=None, offset=None, limit=None) async

Read notification policies.

Parameters:

Name Type Description Default
session AsyncSession

A database session

required
offset int

Query offset

None
limit(int)

Query limit

required

Returns:

Type Description
Sequence[FlowRunNotificationPolicy]

List[db.FlowRunNotificationPolicy]: Notification policies

Source code in src/prefect/server/models/flow_run_notification_policies.py
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
async def read_flow_run_notification_policies(
    session: AsyncSession,
    flow_run_notification_policy_filter: Optional[
        schemas.filters.FlowRunNotificationPolicyFilter
    ] = None,
    offset: Optional[int] = None,
    limit: Optional[int] = None,
) -> Sequence[orm_models.FlowRunNotificationPolicy]:
    """
    Read notification policies.

    Args:
        session (AsyncSession): A database session
        offset (int): Query offset
        limit(int): Query limit

    Returns:
        List[db.FlowRunNotificationPolicy]: Notification policies
    """

    query = select(orm_models.FlowRunNotificationPolicy).order_by(
        orm_models.FlowRunNotificationPolicy.id
    )

    if flow_run_notification_policy_filter:
        query = query.where(flow_run_notification_policy_filter.as_sql_filter())

    if offset is not None:
        query = query.offset(offset)
    if limit is not None:
        query = query.limit(limit)

    result = await session.execute(query)
    return result.scalars().unique().all()

read_flow_run_notification_policy(session, flow_run_notification_policy_id) async

Reads a FlowRunNotificationPolicy by id.

Parameters:

Name Type Description Default
session AsyncSession

A database session

required
flow_run_notification_policy_id str

a FlowRunNotificationPolicy id

required

Returns:

Type Description
Union[FlowRunNotificationPolicy, None]

db.FlowRunNotificationPolicy: the FlowRunNotificationPolicy

Source code in src/prefect/server/models/flow_run_notification_policies.py
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
async def read_flow_run_notification_policy(
    session: AsyncSession,
    flow_run_notification_policy_id: UUID,
) -> Union[orm_models.FlowRunNotificationPolicy, None]:
    """
    Reads a FlowRunNotificationPolicy by id.

    Args:
        session (AsyncSession): A database session
        flow_run_notification_policy_id (str): a FlowRunNotificationPolicy id

    Returns:
        db.FlowRunNotificationPolicy: the FlowRunNotificationPolicy
    """

    return await session.get(
        orm_models.FlowRunNotificationPolicy, flow_run_notification_policy_id
    )

update_flow_run_notification_policy(session, flow_run_notification_policy_id, flow_run_notification_policy) async

Update a FlowRunNotificationPolicy by id.

Parameters:

Name Type Description Default
session AsyncSession

A database session

required
flow_run_notification_policy FlowRunNotificationPolicyUpdate

the flow run notification policy data

required
flow_run_notification_policy_id str

a FlowRunNotificationPolicy id

required

Returns:

Name Type Description
bool bool

whether or not the FlowRunNotificationPolicy was updated

Source code in src/prefect/server/models/flow_run_notification_policies.py
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
async def update_flow_run_notification_policy(
    session: AsyncSession,
    flow_run_notification_policy_id: UUID,
    flow_run_notification_policy: schemas.actions.FlowRunNotificationPolicyUpdate,
) -> bool:
    """
    Update a FlowRunNotificationPolicy by id.

    Args:
        session (AsyncSession): A database session
        flow_run_notification_policy: the flow run notification policy data
        flow_run_notification_policy_id (str): a FlowRunNotificationPolicy id

    Returns:
        bool: whether or not the FlowRunNotificationPolicy was updated
    """
    # exclude_unset=True allows us to only update values provided by
    # the user, ignoring any defaults on the model
    update_data = flow_run_notification_policy.model_dump_for_orm(exclude_unset=True)

    update_stmt = (
        sa.update(orm_models.FlowRunNotificationPolicy)
        .where(
            orm_models.FlowRunNotificationPolicy.id == flow_run_notification_policy_id
        )
        .values(**update_data)
    )
    result = await session.execute(update_stmt)
    return result.rowcount > 0