Skip to content

prefect_databricks.flows

Module containing flows for interacting with Databricks

DatabricksJobInternalError

Bases: Exception

Raised when Databricks jobs runs submit encounters internal error

Source code in prefect_databricks/flows.py
39
40
class DatabricksJobInternalError(Exception):
    """Raised when Databricks jobs runs submit encounters internal error"""

DatabricksJobRunTimedOut

Bases: Exception

Raised when Databricks jobs runs does not complete in the configured max wait seconds

Source code in prefect_databricks/flows.py
43
44
45
46
47
class DatabricksJobRunTimedOut(Exception):
    """
    Raised when Databricks jobs runs does not complete in the configured max
    wait seconds
    """

DatabricksJobSkipped

Bases: Exception

Raised when Databricks jobs runs submit skips

Source code in prefect_databricks/flows.py
35
36
class DatabricksJobSkipped(Exception):
    """Raised when Databricks jobs runs submit skips"""

DatabricksJobTerminated

Bases: Exception

Raised when Databricks jobs runs submit terminates

Source code in prefect_databricks/flows.py
31
32
class DatabricksJobTerminated(Exception):
    """Raised when Databricks jobs runs submit terminates"""

jobs_runs_submit_and_wait_for_completion(databricks_credentials, tasks=None, run_name=None, max_wait_seconds=900, poll_frequency_seconds=10, git_source=None, timeout_seconds=None, idempotency_token=None, access_control_list=None, return_metadata=False, job_submission_handler=None, **jobs_runs_submit_kwargs) async

Flow that triggers a job run and waits for the triggered run to complete.

Parameters:

Name Type Description Default
databricks_credentials DatabricksCredentials

Credentials to use for authentication with Databricks.

required
tasks Optional[List[RunSubmitTaskSettings]]

Tasks to run, e.g.

[
    {
        "task_key": "Sessionize",
        "description": "Extracts session data from events",
        "depends_on": [],
        "existing_cluster_id": "0923-164208-meows279",
        "spark_jar_task": {
            "main_class_name": "com.databricks.Sessionize",
            "parameters": ["--data", "dbfs:/path/to/data.json"],
        },
        "libraries": [{"jar": "dbfs:/mnt/databricks/Sessionize.jar"}],
        "timeout_seconds": 86400,
    },
    {
        "task_key": "Orders_Ingest",
        "description": "Ingests order data",
        "depends_on": [],
        "existing_cluster_id": "0923-164208-meows279",
        "spark_jar_task": {
            "main_class_name": "com.databricks.OrdersIngest",
            "parameters": ["--data", "dbfs:/path/to/order-data.json"],
        },
        "libraries": [{"jar": "dbfs:/mnt/databricks/OrderIngest.jar"}],
        "timeout_seconds": 86400,
    },
    {
        "task_key": "Match",
        "description": "Matches orders with user sessions",
        "depends_on": [
            {"task_key": "Orders_Ingest"},
            {"task_key": "Sessionize"},
        ],
        "new_cluster": {
            "spark_version": "7.3.x-scala2.12",
            "node_type_id": "i3.xlarge",
            "spark_conf": {"spark.speculation": True},
            "aws_attributes": {
                "availability": "SPOT",
                "zone_id": "us-west-2a",
            },
            "autoscale": {"min_workers": 2, "max_workers": 16},
        },
        "notebook_task": {
            "notebook_path": "/Users/user.name@databricks.com/Match",
            "base_parameters": {"name": "John Doe", "age": "35"},
        },
        "timeout_seconds": 86400,
    },
]
None
run_name Optional[str]

An optional name for the run. The default value is Untitled, e.g. A multitask job run.

None
git_source Optional[GitSource]

This functionality is in Public Preview. An optional specification for a remote repository containing the notebooks used by this job's notebook tasks. Key-values: - git_url: URL of the repository to be cloned by this job. The maximum length is 300 characters, e.g. https://github.com/databricks/databricks-cli. - git_provider: Unique identifier of the service used to host the Git repository. The value is case insensitive, e.g. github. - git_branch: Name of the branch to be checked out and used by this job. This field cannot be specified in conjunction with git_tag or git_commit. The maximum length is 255 characters, e.g. main. - git_tag: Name of the tag to be checked out and used by this job. This field cannot be specified in conjunction with git_branch or git_commit. The maximum length is 255 characters, e.g. release-1.0.0. - git_commit: Commit to be checked out and used by this job. This field cannot be specified in conjunction with git_branch or git_tag. The maximum length is 64 characters, e.g. e0056d01. - git_snapshot: Read-only state of the remote repository at the time the job was run. This field is only included on job runs.

None
timeout_seconds Optional[int]

An optional timeout applied to each run of this job. The default behavior is to have no timeout, e.g. 86400.

