DatabricksWorkflowTaskGroup¶
使用 DatabricksWorkflowTaskGroup 將 Databricks notebook 作業執行作為 Airflow 任務啟動和監控。此任務組啟動一個 Databricks Workflow 並在其中執行 notebook 作業,與在 DatabricksWorkflowTaskGroup 之外執行 DatabricksNotebookOperator 相比,可降低 75% 的成本(通用計算為 $0.40/DBU,作業計算為 $0.07/DBU)。
在 Airflow 中定義 Databricks Workflows 有一些優勢
創作介面 |
透過 Databricks (基於 Web 的 Databricks UI) |
透過 Airflow (使用 Airflow DAG 程式碼) |
|---|---|---|
Workflow 計算定價 |
✅ |
✅ |
原始碼控制中的 Notebook 程式碼 |
✅ |
✅ |
原始碼控制中的 Workflow 結構 |
✅ |
✅ |
從頭開始重試 |
✅ |
✅ |
重試單個任務 |
✅ |
✅ |
Workflows 中的任務組 |
✅ |
|
從其他 DAG 觸發 workflows |
✅ |
|
Workflow 級別引數 |
✅ |
示例¶
帶有 DatabricksWorkflowTaskGroup 的 DAG 示例¶
tests/system/databricks/example_databricks_workflow.py
task_group = DatabricksWorkflowTaskGroup(
group_id=f"test_workflow_{USER}_{GROUP_ID}",
databricks_conn_id=DATABRICKS_CONN_ID,
job_clusters=job_cluster_spec,
notebook_params={"ts": "{{ ts }}"},
notebook_packages=[
{
"pypi": {
"package": "simplejson==3.18.0", # Pin specification version of a package like this.
"repo": "https://pypi.org/simple", # You can specify your required Pypi index here.
}
},
],
extra_job_params={
"email_notifications": {
"on_start": [DATABRICKS_NOTIFICATION_EMAIL],
},
},
)
with task_group:
notebook_1 = DatabricksNotebookOperator(
task_id="workflow_notebook_1",
databricks_conn_id=DATABRICKS_CONN_ID,
notebook_path="/Shared/Notebook_1",
notebook_packages=[{"pypi": {"package": "Faker"}}],
source="WORKSPACE",
job_cluster_key="Shared_job_cluster",
execution_timeout=timedelta(seconds=600),
)
notebook_2 = DatabricksNotebookOperator(
task_id="workflow_notebook_2",
databricks_conn_id=DATABRICKS_CONN_ID,
notebook_path="/Shared/Notebook_2",
source="WORKSPACE",
job_cluster_key="Shared_job_cluster",
notebook_params={"foo": "bar", "ds": "{{ ds }}"},
)
task_operator_nb_1 = DatabricksTaskOperator(
task_id="nb_1",
databricks_conn_id=DATABRICKS_CONN_ID,
job_cluster_key="Shared_job_cluster",
task_config={
"notebook_task": {
"notebook_path": "/Shared/Notebook_1",
"source": "WORKSPACE",
},
"libraries": [
{"pypi": {"package": "Faker"}},
],
},
)
sql_query = DatabricksTaskOperator(
task_id="sql_query",
databricks_conn_id=DATABRICKS_CONN_ID,
task_config={
"sql_task": {
"query": {
"query_id": QUERY_ID,
},
"warehouse_id": WAREHOUSE_ID,
}
},
)
notebook_1 >> notebook_2 >> task_operator_nb_1 >> sql_query
透過此示例,Airflow 將生成一個名為 <dag_name>.test_workflow_<USER>_<GROUP_ID> 的作業,該作業將執行任務 notebook_1,然後執行 notebook_2。如果該作業尚不存在,將在 databricks 工作區中建立。如果作業已存在,將進行更新以匹配 DAG 中定義的 workflow。
下圖顯示了 Airflow UI 中生成的 Databricks Workflow (基於上面提供的示例)¶
下面顯示了從 Airflow DAG 觸發的執行在 Databricks UI 中對應的 Databricks Workflow¶
為儘量減少更新衝突,我們建議您在可能的情況下將引數儲存在 DatabricksWorkflowTaskGroup 的 notebook_params 中,而不是在 DatabricksNotebookOperator 中。這是因為 DatabricksWorkflowTaskGroup 中的任務在作業觸發時傳入,並且不會修改作業定義。