Airflow Summit 2025 將於 10 月 07-09 日舉行。立即註冊以獲得早鳥票!

回撥

任務回撥是日誌和監控的一個重要組成部分,用於在給定任務狀態變化時或給定 DAG 中所有任務狀態變化時採取行動。例如,您可能希望在某些任務失敗時發出警報,或者讓 DAG 中的最後一個任務在成功時呼叫回撥函式。

注意

回撥函式僅在任務狀態因 worker 執行而改變時被呼叫。因此,透過命令列介面 (CLI) 或使用者介面 (UI) 設定的任務更改不會執行回撥函式。

警告

回撥函式在任務完成後執行。回撥函式中的錯誤將出現在排程器日誌中,而不是任務日誌中。預設情況下,排程器日誌不會在 UI 中顯示,而可以在 $AIRFLOW_HOME/logs/scheduler/latest/DAG_FILE.py.log 中找到。

回撥型別

有五種任務事件型別可以觸發回撥

名稱

描述

on_success_callback

當任務成功時呼叫

on_failure_callback

當任務失敗時呼叫

on_retry_callback

當任務準備重試時呼叫

on_execute_callback

在任務開始執行前立即呼叫。

on_skipped_callback

當任務正在執行並丟擲 AirflowSkipException 時呼叫。明確地說,如果任務由於 DAG 中前一個分支決策或觸發規則導致跳過執行而從未被排程執行,則不會呼叫此函式。

示例

在以下示例中,任何任務失敗都會呼叫 task_failure_alert 函式,最後一個任務成功時會呼叫 dag_success_alert 函式

import datetime
import pendulum

from airflow.sdk import DAG
from airflow.providers.standard.operators.empty import EmptyOperator


def task_failure_alert(context):
    print(f"Task has failed, task_instance_key_str: {context['task_instance_key_str']}")


def dag_success_alert(context):
    print(f"DAG has succeeded, run_id: {context['run_id']}")


with DAG(
    dag_id="example_callback",
    schedule=None,
    start_date=pendulum.datetime(2021, 1, 1, tz="UTC"),
    dagrun_timeout=datetime.timedelta(minutes=60),
    catchup=False,
    on_success_callback=None,
    on_failure_callback=task_failure_alert,
    tags=["example"],
):
    task1 = EmptyOperator(task_id="task1")
    task2 = EmptyOperator(task_id="task2")
    task3 = EmptyOperator(task_id="task3", on_success_callback=[dag_success_alert])
    task1 >> task2 >> task3

注意

自 Airflow 2.6.0 起,回撥現在支援回撥函式列表,允許使用者指定多個函式在所需的事件發生時執行。只需在定義 DAG/任務回撥時將回調函式列表傳遞給回撥引數:例如 on_failure_callback=[callback_func_1, callback_func_2]

context 中可用的變數完整列表見文件程式碼

本條目有幫助嗎?