25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123 | class KubernetesEventsReplicator:
"""Replicates Kubernetes pod events to Prefect events."""
def __init__(
self,
client: "ApiClient",
job_name: str,
namespace: str,
worker_resource: Dict[str, str],
related_resources: List[RelatedResource],
timeout_seconds: int,
):
self._client = client
self._job_name = job_name
self._namespace = namespace
self._timeout_seconds = timeout_seconds
self._task = None
# All events emitted by this replicator have the pod itself as the
# resource. The `worker_resource` is what the worker uses when it's
# the primary resource, so here it's turned into a related resource
# instead.
worker_resource["prefect.resource.role"] = "worker"
worker_related_resource = RelatedResource(worker_resource)
self._related_resources = related_resources + [worker_related_resource]
self._state = "READY"
async def __aenter__(self):
"""Start the Kubernetes event watcher when entering the context."""
self._task = asyncio.create_task(self._replicate_pod_events())
self._state = "STARTED"
return self
async def __aexit__(self, exc_type, exc_value, traceback):
"""Stop the Kubernetes event watcher and ensure all tasks are completed before exiting the context."""
self._state = "STOPPED"
if self._task:
self._task.cancel()
def _pod_as_resource(self, pod: "V1Pod") -> Dict[str, str]:
"""Convert a pod to a resource dictionary"""
return {
"prefect.resource.id": f"prefect.kubernetes.pod.{pod.metadata.uid}",
"prefect.resource.name": pod.metadata.name,
"kubernetes.namespace": pod.metadata.namespace,
}
async def _replicate_pod_events(self):
"""Replicate Kubernetes pod events as Prefect Events."""
seen_phases = set()
last_event = None
core_client = kubernetes_asyncio.client.CoreV1Api(api_client=self._client)
watch = kubernetes_asyncio.watch.Watch()
async with watch:
async for event in watch.stream(
func=core_client.list_namespaced_pod,
namespace=self._namespace,
label_selector=f"job-name={self._job_name}",
timeout_seconds=self._timeout_seconds,
_request_timeout=aiohttp.ClientTimeout(),
):
phase = event["object"].status.phase
if phase not in seen_phases:
last_event = await self._emit_pod_event(
event, last_event=last_event
)
seen_phases.add(phase)
if phase in FINAL_PHASES:
break
async def _emit_pod_event(
self,
pod_event: Dict,
last_event: Optional[Event] = None,
) -> Event:
"""Emit a Prefect event for a Kubernetes pod event."""
pod_event_type = pod_event["type"]
pod: "V1Pod" = pod_event["object"]
pod_phase = pod.status.phase
resource = self._pod_as_resource(pod)
if pod_event_type == "MODIFIED" and pod_phase == "Failed":
for container_status in pod.status.container_statuses:
if container_status.state.terminated.reason in EVICTED_REASONS:
pod_phase = "evicted"
resource[
"kubernetes.reason"
] = container_status.state.terminated.reason
break
return emit_event(
event=f"prefect.kubernetes.pod.{pod_phase.lower()}",
resource=resource,
related=self._related_resources,
follows=last_event,
)
|