外掛¶
Airflow 內建了一個簡單的外掛管理器,只需將檔案放到您的 $AIRFLOW_HOME/plugins 資料夾中,即可將其外部功能整合到核心中。
plugins 資料夾中的 Python 模組會被匯入,並且 宏 和 Web 檢視 會整合到 Airflow 的主集合中並可供使用。
要排除外掛問題,可以使用 airflow plugins 命令。此命令會輸出已載入外掛的資訊。
有什麼用?¶
Airflow 提供了處理資料的通用工具箱。不同的組織有不同的技術棧和不同的需求。使用 Airflow 外掛可以幫助公司定製其 Airflow 安裝,以反映其生態系統。
外掛可以作為一種簡便的方法來編寫、共享和啟用新的功能集。
此外,還需要一套更復雜的應用程式來與不同型別的資料和元資料互動。
示例
一套用於解析 Hive 日誌並暴露 Hive 元資料(CPU / IO / 階段 / 傾斜 / 等)的工具
一個異常檢測框架,允許人們收集指標、設定閾值和警報
一個審計工具,幫助瞭解誰訪問了什麼
一個配置驅動的 SLA 監控工具,允許您設定被監控的表及其應該到達的時間,向相關人員傳送警報,並展示中斷的視覺化
為何基於 Airflow 構建?¶
Airflow 包含許多在構建應用程式時可以複用的元件
一個可用於渲染檢視的 Web 伺服器
一個用於儲存模型資料的元資料資料庫
訪問您的資料庫,並瞭解如何連線它們
一組您的應用程式可以推送工作負載的 Worker
Airflow 已經部署好,您可以直接利用其部署流程
基本的圖表功能,以及底層庫和抽象
外掛何時載入(和重新載入)?¶
預設情況下,外掛是延遲載入的,一旦載入,就不會重新載入(UI 外掛在 Webserver 中自動載入除外)。要在每個 Airflow 程序啟動時載入它們,請在 airflow.cfg 中設定 [core] lazy_load_plugins = False。
這意味著如果您對外掛進行了任何更改,並希望 Webserver 或排程器使用新程式碼,則需要重啟這些程序。但是,直到排程器啟動後,更改才會反映在新的執行任務中。
預設情況下,任務執行使用 forking。這避免了建立新的 Python 直譯器並重新解析所有 Airflow 程式碼和啟動例程所帶來的速度減慢。這種方法具有顯著的優勢,尤其是對於較短的任務。這意味著,如果您在任務中使用外掛並希望它們更新,則需要重啟 Worker(如果使用 CeleryExecutor)或排程器(LocalExecutor)。另一種選擇是接受啟動時的速度損失,將 core.execute_tasks_new_python_interpreter 配置設定為 True,這樣會為任務啟動一個全新的 Python 直譯器。
(另一方面,僅由 DAG 檔案匯入的模組則沒有這個問題,因為 DAG 檔案不會在任何長時間執行的 Airflow 程序中載入/解析。)
介面¶
要建立外掛,您需要派生 airflow.plugins_manager.AirflowPlugin 類,並引用您想要插入到 Airflow 中的物件。您需要派生的類如下所示
class AirflowPlugin:
# The name of your plugin (str)
name = None
# A list of references to inject into the macros namespace
macros = []
# A list of dictionaries containing FastAPI app objects and some metadata. See the example below.
fastapi_apps = []
# A list of dictionaries containing FastAPI middleware factory objects and some metadata. See the example below.
fastapi_root_middlewares = []
# A callback to perform actions when Airflow starts and the plugin is loaded.
# NOTE: Ensure your plugin has *args, and **kwargs in the method definition
# to protect against extra parameters injected into the on_load(...)
# function in future changes
def on_load(*args, **kwargs):
# ... perform Plugin boot actions
pass
# A list of global operator extra links that can redirect users to
# external systems. These extra links will be available on the
# task page in the form of buttons.
#
# Note: the global operator extra link can be overridden at each
# operator level.
global_operator_extra_links = []
# A list of operator extra links to override or add operator links
# to existing Airflow Operators.
# These extra links will be available on the task page in form of
# buttons.
operator_extra_links = []
# A list of timetable classes to register so they can be used in dags.
timetables = []
# A list of Listeners that plugin provides. Listeners can register to
# listen to particular events that happen in Airflow, like
# TaskInstance state changes. Listeners are python modules.
listeners = []
您可以透過繼承來派生它(請參閱下面的示例)。在示例中,所有選項都已定義為類屬性,但如果您需要執行額外的初始化,也可以將它們定義為屬性。請注意,此類內部的 name 必須指定。
請確保在更改外掛後重新啟動 Webserver 和排程器,以使更改生效。
示例¶
下面的程式碼定義了一個外掛,它在 Airflow 中注入了一組說明性的物件定義。
# This is the class you derive to create a plugin
from airflow.plugins_manager import AirflowPlugin
from fastapi import FastAPI
from fastapi.middleware.trustedhost import TrustedHostMiddleware
# Importing base classes that we need to derive
from airflow.hooks.base import BaseHook
from airflow.providers.amazon.aws.transfers.gcs_to_s3 import GCSToS3Operator
# Will show up under airflow.macros.test_plugin.plugin_macro
# and in templates through {{ macros.test_plugin.plugin_macro }}
def plugin_macro():
pass
# Creating a FastAPI application to integrate in Airflow Rest API.
app = FastAPI()
@app.get("/")
async def root():
return {"message": "Hello World from FastAPI plugin"}
app_with_metadata = {"app": app, "url_prefix": "/some_prefix", "name": "Name of the App"}
# Creating a FastAPI middleware that will operates on all the server api requests.
middleware_with_metadata = {
"middleware": TrustedHostMiddleware,
"args": [],
"kwargs": {"allowed_hosts": ["example.com", "*.example.com"]},
"name": "Name of the Middleware",
}
# Defining the plugin class
class AirflowTestPlugin(AirflowPlugin):
name = "test_plugin"
macros = [plugin_macro]
fastapi_apps = [app_with_metadata]
fastapi_root_middlewares = [middleware_with_metadata]
另請參閱
從 CSRF 保護中排除檢視¶
我們強烈建議您使用 CSRF 保護所有檢視。但如果需要,可以使用裝飾器排除某些檢視。
from airflow.www.app import csrf
@csrf.exempt
def my_handler():
# ...
return "ok"
作為 Python 包的外掛¶
可以透過 setuptools entrypoint 機制載入外掛。為此,請在您的軟體包中使用 entrypoint 連結您的外掛。如果軟體包已安裝,Airflow 將自動從 entrypoint 列表中載入註冊的外掛。
注意
entrypoint 名稱(例如 my_plugin)和外掛類名稱都不會影響外掛本身的模組和類名稱。
# my_package/my_plugin.py
from airflow.plugins_manager import AirflowPlugin
class MyAirflowPlugin(AirflowPlugin):
name = "my_namespace"
然後在 pyproject.toml 中
[project.entry-points."airflow.plugins"]
my_plugin = "my_package.my_plugin:MyAirflowPlugin"
Airflow 3 中的 Flask Appbuilder 和 Flask Blueprints¶
Airflow 2 支援外掛中的 Flask Appbuilder 檢視 (appbuilder_views)、Flask AppBuilder 選單項 (appbuilder_menu_items) 和 Flask Blueprints (flask_blueprints)。這些在 Airflow 3 中已被 FastAPI 應用取代。所有新外掛都應改用 FastAPI 應用 (fastapi_apps)。
但是,為 Flask 和 FAB 外掛提供了一個相容層,以便於過渡到 Airflow 3 - 只需安裝 FAB Provider 即可。理想情況下,您應該在升級過程中將您的外掛轉換為 FastAPI 應用 (fastapi_apps),因為此相容層已被棄用。
故障排除¶
您可以使用 Flask CLI 來排除問題。要執行此命令,您需要將變數 FLASK_APP 設定為 airflow.www.app:create_app。
例如,要列印所有路由,執行
FLASK_APP=airflow.www.app:create_app flask routes