None
idempotency_token Optional[str]

An optional token that can be used to guarantee the idempotency of job run requests. If a run with the provided token already exists, the request does not create a new run but returns the ID of the existing run instead. If a run with the provided token is deleted, an error is returned. If you specify the idempotency token, upon failure you can retry until the request succeeds. Databricks guarantees that exactly one run is launched with that idempotency token. This token must have at most 64 characters. For more information, see How to ensure idempotency for jobs, e.g. 8f018174-4792-40d5-bcbc-3e6a527352c8.

None
access_control_list Optional[List[AccessControlRequest]]

List of permissions to set on the job.

None
max_wait_seconds int

Maximum number of seconds to wait for the entire flow to complete.

900
poll_frequency_seconds int

Number of seconds to wait in between checks for run completion.

10
return_metadata bool

When True, method will return a tuple of notebook output as well as job run metadata; by default though, the method only returns notebook output

False
job_submission_handler Optional[Callable]

An optional callable to intercept job submission.

None
**jobs_runs_submit_kwargs Dict[str, Any]

Additional keyword arguments to pass to jobs_runs_submit.

{}

Returns:

Type Description
Union[NotebookOutput, Tuple[NotebookOutput, JobMetadata], None]

Either a dict or a tuple (depends on return_metadata) comprised of

Union[NotebookOutput, Tuple[NotebookOutput, JobMetadata], None]
  • task_notebook_outputs: dictionary of task keys to its corresponding notebook output; this is the only object returned by default from this method
Union[NotebookOutput, Tuple[NotebookOutput, JobMetadata], None]
  • jobs_runs_metadata: dictionary containing IDs of the jobs runs tasks; this is only returned if return_metadata=True.

Examples:

Submit jobs runs and wait.

from prefect import flow
from prefect_databricks import DatabricksCredentials
from prefect_databricks.flows import jobs_runs_submit_and_wait_for_completion
from prefect_databricks.models.jobs import (
    AutoScale,
    AwsAttributes,
    JobTaskSettings,
    NotebookTask,
    NewCluster,
)

@flow
def jobs_runs_submit_and_wait_for_completion_flow(notebook_path, **base_parameters):
    databricks_credentials = await DatabricksCredentials.load("BLOCK_NAME")

    # specify new cluster settings
    aws_attributes = AwsAttributes(
        availability="SPOT",
        zone_id="us-west-2a",
        ebs_volume_type="GENERAL_PURPOSE_SSD",
        ebs_volume_count=3,
        ebs_volume_size=100,
    )
    auto_scale = AutoScale(min_workers=1, max_workers=2)
    new_cluster = NewCluster(
        aws_attributes=aws_attributes,
        autoscale=auto_scale,
        node_type_id="m4.large",
        spark_version="10.4.x-scala2.12",
        spark_conf={"spark.speculation": True},
    )

    # specify notebook to use and parameters to pass
    notebook_task = NotebookTask(
        notebook_path=notebook_path,
        base_parameters=base_parameters,
    )

    # compile job task settings
    job_task_settings = JobTaskSettings(
        new_cluster=new_cluster,
        notebook_task=notebook_task,
        task_key="prefect-task"
    )

    multi_task_runs = jobs_runs_submit_and_wait_for_completion(
        databricks_credentials=databricks_credentials,
        run_name="prefect-job",
        tasks=[job_task_settings]
    )

    return multi_task_runs
