跨DAG依賴

當兩個 DAGs 之間存在依賴關係時,可以考慮將它們合併到一個單獨的 DAG 中,這通常更容易理解。Airflow 也為同一 DAG 中的任務提供了更好的依賴視覺化表示。然而,有時將所有相關任務放在同一個 DAG 中並不實用。例如:

  • 兩個 DAGs 可能有不同的排程。例如,一個每週執行的 DAG 中的任務可能依賴於一個每天執行的 DAG 中的其他任務。

  • 不同的團隊負責不同的 DAGs,但這些 DAGs 之間存在一些跨 DAG 依賴。

  • 某個任務可能依賴於同一 DAG 中但不同 execution_date(資料區間的開始)下的另一個任務。

  • 對於在不同時間執行的任務,可以使用 execution_delta,例如使用 execution_delta=timedelta(hours=1) 來檢查比當前任務早執行 1 小時的任務。

可以使用 ExternalTaskSensor 來建立跨不同 DAG 的此類依賴關係。當它與 ExternalTaskMarker 一起使用時,依賴任務的清除也可以跨不同 DAG 發生。

ExternalTaskSensor

使用 ExternalTaskSensor 使一個 DAG 中的任務等待另一個不同 DAG 中特定 execution_date 下的任務完成。

ExternalTaskSensor 還提供了選項,用於透過 allowed_statesfailed_states 引數設定遠端 DAG 上的任務是成功還是失敗。

airflow/example_dags/example_external_task_marker_dag.py

child_task1 = ExternalTaskSensor(
    task_id="child_task1",
    external_dag_id=parent_dag.dag_id,
    external_task_id=parent_task.task_id,
    timeout=600,
    allowed_states=["success"],
    failed_states=["failed", "skipped"],
    mode="reschedule",
)

此外,對於此操作,您可以在可延遲模式下使用 Sensor

airflow-core/tests/system/core/example_external_task_parent_deferrable.py

external_task_sensor = ExternalTaskSensor(
    task_id="parent_task_sensor",
    external_task_id="child_task",
    external_dag_id="child_dag",
    deferrable=True,
)

具有 task_group 依賴的 ExternalTaskSensor

此外,我們還可以使用 ExternalTaskSensor 使一個 DAG 中的任務等待另一個不同 DAG 中特定 execution_date 下的 task_group 完成。

airflow/example_dags/example_external_task_marker_dag.py

child_task2 = ExternalTaskSensor(
    task_id="child_task2",
    external_dag_id=parent_dag.dag_id,
    external_task_group_id="parent_dag_task_group_id",
    timeout=600,
    allowed_states=["success"],
    failed_states=["failed", "skipped"],
    mode="reschedule",
)

ExternalTaskMarker

如果希望在 parent_dag 上的 parent_task 被清除時,特定 execution_datechild_dag 上的 child_task1 也被清除,則應使用 ExternalTaskMarker。請注意,只有當用戶清除 parent_task 時選擇“遞迴”選項,child_task1 才會被清除。

airflow/example_dags/example_external_task_marker_dag.py

parent_task = ExternalTaskMarker(
    task_id="parent_task",
    external_dag_id="example_external_task_marker_child",
    external_task_id="child_task1",
)

這篇內容有幫助嗎?