Skip to content

prefect_kubernetes.flows

A module to define flows interacting with Kubernetes resources.

run_namespaced_job(kubernetes_job, print_func=None)

Flow for running a namespaced Kubernetes job.

Parameters:

Name Type Description Default
kubernetes_job KubernetesJob

The KubernetesJob block that specifies the job to run.

required
print_func Optional[Callable]

A function to print the logs from the job pods.

None

Returns:

Type Description
Dict[str, Any]

A dict of logs from each pod in the job, e.g. {'pod_name': 'pod_log_str'}.

Raises:

Type Description
RuntimeError

If the created Kubernetes job attains a failed status.

```python
from prefect_kubernetes import KubernetesJob, run_namespaced_job
from prefect_kubernetes.credentials import KubernetesCredentials

run_namespaced_job(
    kubernetes_job=KubernetesJob.from_yaml_file(
        credentials=KubernetesCredentials.load("k8s-creds"),
        manifest_path="path/to/job.yaml",
    )
)
```
Source code in prefect_kubernetes/flows.py
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
@flow
def run_namespaced_job(
    kubernetes_job: KubernetesJob, print_func: Optional[Callable] = None
) -> Dict[str, Any]:
    """Flow for running a namespaced Kubernetes job.

    Args:
        kubernetes_job: The `KubernetesJob` block that specifies the job to run.
        print_func: A function to print the logs from the job pods.

    Returns:
        A dict of logs from each pod in the job, e.g. {'pod_name': 'pod_log_str'}.

    Raises:
        RuntimeError: If the created Kubernetes job attains a failed status.

    Example:

        ```python
        from prefect_kubernetes import KubernetesJob, run_namespaced_job
        from prefect_kubernetes.credentials import KubernetesCredentials

        run_namespaced_job(
            kubernetes_job=KubernetesJob.from_yaml_file(
                credentials=KubernetesCredentials.load("k8s-creds"),
                manifest_path="path/to/job.yaml",
            )
        )
        ```
    """
    kubernetes_job_run = task(kubernetes_job.trigger)()

    task(kubernetes_job_run.wait_for_completion)(print_func)

    return task(kubernetes_job_run.fetch_result)()

run_namespaced_job_async(kubernetes_job, print_func=None) async

Flow for running a namespaced Kubernetes job.

Parameters:

Name Type Description Default
kubernetes_job KubernetesJob

The KubernetesJob block that specifies the job to run.

required
print_func Optional[Callable]

A function to print the logs from the job pods.

None

Returns:

Type Description
Dict[str, Any]

A dict of logs from each pod in the job, e.g. {'pod_name': 'pod_log_str'}.

Raises:

Type Description
RuntimeError

If the created Kubernetes job attains a failed status.

```python
from prefect_kubernetes import KubernetesJob, run_namespaced_job
from prefect_kubernetes.credentials import KubernetesCredentials

run_namespaced_job(
    kubernetes_job=KubernetesJob.from_yaml_file(
        credentials=KubernetesCredentials.load("k8s-creds"),
        manifest_path="path/to/job.yaml",
    )
)
```
Source code in prefect_kubernetes/flows.py
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
@flow
async def run_namespaced_job_async(
    kubernetes_job: KubernetesJob, print_func: Optional[Callable] = None
) -> Dict[str, Any]:
    """Flow for running a namespaced Kubernetes job.

    Args:
        kubernetes_job: The `KubernetesJob` block that specifies the job to run.
        print_func: A function to print the logs from the job pods.

    Returns:
        A dict of logs from each pod in the job, e.g. {'pod_name': 'pod_log_str'}.

    Raises:
        RuntimeError: If the created Kubernetes job attains a failed status.

    Example:

        ```python
        from prefect_kubernetes import KubernetesJob, run_namespaced_job
        from prefect_kubernetes.credentials import KubernetesCredentials

        run_namespaced_job(
            kubernetes_job=KubernetesJob.from_yaml_file(
                credentials=KubernetesCredentials.load("k8s-creds"),
                manifest_path="path/to/job.yaml",
            )
        )
        ```
    """
    kubernetes_job_run = (
        await maybe_coro
        if asyncio.iscoroutine((maybe_coro := task(kubernetes_job.trigger)()))
        else maybe_coro
    )

    (
        await maybe_coro
        if asyncio.iscoroutine(
            maybe_coro := task(kubernetes_job_run.wait_for_completion)(print_func)
        )
        else maybe_coro
    )

    return (
        await maybe_coro
        if asyncio.iscoroutine(maybe_coro := task(kubernetes_job_run.fetch_result)())
        else maybe_coro
    )