Airflow 的監聽器外掛¶
Airflow 具有透過外掛新增監聽器來監控和跟蹤任務狀態的功能。
這是一個簡單的 Airflow 監聽器外掛示例,它有助於跟蹤任務狀態,並收集有關任務、DAG 執行和 DAG 的有用元資料資訊。
這是一個 Airflow 外掛示例,用於建立 Airflow 監聽器外掛。此外掛透過使用 SQLAlchemy 的事件機制工作。它在表級別監視任務例項狀態的變化並觸發事件。這將通知所有 DAG 中所有任務的更改。
在此外掛中,物件引用派生自基類 airflow.plugins_manager.AirflowPlugin。
監聽器外掛底層使用了 pluggy 應用。Pluggy 是一個為 Pytest 構建的外掛管理和 hook 呼叫應用。Pluggy 支援函式 hooking,因此它允許構建具有您自己定製 hooking 的“可插拔”系統。
- 使用此外掛,可以監聽以下事件:
任務例項處於執行狀態。
任務例項處於成功狀態。
任務例項處於失敗狀態。
DAG 執行處於執行狀態。
DAG 執行處於成功狀態。
DAG 執行處於失敗狀態。
在 Airflow 作業、排程器等事件開始之前
在 Airflow 作業、排程器等事件停止之前
監聽器註冊¶
具有監聽器物件引用的監聽器外掛作為 Airflow 外掛的一部分註冊。以下是實現新監聽器的骨架:
from airflow.plugins_manager import AirflowPlugin
# This is the listener file created where custom code to monitor is added over hookimpl
import listener
class MetadataCollectionPlugin(AirflowPlugin):
name = "MetadataCollectionPlugin"
listeners = [listener]
接下來,我們可以檢視新增到 listener 中的程式碼,並檢視每個監聽器的實現方法。實現完成後,監聽器部分將在所有 DAG 的所有任務執行期間執行。
作為參考,這是 listener.py 類中的外掛程式碼,顯示了資料庫中的表列表:
此示例監聽任務例項處於執行狀態時的事件。
src/airflow/example_dags/plugins/event_listener.py
@hookimpl
def on_task_instance_running(previous_state: TaskInstanceState, task_instance: RuntimeTaskInstance):
"""
Called when task state changes to RUNNING.
previous_task_state and task_instance object can be used to retrieve more information about current
task_instance that is running, its dag_run, task and dag information.
"""
print("Task instance is in running state")
print(" Previous state of the Task instance:", previous_state)
name: str = task_instance.task_id
context = task_instance.get_template_context()
task = context["task"]
if TYPE_CHECKING:
assert task
dag = task.dag
dag_name = None
if dag:
dag_name = dag.dag_id
print(f"Current task name:{name}")
print(f"Dag name:{dag_name}")
類似地,可以實現監聽任務例項成功和失敗後的程式碼。
此示例監聽 DAG 執行更改為失敗狀態時的事件。
src/airflow/example_dags/plugins/event_listener.py
@hookimpl
def on_dag_run_failed(dag_run: DagRun, msg: str):
"""
This method is called when dag run state changes to FAILED.
"""
print("Dag run in failure state")
dag_id = dag_run.dag_id
run_id = dag_run.run_id
run_type = dag_run.run_type
print(f"Dag information:{dag_id} Run id: {run_id} Run type: {run_type}")
print(f"Failed with message: {msg}")
類似地,可以實現監聽 DAG 執行成功後和執行狀態期間的程式碼。
新增監聽器實現所需的監聽器外掛檔案作為 Airflow 外掛的一部分新增到 $AIRFLOW_HOME/plugins/ 資料夾中,並在 Airflow 啟動時載入。