Skip to content

prefect.server.models.flow_run_notification_policies

Functions for interacting with flow run notification policy ORM objects. Intended for internal use by the Prefect REST API.

create_flow_run_notification_policy(session, flow_run_notification_policy, db) async

Creates a FlowRunNotificationPolicy.

Parameters:

Name Type Description Default
session Session

a database session

required
flow_run_notification_policy FlowRunNotificationPolicy

a FlowRunNotificationPolicy model

required

Returns:

Type Description

db.FlowRunNotificationPolicy: the newly-created FlowRunNotificationPolicy

Source code in src/prefect/server/models/flow_run_notification_policies.py
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
@inject_db
async def create_flow_run_notification_policy(
    session: sa.orm.Session,
    flow_run_notification_policy: schemas.core.FlowRunNotificationPolicy,
    db: PrefectDBInterface,
):
    """
    Creates a FlowRunNotificationPolicy.

    Args:
        session (sa.orm.Session): a database session
        flow_run_notification_policy (schemas.core.FlowRunNotificationPolicy): a FlowRunNotificationPolicy model

    Returns:
        db.FlowRunNotificationPolicy: the newly-created FlowRunNotificationPolicy

    """
    model = db.FlowRunNotificationPolicy(**flow_run_notification_policy.model_dump())
    session.add(model)
    await session.flush()

    return model

delete_flow_run_notification_policy(session, flow_run_notification_policy_id, db) async

Delete a FlowRunNotificationPolicy by id.

Parameters:

Name Type Description Default
session Session

A database session

required
flow_run_notification_policy_id str

a FlowRunNotificationPolicy id

required

Returns:

Name Type Description
bool bool

whether or not the FlowRunNotificationPolicy was deleted

Source code in src/prefect/server/models/flow_run_notification_policies.py
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
@inject_db
async def delete_flow_run_notification_policy(
    session: sa.orm.Session,
    flow_run_notification_policy_id: UUID,
    db: PrefectDBInterface,
) -> bool:
    """
    Delete a FlowRunNotificationPolicy by id.

    Args:
        session (sa.orm.Session): A database session
        flow_run_notification_policy_id (str): a FlowRunNotificationPolicy id

    Returns:
        bool: whether or not the FlowRunNotificationPolicy was deleted
    """

    result = await session.execute(
        delete(db.FlowRunNotificationPolicy).where(
            db.FlowRunNotificationPolicy.id == flow_run_notification_policy_id
        )
    )
    return result.rowcount > 0

read_flow_run_notification_policies(db, session, flow_run_notification_policy_filter=None, offset=None, limit=None) async

Read notification policies.

Parameters:

Name Type Description Default
session Session

A database session

required
offset int

Query offset

None
limit(int)

Query limit

required

Returns:

Type Description

List[db.FlowRunNotificationPolicy]: Notification policies

Source code in src/prefect/server/models/flow_run_notification_policies.py
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
@inject_db
async def read_flow_run_notification_policies(
    db: PrefectDBInterface,
    session: sa.orm.Session,
    flow_run_notification_policy_filter: Optional[
        schemas.filters.FlowRunNotificationPolicyFilter
    ] = None,
    offset: Optional[int] = None,
    limit: Optional[int] = None,
):
    """
    Read notification policies.

    Args:
        session (sa.orm.Session): A database session
        offset (int): Query offset
        limit(int): Query limit

    Returns:
        List[db.FlowRunNotificationPolicy]: Notification policies
    """

    query = select(db.FlowRunNotificationPolicy).order_by(
        db.FlowRunNotificationPolicy.id
    )

    if flow_run_notification_policy_filter:
        query = query.where(flow_run_notification_policy_filter.as_sql_filter())

    if offset is not None:
        query = query.offset(offset)
    if limit is not None:
        query = query.limit(limit)

    result = await session.execute(query)
    return result.scalars().unique().all()

read_flow_run_notification_policy(session, flow_run_notification_policy_id, db) async

Reads a FlowRunNotificationPolicy by id.

Parameters:

Name Type Description Default
session Session

A database session

required
flow_run_notification_policy_id str

a FlowRunNotificationPolicy id

required

Returns:

Type Description

db.FlowRunNotificationPolicy: the FlowRunNotificationPolicy

Source code in src/prefect/server/models/flow_run_notification_policies.py
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
@inject_db
async def read_flow_run_notification_policy(
    session: sa.orm.Session,
    flow_run_notification_policy_id: UUID,
    db: PrefectDBInterface,
):
    """
    Reads a FlowRunNotificationPolicy by id.

    Args:
        session (sa.orm.Session): A database session
        flow_run_notification_policy_id (str): a FlowRunNotificationPolicy id

    Returns:
        db.FlowRunNotificationPolicy: the FlowRunNotificationPolicy
    """

    return await session.get(
        db.FlowRunNotificationPolicy, flow_run_notification_policy_id
    )

update_flow_run_notification_policy(session, flow_run_notification_policy_id, flow_run_notification_policy, db) async

Update a FlowRunNotificationPolicy by id.

Parameters:

Name Type Description Default
session Session

A database session

required
flow_run_notification_policy FlowRunNotificationPolicyUpdate

the flow run notification policy data

required
flow_run_notification_policy_id str

a FlowRunNotificationPolicy id

required

Returns:

Name Type Description
bool bool

whether or not the FlowRunNotificationPolicy was updated

Source code in src/prefect/server/models/flow_run_notification_policies.py
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
@inject_db
async def update_flow_run_notification_policy(
    session: sa.orm.Session,
    flow_run_notification_policy_id: UUID,
    flow_run_notification_policy: schemas.actions.FlowRunNotificationPolicyUpdate,
    db: PrefectDBInterface,
) -> bool:
    """
    Update a FlowRunNotificationPolicy by id.

    Args:
        session (sa.orm.Session): A database session
        flow_run_notification_policy: the flow run notification policy data
        flow_run_notification_policy_id (str): a FlowRunNotificationPolicy id

    Returns:
        bool: whether or not the FlowRunNotificationPolicy was updated
    """
    # exclude_unset=True allows us to only update values provided by
    # the user, ignoring any defaults on the model
    update_data = flow_run_notification_policy.model_dump_for_orm(exclude_unset=True)

    update_stmt = (
        sa.update(db.FlowRunNotificationPolicy)
        .where(db.FlowRunNotificationPolicy.id == flow_run_notification_policy_id)
        .values(**update_data)
    )
    result = await session.execute(update_stmt)
    return result.rowcount > 0