Skip to content

prefect.server.services.flow_run_notifications

A service that checks for flow run notifications and sends them.

FlowRunNotifications

Bases: LoopService

A loop service that checks for flow run notifications that need to be sent.

Notifications are queued, and this service pulls them off the queue and actually sends the notification.

Source code in src/prefect/server/services/flow_run_notifications.py
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
class FlowRunNotifications(LoopService):
    """
    A loop service that checks for flow run notifications that need to be sent.

    Notifications are queued, and this service pulls them off the queue and
    actually sends the notification.
    """

    # check queue every 4 seconds
    # note: a tight loop is executed until the queue is exhausted
    loop_seconds: int = 4

    @inject_db
    async def run_once(self, db: PrefectDBInterface):
        while True:
            async with db.session_context(begin_transaction=True) as session:
                # Drain the queue one entry at a time, because if a transient
                # database error happens while sending a notification, the whole
                # transaction will be rolled back, which effectively re-queues any
                # notifications that we pulled here.  If we drain in batches larger
                # than 1, we risk double-sending earlier notifications when a
                # transient error occurs.
                notifications = await db.get_flow_run_notifications_from_queue(
                    session=session,
                    limit=1,
                )
                self.logger.debug(f"Got {len(notifications)} notifications from queue.")

                # if no notifications were found, exit the tight loop and sleep
                if not notifications:
                    break

                # all retrieved notifications are deleted, assert that we only got one
                # since we only send the first notification returned
                assert (
                    len(notifications) == 1
                ), "Expected one notification; query limit not respected."

                try:
                    await self.send_flow_run_notification(
                        session=session, db=db, notification=notifications[0]
                    )
                finally:
                    connection = await session.connection()
                    if connection.invalidated:
                        # If the connection was invalidated due to an error that we
                        # handled in _send_flow_run_notification, we'll need to
                        # rollback the session in order to synchronize it with the
                        # reality of the underlying connection before we can proceed
                        # with more iterations of the loop.  This may happen due to
                        # transient database connection errors, but will _not_
                        # happen due to an calling a third-party service to send a
                        # notification.
                        await session.rollback()
                        assert not connection.invalidated

    @inject_db
    async def send_flow_run_notification(
        self,
        session: sa.orm.session,
        db: PrefectDBInterface,
        notification,
    ):
        try:
            orm_block_document = await session.get(
                db.BlockDocument, notification.block_document_id
            )
            if orm_block_document is None:
                self.logger.error(
                    f"Missing block document {notification.block_document_id} "
                    f"from policy {notification.flow_run_notification_policy_id}"
                )
                return

            from prefect.blocks.core import Block

            block = Block._from_block_document(
                await schemas.core.BlockDocument.from_orm_model(
                    session=session,
                    orm_block_document=orm_block_document,
                    include_secrets=True,
                )
            )

            message = self.construct_notification_message(notification=notification)
            await block.notify(
                subject="Prefect flow run notification",
                body=message,
            )

            self.logger.debug(
                "Successfully sent notification for flow run"
                f" {notification.flow_run_id} from policy"
                f" {notification.flow_run_notification_policy_id}"
            )

        except Exception:
            self.logger.error(
                (
                    "Error sending notification for policy"
                    f" {notification.flow_run_notification_policy_id} on flow run"
                    f" {notification.flow_run_id}"
                ),
                exc_info=True,
            )

    def construct_notification_message(self, notification) -> str:
        """
        Construct the message for a flow run notification, including
        templating any variables.
        """
        message_template = (
            notification.flow_run_notification_policy_message_template
            or models.flow_run_notification_policies.DEFAULT_MESSAGE_TEMPLATE
        )

        # create a dict from the sqlalchemy object for templating
        notification_dict = dict(notification._mapping)
        # add the flow run url to the info
        notification_dict["flow_run_url"] = self.get_ui_url_for_flow_run_id(
            flow_run_id=notification_dict["flow_run_id"]
        )

        message = message_template.format(
            **{
                k: notification_dict[k]
                for k in schemas.core.FLOW_RUN_NOTIFICATION_TEMPLATE_KWARGS
            }
        )
        return message

    def get_ui_url_for_flow_run_id(self, flow_run_id: UUID) -> str:
        """
        Returns a link to the flow run view of the given flow run id.

        Args:
            flow_run_id: the flow run id.
        """
        return urls.url_for(
            "flow-run",
            obj_id=flow_run_id,
            default_base_url="http://ephemeral-prefect/api",
        )

construct_notification_message(notification)

Construct the message for a flow run notification, including templating any variables.

Source code in src/prefect/server/services/flow_run_notifications.py
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
def construct_notification_message(self, notification) -> str:
    """
    Construct the message for a flow run notification, including
    templating any variables.
    """
    message_template = (
        notification.flow_run_notification_policy_message_template
        or models.flow_run_notification_policies.DEFAULT_MESSAGE_TEMPLATE
    )

    # create a dict from the sqlalchemy object for templating
    notification_dict = dict(notification._mapping)
    # add the flow run url to the info
    notification_dict["flow_run_url"] = self.get_ui_url_for_flow_run_id(
        flow_run_id=notification_dict["flow_run_id"]
    )

    message = message_template.format(
        **{
            k: notification_dict[k]
            for k in schemas.core.FLOW_RUN_NOTIFICATION_TEMPLATE_KWARGS
        }
    )
    return message

get_ui_url_for_flow_run_id(flow_run_id)

Returns a link to the flow run view of the given flow run id.

Parameters:

Name Type Description Default
flow_run_id UUID

the flow run id.

required
Source code in src/prefect/server/services/flow_run_notifications.py
148
149
150
151
152
153
154
155
156
157
158
159
def get_ui_url_for_flow_run_id(self, flow_run_id: UUID) -> str:
    """
    Returns a link to the flow run view of the given flow run id.

    Args:
        flow_run_id: the flow run id.
    """
    return urls.url_for(
        "flow-run",
        obj_id=flow_run_id,
        default_base_url="http://ephemeral-prefect/api",
    )