Skip to content

prefect_gcp.utilities

Execution

Bases: BaseModel

Utility class to call GCP executions API and interact with the returned objects.

Source code in prefect_gcp/utilities.py
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
class Execution(BaseModel):
    """
    Utility class to call GCP `executions` API and
    interact with the returned objects.
    """

    name: str
    namespace: str
    metadata: dict
    spec: dict
    status: dict
    log_uri: str

    def is_running(self) -> bool:
        """Returns True if Execution is not completed."""
        return self.status.get("completionTime") is None

    def condition_after_completion(self):
        """Returns Execution condition if Execution has completed."""
        for condition in self.status["conditions"]:
            if condition["type"] == "Completed":
                return condition

    def succeeded(self):
        """Whether or not the Execution completed is a successful state."""
        completed_condition = self.condition_after_completion()
        if completed_condition and completed_condition["status"] == "True":
            return True

        return False

    @classmethod
    def get(cls, client: Resource, namespace: str, execution_name: str):
        """
        Make a get request to the GCP executions API
        and return an Execution instance.
        """
        request = client.executions().get(
            name=f"namespaces/{namespace}/executions/{execution_name}"
        )
        response = request.execute()

        return cls(
            name=response["metadata"]["name"],
            namespace=response["metadata"]["namespace"],
            metadata=response["metadata"],
            spec=response["spec"],
            status=response["status"],
            log_uri=response["status"]["logUri"],
        )

condition_after_completion()

Returns Execution condition if Execution has completed.

Source code in prefect_gcp/utilities.py
180
181
182
183
184
def condition_after_completion(self):
    """Returns Execution condition if Execution has completed."""
    for condition in self.status["conditions"]:
        if condition["type"] == "Completed":
            return condition

get(client, namespace, execution_name) classmethod

Make a get request to the GCP executions API and return an Execution instance.

Source code in prefect_gcp/utilities.py
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
@classmethod
def get(cls, client: Resource, namespace: str, execution_name: str):
    """
    Make a get request to the GCP executions API
    and return an Execution instance.
    """
    request = client.executions().get(
        name=f"namespaces/{namespace}/executions/{execution_name}"
    )
    response = request.execute()

    return cls(
        name=response["metadata"]["name"],
        namespace=response["metadata"]["namespace"],
        metadata=response["metadata"],
        spec=response["spec"],
        status=response["status"],
        log_uri=response["status"]["logUri"],
    )

is_running()

Returns True if Execution is not completed.

Source code in prefect_gcp/utilities.py
176
177
178
def is_running(self) -> bool:
    """Returns True if Execution is not completed."""
    return self.status.get("completionTime") is None

succeeded()

Whether or not the Execution completed is a successful state.

Source code in prefect_gcp/utilities.py
186
187
188
189
190
191
192
def succeeded(self):
    """Whether or not the Execution completed is a successful state."""
    completed_condition = self.condition_after_completion()
    if completed_condition and completed_condition["status"] == "True":
        return True

    return False

Job

Bases: BaseModel

Utility class to call GCP jobs API and interact with the returned objects.

Source code in prefect_gcp/utilities.py
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
class Job(BaseModel):
    """
    Utility class to call GCP `jobs` API and
    interact with the returned objects.
    """

    metadata: dict
    spec: dict
    status: dict
    name: str
    ready_condition: dict
    execution_status: dict

    def _is_missing_container(self):
        """
        Check if Job status is not ready because
        the specified container cannot be found.
        """
        if (
            self.ready_condition.get("status") == "False"
            and self.ready_condition.get("reason") == "ContainerMissing"
        ):
            return True
        return False

    def is_ready(self) -> bool:
        """Whether a job is finished registering and ready to be executed"""
        if self._is_missing_container():
            raise Exception(f"{self.ready_condition['message']}")
        return self.ready_condition.get("status") == "True"

    def has_execution_in_progress(self) -> bool:
        """See if job has a run in progress."""
        return (
            self.execution_status == {}
            or self.execution_status.get("completionTimestamp") is None
        )

    @staticmethod
    def _get_ready_condition(job: dict) -> dict:
        """Utility to access JSON field containing ready condition."""
        if job["status"].get("conditions"):
            for condition in job["status"]["conditions"]:
                if condition["type"] == "Ready":
                    return condition

        return {}

    @staticmethod
    def _get_execution_status(job: dict):
        """Utility to access JSON field containing execution status."""
        if job["status"].get("latestCreatedExecution"):
            return job["status"]["latestCreatedExecution"]

        return {}

    @classmethod
    def get(cls, client: Resource, namespace: str, job_name: str):
        """Make a get request to the GCP jobs API and return a Job instance."""
        request = client.jobs().get(name=f"namespaces/{namespace}/jobs/{job_name}")
        response = request.execute()

        return cls(
            metadata=response["metadata"],
            spec=response["spec"],
            status=response["status"],
            name=response["metadata"]["name"],
            ready_condition=cls._get_ready_condition(response),
            execution_status=cls._get_execution_status(response),
        )

    @staticmethod
    def create(client: Resource, namespace: str, body: dict):
        """Make a create request to the GCP jobs API."""
        request = client.jobs().create(parent=f"namespaces/{namespace}", body=body)
        response = request.execute()
        return response

    @staticmethod
    def delete(client: Resource, namespace: str, job_name: str):
        """Make a delete request to the GCP jobs API."""
        request = client.jobs().delete(name=f"namespaces/{namespace}/jobs/{job_name}")
        response = request.execute()
        return response

    @staticmethod
    def run(client: Resource, namespace: str, job_name: str):
        """Make a run request to the GCP jobs API."""
        request = client.jobs().run(name=f"namespaces/{namespace}/jobs/{job_name}")
        response = request.execute()
        return response