Source code in prefect_databricks/flows.py
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
@flow(
    name="Submit jobs runs and wait for completion",
    description=(
        "Triggers a Databricks jobs runs and waits for the "
        "triggered runs to complete."
    ),
)
async def jobs_runs_submit_and_wait_for_completion(
    databricks_credentials: DatabricksCredentials,
    tasks: Optional[List[RunSubmitTaskSettings]] = None,
    run_name: Optional[str] = None,
    max_wait_seconds: int = 900,
    poll_frequency_seconds: int = 10,
    git_source: Optional[GitSource] = None,
    timeout_seconds: Optional[int] = None,
    idempotency_token: Optional[str] = None,
    access_control_list: Optional[List[AccessControlRequest]] = None,
    return_metadata: bool = False,
    job_submission_handler: Optional[Callable] = None,
    **jobs_runs_submit_kwargs: Dict[str, Any],
) -> Union[NotebookOutput, Tuple[NotebookOutput, JobMetadata], None]:
    """
    Flow that triggers a job run and waits for the triggered run to complete.

    Args:
        databricks_credentials:
            Credentials to use for authentication with Databricks.
        tasks: Tasks to run, e.g.
            ```
            [
                {
                    "task_key": "Sessionize",
                    "description": "Extracts session data from events",
                    "depends_on": [],
                    "existing_cluster_id": "0923-164208-meows279",
                    "spark_jar_task": {
                        "main_class_name": "com.databricks.Sessionize",
                        "parameters": ["--data", "dbfs:/path/to/data.json"],
                    },
                    "libraries": [{"jar": "dbfs:/mnt/databricks/Sessionize.jar"}],
                    "timeout_seconds": 86400,
                },
                {
                    "task_key": "Orders_Ingest",
                    "description": "Ingests order data",
                    "depends_on": [],
                    "existing_cluster_id": "0923-164208-meows279",
                    "spark_jar_task": {
                        "main_class_name": "com.databricks.OrdersIngest",
                        "parameters": ["--data", "dbfs:/path/to/order-data.json"],
                    },
                    "libraries": [{"jar": "dbfs:/mnt/databricks/OrderIngest.jar"}],
                    "timeout_seconds": 86400,
                },
                {
                    "task_key": "Match",
                    "description": "Matches orders with user sessions",
                    "depends_on": [
                        {"task_key": "Orders_Ingest"},
                        {"task_key": "Sessionize"},
                    ],
                    "new_cluster": {
                        "spark_version": "7.3.x-scala2.12",
                        "node_type_id": "i3.xlarge",
                        "spark_conf": {"spark.speculation": True},
                        "aws_attributes": {
                            "availability": "SPOT",
                            "zone_id": "us-west-2a",
                        },
                        "autoscale": {"min_workers": 2, "max_workers": 16},
                    },
                    "notebook_task": {
                        "notebook_path": "/Users/user.name@databricks.com/Match",
                        "base_parameters": {"name": "John Doe", "age": "35"},
                    },
                    "timeout_seconds": 86400,
                },
            ]
            ```
        run_name:
            An optional name for the run. The default value is `Untitled`, e.g. `A
            multitask job run`.
        git_source:
            This functionality is in Public Preview.  An optional specification for
            a remote repository containing the notebooks used by this
            job's notebook tasks. Key-values:
            - git_url:
                URL of the repository to be cloned by this job. The maximum
                length is 300 characters, e.g.
                `https://github.com/databricks/databricks-cli`.
            - git_provider:
                Unique identifier of the service used to host the Git
                repository. The value is case insensitive, e.g. `github`.
            - git_branch:
                Name of the branch to be checked out and used by this job.
                This field cannot be specified in conjunction with git_tag
                or git_commit. The maximum length is 255 characters, e.g.
                `main`.
            - git_tag:
                Name of the tag to be checked out and used by this job. This
                field cannot be specified in conjunction with git_branch or
                git_commit. The maximum length is 255 characters, e.g.
                `release-1.0.0`.
            - git_commit:
                Commit to be checked out and used by this job. This field
                cannot be specified in conjunction with git_branch or
                git_tag. The maximum length is 64 characters, e.g.
                `e0056d01`.
            - git_snapshot:
                Read-only state of the remote repository at the time the job was run.
                            This field is only included on job runs.
        timeout_seconds:
            An optional timeout applied to each run of this job. The default
            behavior is to have no timeout, e.g. `86400`.
        idempotency_token:
            An optional token that can be used to guarantee the idempotency of job
            run requests. If a run with the provided token already
            exists, the request does not create a new run but returns
            the ID of the existing run instead. If a run with the
            provided token is deleted, an error is returned.  If you
            specify the idempotency token, upon failure you can retry
            until the request succeeds. Databricks guarantees that
            exactly one run is launched with that idempotency token.
            This token must have at most 64 characters.  For more
            information, see [How to ensure idempotency for
            jobs](https://kb.databricks.com/jobs/jobs-idempotency.html),
            e.g. `8f018174-4792-40d5-bcbc-3e6a527352c8`.
        access_control_list:
            List of permissions to set on the job.
        max_wait_seconds: Maximum number of seconds to wait for the entire flow to complete.
        poll_frequency_seconds: Number of seconds to wait in between checks for
            run completion.
        return_metadata: When True, method will return a tuple of notebook output as well as
            job run metadata; by default though, the method only returns notebook output
        job_submission_handler: An optional callable to intercept job submission.
        **jobs_runs_submit_kwargs: Additional keyword arguments to pass to `jobs_runs_submit`.

    Returns:
        Either a dict or a tuple (depends on `return_metadata`) comprised of
        * task_notebook_outputs: dictionary of task keys to its corresponding notebook output;
          this is the only object returned by default from this method
        * jobs_runs_metadata: dictionary containing IDs of the jobs runs tasks; this is only
          returned if `return_metadata=True`.

    Examples:
        Submit jobs runs and wait.
        ```python
        from prefect import flow
        from prefect_databricks import DatabricksCredentials
        from prefect_databricks.flows import jobs_runs_submit_and_wait_for_completion
        from prefect_databricks.models.jobs import (
            AutoScale,
            AwsAttributes,
            JobTaskSettings,
            NotebookTask,
            NewCluster,
        )

        @flow
        def jobs_runs_submit_and_wait_for_completion_flow(notebook_path, **base_parameters):
            databricks_credentials = await DatabricksCredentials.load("BLOCK_NAME")

            # specify new cluster settings
            aws_attributes = AwsAttributes(
                availability="SPOT",
                zone_id="us-west-2a",
                ebs_volume_type="GENERAL_PURPOSE_SSD",
                ebs_volume_count=3,
                ebs_volume_size=100,
            )
            auto_scale = AutoScale(min_workers=1, max_workers=2)
            new_cluster = NewCluster(
                aws_attributes=aws_attributes,
                autoscale=auto_scale,
                node_type_id="m4.large",
                spark_version="10.4.x-scala2.12",
                spark_conf={"spark.speculation": True},
            )

            # specify notebook to use and parameters to pass
            notebook_task = NotebookTask(
                notebook_path=notebook_path,
                base_parameters=base_parameters,
            )

            # compile job task settings
            job_task_settings = JobTaskSettings(
                new_cluster=new_cluster,
                notebook_task=notebook_task,
                task_key="prefect-task"
            )

            multi_task_runs = jobs_runs_submit_and_wait_for_completion(
                databricks_credentials=databricks_credentials,
                run_name="prefect-job",
                tasks=[job_task_settings]
            )

            return multi_task_runs
        ```
    """  # noqa
    logger = get_run_logger()

    # submit the jobs runs
    multi_task_jobs_runs_future = jobs_runs_submit.submit(
        databricks_credentials=databricks_credentials,
        tasks=tasks,
        run_name=run_name,
        git_source=git_source,
        timeout_seconds=timeout_seconds,
        idempotency_token=idempotency_token,
        access_control_list=access_control_list,
        **jobs_runs_submit_kwargs,
    )
    multi_task_jobs_runs = multi_task_jobs_runs_future.result()
    if job_submission_handler:
        result = job_submission_handler(multi_task_jobs_runs)
        if inspect.isawaitable(result):
            await result
    multi_task_jobs_runs_id = multi_task_jobs_runs["run_id"]

    # wait for all the jobs runs to complete in a separate flow
    # for a cleaner radar interface
    jobs_runs_state, jobs_runs_metadata = await jobs_runs_wait_for_completion(
        multi_task_jobs_runs_id=multi_task_jobs_runs_id,
        databricks_credentials=databricks_credentials,
        run_name=run_name,
        max_wait_seconds=max_wait_seconds,
        poll_frequency_seconds=poll_frequency_seconds,
    )

    # fetch the state results
    jobs_runs_life_cycle_state = jobs_runs_state["life_cycle_state"]
    jobs_runs_state_message = jobs_runs_state["state_message"]

    # return results or raise error
    if jobs_runs_life_cycle_state == RunLifeCycleState.terminated.value:
        jobs_runs_result_state = jobs_runs_state.get("result_state", None)
        if jobs_runs_result_state == RunResultState.success.value:
            task_notebook_outputs = {}
            for task in jobs_runs_metadata["tasks"]:
                task_key = task["task_key"]
                task_run_id = task["run_id"]
                task_run_output_future = jobs_runs_get_output.submit(
                    run_id=task_run_id,
                    databricks_credentials=databricks_credentials,
                )
                task_run_output = task_run_output_future.result()
                task_run_notebook_output = task_run_output.get("notebook_output", {})
                task_notebook_outputs[task_key] = task_run_notebook_output
            logger.info(
                "Databricks Jobs Runs Submit (%s ID %s) completed successfully!",
                run_name,
                multi_task_jobs_runs_id,
            )
            if return_metadata:
                return task_notebook_outputs, jobs_runs_metadata
            return task_notebook_outputs
        else:
            raise DatabricksJobTerminated(
                f"Databricks Jobs Runs Submit "
                f"({run_name} ID {multi_task_jobs_runs_id}) "
                f"terminated with result state, {jobs_runs_result_state}: "
                f"{jobs_runs_state_message}"
            )
    elif jobs_runs_life_cycle_state == RunLifeCycleState.skipped.value:
        raise DatabricksJobSkipped(
            f"Databricks Jobs Runs Submit ({run_name} ID "
            f"{multi_task_jobs_runs_id}) was skipped: {jobs_runs_state_message}.",
        )
    elif jobs_runs_life_cycle_state == RunLifeCycleState.internalerror.value:
        raise DatabricksJobInternalError(
            f"Databricks Jobs Runs Submit ({run_name} ID "
            f"{multi_task_jobs_runs_id}) "
            f"encountered an internal error: {jobs_runs_state_message}.",
        )

