DatabricksCreateJobsOperator¶
使用 DatabricksCreateJobsOperator 來建立(或重置)一個 Databricks 作業。此 Operator 依賴於過去的 XComs 來記住已建立的 job_id,以便透過此 Operator 進行重複呼叫時更新現有作業,而不是建立新作業。當與 DatabricksRunNowOperator 配對使用時,所有執行都將屬於 Databricks UI 中的同一作業。
使用此 Operator¶
有三種例項化此 Operator 的方法。第一種方法是,您可以獲取通常用於呼叫 api/2.1/jobs/create 端點的 JSON 有效負載,並透過 json 引數將其直接傳遞給我們的 DatabricksCreateJobsOperator。使用此方法,您可以完全控制底層有效負載到 Jobs REST API,包括執行包含多個任務的 Databricks 作業,但由於缺乏型別檢查,更難檢測錯誤。
完成相同事情的第二種方法是直接使用 DatabricksCreateJobsOperator 的命名引數。請注意,api/2.1/jobs/create 端點中的每個頂級引數都恰好對應一個命名引數。
第三種方法是同時使用 json 引數 和 命名引數。它們將被合併。如果在合併過程中發生衝突,命名引數將優先並覆蓋頂級 json 鍵。
- 目前
DatabricksCreateJobsOperator支援的命名引數有 namedescriptiontagstasksjob_clustersemail_notificationswebhook_notificationsnotification_settingstimeout_secondsschedulemax_concurrent_runsgit_sourceaccess_control_list
示例¶
將引數指定為 JSON¶
以下是 DatabricksCreateJobsOperator 的一個使用示例
tests/system/databricks/example_databricks.py
# Example of using the JSON parameter to initialize the operator.
job = {
"tasks": [
{
"task_key": "test",
"job_cluster_key": "job_cluster",
"notebook_task": {
"notebook_path": "/Shared/test",
},
},
],
"job_clusters": [
{
"job_cluster_key": "job_cluster",
"new_cluster": {
"spark_version": "7.3.x-scala2.12",
"node_type_id": "i3.xlarge",
"num_workers": 2,
},
},
],
}
jobs_create_json = DatabricksCreateJobsOperator(task_id="jobs_create_json", json=job)
使用命名引數¶
您也可以使用命名引數來初始化此 Operator 並執行作業。
tests/system/databricks/example_databricks.py
# Example of using the named parameters to initialize the operator.
tasks = [
{
"task_key": "test",
"job_cluster_key": "job_cluster",
"notebook_task": {
"notebook_path": "/Shared/test",
},
},
]
job_clusters = [
{
"job_cluster_key": "job_cluster",
"new_cluster": {
"spark_version": "7.3.x-scala2.12",
"node_type_id": "i3.xlarge",
"num_workers": 2,
},
},
]
jobs_create_named = DatabricksCreateJobsOperator(
task_id="jobs_create_named", tasks=tasks, job_clusters=job_clusters
)
與 DatabricksRunNowOperator 配對¶
您可以將 DatabricksCreateJobsOperator 在 return_value XCom 中返回的 job_id 作為引數傳遞給 DatabricksRunNowOperator 來執行作業。
tests/system/databricks/example_databricks.py
# Example of using the DatabricksRunNowOperator after creating a job with DatabricksCreateJobsOperator.
run_now = DatabricksRunNowOperator(
task_id="run_now", job_id="{{ ti.xcom_pull(task_ids='jobs_create_named') }}"
)
jobs_create_named >> run_now