create(client, namespace, body) staticmethod

Make a create request to the GCP jobs API.

Source code in prefect_gcp/utilities.py
141
142
143
144
145
146
@staticmethod
def create(client: Resource, namespace: str, body: dict):
    """Make a create request to the GCP jobs API."""
    request = client.jobs().create(parent=f"namespaces/{namespace}", body=body)
    response = request.execute()
    return response

delete(client, namespace, job_name) staticmethod

Make a delete request to the GCP jobs API.

Source code in prefect_gcp/utilities.py
148
149
150
151
152
153
@staticmethod
def delete(client: Resource, namespace: str, job_name: str):
    """Make a delete request to the GCP jobs API."""
    request = client.jobs().delete(name=f"namespaces/{namespace}/jobs/{job_name}")
    response = request.execute()
    return response

get(client, namespace, job_name) classmethod

Make a get request to the GCP jobs API and return a Job instance.

Source code in prefect_gcp/utilities.py
126
127
128
129
130
131
132
133
134
135
136
137
138
139
@classmethod
def get(cls, client: Resource, namespace: str, job_name: str):
    """Make a get request to the GCP jobs API and return a Job instance."""
    request = client.jobs().get(name=f"namespaces/{namespace}/jobs/{job_name}")
    response = request.execute()

    return cls(
        metadata=response["metadata"],
        spec=response["spec"],
        status=response["status"],
        name=response["metadata"]["name"],
        ready_condition=cls._get_ready_condition(response),
        execution_status=cls._get_execution_status(response),
    )

has_execution_in_progress()

See if job has a run in progress.

Source code in prefect_gcp/utilities.py
101
102
103
104
105
106
def has_execution_in_progress(self) -> bool:
    """See if job has a run in progress."""
    return (
        self.execution_status == {}
        or self.execution_status.get("completionTimestamp") is None
    )

is_ready()

Whether a job is finished registering and ready to be executed

Source code in prefect_gcp/utilities.py
95
96
97
98
99
def is_ready(self) -> bool:
    """Whether a job is finished registering and ready to be executed"""
    if self._is_missing_container():
        raise Exception(f"{self.ready_condition['message']}")
    return self.ready_condition.get("status") == "True"

run(client, namespace, job_name) staticmethod

Make a run request to the GCP jobs API.

Source code in prefect_gcp/utilities.py
155
156
157
158
159
160
@staticmethod
def run(client: Resource, namespace: str, job_name: str):
    """Make a run request to the GCP jobs API."""
    request = client.jobs().run(name=f"namespaces/{namespace}/jobs/{job_name}")
    response = request.execute()
    return response

slugify_name(name, max_length=30)

Slugify text for use as a name.

Keeps only alphanumeric characters and dashes, and caps the length of the slug at 30 chars.

The 30 character length allows room to add a uuid for generating a unique name for the job while keeping the total length of a name below 63 characters, which is the limit for Cloud Run job names.

Parameters:

Name Type Description Default
name str

The name of the job

required

Returns:

Type Description
Optional[str]

The slugified job name or None if the slugified name is empty

Source code in prefect_gcp/utilities.py
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
def slugify_name(name: str, max_length: int = 30) -> Optional[str]:
    """
    Slugify text for use as a name.

    Keeps only alphanumeric characters and dashes, and caps the length
    of the slug at 30 chars.

    The 30 character length allows room to add a uuid for generating a unique
    name for the job while keeping the total length of a name below 63 characters,
    which is the limit for Cloud Run job names.

    Args:
        name: The name of the job

    Returns:
        The slugified job name or None if the slugified name is empty
    """
    slug = slugify(
        name,
        max_length=max_length,
        regex_pattern=r"[^a-zA-Z0-9-]+",
    )

    return slug if slug else None