jobs_runs_submit_by_id_and_wait_for_completion(databricks_credentials, job_id, idempotency_token=None, jar_params=None, max_wait_seconds=900, poll_frequency_seconds=10, notebook_params=None, python_params=None, spark_submit_params=None, python_named_params=None, pipeline_params=None, sql_params=None, dbt_commands=None, job_submission_handler=None, **jobs_runs_submit_kwargs) async

flow that triggers an existing job and waits for its completion

Parameters:

Name Type Description Default
databricks_credentials DatabricksCredentials

Credentials to use for authentication with Databricks.

required
job_id int

Id of the databricks job.

required
idempotency_token Optional[str]

An optional token that can be used to guarantee the idempotency of job run requests. If a run with the provided token already exists, the request does not create a new run but returns the ID of the existing run instead. If a run with the provided token is deleted, an error is returned. If you specify the idempotency token, upon failure you can retry until the request succeeds. Databricks guarantees that exactly one run is launched with that idempotency token. This token must have at most 64 characters. For more information, see How to ensure idempotency for jobs, e.g. 8f018174-4792-40d5-bcbc-3e6a527352c8.

None
jar_params Optional[List[str]]

A list of parameters for jobs with Spark JAR tasks, for example "jar_params" : ["john doe", "35"]. The parameters are used to invoke the main function of the main class specified in the Spark JAR task. If not specified upon run- now, it defaults to an empty list. jar_params cannot be specified in conjunction with notebook_params. The JSON representation of this field (for example {"jar_params": ["john doe","35"]}) cannot exceed 10,000 bytes.

