Airflow 2025 峰會將於 10 月 07-09 日舉行。立即註冊獲取早鳥票!

外掛

Airflow 內建了一個簡單的外掛管理器,只需將檔案放到您的 $AIRFLOW_HOME/plugins 資料夾中,即可將其外部功能整合到核心中。

plugins 資料夾中的 Python 模組會被匯入,並且 和 Web 檢視 會整合到 Airflow 的主集合中並可供使用。

要排除外掛問題,可以使用 airflow plugins 命令。此命令會輸出已載入外掛的資訊。

有什麼用?

Airflow 提供了處理資料的通用工具箱。不同的組織有不同的技術棧和不同的需求。使用 Airflow 外掛可以幫助公司定製其 Airflow 安裝,以反映其生態系統。

外掛可以作為一種簡便的方法來編寫、共享和啟用新的功能集。

此外,還需要一套更復雜的應用程式來與不同型別的資料和元資料互動。

示例

  • 一套用於解析 Hive 日誌並暴露 Hive 元資料(CPU / IO / 階段 / 傾斜 / 等)的工具

  • 一個異常檢測框架,允許人們收集指標、設定閾值和警報

  • 一個審計工具,幫助瞭解誰訪問了什麼

  • 一個配置驅動的 SLA 監控工具,允許您設定被監控的表及其應該到達的時間,向相關人員傳送警報,並展示中斷的視覺化

為何基於 Airflow 構建?

Airflow 包含許多在構建應用程式時可以複用的元件

  • 一個可用於渲染檢視的 Web 伺服器

  • 一個用於儲存模型資料的元資料資料庫

  • 訪問您的資料庫,並瞭解如何連線它們

  • 一組您的應用程式可以推送工作負載的 Worker

  • Airflow 已經部署好,您可以直接利用其部署流程

  • 基本的圖表功能,以及底層庫和抽象

外掛何時載入(和重新載入)?

預設情況下,外掛是延遲載入的,一旦載入,就不會重新載入(UI 外掛在 Webserver 中自動載入除外)。要在每個 Airflow 程序啟動時載入它們,請在 airflow.cfg 中設定 [core] lazy_load_plugins = False

這意味著如果您對外掛進行了任何更改,並希望 Webserver 或排程器使用新程式碼,則需要重啟這些程序。但是,直到排程器啟動後,更改才會反映在新的執行任務中。

預設情況下,任務執行使用 forking。這避免了建立新的 Python 直譯器並重新解析所有 Airflow 程式碼和啟動例程所帶來的速度減慢。這種方法具有顯著的優勢,尤其是對於較短的任務。這意味著,如果您在任務中使用外掛並希望它們更新,則需要重啟 Worker(如果使用 CeleryExecutor)或排程器(LocalExecutor)。另一種選擇是接受啟動時的速度損失,將 core.execute_tasks_new_python_interpreter 配置設定為 True,這樣會為任務啟動一個全新的 Python 直譯器。

(另一方面,僅由 DAG 檔案匯入的模組則沒有這個問題,因為 DAG 檔案不會在任何長時間執行的 Airflow 程序中載入/解析。)

介面

要建立外掛,您需要派生 airflow.plugins_manager.AirflowPlugin 類,並引用您想要插入到 Airflow 中的物件。您需要派生的類如下所示

class AirflowPlugin:
    # The name of your plugin (str)
    name = None
    # A list of references to inject into the macros namespace
    macros = []
    # A list of dictionaries containing FastAPI app objects and some metadata. See the example below.
    fastapi_apps = []
    # A list of dictionaries containing FastAPI middleware factory objects and some metadata. See the example below.
    fastapi_root_middlewares = []

    # A callback to perform actions when Airflow starts and the plugin is loaded.
    # NOTE: Ensure your plugin has *args, and **kwargs in the method definition
    #   to protect against extra parameters injected into the on_load(...)
    #   function in future changes
    def on_load(*args, **kwargs):
        # ... perform Plugin boot actions
        pass

    # A list of global operator extra links that can redirect users to
    # external systems. These extra links will be available on the
    # task page in the form of buttons.
    #
    # Note: the global operator extra link can be overridden at each
    # operator level.
    global_operator_extra_links = []

    # A list of operator extra links to override or add operator links
    # to existing Airflow Operators.
    # These extra links will be available on the task page in form of
    # buttons.
    operator_extra_links = []

    # A list of timetable classes to register so they can be used in dags.
    timetables = []

    # A list of Listeners that plugin provides. Listeners can register to
    # listen to particular events that happen in Airflow, like
    # TaskInstance state changes. Listeners are python modules.
    listeners = []

