Skip to content

prefect_kubernetes.events

KubernetesEventsReplicator

Replicates Kubernetes pod events to Prefect events.

Source code in prefect_kubernetes/events.py
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
class KubernetesEventsReplicator:
    """Replicates Kubernetes pod events to Prefect events."""

    def __init__(
        self,
        client: "ApiClient",
        job_name: str,
        namespace: str,
        worker_resource: Dict[str, str],
        related_resources: List[RelatedResource],
        timeout_seconds: int,
    ):
        self._client = client
        self._job_name = job_name
        self._namespace = namespace
        self._timeout_seconds = timeout_seconds

        # All events emitted by this replicator have the pod itself as the
        # resource. The `worker_resource` is what the worker uses when it's
        # the primary resource, so here it's turned into a related resource
        # instead.
        worker_resource["prefect.resource.role"] = "worker"
        worker_related_resource = RelatedResource(worker_resource)
        self._related_resources = related_resources + [worker_related_resource]

        self._watch = kubernetes.watch.Watch()
        self._thread = threading.Thread(target=self._replicate_pod_events)

        self._state = "READY"

        atexit.register(self.stop)

    def __enter__(self):
        """Start the replicator thread."""
        self._thread.start()
        self._state = "STARTED"

    def __exit__(self, *args, **kwargs):
        """Stop the replicator thread."""
        self.stop()

    def stop(self):
        """Stop watching for pod events and stop thread."""
        if self._thread.is_alive():
            self._watch.stop()
            self._thread.join()
            self._state = "STOPPED"

    def _pod_as_resource(self, pod: "V1Pod") -> Dict[str, str]:
        """Convert a pod to a resource dictionary"""
        return {
            "prefect.resource.id": f"prefect.kubernetes.pod.{pod.metadata.uid}",
            "prefect.resource.name": pod.metadata.name,
            "kubernetes.namespace": pod.metadata.namespace,
        }

    def _replicate_pod_events(self):
        """Replicate Kubernetes pod events as Prefect Events."""
        seen_phases = set()
        last_event = None

        try:
            core_client = kubernetes.client.CoreV1Api(api_client=self._client)
            for event in self._watch.stream(
                func=core_client.list_namespaced_pod,
                namespace=self._namespace,
                label_selector=f"job-name={self._job_name}",
                timeout_seconds=self._timeout_seconds,
            ):
                phase = event["object"].status.phase

                if phase not in seen_phases:
                    last_event = self._emit_pod_event(event, last_event=last_event)
                    seen_phases.add(phase)
                    if phase in FINAL_PHASES:
                        self._watch.stop()
        finally:
            self._client.rest_client.pool_manager.clear()

    def _emit_pod_event(
        self,
        pod_event: Dict,
        last_event: Optional[Event] = None,
    ) -> Event:
        """Emit a Prefect event for a Kubernetes pod event."""
        pod_event_type = pod_event["type"]
        pod: "V1Pod" = pod_event["object"]
        pod_phase = pod.status.phase

        resource = self._pod_as_resource(pod)

        if pod_event_type == "MODIFIED" and pod_phase == "Failed":
            for container_status in pod.status.container_statuses:
                if container_status.state.terminated.reason in EVICTED_REASONS:
                    pod_phase = "evicted"
                    resource[
                        "kubernetes.reason"
                    ] = container_status.state.terminated.reason
                    break

        return emit_event(
            event=f"prefect.kubernetes.pod.{pod_phase.lower()}",
            resource=resource,
            related=self._related_resources,
            follows=last_event,
        )

__enter__()

Start the replicator thread.

Source code in prefect_kubernetes/events.py
62
63
64
65
def __enter__(self):
    """Start the replicator thread."""
    self._thread.start()
    self._state = "STARTED"

__exit__(*args, **kwargs)

Stop the replicator thread.

Source code in prefect_kubernetes/events.py
67
68
69
def __exit__(self, *args, **kwargs):
    """Stop the replicator thread."""
    self.stop()

stop()

Stop watching for pod events and stop thread.

Source code in prefect_kubernetes/events.py
71
72
73
74
75
76
def stop(self):
    """Stop watching for pod events and stop thread."""
    if self._thread.is_alive():
        self._watch.stop()
        self._thread.join()
        self._state = "STOPPED"