Skip to content

prefect_dbt.cloud.runs

Module containing tasks and flows for interacting with dbt Cloud job runs

DbtCloudJobRunStatus

Bases: Enum

dbt Cloud Job statuses.

Source code in prefect_dbt/cloud/runs.py
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
class DbtCloudJobRunStatus(Enum):
    """dbt Cloud Job statuses."""

    QUEUED = 1
    STARTING = 2
    RUNNING = 3
    SUCCESS = 10
    FAILED = 20
    CANCELLED = 30

    @classmethod
    def is_terminal_status_code(cls, status_code: Any) -> bool:
        """
        Returns True if a status code is terminal for a job run.
        Returns False otherwise.
        """
        return status_code in [cls.SUCCESS.value, cls.FAILED.value, cls.CANCELLED.value]

is_terminal_status_code(status_code) classmethod

Returns True if a status code is terminal for a job run. Returns False otherwise.

Source code in prefect_dbt/cloud/runs.py
32
33
34
35
36
37
38
@classmethod
def is_terminal_status_code(cls, status_code: Any) -> bool:
    """
    Returns True if a status code is terminal for a job run.
    Returns False otherwise.
    """
    return status_code in [cls.SUCCESS.value, cls.FAILED.value, cls.CANCELLED.value]

get_dbt_cloud_run_artifact(dbt_cloud_credentials, run_id, path, step=None) async

A task to get an artifact generated for a completed run. The requested artifact is saved to a file in the current working directory.

Parameters:

Name Type Description Default
dbt_cloud_credentials DbtCloudCredentials

Credentials for authenticating with dbt Cloud.

required
run_id int

The ID of the run to list run artifacts for.

required
path str

The relative path to the run artifact (e.g. manifest.json, catalog.json, run_results.json)

required
step Optional[int]

The index of the step in the run to query for artifacts. The first step in the run has the index 1. If the step parameter is omitted, then this method will return the artifacts compiled for the last step in the run.

None

Returns:

Type Description
Union[Dict, str]

The contents of the requested manifest. Returns a Dict if the requested artifact is a JSON file and a str otherwise.

Examples:

Get an artifact of a dbt Cloud job run:

from prefect import flow

from prefect_dbt.cloud import DbtCloudCredentials
from prefect_dbt.cloud.runs import get_dbt_cloud_run_artifact

@flow
def get_artifact_flow():
    credentials = DbtCloudCredentials(api_key="my_api_key", account_id=123456789)

    return get_dbt_cloud_run_artifact(
        dbt_cloud_credentials=credentials,
        run_id=42,
        path="manifest.json"
    )

get_artifact_flow()

Get an artifact of a dbt Cloud job run and write it to a file:

import json

from prefect import flow

from prefect_dbt.cloud import DbtCloudCredentials
from prefect_dbt.cloud.jobs import get_dbt_cloud_run_artifact

@flow
def get_artifact_flow():
    credentials = DbtCloudCredentials(api_key="my_api_key", account_id=123456789)

    get_run_artifact_result = get_dbt_cloud_run_artifact(
        dbt_cloud_credentials=credentials,
        run_id=42,
        path="manifest.json"
    )

    with open("manifest.json", "w") as file:
        json.dump(get_run_artifact_result, file)

get_artifact_flow()
Source code in prefect_dbt/cloud/runs.py
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
@task(
    name="Get dbt Cloud job artifact",
    description="Fetches an artifact from a completed run.",
    retries=3,
    retry_delay_seconds=10,
)
async def get_dbt_cloud_run_artifact(
    dbt_cloud_credentials: DbtCloudCredentials,
    run_id: int,
    path: str,
    step: Optional[int] = None,
) -> Union[Dict, str]:
    """
    A task to get an artifact generated for a completed run. The requested artifact
    is saved to a file in the current working directory.

    Args:
        dbt_cloud_credentials: Credentials for authenticating with dbt Cloud.
        run_id: The ID of the run to list run artifacts for.
        path: The relative path to the run artifact (e.g. manifest.json, catalog.json,
            run_results.json)
        step: The index of the step in the run to query for artifacts. The
            first step in the run has the index 1. If the step parameter is
            omitted, then this method will return the artifacts compiled
            for the last step in the run.

    Returns:
        The contents of the requested manifest. Returns a `Dict` if the
            requested artifact is a JSON file and a `str` otherwise.

    Examples:
        Get an artifact of a dbt Cloud job run:
        ```python
        from prefect import flow

        from prefect_dbt.cloud import DbtCloudCredentials
        from prefect_dbt.cloud.runs import get_dbt_cloud_run_artifact

        @flow
        def get_artifact_flow():
            credentials = DbtCloudCredentials(api_key="my_api_key", account_id=123456789)

            return get_dbt_cloud_run_artifact(
                dbt_cloud_credentials=credentials,
                run_id=42,
                path="manifest.json"
            )

        get_artifact_flow()
        ```

        Get an artifact of a dbt Cloud job run and write it to a file:
        ```python
        import json

        from prefect import flow

        from prefect_dbt.cloud import DbtCloudCredentials
        from prefect_dbt.cloud.jobs import get_dbt_cloud_run_artifact

        @flow
        def get_artifact_flow():
            credentials = DbtCloudCredentials(api_key="my_api_key", account_id=123456789)

            get_run_artifact_result = get_dbt_cloud_run_artifact(
                dbt_cloud_credentials=credentials,
                run_id=42,
                path="manifest.json"
            )

            with open("manifest.json", "w") as file:
                json.dump(get_run_artifact_result, file)

        get_artifact_flow()
        ```
    """  # noqa

    try:
        async with dbt_cloud_credentials.get_administrative_client() as client:
            response = await client.get_run_artifact(
                run_id=run_id, path=path, step=step
            )
    except HTTPStatusError as ex:
        raise DbtCloudGetRunArtifactFailed(extract_user_message(ex)) from ex

    if path.endswith(".json"):
        artifact_contents = response.json()
    else:
        artifact_contents = response.text

    return artifact_contents

