DAG 執行¶
DAG 執行是表示 DAG 在時間上的一個例項化物件。任何時候 DAG 被執行時,都會建立一個 DAG 執行,並且其中所有的任務都會被執行。DAG 執行的狀態取決於任務的狀態。每個 DAG 執行都是相互獨立執行的,這意味著你可以同時運行同一個 DAG 的多個例項。
DAG 執行狀態¶
DAG 執行狀態在其執行完成時確定。DAG 的執行取決於其包含的任務及其依賴關係。當所有任務都處於終端狀態(即沒有可能轉換到其他狀態)時,例如 success(成功)、failed(失敗)或 skipped(跳過),狀態會被分配給 DAG 執行。DAG 執行的狀態是基於所謂的“葉子節點”或簡稱為“葉子”來分配的。葉子節點是沒有子任務的任務。
DAG 執行有兩種可能的終端狀態
success(成功):如果所有葉子節點狀態都為success或skipped,則為success。failed(失敗):如果任何葉子節點狀態為failed或upstream_failed(上游失敗),則為failed。
注意
請注意,如果您的某些任務定義了特定的 觸發規則,則需要小心。這可能會導致一些意想不到的行為,例如,如果您有一個葉子任務,其觸發規則為“all_done”,則無論其他任務的狀態如何,它都會被執行,如果它成功了,那麼整個 DAG 執行也會被標記為 success,即使中間有其他任務失敗了。
在 Airflow 2.7 中新增
當前有正在執行的 DAG 執行的 DAGs 可以在 UI 面板的“執行中”選項卡中顯示。類似地,最新 DAG 執行被標記為失敗的 DAGs 可以在“失敗”選項卡中找到。
資料區間¶
Airflow 中的每個 DAG 執行都有一個分配的“資料區間”,代表其操作的時間範圍。例如,對於使用 @daily 排程的 DAG,其每個資料區間將從每天午夜 (00:00) 開始,並在午夜 (24:00) 結束。
DAG 執行通常在其關聯的資料區間結束*之後*進行排程,以確保執行能夠收集該時間段內的所有資料。換句話說,覆蓋 2020-01-01 資料期間的執行通常不會在 2020-01-01 結束之前開始執行,即在 2020-01-02 00:00:00 之後。
Airflow 中的所有日期在某種程度上都與資料區間概念相關聯。“邏輯日期”(在 Airflow 2.2 版本之前也稱為 execution_date)例如,表示資料區間的開始,而不是 DAG 實際執行的時間。
類似地,由於 DAG 及其任務的 start_date 引數指向相同的邏輯日期,它標誌著 *DAG 的第一個資料區間*的開始,而不是 DAG 中的任務何時開始執行。換句話說,DAG 執行只會在 start_date 之後排程一個區間。
提示
如果 cron 表示式或 timedelta 物件不足以表達您的 DAG 排程、邏輯日期或資料區間,請參閱 時間表。有關 logical date 的更多資訊,請參閱 執行 DAG 和 execution_date 是什麼意思?
重新執行 DAG¶
有時您可能希望再次執行您的 DAG。其中一種情況是計劃的 DAG 執行失敗時。
追趕執行¶
使用 start_date(可能還有 end_date)和非資產計劃定義的 Airflow DAG 定義了一系列區間,排程器將這些區間轉化為獨立的 DAG 執行並執行。預設情況下,自上一個資料區間以來尚未執行的 DAG 執行不會在 DAG 啟用時由排程器建立 (Airflow 配置 scheduler.catchup_by_default=False)。排程器只為最新的區間建立 DAG 執行。
如果您在 DAG 中設定 catchup=True,排程器將為自上一個資料區間以來尚未執行(或已被清除)的任何資料區間啟動一個 DAG 執行。這個概念被稱為追趕執行 (Catchup)。
如果您的 DAG 沒有編寫為處理其追趕執行(例如,不限於區間,而是使用 Now),那麼您會希望關閉追趕執行,這是預設設定,或者可以在 DAG 定義中顯式設定 catchup=False,如果您的 Airflow 環境更改了預設配置。
"""
Code that goes along with the Airflow tutorial located at:
https://github.com/apache/airflow/blob/main/airflow/example_dags/tutorial.py
"""
from airflow.sdk import DAG
from airflow.providers.standard.operators.bash import BashOperator
import datetime
import pendulum
dag = DAG(
"tutorial",
default_args={
"depends_on_past": True,
"retries": 1,
"retry_delay": datetime.timedelta(minutes=3),
},
start_date=pendulum.datetime(2015, 12, 1, tz="UTC"),
description="A simple tutorial DAG",
schedule="@daily",
)
在上面的示例中,如果排程器守護程式在 2016-01-02 早上 6 點(或從命令列)拾取了該 DAG,則會建立一個數據區間在 2016-01-01 和 2016-01-02 之間的 DAG 執行,下一個將在 2016-01-03 凌晨剛過午夜時建立,資料區間在 2016-01-02 和 2016-01-03 之間。
請注意,使用 datetime.timedelta 物件作為計劃可能會導致不同的行為。在這種情況下,建立的單個 DAG 執行將覆蓋 2016-01-01 06:00 和 2016-01-02 06:00 之間的資料(一個計劃區間現在結束)。有關基於 cron 和基於 delta 的計劃之間差異的更詳細描述,請檢視 時間表比較。
如果 dag.catchup 的值改為 True,排程器將為 2015-12-01 和 2016-01-02 之間的每個已完成區間(但不包括 2016-01-02 的區間,因為它尚未完成)建立一個 DAG 執行,並且排程器將按順序執行它們。
當您在指定時間內關閉 DAG 然後重新啟用它時,也會觸發追趕執行。
這種行為非常適合可以輕鬆按週期劃分的原子資產。如果您的 DAG 內部執行追趕執行,則關閉 catchup 是個不錯的選擇。
回填執行¶
您可能希望在指定的歷史時期內執行 DAG。例如,建立了一個 start_date 為 2024-11-21 的 DAG,但另一個使用者需要一個月前的資料輸出,即 2024-10-21。這個過程稱為回填執行 (Backfill)。
這可以透過 API 或 CLI 完成。對於 CLI 使用,執行以下命令
airflow backfill create --dag-id DAG_ID \
--start-date START_DATE \
--end-date END_DATE \
backfill 命令 將重新執行指定開始日期和結束日期範圍內的 dag_id 的所有例項,針對所有區間。
重新執行任務¶
在計劃執行期間,某些任務可能會失敗。在查閱日誌並修復錯誤後,您可以透過清除計劃日期的任務例項來重新執行任務。清除任務例項會建立該任務例項的記錄。當前任務例項的 try_number 會遞增,max_tries 設定為 0,狀態設定為 None,這將導致任務重新執行。
在 Tree 或 Graph 檢視中單擊失敗的任務,然後單擊 Clear。Executor 將重新執行它。
您可以選擇多個選項來重新執行 -
Past (過去) - DAG 最近資料區間之前執行中的該任務的所有例項
Future (將來) - DAG 最近資料區間之後執行中的該任務的所有例項
Upstream (上游) - 當前 DAG 中的上游任務
Downstream (下游) - 當前 DAG 中的下游任務
Recursive (遞迴) - 子 DAG 和父 DAG 中的所有任務
Failed (失敗) - 僅限 DAG 最近一次執行中失敗的任務
您還可以透過 CLI 使用以下命令清除任務
airflow tasks clear dag_id \
--task-regex task_regex \
--start-date START_DATE \
--end-date END_DATE
對於指定的 dag_id 和時間區間,該命令將清除所有與正則表示式匹配的任務例項。有關更多選項,您可以檢視 clear 命令 的幫助。
airflow tasks clear --help
任務例項歷史記錄¶
當任務例項重試或被清除時,會保留任務例項歷史記錄。您可以在 Grid 檢視中單擊任務例項來檢視此歷史記錄。
注意
上面顯示的嘗試次數選擇器僅適用於已重試或已清除的任務。
歷史記錄顯示了特定執行結束時任務例項屬性的值。在日誌頁面上,您還可以檢視任務例項每次嘗試的日誌。這對於除錯非常有用。
注意
相關的任務例項物件,如 XComs、渲染後的模板欄位等,不會保留在歷史記錄中。僅保留任務例項屬性,包括日誌。
外部觸發器¶
請注意,DAG 執行也可以透過 CLI 手動建立。只需執行命令 -
airflow dags trigger --logical-date logical_date run_id
透過排程器外部建立的 DAG 執行會關聯到觸發器的時間戳,並在 UI 中與計劃的 DAG 執行一起顯示。DAG 內部傳遞的邏輯日期可以使用 -e 引數指定。預設值為 UTC 時區的當前日期。
此外,您還可以使用 Web UI 手動觸發 DAG 執行(選項卡 Dags -> 列 Links -> 按鈕 Trigger Dag)
觸發 DAG 時傳遞引數¶
透過 CLI、REST API 或 UI 觸發 DAG 時,可以將 DAG 執行的配置作為 JSON blob 傳遞。
帶引數的 DAG 示例
import pendulum
from airflow.sdk import DAG
from airflow.providers.standard.operators.bash import BashOperator
dag = DAG(
"example_parameterized_dag",
schedule=None,
start_date=pendulum.datetime(2021, 1, 1, tz="UTC"),
catchup=False,
)
parameterized_task = BashOperator(
task_id="parameterized_task",
bash_command="echo value: {{ dag_run.conf['conf1'] }}",
dag=dag,
)
注意:來自 dag_run.conf 的引數只能在 operator 的模板欄位中使用。
使用 CLI¶
airflow dags trigger --conf '{"conf1": "value1"}' example_parameterized_dag
注意事項¶
可以透過 UI 將任務例項標記為失敗。這可用於停止正在執行的任務例項。
可以透過 UI 將任務例項標記為成功。這主要用於修復誤報,或者例如,當修復已在 Airflow 外部應用時。