監聽器¶
你可以編寫監聽器,以便 Airflow 在事件發生時通知你。這些監聽器由 Pluggy 提供支援。
警告
監聽器是 Airflow 的高階特性。它們與執行它們的 Airflow 元件並非隔離,可能會降低 Airflow 例項的效能,甚至在某些情況下導致例項崩潰。因此,編寫監聽器時應格外小心。
Airflow 支援以下事件的通知
生命週期事件¶
on_startingbefore_stopping
生命週期事件允許你對 Airflow Job(例如 SchedulerJob)的啟動和停止事件作出反應。
DagRun 狀態變化事件¶
當 DagRun 改變狀態時,會發生 DagRun 狀態變化事件。從 Airflow 3 開始,當狀態變化透過 API 觸發時(對於 on_dag_run_success 和 on_dag_run_failed),監聽器也會收到通知,例如當在 Airflow UI 中將 DagRun 標記為成功時。
on_dag_run_running
src/airflow/example_dags/plugins/event_listener.py
@hookimpl
def on_dag_run_running(dag_run: DagRun, msg: str):
"""
This method is called when dag run state changes to RUNNING.
"""
print("Dag run in running state")
queued_at = dag_run.queued_at
version = dag_run.version_number
print(f"Dag information Queued at: {queued_at} version: {version}")
on_dag_run_success
src/airflow/example_dags/plugins/event_listener.py
@hookimpl
def on_dag_run_success(dag_run: DagRun, msg: str):
"""
This method is called when dag run state changes to SUCCESS.
"""
print("Dag run in success state")
start_date = dag_run.start_date
end_date = dag_run.end_date
print(f"Dag run start:{start_date} end:{end_date}")
on_dag_run_failed
src/airflow/example_dags/plugins/event_listener.py
@hookimpl
def on_dag_run_failed(dag_run: DagRun, msg: str):
"""
This method is called when dag run state changes to FAILED.
"""
print("Dag run in failure state")
dag_id = dag_run.dag_id
run_id = dag_run.run_id
run_type = dag_run.run_type
print(f"Dag information:{dag_id} Run id: {run_id} Run type: {run_type}")
print(f"Failed with message: {msg}")
TaskInstance 狀態變化事件¶
當 RuntimeTaskInstance 改變狀態時,會發生 TaskInstance 狀態變化事件。你可以使用這些事件對 LocalTaskJob 狀態變化作出反應。從 Airflow 3 開始,當狀態變化透過 API 觸發時(對於 on_task_instance_success 和 on_task_instance_failed),監聽器也會收到通知,例如當在 Airflow UI 中將任務例項標記為成功時。在這種情況下,監聽器將收到一個 TaskInstance 例項,而不是 RuntimeTaskInstance 例項。
on_task_instance_running
src/airflow/example_dags/plugins/event_listener.py
@hookimpl
def on_task_instance_running(previous_state: TaskInstanceState, task_instance: RuntimeTaskInstance):
"""
Called when task state changes to RUNNING.
previous_task_state and task_instance object can be used to retrieve more information about current
task_instance that is running, its dag_run, task and dag information.
"""
print("Task instance is in running state")
print(" Previous state of the Task instance:", previous_state)
name: str = task_instance.task_id
context = task_instance.get_template_context()
task = context["task"]
if TYPE_CHECKING:
assert task
dag = task.dag
dag_name = None
if dag:
dag_name = dag.dag_id
print(f"Current task name:{name}")
print(f"Dag name:{dag_name}")
on_task_instance_success
src/airflow/example_dags/plugins/event_listener.py
@hookimpl
def on_task_instance_success(
previous_state: TaskInstanceState, task_instance: RuntimeTaskInstance | TaskInstance
):
"""
Called when task state changes to SUCCESS.
previous_task_state and task_instance object can be used to retrieve more information about current
task_instance that has succeeded, its dag_run, task and dag information.
A RuntimeTaskInstance is provided in most cases, except when the task's state change is triggered
through the API. In that case, the TaskInstance available on the API server will be provided instead.
"""
print("Task instance in success state")
print(" Previous state of the Task instance:", previous_state)
if isinstance(task_instance, TaskInstance):
print("Task instance's state was changed through the API.")
print(f"Task operator:{task_instance.operator}")
return
context = task_instance.get_template_context()
operator = context["task"]
print(f"Task operator:{operator}")
on_task_instance_failed
src/airflow/example_dags/plugins/event_listener.py
@hookimpl
def on_task_instance_failed(
previous_state: TaskInstanceState,
task_instance: RuntimeTaskInstance | TaskInstance,
error: None | str | BaseException,
):
"""
Called when task state changes to FAILED.
previous_task_state, task_instance object and error can be used to retrieve more information about current
task_instance that has failed, its dag_run, task and dag information.
A RuntimeTaskInstance is provided in most cases, except when the task's state change is triggered
through the API. In that case, the TaskInstance available on the API server will be provided instead.
"""
print("Task instance in failure state")
if isinstance(task_instance, TaskInstance):
print("Task instance's state was changed through the API.")
print(f"Task operator:{task_instance.operator}")
if error:
print(f"Failure caused by {error}")
return
context = task_instance.get_template_context()
task = context["task"]
if TYPE_CHECKING:
assert task
print("Task start")
print(f"Task:{task}")
if error:
print(f"Failure caused by {error}")
資產事件¶
on_asset_createdon_asset_alias_createdon_asset_changed
當資產管理操作執行時,會發生資產事件。
Dag 匯入錯誤事件¶
on_new_dag_import_erroron_existing_dag_import_error
當 dag 處理器在 Dag 程式碼中發現匯入錯誤並更新元資料資料庫表時,會發生 Dag 匯入錯誤事件。
這是一項實驗性功能。
使用方法¶
建立監聽器
import
airflow.listeners.hookimpl實現你想要生成通知的事件的
hookimpls
Airflow 將規範定義為 hookspec。你的實現必須接受與 hookspec 中定義的同名引數。如果你使用的引數與 hookspec 不同,Pluggy 會在你嘗試使用外掛時丟擲錯誤。但你不需要實現每個方法。許多監聽器只實現一個或一部分方法。
要在你的 Airflow 安裝中包含監聽器,將其作為 Airflow 外掛的一部分。
監聽器 API 旨在跨所有 dags 和所有操作器呼叫。你無法監聽由特定 dags 生成的事件。對於這種行為,可以嘗試使用 on_success_callback 和 pre_execute 等方法。這些方法為特定的 DAG 作者或操作器建立者提供回撥。日誌和 print() 呼叫將作為監聽器的一部分進行處理。
相容性說明¶
監聽器介面可能會隨時間變化。我們使用 pluggy 規範,這意味著為舊版介面編寫的監聽器實現應該與 Airflow 的未來版本向前相容。
然而,反之則不保證,因此如果你的監聽器是針對較新版本的介面實現的,它可能無法與舊版本的 Airflow 一起工作。如果你的目標是單個 Airflow 版本,這不成問題,因為你可以根據你使用的 Airflow 版本調整實現,但如果你正在編寫可與不同版本 Airflow 一起使用的外掛或擴充套件,這一點就很重要。
例如,如果在介面中添加了新欄位(如 2.10.0 版本 on_task_instance_failed 方法中的 error 欄位),監聽器實現將無法處理事件物件中不存在該欄位的情況,此類監聽器將僅適用於 Airflow 2.10.0 及更高版本。
為了實現一個與多個 Airflow 版本相容的監聽器,包括使用在較新版本的 Airflow 中新增的功能和欄位,你應該檢查使用的 Airflow 版本,並使用較新版本的介面實現,但對於舊版本的 Airflow,你應該使用舊版本的介面。
例如,如果你想實現一個使用 on_task_instance_failed 中的 error 欄位的監聽器,你應該使用如下程式碼
from importlib.metadata import version
from packaging.version import Version
from airflow.listeners import hookimpl
airflow_version = Version(version("apache-airflow"))
if airflow_version >= Version("2.10.0"):
class ClassBasedListener:
...
@hookimpl
def on_task_instance_failed(self, previous_state, task_instance, error: None | str | BaseException):
# Handle error case here
pass
else:
class ClassBasedListener: # type: ignore[no-redef]
...
@hookimpl
def on_task_instance_failed(self, previous_state, task_instance):
# Handle no error case here
pass
自 2.8.0 版本引入監聽器介面以來的變更列表
Airflow 版本 |
受影響方法 |
變更 |
|---|---|---|
2.10.0 |
|
介面中添加了 |
3.0.0 |
|
從任務例項監聽器中移除 |
3.0.0 |
|
從任務例項監聽器中移除 |