設定和拆卸¶
在資料工作流程中,通常需要建立一個資源(例如計算資源),使用它完成一些工作,然後將其拆卸。Airflow 提供了設定和拆卸任務來支援這一需求。
設定和拆卸任務的關鍵特性
如果您清除一個任務,其設定和拆卸任務也將被清除。
預設情況下,拆卸任務在評估 DAG 執行狀態時會被忽略。
如果其設定成功,即使工作任務失敗,拆卸任務也會執行。但如果設定被跳過,拆卸任務也會跳過。
在針對任務組設定依賴關係時,拆卸任務會被忽略。
即使 DAG 執行被手動設定為“失敗”或“成功”,拆卸任務也會執行,以確保資源被清理。
設定和拆卸的工作原理¶
基本用法¶
假設您有一個 DAG,它建立叢集,執行查詢,然後刪除叢集。如果不使用設定和拆卸任務,您可能會設定以下依賴關係
create_cluster >> run_query >> delete_cluster
要將 create_cluster 和 delete_cluster 啟用為設定和拆卸任務,我們使用 as_setup 和 as_teardown 方法標記它們,並在它們之間新增上游/下游依賴關係
create_cluster.as_setup() >> run_query >> delete_cluster.as_teardown()
create_cluster >> delete_cluster
為了方便,我們可以透過將 create_cluster 傳遞給 as_teardown 方法在一行中完成此操作
create_cluster >> run_query >> delete_cluster.as_teardown(setups=create_cluster)
這是此 DAG 的圖
注意事項
如果您清除
run_query以便再次執行它,則create_cluster和delete_cluster都會被清除。如果
run_query失敗,則delete_cluster仍將執行。DAG 執行的成功僅取決於
run_query的成功。
此外,如果我們有多個要包裹的任務,我們可以使用拆卸作為上下文管理器
with delete_cluster().as_teardown(setups=create_cluster()):
[RunQueryOne(), RunQueryTwo()] >> DoSomeOtherStuff()
WorkOne() >> [do_this_stuff(), do_other_stuff()]
這將設定 create_cluster 在上下文中的任務之前執行,並在這些任務之後執行 delete_cluster。
圖表中顯示如下
請注意,如果您嘗試將一個已例項化的任務新增到設定上下文中,則需要顯式執行此操作
with my_teardown_task as scope:
scope.add_task(work_task) # work_task was already instantiated elsewhere
設定的“範圍”¶
設定與其拆卸之間的任務處於設定/拆卸對的“範圍”內。
我們來看一個示例
s1 >> w1 >> w2 >> t1.as_teardown(setups=s1) >> w3
w2 >> w4
以及圖表
在上面的示例中,w1 和 w2 處於 s1 和 t1 “之間”,因此假定它們需要 s1。因此,如果清除 w1 或 w2,s1 和 t1 也將被清除。但如果清除 w3 或 w4,s1 和 t1 都不會被清除。
您可以將多個設定任務連線到一個拆卸任務。如果至少有一個設定任務成功完成,拆卸任務將執行。
您可以有一個沒有拆卸的設定
create_cluster >> run_query >> other_task
在這種情況下,create_cluster 的所有下游任務都假定需要它。因此,如果您清除 other_task,它也會清除 create_cluster。假設我們在 run_query 之後為 create_cluster 新增一個拆卸
create_cluster >> run_query >> other_task
run_query >> delete_cluster.as_teardown(setups=create_cluster)
現在,Airflow 可以推斷 other_task 不需要 create_cluster,因此如果我們清除 other_task,create_cluster 也不會被清除。
在那個示例中,我們(在假想的文件中)實際上想要刪除叢集。但假設我們不這樣做,而只是想說“other_task 不需要 create_cluster”,那麼我們可以使用 EmptyOperator 來限制設定的範圍
create_cluster >> run_query >> other_task
run_query >> EmptyOperator(task_id="cluster_teardown").as_teardown(setups=create_cluster)
隱式的 ALL_SUCCESS 約束¶
任何處於設定範圍內的任務對其設定都有一個隱式的“all_success”約束。這對於確保當一個具有間接設定的任務被清除時,它會等待這些設定完成是必要的。如果設定失敗或被跳過,依賴於它們的工作任務將被標記為失敗或跳過。我們還要求直接位於設定下游的任何非拆卸任務必須具有觸發規則 ALL_SUCCESS。
控制 DAG 執行狀態¶
設定/拆卸任務的另一個特性是您可以選擇拆卸任務是否應對 DAG 執行狀態產生影響。也許您不關心拆卸任務執行的“清理”工作是否失敗,而只在“工作”任務失敗時才認為 DAG 執行失敗。預設情況下,拆卸任務不計入 DAG 執行狀態的評估。
繼續上面的示例,如果您希望執行的成功依賴於 delete_cluster,則在將 delete_cluster 設定為拆卸任務時,設定 on_failure_fail_dagrun=True。例如
create_cluster >> run_query >> delete_cluster.as_teardown(setups=create_cluster, on_failure_fail_dagrun=True)
並行執行設定和拆卸¶
您可以並行執行設定任務
(
[create_cluster, create_bucket]
>> run_query
>> [delete_cluster.as_teardown(setups=create_cluster), delete_bucket.as_teardown(setups=create_bucket)]
)
圖表
將它們放入一個組中在視覺上可能更佳
with TaskGroup("setup") as tg_s:
create_cluster = create_cluster()
create_bucket = create_bucket()
run_query = run_query()
with TaskGroup("teardown") as tg_t:
delete_cluster = delete_cluster().as_teardown(setups=create_cluster)
delete_bucket = delete_bucket().as_teardown(setups=create_bucket)
tg_s >> run_query >> tg_t
以及圖表
拆卸的觸發規則行為¶
拆卸任務使用一個(不可配置的)觸發規則,稱為 ALL_DONE_SETUP_SUCCESS。使用此規則,只要所有上游任務完成且至少一個直接連線的設定任務成功,拆卸任務就會執行。如果拆卸任務的所有設定都被跳過或失敗,這些狀態將傳播到拆卸任務。
手動更改 DAG 狀態的副作用¶
由於拆卸任務通常用於清理資源,因此即使 DAG 被手動終止,它們也需要執行。為了儘早終止,使用者可以手動將 DAG 執行標記為“成功”或“失敗”,這會在完成之前殺死所有任務。如果 DAG 包含拆卸任務,它們仍將執行。因此,允許排程拆卸任務的一個副作用是,即使使用者請求,DAG 也不會立即進入終止狀態。