Skip to content

prefect_kubernetes.events

KubernetesEventsReplicator

Replicates Kubernetes pod events to Prefect events.

Source code in prefect_kubernetes/events.py
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
class KubernetesEventsReplicator:
    """Replicates Kubernetes pod events to Prefect events."""

    def __init__(
        self,
        client: "ApiClient",
        job_name: str,
        namespace: str,
        worker_resource: Dict[str, str],
        related_resources: List[RelatedResource],
        timeout_seconds: int,
    ):
        self._client = client
        self._job_name = job_name
        self._namespace = namespace
        self._timeout_seconds = timeout_seconds
        self._task = None

        # All events emitted by this replicator have the pod itself as the
        # resource. The `worker_resource` is what the worker uses when it's
        # the primary resource, so here it's turned into a related resource
        # instead.
        worker_resource["prefect.resource.role"] = "worker"
        worker_related_resource = RelatedResource(worker_resource)
        self._related_resources = related_resources + [worker_related_resource]
        self._state = "READY"

    async def __aenter__(self):
        """Start the Kubernetes event watcher when entering the context."""
        self._task = asyncio.create_task(self._replicate_pod_events())
        self._state = "STARTED"
        return self

    async def __aexit__(self, exc_type, exc_value, traceback):
        """Stop the Kubernetes event watcher and ensure all tasks are completed before exiting the context."""
        self._state = "STOPPED"
        if self._task:
            self._task.cancel()

    def _pod_as_resource(self, pod: "V1Pod") -> Dict[str, str]:
        """Convert a pod to a resource dictionary"""
        return {
            "prefect.resource.id": f"prefect.kubernetes.pod.{pod.metadata.uid}",
            "prefect.resource.name": pod.metadata.name,
            "kubernetes.namespace": pod.metadata.namespace,
        }

    async def _replicate_pod_events(self):
        """Replicate Kubernetes pod events as Prefect Events."""
        seen_phases = set()
        last_event = None

        core_client = kubernetes_asyncio.client.CoreV1Api(api_client=self._client)
        watch = kubernetes_asyncio.watch.Watch()
        async with watch:
            async for event in watch.stream(
                func=core_client.list_namespaced_pod,
                namespace=self._namespace,
                label_selector=f"job-name={self._job_name}",
                timeout_seconds=self._timeout_seconds,
                _request_timeout=aiohttp.ClientTimeout(),
            ):
                phase = event["object"].status.phase

                if phase not in seen_phases:
                    last_event = await self._emit_pod_event(
                        event, last_event=last_event
                    )
                    seen_phases.add(phase)
                    if phase in FINAL_PHASES:
                        break

    async def _emit_pod_event(
        self,
        pod_event: Dict,
        last_event: Optional[Event] = None,
    ) -> Event:
        """Emit a Prefect event for a Kubernetes pod event."""
        pod_event_type = pod_event["type"]
        pod: "V1Pod" = pod_event["object"]
        pod_phase = pod.status.phase

        resource = self._pod_as_resource(pod)

        if pod_event_type == "MODIFIED" and pod_phase == "Failed":
            for container_status in pod.status.container_statuses:
                if container_status.state.terminated.reason in EVICTED_REASONS:
                    pod_phase = "evicted"
                    resource[
                        "kubernetes.reason"
                    ] = container_status.state.terminated.reason
                    break

        return emit_event(
            event=f"prefect.kubernetes.pod.{pod_phase.lower()}",
            resource=resource,
            related=self._related_resources,
            follows=last_event,
        )

__aenter__() async

Start the Kubernetes event watcher when entering the context.

Source code in prefect_kubernetes/events.py
52
53
54
55
56
async def __aenter__(self):
    """Start the Kubernetes event watcher when entering the context."""
    self._task = asyncio.create_task(self._replicate_pod_events())
    self._state = "STARTED"
    return self

__aexit__(exc_type, exc_value, traceback) async

Stop the Kubernetes event watcher and ensure all tasks are completed before exiting the context.

Source code in prefect_kubernetes/events.py
58
59
60
61
62
async def __aexit__(self, exc_type, exc_value, traceback):
    """Stop the Kubernetes event watcher and ensure all tasks are completed before exiting the context."""
    self._state = "STOPPED"
    if self._task:
        self._task.cancel()