get_dbt_cloud_run_info(dbt_cloud_credentials, run_id, include_related=None) async

A task to retrieve information about a dbt Cloud job run.

Parameters:

Name Type Description Default
dbt_cloud_credentials DbtCloudCredentials

Credentials for authenticating with dbt Cloud.

required
run_id int

The ID of the job to trigger.

required
include_related Optional[List[Literal['trigger', 'job', 'debug_logs', 'run_steps']]]

List of related fields to pull with the run. Valid values are "trigger", "job", "debug_logs", and "run_steps". If "debug_logs" is not provided in a request, then the included debug logs will be truncated to the last 1,000 lines of the debug log output file.

None

Returns:

Type Description
Dict

The run data returned by the dbt Cloud administrative API.

Example

Get status of a dbt Cloud job run:

from prefect import flow

from prefect_dbt.cloud import DbtCloudCredentials
from prefect_dbt.cloud.jobs import get_run

@flow
def get_run_flow():
    credentials = DbtCloudCredentials(api_key="my_api_key", account_id=123456789)

    return get_run(
        dbt_cloud_credentials=credentials,
        run_id=42
    )

get_run_flow()
Source code in prefect_dbt/cloud/runs.py
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
@task(
    name="Get dbt Cloud job run details",
    description="Retrieves details of a dbt Cloud job run "
    "for the run with the given run_id.",
    retries=3,
    retry_delay_seconds=10,
)
async def get_dbt_cloud_run_info(
    dbt_cloud_credentials: DbtCloudCredentials,
    run_id: int,
    include_related: Optional[
        List[Literal["trigger", "job", "debug_logs", "run_steps"]]
    ] = None,
) -> Dict:
    """
    A task to retrieve information about a dbt Cloud job run.

    Args:
        dbt_cloud_credentials: Credentials for authenticating with dbt Cloud.
        run_id: The ID of the job to trigger.
        include_related: List of related fields to pull with the run.
            Valid values are "trigger", "job", "debug_logs", and "run_steps".
            If "debug_logs" is not provided in a request, then the included debug
            logs will be truncated to the last 1,000 lines of the debug log output file.

    Returns:
        The run data returned by the dbt Cloud administrative API.

    Example:
        Get status of a dbt Cloud job run:
        ```python
        from prefect import flow

        from prefect_dbt.cloud import DbtCloudCredentials
        from prefect_dbt.cloud.jobs import get_run

        @flow
        def get_run_flow():
            credentials = DbtCloudCredentials(api_key="my_api_key", account_id=123456789)

            return get_run(
                dbt_cloud_credentials=credentials,
                run_id=42
            )

        get_run_flow()
        ```
    """  # noqa
    try:
        async with dbt_cloud_credentials.get_administrative_client() as client:
            response = await client.get_run(
                run_id=run_id, include_related=include_related
            )
    except HTTPStatusError as ex:
        raise DbtCloudGetRunFailed(extract_user_message(ex)) from ex
    return response.json()["data"]

list_dbt_cloud_run_artifacts(dbt_cloud_credentials, run_id, step=None) async

A task to list the artifact files generated for a completed run.

Parameters:

Name Type Description Default
dbt_cloud_credentials DbtCloudCredentials

Credentials for authenticating with dbt Cloud.

required
run_id int

The ID of the run to list run artifacts for.

required
step Optional[int]

The index of the step in the run to query for artifacts. The first step in the run has the index 1. If the step parameter is omitted, then this method will return the artifacts compiled for the last step in the run.

None

Returns:

Type Description
List[str]

A list of paths to artifact files that can be used to retrieve the generated artifacts.

Example

List artifacts of a dbt Cloud job run:

from prefect import flow

from prefect_dbt.cloud import DbtCloudCredentials
from prefect_dbt.cloud.jobs import list_dbt_cloud_run_artifacts