None
max_wait_seconds int

Maximum number of seconds to wait for the entire flow to complete.

900
poll_frequency_seconds int

Number of seconds to wait in between checks for run completion.

10
notebook_params Optional[Dict]

A map from keys to values for jobs with notebook task, for example "notebook_params": {"name": "john doe", "age": "35"}. The map is passed to the notebook and is accessible through the dbutils.widgets.get function. If not specified upon run-now, the triggered run uses the job’s base parameters. notebook_params cannot be specified in conjunction with jar_params. Use Task parameter variables to set parameters containing information about job runs. The JSON representation of this field (for example {"notebook_params":{"name":"john doe","age":"35"}}) cannot exceed 10,000 bytes.

None
python_params Optional[List[str]]

A list of parameters for jobs with Python tasks, for example "python_params" :["john doe", "35"]. The parameters are passed to Python file as command- line parameters. If specified upon run-now, it would overwrite the parameters specified in job setting. The JSON representation of this field (for example {"python_params":["john doe","35"]}) cannot exceed 10,000 bytes Use Task parameter variables to set parameters containing information about job runs. These parameters accept only Latin characters (ASCII character set). Using non-ASCII characters returns an error. Examples of invalid, non-ASCII characters are Chinese, Japanese kanjis, and emojis.

None
spark_submit_params Optional[List[str]]

A list of parameters for jobs with spark submit task, for example "spark_submit_params": ["--class", "org.apache.spark.examples.SparkPi"]. The parameters are passed to spark-submit script as command-line parameters. If specified upon run-now, it would overwrite the parameters specified in job setting. The JSON representation of this field (for example {"python_params":["john doe","35"]}) cannot exceed 10,000 bytes. Use Task parameter variables to set parameters containing information about job runs. These parameters accept only Latin characters (ASCII character set). Using non-ASCII characters returns an error. Examples of invalid, non-ASCII characters are Chinese, Japanese kanjis, and emojis.

None
python_named_params Optional[Dict]

A map from keys to values for jobs with Python wheel task, for example "python_named_params": {"name": "task", "data": "dbfs:/path/to/data.json"}.

None
pipeline_params Optional[str]

If full_refresh is set to true, trigger a full refresh on the delta live table e.g.

    "pipeline_params": {"full_refresh": true}
None
sql_params Optional[Dict]

A map from keys to values for SQL tasks, for example "sql_params": {"name": "john doe", "age": "35"}. The SQL alert task does not support custom parameters.

None
dbt_commands Optional[List]

An array of commands to execute for jobs with the dbt task, for example "dbt_commands": ["dbt deps", "dbt seed", "dbt run"]

None
job_submission_handler Optional[Callable]

An optional callable to intercept job submission

None

Raises:

Type Description
DatabricksJobTerminated

Raised when the Databricks job run is terminated with a non-successful result state.

DatabricksJobSkipped

Raised when the Databricks job run is skipped.

DatabricksJobInternalError

Raised when the Databricks job run encounters an internal error.

Returns:

Name Type Description
Dict Dict

A dictionary containing information about the completed job run.

