Airflow 峰會 2025 將於 10 月 07-09 日舉行。立即註冊獲取早鳥票!

設定和拆卸

在資料工作流程中,通常需要建立一個資源(例如計算資源),使用它完成一些工作,然後將其拆卸。Airflow 提供了設定和拆卸任務來支援這一需求。

設定和拆卸任務的關鍵特性

  • 如果您清除一個任務,其設定和拆卸任務也將被清除。

  • 預設情況下,拆卸任務在評估 DAG 執行狀態時會被忽略。

  • 如果其設定成功,即使工作任務失敗,拆卸任務也會執行。但如果設定被跳過,拆卸任務也會跳過。

  • 在針對任務組設定依賴關係時,拆卸任務會被忽略。

  • 即使 DAG 執行被手動設定為“失敗”或“成功”,拆卸任務也會執行,以確保資源被清理。

設定和拆卸的工作原理

基本用法

假設您有一個 DAG,它建立叢集,執行查詢,然後刪除叢集。如果不使用設定和拆卸任務,您可能會設定以下依賴關係

create_cluster >> run_query >> delete_cluster

要將 create_clusterdelete_cluster 啟用為設定和拆卸任務,我們使用 as_setupas_teardown 方法標記它們,並在它們之間新增上游/下游依賴關係

create_cluster.as_setup() >> run_query >> delete_cluster.as_teardown()
create_cluster >> delete_cluster

為了方便,我們可以透過將 create_cluster 傳遞給 as_teardown 方法在一行中完成此操作

create_cluster >> run_query >> delete_cluster.as_teardown(setups=create_cluster)

這是此 DAG 的圖

../_images/setup-teardown-simple.png

注意事項

  • 如果您清除 run_query 以便再次執行它,則 create_clusterdelete_cluster 都會被清除。

  • 如果 run_query 失敗,則 delete_cluster 仍將執行。

  • DAG 執行的成功取決於 run_query 的成功。

此外,如果我們有多個要包裹的任務,我們可以使用拆卸作為上下文管理器

with delete_cluster().as_teardown(setups=create_cluster()):
    [RunQueryOne(), RunQueryTwo()] >> DoSomeOtherStuff()
    WorkOne() >> [do_this_stuff(), do_other_stuff()]

這將設定 create_cluster 在上下文中的任務之前執行,並在這些任務之後執行 delete_cluster。

圖表中顯示如下

../_images/setup-teardown-complex.png

請注意,如果您嘗試將一個已例項化的任務新增到設定上下文中,則需要顯式執行此操作

with my_teardown_task as scope:
    scope.add_task(work_task)  # work_task was already instantiated elsewhere

設定的“範圍”

設定與其拆卸之間的任務處於設定/拆卸對的“範圍”內。

我們來看一個示例

s1 >> w1 >> w2 >> t1.as_teardown(setups=s1) >> w3
w2 >> w4

以及圖表

../_images/setup-teardown-scope.png

在上面的示例中,w1w2 處於 s1t1 “之間”,因此假定它們需要 s1。因此,如果清除 w1w2s1t1 也將被清除。但如果清除 w3w4s1t1 都不會被清除。

您可以將多個設定任務連線到一個拆卸任務。如果至少有一個設定任務成功完成,拆卸任務將執行。

您可以有一個沒有拆卸的設定

create_cluster >> run_query >> other_task

在這種情況下,create_cluster 的所有下游任務都假定需要它。因此,如果您清除 other_task,它也會清除 create_cluster。假設我們在 run_query 之後為 create_cluster 新增一個拆卸

create_cluster >> run_query >> other_task
run_query >> delete_cluster.as_teardown(setups=create_cluster)

現在,Airflow 可以推斷 other_task 不需要 create_cluster,因此如果我們清除 other_task,create_cluster 也不會被清除。

在那個示例中,我們(在假想的文件中)實際上想要刪除叢集。但假設我們不這樣做,而只是想說“other_task 不需要 create_cluster”,那麼我們可以使用 EmptyOperator 來限制設定的範圍

create_cluster >> run_query >> other_task
run_query >> EmptyOperator(task_id="cluster_teardown").as_teardown(setups=create_cluster)

隱式的 ALL_SUCCESS 約束

