Skip to content

prefect.server.services.telemetry

The Telemetry service.

Telemetry

Bases: LoopService

This service sends anonymous data (e.g. count of flow runs) to Prefect to help us improve. It can be toggled off with the PREFECT_SERVER_ANALYTICS_ENABLED setting.

Source code in src/prefect/server/services/telemetry.py
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
class Telemetry(LoopService):
    """
    This service sends anonymous data (e.g. count of flow runs) to Prefect to help us
    improve. It can be toggled off with the PREFECT_SERVER_ANALYTICS_ENABLED setting.
    """

    loop_seconds: int = 600

    def __init__(self, loop_seconds: Optional[int] = None, **kwargs):
        super().__init__(loop_seconds=loop_seconds, **kwargs)
        self.telemetry_environment = os.environ.get(
            "PREFECT_API_TELEMETRY_ENVIRONMENT", "production"
        )

    @inject_db
    async def _fetch_or_set_telemetry_session(self, db: PrefectDBInterface):
        """
        This method looks for a telemetry session in the configuration table. If there
        isn't one, it sets one. It then sets `self.session_id` and
        `self.session_start_timestamp`.

        Telemetry sessions last until the database is reset.
        """
        async with db.session_context(begin_transaction=True) as session:
            telemetry_session = await configuration.read_configuration(
                session, "TELEMETRY_SESSION"
            )

            if telemetry_session is None:
                self.logger.debug("No telemetry session found, setting")
                session_id = str(uuid4())
                session_start_timestamp = pendulum.now("UTC").to_iso8601_string()

                telemetry_session = Configuration(
                    key="TELEMETRY_SESSION",
                    value={
                        "session_id": session_id,
                        "session_start_timestamp": session_start_timestamp,
                    },
                )

                await configuration.write_configuration(session, telemetry_session)

                self.session_id = session_id
                self.session_start_timestamp = session_start_timestamp
            else:
                self.logger.debug("Session information retrieved from database")
                self.session_id = telemetry_session.value["session_id"]
                self.session_start_timestamp = telemetry_session.value[
                    "session_start_timestamp"
                ]
        self.logger.debug(
            f"Telemetry Session: {self.session_id}, {self.session_start_timestamp}"
        )
        return (self.session_start_timestamp, self.session_id)

    async def run_once(self):
        """
        Sends a heartbeat to the sens-o-matic
        """
        from prefect.client.constants import SERVER_API_VERSION

        if not hasattr(self, "session_id"):
            await self._fetch_or_set_telemetry_session()

        heartbeat = {
            "source": "prefect_server",
            "type": "heartbeat",
            "payload": {
                "platform": platform.system(),
                "architecture": platform.machine(),
                "python_version": platform.python_version(),
                "python_implementation": platform.python_implementation(),
                "environment": self.telemetry_environment,
                "api_version": SERVER_API_VERSION,
                "prefect_version": prefect.__version__,
                "session_id": self.session_id,
                "session_start_timestamp": self.session_start_timestamp,
            },
        }

        try:
            async with httpx.AsyncClient() as client:
                result = await client.post(
                    "https://sens-o-matic.prefect.io/",
                    json=heartbeat,
                    headers={"x-prefect-event": "prefect_server"},
                )
            result.raise_for_status()
        except Exception as exc:
            self.logger.error(
                f"Failed to send telemetry: {exc}\nShutting down telemetry service...",
                # The traceback is only needed if doing deeper debugging, otherwise
                # this looks like an impactful server error
                exc_info=PREFECT_DEBUG_MODE.value(),
            )
            await self.stop(block=False)

run_once() async

Sends a heartbeat to the sens-o-matic

Source code in src/prefect/server/services/telemetry.py
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
async def run_once(self):
    """
    Sends a heartbeat to the sens-o-matic
    """
    from prefect.client.constants import SERVER_API_VERSION

    if not hasattr(self, "session_id"):
        await self._fetch_or_set_telemetry_session()

    heartbeat = {
        "source": "prefect_server",
        "type": "heartbeat",
        "payload": {
            "platform": platform.system(),
            "architecture": platform.machine(),
            "python_version": platform.python_version(),
            "python_implementation": platform.python_implementation(),
            "environment": self.telemetry_environment,
            "api_version": SERVER_API_VERSION,
            "prefect_version": prefect.__version__,
            "session_id": self.session_id,
            "session_start_timestamp": self.session_start_timestamp,
        },
    }

    try:
        async with httpx.AsyncClient() as client:
            result = await client.post(
                "https://sens-o-matic.prefect.io/",
                json=heartbeat,
                headers={"x-prefect-event": "prefect_server"},
            )
        result.raise_for_status()
    except Exception as exc:
        self.logger.error(
            f"Failed to send telemetry: {exc}\nShutting down telemetry service...",
            # The traceback is only needed if doing deeper debugging, otherwise
            # this looks like an impactful server error
            exc_info=PREFECT_DEBUG_MODE.value(),
        )
        await self.stop(block=False)