23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119 | class Telemetry(LoopService):
"""
This service sends anonymous data (e.g. count of flow runs) to Prefect to help us
improve. It can be toggled off with the PREFECT_SERVER_ANALYTICS_ENABLED setting.
"""
loop_seconds: int = 600
def __init__(self, loop_seconds: Optional[int] = None, **kwargs):
super().__init__(loop_seconds=loop_seconds, **kwargs)
self.telemetry_environment = os.environ.get(
"PREFECT_API_TELEMETRY_ENVIRONMENT", "production"
)
@inject_db
async def _fetch_or_set_telemetry_session(self, db: PrefectDBInterface):
"""
This method looks for a telemetry session in the configuration table. If there
isn't one, it sets one. It then sets `self.session_id` and
`self.session_start_timestamp`.
Telemetry sessions last until the database is reset.
"""
async with db.session_context(begin_transaction=True) as session:
telemetry_session = await configuration.read_configuration(
session, "TELEMETRY_SESSION"
)
if telemetry_session is None:
self.logger.debug("No telemetry session found, setting")
session_id = str(uuid4())
session_start_timestamp = pendulum.now("UTC").to_iso8601_string()
telemetry_session = Configuration(
key="TELEMETRY_SESSION",
value={
"session_id": session_id,
"session_start_timestamp": session_start_timestamp,
},
)
await configuration.write_configuration(session, telemetry_session)
self.session_id = session_id
self.session_start_timestamp = session_start_timestamp
else:
self.logger.debug("Session information retrieved from database")
self.session_id = telemetry_session.value["session_id"]
self.session_start_timestamp = telemetry_session.value[
"session_start_timestamp"
]
self.logger.debug(
f"Telemetry Session: {self.session_id}, {self.session_start_timestamp}"
)
return (self.session_start_timestamp, self.session_id)
async def run_once(self):
"""
Sends a heartbeat to the sens-o-matic
"""
from prefect.client.constants import SERVER_API_VERSION
if not hasattr(self, "session_id"):
await self._fetch_or_set_telemetry_session()
heartbeat = {
"source": "prefect_server",
"type": "heartbeat",
"payload": {
"platform": platform.system(),
"architecture": platform.machine(),
"python_version": platform.python_version(),
"python_implementation": platform.python_implementation(),
"environment": self.telemetry_environment,
"api_version": SERVER_API_VERSION,
"prefect_version": prefect.__version__,
"session_id": self.session_id,
"session_start_timestamp": self.session_start_timestamp,
},
}
try:
async with httpx.AsyncClient() as client:
result = await client.post(
"https://sens-o-matic.prefect.io/",
json=heartbeat,
headers={"x-prefect-event": "prefect_server"},
)
result.raise_for_status()
except Exception as exc:
self.logger.error(
f"Failed to send telemetry: {exc}\nShutting down telemetry service...",
# The traceback is only needed if doing deeper debugging, otherwise
# this looks like an impactful server error
exc_info=PREFECT_DEBUG_MODE.value(),
)
await self.stop(block=False)
|