任何處於設定範圍內的任務對其設定都有一個隱式的“all_success”約束。這對於確保當一個具有間接設定的任務被清除時,它會等待這些設定完成是必要的。如果設定失敗或被跳過,依賴於它們的工作任務將被標記為失敗或跳過。我們還要求直接位於設定下游的任何非拆卸任務必須具有觸發規則 ALL_SUCCESS。

控制 DAG 執行狀態

設定/拆卸任務的另一個特性是您可以選擇拆卸任務是否應對 DAG 執行狀態產生影響。也許您不關心拆卸任務執行的“清理”工作是否失敗,而只在“工作”任務失敗時才認為 DAG 執行失敗。預設情況下,拆卸任務不計入 DAG 執行狀態的評估。

繼續上面的示例,如果您希望執行的成功依賴於 delete_cluster,則在將 delete_cluster 設定為拆卸任務時,設定 on_failure_fail_dagrun=True。例如

create_cluster >> run_query >> delete_cluster.as_teardown(setups=create_cluster, on_failure_fail_dagrun=True)

使用任務組進行編排

從任務組到任務組,或從任務組到任務新增依賴關係時,我們會忽略拆卸任務。這允許拆卸任務並行執行,並允許 DAG 執行即使在拆卸任務失敗的情況下也能繼續進行。

考慮這個示例

with TaskGroup("my_group") as tg:
    s1 = s1()
    w1 = w1()
    t1 = t1()
    s1 >> w1 >> t1.as_teardown(setups=s1)
w2 = w2()
tg >> w2

圖表

../_images/setup-teardown-group.png

如果 t1 不是一個拆卸任務,那麼這個 DAG 實際上將是 s1 >> w1 >> t1 >> w2。但是由於我們已將 t1 標記為拆卸任務,它在 tg >> w2 中被忽略了。因此,該 DAG 等同於以下內容

s1 >> w1 >> [t1.as_teardown(setups=s1), w2]

現在我們考慮一個巢狀的示例

with TaskGroup("my_group") as tg:
    s1 = s1()
    w1 = w1()
    t1 = t1()
    s1 >> w1 >> t1.as_teardown(setups=s1)
w2 = w2()
tg >> w2
dag_s1 = dag_s1()
dag_t1 = dag_t1()
dag_s1 >> [tg, w2] >> dag_t1.as_teardown(setups=dag_s1)

圖表

../_images/setup-teardown-nesting.png

在此示例中,s1dag_s1 的下游任務,因此它必須等待 dag_s1 成功完成。但是 t1dag_t1 可以並行執行,因為 t1 在表示式 tg >> dag_t1 中被忽略了。如果您清除 w2,它將清除 dag_s1dag_t1,但不會清除任務組中的任何內容。

並行執行設定和拆卸

您可以並行執行設定任務

(
    [create_cluster, create_bucket]
    >> run_query
    >> [delete_cluster.as_teardown(setups=create_cluster), delete_bucket.as_teardown(setups=create_bucket)]
)

圖表

../_images/setup-teardown-parallel.png

將它們放入一個組中在視覺上可能更佳

with TaskGroup("setup") as tg_s:
    create_cluster = create_cluster()
    create_bucket = create_bucket()
run_query = run_query()
with TaskGroup("teardown") as tg_t:
    delete_cluster = delete_cluster().as_teardown(setups=create_cluster)
    delete_bucket = delete_bucket().as_teardown(setups=create_bucket)
tg_s >> run_query >> tg_t

以及圖表

../_images/setup-teardown-setup-group.png

拆卸的觸發規則行為

拆卸任務使用一個(不可配置的)觸發規則,稱為 ALL_DONE_SETUP_SUCCESS。使用此規則,只要所有上游任務完成且至少一個直接連線的設定任務成功,拆卸任務就會執行。如果拆卸任務的所有設定都被跳過或失敗,這些狀態將傳播到拆卸任務。

手動更改 DAG 狀態的副作用

由於拆卸任務通常用於清理資源,因此即使 DAG 被手動終止,它們也需要執行。為了儘早終止,使用者可以手動將 DAG 執行標記為“成功”或“失敗”,這會在完成之前殺死所有任務。如果 DAG 包含拆卸任務,它們仍將執行。因此,允許排程拆卸任務的一個副作用是,即使使用者請求,DAG 也不會立即進入終止狀態。

此條目是否有幫助?