Example
from prefect import flow
from prefect_databricks import DatabricksCredentials
from prefect_databricks.flows import (
    jobs_runs_submit_by_id_and_wait_for_completion,
)


@flow
def submit_existing_job(block_name: str, job_id):
    databricks_credentials = DatabricksCredentials.load(block_name)

    run = jobs_runs_submit_by_id_and_wait_for_completion(
        databricks_credentials=databricks_credentials, job_id=job_id
    )

    return run


submit_existing_job(block_name="db-creds", job_id=db_job_id)
Source code in prefect_databricks/flows.py
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
@flow(
    name="Submit existing job runs and wait for completion",
    description=(
        "Triggers a Databricks jobs runs and waits for the "
        "triggered runs to complete."
    ),
)
async def jobs_runs_submit_by_id_and_wait_for_completion(
    databricks_credentials: DatabricksCredentials,
    job_id: int,
    idempotency_token: Optional[str] = None,
    jar_params: Optional[List[str]] = None,
    max_wait_seconds: int = 900,
    poll_frequency_seconds: int = 10,
    notebook_params: Optional[Dict] = None,
    python_params: Optional[List[str]] = None,
    spark_submit_params: Optional[List[str]] = None,
    python_named_params: Optional[Dict] = None,
    pipeline_params: Optional[str] = None,
    sql_params: Optional[Dict] = None,
    dbt_commands: Optional[List] = None,
    job_submission_handler: Optional[Callable] = None,
    **jobs_runs_submit_kwargs: Dict[str, Any],
) -> Dict:
    """flow that triggers an existing job and waits for its completion

    Args:
        databricks_credentials: Credentials to use for authentication with Databricks.
        job_id: Id of the databricks job.
        idempotency_token:
            An optional token that can be used to guarantee the idempotency of job
            run requests. If a run with the provided token already
            exists, the request does not create a new run but returns
            the ID of the existing run instead. If a run with the
            provided token is deleted, an error is returned.  If you
            specify the idempotency token, upon failure you can retry
            until the request succeeds. Databricks guarantees that
            exactly one run is launched with that idempotency token.
            This token must have at most 64 characters.  For more
            information, see [How to ensure idempotency for
            jobs](https://kb.databricks.com/jobs/jobs-idempotency.html),
            e.g. `8f018174-4792-40d5-bcbc-3e6a527352c8`.
        jar_params:
            A list of parameters for jobs with Spark JAR tasks, for example "jar_params"
            : ["john doe", "35"]. The parameters are used to invoke the main function of
            the main class specified in the Spark JAR task. If not specified upon run-
            now, it defaults to an empty list. jar_params cannot be specified in
            conjunction with notebook_params. The JSON representation of this field (for
            example {"jar_params": ["john doe","35"]}) cannot exceed 10,000 bytes.
        max_wait_seconds:
            Maximum number of seconds to wait for the entire flow to complete.
        poll_frequency_seconds: Number of seconds to wait in between checks for
            run completion.
        notebook_params:
            A map from keys to values for jobs with notebook task, for example
            "notebook_params": {"name": "john doe", "age": "35"}. The map is
            passed to the notebook and is accessible through the dbutils.widgets.get
            function. If not specified upon run-now, the triggered run uses the job’s
            base parameters. notebook_params cannot be specified in conjunction with
            jar_params. Use Task parameter variables to set parameters containing
            information about job runs. The JSON representation of this field
            (for example {"notebook_params":{"name":"john doe","age":"35"}}) cannot
            exceed 10,000 bytes.
        python_params:
            A list of parameters for jobs with Python tasks, for example "python_params"
            :["john doe", "35"]. The parameters are passed to Python file as command-
            line parameters. If specified upon run-now, it would overwrite the
            parameters specified in job setting. The JSON representation of this field
            (for example {"python_params":["john doe","35"]}) cannot exceed 10,000 bytes
            Use Task parameter variables to set parameters containing information
            about job runs. These parameters accept only Latin characters (ASCII
            character set). Using non-ASCII characters returns an error. Examples of
            invalid, non-ASCII characters are Chinese, Japanese kanjis, and emojis.
        spark_submit_params:
            A list of parameters for jobs with spark submit task, for example
            "spark_submit_params": ["--class", "org.apache.spark.examples.SparkPi"].
            The parameters are passed to spark-submit script as command-line parameters.
            If specified upon run-now, it would overwrite the parameters specified in
            job setting. The JSON representation of this field (for example
            {"python_params":["john doe","35"]}) cannot exceed 10,000 bytes.
            Use Task parameter variables to set parameters containing information about
            job runs. These parameters accept only Latin characters (ASCII character
            set). Using non-ASCII characters returns an error. Examples of invalid,
            non-ASCII characters are Chinese, Japanese kanjis, and emojis.
        python_named_params:
            A map from keys to values for jobs with Python wheel task, for example
            "python_named_params": {"name": "task", "data": "dbfs:/path/to/data.json"}.
        pipeline_params:
            If `full_refresh` is set to true, trigger a full refresh on the
            delta live table e.g.
            ```
                "pipeline_params": {"full_refresh": true}
            ```
        sql_params:
            A map from keys to values for SQL tasks, for example "sql_params":
            {"name": "john doe", "age": "35"}. The SQL alert task does not support
            custom parameters.
        dbt_commands:
            An array of commands to execute for jobs with the dbt task,
            for example "dbt_commands": ["dbt deps", "dbt seed", "dbt run"]
        job_submission_handler: An optional callable to intercept job submission

    Raises:
        DatabricksJobTerminated:
            Raised when the Databricks job run is terminated with a non-successful
            result state.
        DatabricksJobSkipped: Raised when the Databricks job run is skipped.
        DatabricksJobInternalError:
            Raised when the Databricks job run encounters an internal error.

    Returns:
        Dict: A dictionary containing information about the completed job run.

    Example:
        ```python
        from prefect import flow
        from prefect_databricks import DatabricksCredentials
        from prefect_databricks.flows import (
            jobs_runs_submit_by_id_and_wait_for_completion,
        )


        @flow
        def submit_existing_job(block_name: str, job_id):
            databricks_credentials = DatabricksCredentials.load(block_name)

            run = jobs_runs_submit_by_id_and_wait_for_completion(
                databricks_credentials=databricks_credentials, job_id=job_id
            )

            return run


        submit_existing_job(block_name="db-creds", job_id=db_job_id)
        ```
    """
    logger = get_run_logger()

    # submit the jobs runs

    jobs_runs_future = jobs_run_now.submit(
        databricks_credentials=databricks_credentials,
        job_id=job_id,
        idempotency_token=idempotency_token,
        jar_params=jar_params,
        notebook_params=notebook_params,
        python_params=python_params,
        spark_submit_params=spark_submit_params,
        python_named_params=python_named_params,
        pipeline_params=pipeline_params,
        sql_params=sql_params,
        dbt_commands=dbt_commands,
        **jobs_runs_submit_kwargs,
    )

    jobs_runs = jobs_runs_future.result()

    if job_submission_handler:
        result = job_submission_handler(jobs_runs)
        if inspect.isawaitable(result):
            await result
    job_run_id = jobs_runs["run_id"]

    # wait for all the jobs runs to complete in a separate flow
    # for a cleaner radar interface
    jobs_runs_state, jobs_runs_metadata = await jobs_runs_wait_for_completion(
        multi_task_jobs_runs_id=job_run_id,
        databricks_credentials=databricks_credentials,
        max_wait_seconds=max_wait_seconds,
        poll_frequency_seconds=poll_frequency_seconds,
    )

    # fetch the state results
    jobs_runs_life_cycle_state = jobs_runs_state["life_cycle_state"]
    jobs_runs_state_message = jobs_runs_state["state_message"]

    # return results or raise error
    if jobs_runs_life_cycle_state == RunLifeCycleState.terminated.value:
        jobs_runs_result_state = jobs_runs_state.get("result_state", None)
        if jobs_runs_result_state == RunResultState.success.value:
            task_notebook_outputs = {}
            for task in jobs_runs_metadata["tasks"]:
                task_key = task["task_key"]
                task_run_id = task["run_id"]
                task_run_output_future = jobs_runs_get_output.submit(
                    run_id=task_run_id,
                    databricks_credentials=databricks_credentials,
                )
                task_run_output = task_run_output_future.result()
                task_run_notebook_output = task_run_output.get("notebook_output", {})
                task_notebook_outputs[task_key] = task_run_notebook_output
            logger.info(
                f"Databricks Jobs Runs Submit {job_id} completed successfully!",
            )
            return task_notebook_outputs
        else:
            raise DatabricksJobTerminated(
                f"Databricks Jobs Runs Submit ID {job_id} "
                f"terminated with result state, {jobs_runs_result_state}: "
                f"{jobs_runs_state_message}"
            )
    elif jobs_runs_life_cycle_state == RunLifeCycleState.skipped.value:
        raise DatabricksJobSkipped(
            f"Databricks Jobs Runs Submit ID "
            f"{job_id} was skipped: {jobs_runs_state_message}.",
        )
    elif jobs_runs_life_cycle_state == RunLifeCycleState.internalerror.value:
        raise DatabricksJobInternalError(
            f"Databricks Jobs Runs Submit ID "
            f"{job_id} "
            f"encountered an internal error: {jobs_runs_state_message}.",
        )

