prefect_dbt.cloud.utils
Utilities for common interactions with the dbt Cloud API
DbtCloudAdministrativeApiCallFailed
Bases: Exception
Raised when a call to dbt Cloud administrative API fails.
Source code in prefect_dbt/cloud/utils.py
45 46 |
|
call_dbt_cloud_administrative_api_endpoint(dbt_cloud_credentials, path, http_method, params=None, json=None)
async
Task that calls a specified endpoint in the dbt Cloud administrative API. Use this task if a prebuilt one is not yet available.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
dbt_cloud_credentials |
DbtCloudCredentials
|
Credentials for authenticating with dbt Cloud. |
required |
path |
str
|
The partial path for the request (e.g. /projects/). Will be appended onto the base URL as determined by the client configuration. |
required |
http_method |
str
|
HTTP method to call on the endpoint. |
required |
params |
Optional[Dict[str, Any]]
|
Query parameters to include in the request. |
None
|
json |
Optional[Dict[str, Any]]
|
JSON serializable body to send in the request. |
None
|
Returns:
Type | Description |
---|---|
Any
|
The body of the response. If the body is JSON serializable, then the result of
|
Examples:
List projects for an account:
from prefect import flow
from prefect_dbt.cloud import DbtCloudCredentials
from prefect_dbt.cloud.utils import call_dbt_cloud_administrative_api_endpoint
@flow
def get_projects_flow():
credentials = DbtCloudCredentials(api_key="my_api_key", account_id=123456789)
result = call_dbt_cloud_administrative_api_endpoint(
dbt_cloud_credentials=credentials,
path="/projects/",
http_method="GET",
)
return result["data"]
get_projects_flow()
Create a new job:
from prefect import flow
from prefect_dbt.cloud import DbtCloudCredentials
from prefect_dbt.cloud.utils import call_dbt_cloud_administrative_api_endpoint
@flow
def create_job_flow():
credentials = DbtCloudCredentials(api_key="my_api_key", account_id=123456789)
result = call_dbt_cloud_administrative_api_endpoint(
dbt_cloud_credentials=credentials,
path="/jobs/",
http_method="POST",
json={
"id": None,
"account_id": 123456789,
"project_id": 100,
"environment_id": 10,
"name": "Nightly run",
"dbt_version": None,
"triggers": {"github_webhook": True, "schedule": True},
"execute_steps": ["dbt run", "dbt test", "dbt source snapshot-freshness"],
"settings": {"threads": 4, "target_name": "prod"},
"state": 1,
"schedule": {
"date": {"type": "every_day"},
"time": {"type": "every_hour", "interval": 1},
},
},
)
return result["data"]
create_job_flow()
Source code in prefect_dbt/cloud/utils.py
49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 |
|
extract_developer_message(ex)
Extracts developer message from a error response from the dbt Cloud administrative API.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
ex |
HTTPStatusError
|
An HTTPStatusError raised by httpx |
required |
Returns:
Type | Description |
---|---|
Optional[str]
|
developer_message from dbt Cloud administrative API response or None if a |
Optional[str]
|
developer_message cannot be extracted |
Source code in prefect_dbt/cloud/utils.py
28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 |
|
extract_user_message(ex)
Extracts user message from a error response from the dbt Cloud administrative API.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
ex |
HTTPStatusError
|
An HTTPStatusError raised by httpx |
required |
Returns:
Type | Description |
---|---|
Optional[str]
|
user_message from dbt Cloud administrative API response or None if a |
Optional[str]
|
user_message cannot be extracted |
Source code in prefect_dbt/cloud/utils.py
12 13 14 15 16 17 18 19 20 21 22 23 24 25 |
|