DatabricksSubmitRunOperator¶
使用 DatabricksSubmitRunOperator 透過 Databricks api/2.1/jobs/runs/submit API 端點提交新的 Databricks 作業。
使用 Operator¶
有三種例項化此 Operator 的方式。第一種方式是,您可以獲取通常用於呼叫 api/2.1/jobs/runs/submit 端點的 JSON 有效載荷,並透過 json 引數將其直接傳遞給我們的 DatabricksSubmitRunOperator。使用此方法,您可以完全控制 Jobs REST API 的底層有效載荷,包括執行包含多個任務的 Databricks 作業,但由於缺乏型別檢查,更難檢測錯誤。
json = {
"new_cluster": {"spark_version": "2.1.0-db3-scala2.11", "num_workers": 2},
"notebook_task": {
"notebook_path": "/Users/airflow@example.com/PrepareData",
},
}
notebook_run = DatabricksSubmitRunOperator(task_id="notebook_run", json=json)
實現相同目標的第二種方法是直接使用 DatabricksSubmitRunOperator 的命名引數。請注意,runs/submit 端點中的每個頂級引數恰好對應一個命名引數。使用命名引數時,必須指定以下內容
任務規範 - 應為以下之一:
spark_jar_task- JAR 任務的主類和引數notebook_task- 任務的 notebook 路徑和引數spark_python_task- 用於執行 Python 檔案的 Python 檔案路徑和引數spark_submit_task- 執行spark-submit命令所需的引數pipeline_task- 執行 Delta Live Tables 流水線所需的引數dbt_task- 執行 dbt 專案所需的引數
叢集規範 - 應為以下之一: *
new_cluster- 執行此任務的新叢集配置 *existing_cluster_id- 執行此任務的現有叢集 IDpipeline_task- 可能指pipeline_id或pipeline_name
在同時提供了 json 引數和命名引數的情況下,它們將被合併。如果在合併過程中發生衝突,命名引數將優先並覆蓋頂層 json 鍵。
- 目前
DatabricksSubmitRunOperator支援的命名引數有 spark_jar_tasknotebook_taskspark_python_taskspark_submit_taskpipeline_taskdbt_taskgit_sourcenew_clusterexisting_cluster_idlibrariesrun_nametimeout_seconds
new_cluster = {"spark_version": "10.1.x-scala2.12", "num_workers": 2}
notebook_task = {
"notebook_path": "/Users/airflow@example.com/PrepareData",
}
notebook_run = DatabricksSubmitRunOperator(
task_id="notebook_run", new_cluster=new_cluster, notebook_task=notebook_task
)
另一種方法是使用 tasks 引數傳遞物件陣列來例項化此 Operator。在此處,用於呼叫 api/2.1/jobs/runs/submit 端點的 tasks 引數的值透過 DatabricksSubmitRunOperator 中的 tasks 引數傳遞。您可以傳遞任務陣列而不是呼叫單個任務,並提交一次性執行。
tasks = [
{
"new_cluster": {"spark_version": "2.1.0-db3-scala2.11", "num_workers": 2},
"notebook_task": {"notebook_path": "/Users/airflow@example.com/PrepareData"},
}
]
notebook_run = DatabricksSubmitRunOperator(task_id="notebook_run", tasks=tasks)
示例¶
將引數指定為 JSON¶
DatabricksSubmitRunOperator 的示例用法如下
tests/system/databricks/example_databricks.py
# Example of using the JSON parameter to initialize the operator.
new_cluster = {
"spark_version": "9.1.x-scala2.12",
"node_type_id": "r3.xlarge",
"aws_attributes": {"availability": "ON_DEMAND"},
"num_workers": 8,
}
notebook_task_params = {
"new_cluster": new_cluster,
"notebook_task": {
"notebook_path": "/Users/airflow@example.com/PrepareData",
},
}
notebook_task = DatabricksSubmitRunOperator(task_id="notebook_task", json=notebook_task_params)
使用命名引數¶
您也可以使用命名引數初始化 Operator 並執行作業。
tests/system/databricks/example_databricks.py
# Example of using the named parameters of DatabricksSubmitRunOperator
# to initialize the operator.
spark_jar_task = DatabricksSubmitRunOperator(
task_id="spark_jar_task",
new_cluster=new_cluster,
spark_jar_task={"main_class_name": "com.example.ProcessData"},
libraries=[{"jar": "dbfs:/lib/etl-0.1.jar"}],
)
DatabricksSubmitRunDeferrableOperator¶
DatabricksSubmitRunOperator Operator 的可延遲版本。
它允許使用 Airflow 2.2.0 中引入的新功能更有效地利用 Airflow Worker。