您可以透過繼承來派生它(請參閱下面的示例)。在示例中,所有選項都已定義為類屬性,但如果您需要執行額外的初始化,也可以將它們定義為屬性。請注意,此類內部的 name 必須指定。

請確保在更改外掛後重新啟動 Webserver 和排程器,以使更改生效。

示例

下面的程式碼定義了一個外掛,它在 Airflow 中注入了一組說明性的物件定義。

# This is the class you derive to create a plugin
from airflow.plugins_manager import AirflowPlugin

from fastapi import FastAPI
from fastapi.middleware.trustedhost import TrustedHostMiddleware

# Importing base classes that we need to derive
from airflow.hooks.base import BaseHook
from airflow.providers.amazon.aws.transfers.gcs_to_s3 import GCSToS3Operator


# Will show up under airflow.macros.test_plugin.plugin_macro
# and in templates through {{ macros.test_plugin.plugin_macro }}
def plugin_macro():
    pass


# Creating a FastAPI application to integrate in Airflow Rest API.
app = FastAPI()


@app.get("/")
async def root():
    return {"message": "Hello World from FastAPI plugin"}


app_with_metadata = {"app": app, "url_prefix": "/some_prefix", "name": "Name of the App"}


# Creating a FastAPI middleware that will operates on all the server api requests.
middleware_with_metadata = {
    "middleware": TrustedHostMiddleware,
    "args": [],
    "kwargs": {"allowed_hosts": ["example.com", "*.example.com"]},
    "name": "Name of the Middleware",
}


# Defining the plugin class
class AirflowTestPlugin(AirflowPlugin):
    name = "test_plugin"
    macros = [plugin_macro]
    fastapi_apps = [app_with_metadata]
    fastapi_root_middlewares = [middleware_with_metadata]

從 CSRF 保護中排除檢視

我們強烈建議您使用 CSRF 保護所有檢視。但如果需要,可以使用裝飾器排除某些檢視。

from airflow.www.app import csrf


@csrf.exempt
def my_handler():
    # ...
    return "ok"

作為 Python 包的外掛

可以透過 setuptools entrypoint 機制載入外掛。為此,請在您的軟體包中使用 entrypoint 連結您的外掛。如果軟體包已安裝,Airflow 將自動從 entrypoint 列表中載入註冊的外掛。

注意

entrypoint 名稱(例如 my_plugin)和外掛類名稱都不會影響外掛本身的模組和類名稱。

# my_package/my_plugin.py
from airflow.plugins_manager import AirflowPlugin


class MyAirflowPlugin(AirflowPlugin):
    name = "my_namespace"

然後在 pyproject.toml 中

[project.entry-points."airflow.plugins"]
my_plugin = "my_package.my_plugin:MyAirflowPlugin"

Airflow 3 中的 Flask Appbuilder 和 Flask Blueprints

Airflow 2 支援外掛中的 Flask Appbuilder 檢視 (appbuilder_views)、Flask AppBuilder 選單項 (appbuilder_menu_items) 和 Flask Blueprints (flask_blueprints)。這些在 Airflow 3 中已被 FastAPI 應用取代。所有新外掛都應改用 FastAPI 應用 (fastapi_apps)。

但是,為 Flask 和 FAB 外掛提供了一個相容層,以便於過渡到 Airflow 3 - 只需安裝 FAB Provider 即可。理想情況下,您應該在升級過程中將您的外掛轉換為 FastAPI 應用 (fastapi_apps),因為此相容層已被棄用。

故障排除

您可以使用 Flask CLI 來排除問題。要執行此命令,您需要將變數 FLASK_APP 設定為 airflow.www.app:create_app

例如,要列印所有路由,執行

FLASK_APP=airflow.www.app:create_app flask routes

此條目是否有幫助?