@flow
def list_artifacts_flow():
    credentials = DbtCloudCredentials(api_key="my_api_key", account_id=123456789)

    return list_dbt_cloud_run_artifacts(
        dbt_cloud_credentials=credentials,
        run_id=42
    )

list_artifacts_flow()
Source code in prefect_dbt/cloud/runs.py
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
@task(
    name="List dbt Cloud job artifacts",
    description="Fetches a list of artifact files generated for a completed run.",
    retries=3,
    retry_delay_seconds=10,
)
async def list_dbt_cloud_run_artifacts(
    dbt_cloud_credentials: DbtCloudCredentials, run_id: int, step: Optional[int] = None
) -> List[str]:
    """
    A task to list the artifact files generated for a completed run.

    Args:
        dbt_cloud_credentials: Credentials for authenticating with dbt Cloud.
        run_id: The ID of the run to list run artifacts for.
        step: The index of the step in the run to query for artifacts. The
            first step in the run has the index 1. If the step parameter is
            omitted, then this method will return the artifacts compiled
            for the last step in the run.

    Returns:
        A list of paths to artifact files that can be used to retrieve the generated artifacts.

    Example:
        List artifacts of a dbt Cloud job run:
        ```python
        from prefect import flow

        from prefect_dbt.cloud import DbtCloudCredentials
        from prefect_dbt.cloud.jobs import list_dbt_cloud_run_artifacts

        @flow
        def list_artifacts_flow():
            credentials = DbtCloudCredentials(api_key="my_api_key", account_id=123456789)

            return list_dbt_cloud_run_artifacts(
                dbt_cloud_credentials=credentials,
                run_id=42
            )

        list_artifacts_flow()
        ```
    """  # noqa
    try:
        async with dbt_cloud_credentials.get_administrative_client() as client:
            response = await client.list_run_artifacts(run_id=run_id, step=step)
    except HTTPStatusError as ex:
        raise DbtCloudListRunArtifactsFailed(extract_user_message(ex)) from ex
    return response.json()["data"]

wait_for_dbt_cloud_job_run(run_id, dbt_cloud_credentials, max_wait_seconds=900, poll_frequency_seconds=10) async

Waits for the given dbt Cloud job run to finish running.

Parameters:

Name Type Description Default
run_id int

The ID of the run to wait for.

required
dbt_cloud_credentials DbtCloudCredentials

Credentials for authenticating with dbt Cloud.

required
max_wait_seconds int

Maximum number of seconds to wait for job to complete

900
poll_frequency_seconds int

Number of seconds to wait in between checks for run completion.

10

Raises:

Type Description
DbtCloudJobRunTimedOut

When the elapsed wait time exceeds max_wait_seconds.

Returns:

Name Type Description
run_status DbtCloudJobRunStatus

An enum representing the final dbt Cloud job run status

run_data Dict

A dictionary containing information about the run after completion.

Source code in prefect_dbt/cloud/runs.py
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
@flow(
    name="Wait for dbt Cloud job run",
    description="Waits for a dbt Cloud job run to finish running.",
)
async def wait_for_dbt_cloud_job_run(
    run_id: int,
    dbt_cloud_credentials: DbtCloudCredentials,
    max_wait_seconds: int = 900,
    poll_frequency_seconds: int = 10,
) -> Tuple[DbtCloudJobRunStatus, Dict]:
    """
    Waits for the given dbt Cloud job run to finish running.

    Args:
        run_id: The ID of the run to wait for.
        dbt_cloud_credentials: Credentials for authenticating with dbt Cloud.
        max_wait_seconds: Maximum number of seconds to wait for job to complete
        poll_frequency_seconds: Number of seconds to wait in between checks for
            run completion.

    Raises:
        DbtCloudJobRunTimedOut: When the elapsed wait time exceeds `max_wait_seconds`.

    Returns:
        run_status: An enum representing the final dbt Cloud job run status
        run_data: A dictionary containing information about the run after completion.


    Example:


    """
    logger = get_run_logger()
    seconds_waited_for_run_completion = 0
    wait_for = []
    while seconds_waited_for_run_completion <= max_wait_seconds:
        run_data_future = await get_dbt_cloud_run_info(
            dbt_cloud_credentials=dbt_cloud_credentials,
            run_id=run_id,
            wait_for=wait_for,
        )
        run_data = run_data_future
        run_status_code = run_data.get("status")

        if DbtCloudJobRunStatus.is_terminal_status_code(run_status_code):
            return DbtCloudJobRunStatus(run_status_code), run_data

        wait_for = [run_data_future]
        logger.debug(
            "dbt Cloud job run with ID %i has status %s. Waiting for %i seconds.",
            run_id,
            DbtCloudJobRunStatus(run_status_code).name,
            poll_frequency_seconds,
        )
        await asyncio.sleep(poll_frequency_seconds)
        seconds_waited_for_run_completion += poll_frequency_seconds

    raise DbtCloudJobRunTimedOut(
        f"Max wait time of {max_wait_seconds} seconds exceeded while waiting "
        "for job run with ID {run_id}"
    )