Airflow Summit 2025 即將於 10 月 07-09 日召開。立即註冊獲取早鳥票!

DAG 執行

DAG 執行是表示 DAG 在時間上的一個例項化物件。任何時候 DAG 被執行時,都會建立一個 DAG 執行,並且其中所有的任務都會被執行。DAG 執行的狀態取決於任務的狀態。每個 DAG 執行都是相互獨立執行的,這意味著你可以同時運行同一個 DAG 的多個例項。

DAG 執行狀態

DAG 執行狀態在其執行完成時確定。DAG 的執行取決於其包含的任務及其依賴關係。當所有任務都處於終端狀態(即沒有可能轉換到其他狀態)時,例如 success(成功)、failed(失敗)或 skipped(跳過),狀態會被分配給 DAG 執行。DAG 執行的狀態是基於所謂的“葉子節點”或簡稱為“葉子”來分配的。葉子節點是沒有子任務的任務。

DAG 執行有兩種可能的終端狀態

  • success(成功):如果所有葉子節點狀態都為 successskipped,則為 success

  • failed(失敗):如果任何葉子節點狀態為 failedupstream_failed(上游失敗),則為 failed

注意

請注意,如果您的某些任務定義了特定的 觸發規則,則需要小心。這可能會導致一些意想不到的行為,例如,如果您有一個葉子任務,其觸發規則為“all_done”,則無論其他任務的狀態如何,它都會被執行,如果它成功了,那麼整個 DAG 執行也會被標記為 success,即使中間有其他任務失敗了。

在 Airflow 2.7 中新增

當前有正在執行的 DAG 執行的 DAGs 可以在 UI 面板的“執行中”選項卡中顯示。類似地,最新 DAG 執行被標記為失敗的 DAGs 可以在“失敗”選項卡中找到。

資料區間

Airflow 中的每個 DAG 執行都有一個分配的“資料區間”,代表其操作的時間範圍。例如,對於使用 @daily 排程的 DAG,其每個資料區間將從每天午夜 (00:00) 開始,並在午夜 (24:00) 結束。

DAG 執行通常在其關聯的資料區間結束*之後*進行排程,以確保執行能夠收集該時間段內的所有資料。換句話說,覆蓋 2020-01-01 資料期間的執行通常不會在 2020-01-01 結束之前開始執行,即在 2020-01-02 00:00:00 之後。

Airflow 中的所有日期在某種程度上都與資料區間概念相關聯。“邏輯日期”(在 Airflow 2.2 版本之前也稱為 execution_date)例如,表示資料區間的開始,而不是 DAG 實際執行的時間。

類似地,由於 DAG 及其任務的 start_date 引數指向相同的邏輯日期,它標誌著 *DAG 的第一個資料區間*的開始,而不是 DAG 中的任務何時開始執行。換句話說,DAG 執行只會在 start_date 之後排程一個區間。

提示

如果 cron 表示式或 timedelta 物件不足以表達您的 DAG 排程、邏輯日期或資料區間,請參閱 時間表。有關 logical date 的更多資訊,請參閱 執行 DAGexecution_date 是什麼意思?

重新執行 DAG

有時您可能希望再次執行您的 DAG。其中一種情況是計劃的 DAG 執行失敗時。

追趕執行

使用 start_date(可能還有 end_date)和非資產計劃定義的 Airflow DAG 定義了一系列區間,排程器將這些區間轉化為獨立的 DAG 執行並執行。預設情況下,自上一個資料區間以來尚未執行的 DAG 執行不會在 DAG 啟用時由排程器建立 (Airflow 配置 scheduler.catchup_by_default=False)。排程器只為最新的區間建立 DAG 執行。

如果您在 DAG 中設定 catchup=True,排程器將為自上一個資料區間以來尚未執行(或已被清除)的任何資料區間啟動一個 DAG 執行。這個概念被稱為追趕執行 (Catchup)。

如果您的 DAG 沒有編寫為處理其追趕執行(例如,不限於區間,而是使用 Now),那麼您會希望關閉追趕執行,這是預設設定,或者可以在 DAG 定義中顯式設定 catchup=False,如果您的 Airflow 環境更改了預設配置。

"""
Code that goes along with the Airflow tutorial located at:
https://github.com/apache/airflow/blob/main/airflow/example_dags/tutorial.py
"""

from airflow.sdk import DAG
from airflow.providers.standard.operators.bash import BashOperator

import datetime
import pendulum

dag = DAG(
    "tutorial",
    default_args={
        "depends_on_past": True,
        "retries": 1,
        "retry_delay": datetime.timedelta(minutes=3),
    },
    start_date=pendulum.datetime(2015, 12, 1, tz="UTC"),
    description="A simple tutorial DAG",
    schedule="@daily",
)

在上面的示例中,如果排程器守護程式在 2016-01-02 早上 6 點(或從命令列)拾取了該 DAG,則會建立一個數據區間在 2016-01-01 和 2016-01-02 之間的 DAG 執行,下一個將在 2016-01-03 凌晨剛過午夜時建立,資料區間在 2016-01-02 和 2016-01-03 之間。

