回撥¶
任務回撥是日誌和監控的一個重要組成部分,用於在給定任務狀態變化時或給定 DAG 中所有任務狀態變化時採取行動。例如,您可能希望在某些任務失敗時發出警報,或者讓 DAG 中的最後一個任務在成功時呼叫回撥函式。
警告
回撥函式在任務完成後執行。回撥函式中的錯誤將出現在排程器日誌中,而不是任務日誌中。預設情況下,排程器日誌不會在 UI 中顯示,而可以在 $AIRFLOW_HOME/logs/scheduler/latest/DAG_FILE.py.log 中找到。
回撥型別¶
有五種任務事件型別可以觸發回撥
名稱 |
描述 |
|---|---|
|
當任務成功時呼叫 |
|
當任務失敗時呼叫 |
|
當任務準備重試時呼叫 |
|
在任務開始執行前立即呼叫。 |
|
當任務正在執行並丟擲 AirflowSkipException 時呼叫。明確地說,如果任務由於 DAG 中前一個分支決策或觸發規則導致跳過執行而從未被排程執行,則不會呼叫此函式。 |
示例¶
在以下示例中,任何任務失敗都會呼叫 task_failure_alert 函式,最後一個任務成功時會呼叫 dag_success_alert 函式
import datetime
import pendulum
from airflow.sdk import DAG
from airflow.providers.standard.operators.empty import EmptyOperator
def task_failure_alert(context):
print(f"Task has failed, task_instance_key_str: {context['task_instance_key_str']}")
def dag_success_alert(context):
print(f"DAG has succeeded, run_id: {context['run_id']}")
with DAG(
dag_id="example_callback",
schedule=None,
start_date=pendulum.datetime(2021, 1, 1, tz="UTC"),
dagrun_timeout=datetime.timedelta(minutes=60),
catchup=False,
on_success_callback=None,
on_failure_callback=task_failure_alert,
tags=["example"],
):
task1 = EmptyOperator(task_id="task1")
task2 = EmptyOperator(task_id="task2")
task3 = EmptyOperator(task_id="task3", on_success_callback=[dag_success_alert])
task1 >> task2 >> task3
注意
自 Airflow 2.6.0 起,回撥現在支援回撥函式列表,允許使用者指定多個函式在所需的事件發生時執行。只需在定義 DAG/任務回撥時將回調函式列表傳遞給回撥引數:例如 on_failure_callback=[callback_func_1, callback_func_2]