jobs_runs_wait_for_completion(multi_task_jobs_runs_id, databricks_credentials, run_name=None, max_wait_seconds=900, poll_frequency_seconds=10) async

Flow that triggers a job run and waits for the triggered run to complete.

Parameters:

Name Type Description Default
run_name Optional[str]

The name of the jobs runs task.

None
multi_task_jobs_run_id

The ID of the jobs runs task to watch.

required
databricks_credentials DatabricksCredentials

Credentials to use for authentication with Databricks.

required
max_wait_seconds int

Maximum number of seconds to wait for the entire flow to complete.

900
poll_frequency_seconds int

Number of seconds to wait in between checks for run completion.

10

Returns:

Name Type Description
jobs_runs_state

A dict containing the jobs runs life cycle state and message.

jobs_runs_metadata

A dict containing IDs of the jobs runs tasks.

Example

Waits for completion on jobs runs.

from prefect import flow
from prefect_databricks import DatabricksCredentials
from prefect_databricks.flows import jobs_runs_wait_for_completion

@flow
def jobs_runs_wait_for_completion_flow():
    databricks_credentials = DatabricksCredentials.load("BLOCK_NAME")
    return jobs_runs_wait_for_completion(
        multi_task_jobs_run_id=45429,
        databricks_credentials=databricks_credentials,
        run_name="my_run_name",
        max_wait_seconds=1800,  # 30 minutes
        poll_frequency_seconds=120,  # 2 minutes
    )
