Airflow 峰會 2025 將於 10 月 07-09 日舉行。立即註冊獲取早鳥票!

Airflow 的監聽器外掛

Airflow 具有透過外掛新增監聽器來監控和跟蹤任務狀態的功能。

這是一個簡單的 Airflow 監聽器外掛示例,它有助於跟蹤任務狀態,並收集有關任務、DAG 執行和 DAG 的有用元資料資訊。

這是一個 Airflow 外掛示例,用於建立 Airflow 監聽器外掛。此外掛透過使用 SQLAlchemy 的事件機制工作。它在表級別監視任務例項狀態的變化並觸發事件。這將通知所有 DAG 中所有任務的更改。

在此外掛中,物件引用派生自基類 airflow.plugins_manager.AirflowPlugin

監聽器外掛底層使用了 pluggy 應用。Pluggy 是一個為 Pytest 構建的外掛管理和 hook 呼叫應用。Pluggy 支援函式 hooking,因此它允許構建具有您自己定製 hooking 的“可插拔”系統。

使用此外掛,可以監聽以下事件:
  • 任務例項處於執行狀態。

  • 任務例項處於成功狀態。

  • 任務例項處於失敗狀態。

  • DAG 執行處於執行狀態。

  • DAG 執行處於成功狀態。

  • DAG 執行處於失敗狀態。

  • 在 Airflow 作業、排程器等事件開始之前

  • 在 Airflow 作業、排程器等事件停止之前

監聽器註冊

具有監聽器物件引用的監聽器外掛作為 Airflow 外掛的一部分註冊。以下是實現新監聽器的骨架:

from airflow.plugins_manager import AirflowPlugin

# This is the listener file created where custom code to monitor is added over hookimpl
import listener


class MetadataCollectionPlugin(AirflowPlugin):
    name = "MetadataCollectionPlugin"
    listeners = [listener]

接下來,我們可以檢視新增到 listener 中的程式碼,並檢視每個監聽器的實現方法。實現完成後,監聽器部分將在所有 DAG 的所有任務執行期間執行。

作為參考,這是 listener.py 類中的外掛程式碼,顯示了資料庫中的表列表:

此示例監聽任務例項處於執行狀態時的事件。

src/airflow/example_dags/plugins/event_listener.py

@hookimpl
def on_task_instance_running(previous_state: TaskInstanceState, task_instance: RuntimeTaskInstance):
    """
    Called when task state changes to RUNNING.

    previous_task_state and task_instance object can be used to retrieve more information about current
    task_instance that is running, its dag_run, task and dag information.
    """
    print("Task instance is in running state")
    print(" Previous state of the Task instance:", previous_state)

    name: str = task_instance.task_id

    context = task_instance.get_template_context()

    task = context["task"]

    if TYPE_CHECKING:
        assert task

    dag = task.dag
    dag_name = None
    if dag:
        dag_name = dag.dag_id
    print(f"Current task name:{name}")
    print(f"Dag name:{dag_name}")


類似地,可以實現監聽任務例項成功和失敗後的程式碼。

此示例監聽 DAG 執行更改為失敗狀態時的事件。

src/airflow/example_dags/plugins/event_listener.py

@hookimpl
def on_dag_run_failed(dag_run: DagRun, msg: str):
    """
    This method is called when dag run state changes to FAILED.
    """
    print("Dag run  in failure state")
    dag_id = dag_run.dag_id
    run_id = dag_run.run_id
    run_type = dag_run.run_type

    print(f"Dag information:{dag_id} Run id: {run_id} Run type: {run_type}")
    print(f"Failed with message: {msg}")


類似地,可以實現監聽 DAG 執行成功後和執行狀態期間的程式碼。

新增監聽器實現所需的監聽器外掛檔案作為 Airflow 外掛的一部分新增到 $AIRFLOW_HOME/plugins/ 資料夾中,並在 Airflow 啟動時載入。

此條目有幫助嗎?