請注意,使用 datetime.timedelta 物件作為計劃可能會導致不同的行為。在這種情況下,建立的單個 DAG 執行將覆蓋 2016-01-01 06:00 和 2016-01-02 06:00 之間的資料(一個計劃區間現在結束)。有關基於 cron 和基於 delta 的計劃之間差異的更詳細描述,請檢視 時間表比較

如果 dag.catchup 的值改為 True,排程器將為 2015-12-01 和 2016-01-02 之間的每個已完成區間(但不包括 2016-01-02 的區間,因為它尚未完成)建立一個 DAG 執行,並且排程器將按順序執行它們。

當您在指定時間內關閉 DAG 然後重新啟用它時,也會觸發追趕執行。

這種行為非常適合可以輕鬆按週期劃分的原子資產。如果您的 DAG 內部執行追趕執行,則關閉 catchup 是個不錯的選擇。

回填執行

您可能希望在指定的歷史時期內執行 DAG。例如,建立了一個 start_date2024-11-21 的 DAG,但另一個使用者需要一個月前的資料輸出,即 2024-10-21。這個過程稱為回填執行 (Backfill)。

這可以透過 API 或 CLI 完成。對於 CLI 使用,執行以下命令

airflow backfill create --dag-id DAG_ID \
    --start-date START_DATE \
    --end-date END_DATE \

backfill 命令 將重新執行指定開始日期和結束日期範圍內的 dag_id 的所有例項,針對所有區間。

重新執行任務

在計劃執行期間,某些任務可能會失敗。在查閱日誌並修復錯誤後,您可以透過清除計劃日期的任務例項來重新執行任務。清除任務例項會建立該任務例項的記錄。當前任務例項的 try_number 會遞增,max_tries 設定為 0,狀態設定為 None,這將導致任務重新執行。

在 Tree 或 Graph 檢視中單擊失敗的任務,然後單擊 Clear。Executor 將重新執行它。

您可以選擇多個選項來重新執行 -

  • Past (過去) - DAG 最近資料區間之前執行中的該任務的所有例項

  • Future (將來) - DAG 最近資料區間之後執行中的該任務的所有例項

  • Upstream (上游) - 當前 DAG 中的上游任務

  • Downstream (下游) - 當前 DAG 中的下游任務

  • Recursive (遞迴) - 子 DAG 和父 DAG 中的所有任務

  • Failed (失敗) - 僅限 DAG 最近一次執行中失敗的任務

您還可以透過 CLI 使用以下命令清除任務

airflow tasks clear dag_id \
    --task-regex task_regex \
    --start-date START_DATE \
    --end-date END_DATE

對於指定的 dag_id 和時間區間,該命令將清除所有與正則表示式匹配的任務例項。有關更多選項,您可以檢視 clear 命令 的幫助。

airflow tasks clear --help

任務例項歷史記錄

當任務例項重試或被清除時,會保留任務例項歷史記錄。您可以在 Grid 檢視中單擊任務例項來檢視此歷史記錄。

../_images/task_instance_history.png

注意

上面顯示的嘗試次數選擇器僅適用於已重試或已清除的任務。

歷史記錄顯示了特定執行結束時任務例項屬性的值。在日誌頁面上,您還可以檢視任務例項每次嘗試的日誌。這對於除錯非常有用。

../_images/task_instance_history_log.png

注意

相關的任務例項物件,如 XComs、渲染後的模板欄位等,不會保留在歷史記錄中。僅保留任務例項屬性,包括日誌。

外部觸發器

請注意,DAG 執行也可以透過 CLI 手動建立。只需執行命令 -

airflow dags trigger --logical-date logical_date run_id

透過排程器外部建立的 DAG 執行會關聯到觸發器的時間戳,並在 UI 中與計劃的 DAG 執行一起顯示。DAG 內部傳遞的邏輯日期可以使用 -e 引數指定。預設值為 UTC 時區的當前日期。

此外,您還可以使用 Web UI 手動觸發 DAG 執行(選項卡 Dags -> 列 Links -> 按鈕 Trigger Dag

觸發 DAG 時傳遞引數

透過 CLI、REST API 或 UI 觸發 DAG 時,可以將 DAG 執行的配置作為 JSON blob 傳遞。

帶引數的 DAG 示例

import pendulum

from airflow.sdk import DAG
from airflow.providers.standard.operators.bash import BashOperator

dag = DAG(
    "example_parameterized_dag",
    schedule=None,
    start_date=pendulum.datetime(2021, 1, 1, tz="UTC"),
    catchup=False,
)

parameterized_task = BashOperator(
    task_id="parameterized_task",
    bash_command="echo value: {{ dag_run.conf['conf1'] }}",
    dag=dag,
)

注意:來自 dag_run.conf 的引數只能在 operator 的模板欄位中使用。

使用 CLI

airflow dags trigger --conf '{"conf1": "value1"}' example_parameterized_dag

注意事項

  • 可以透過 UI 將任務例項標記為失敗。這可用於停止正在執行的任務例項。

  • 可以透過 UI 將任務例項標記為成功。這主要用於修復誤報,或者例如,當修復已在 Airflow 外部應用時。

本條目有幫助嗎?