Source code in prefect_databricks/flows.py
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
@flow(
    name="Wait for completion of jobs runs",
    description="Waits for the jobs runs to finish running",
)
async def jobs_runs_wait_for_completion(
    multi_task_jobs_runs_id: int,
    databricks_credentials: DatabricksCredentials,
    run_name: Optional[str] = None,
    max_wait_seconds: int = 900,
    poll_frequency_seconds: int = 10,
):
    """
    Flow that triggers a job run and waits for the triggered run to complete.

    Args:
        run_name: The name of the jobs runs task.
        multi_task_jobs_run_id: The ID of the jobs runs task to watch.
        databricks_credentials:
            Credentials to use for authentication with Databricks.
        max_wait_seconds:
            Maximum number of seconds to wait for the entire flow to complete.
        poll_frequency_seconds: Number of seconds to wait in between checks for
            run completion.

    Returns:
        jobs_runs_state: A dict containing the jobs runs life cycle state and message.
        jobs_runs_metadata: A dict containing IDs of the jobs runs tasks.

    Example:
        Waits for completion on jobs runs.
        ```python
        from prefect import flow
        from prefect_databricks import DatabricksCredentials
        from prefect_databricks.flows import jobs_runs_wait_for_completion

        @flow
        def jobs_runs_wait_for_completion_flow():
            databricks_credentials = DatabricksCredentials.load("BLOCK_NAME")
            return jobs_runs_wait_for_completion(
                multi_task_jobs_run_id=45429,
                databricks_credentials=databricks_credentials,
                run_name="my_run_name",
                max_wait_seconds=1800,  # 30 minutes
                poll_frequency_seconds=120,  # 2 minutes
            )
        ```
    """
    logger = get_run_logger()

    seconds_waited_for_run_completion = 0
    wait_for = []

    jobs_status = {}
    tasks_status = {}
    while seconds_waited_for_run_completion <= max_wait_seconds:
        jobs_runs_metadata_future = jobs_runs_get.submit(
            run_id=multi_task_jobs_runs_id,
            databricks_credentials=databricks_credentials,
            wait_for=wait_for,
        )
        wait_for = [jobs_runs_metadata_future]

        jobs_runs_metadata = jobs_runs_metadata_future.result()
        jobs_status = _update_and_log_state_changes(
            jobs_status, jobs_runs_metadata, logger, "Job"
        )
        jobs_runs_metadata_tasks = jobs_runs_metadata.get("tasks", [])
        for task_metadata in jobs_runs_metadata_tasks:
            tasks_status = _update_and_log_state_changes(
                tasks_status, task_metadata, logger, "Task"
            )

        jobs_runs_state = jobs_runs_metadata.get("state", {})
        jobs_runs_life_cycle_state = jobs_runs_state["life_cycle_state"]
        if jobs_runs_life_cycle_state in TERMINAL_STATUS_CODES:
            return jobs_runs_state, jobs_runs_metadata

        logger.info("Waiting for %s seconds.", poll_frequency_seconds)
        await asyncio.sleep(poll_frequency_seconds)
        seconds_waited_for_run_completion += poll_frequency_seconds

    raise DatabricksJobRunTimedOut(
        f"Max wait time of {max_wait_seconds} seconds exceeded while waiting "
        f"for job run ({run_name} ID {multi_task_jobs_runs_id})"
    )