DAG 序列化¶
為了使 Airflow Webserver 無狀態,Airflow >=1.10.7 支援 DAG 序列化和資料庫持久化。從 Airflow 2.0.0 開始,排程器也使用序列化的 dag 來保持一致性並做出排程決策。
在沒有 DAG 序列化和資料庫持久化的情況下,Webserver 和排程器都需要訪問 DAG 檔案。排程器和 Webserver 都會解析 DAG 檔案。
透過 DAG 序列化,我們旨在將 Webserver 與 DAG 解析解耦,這將使 Webserver 變得非常輕量級。
如上圖所示,使用此功能時,排程器中的 DagFileProcessorProcess 會解析 DAG 檔案,將其序列化為 JSON 格式並作為 SerializedDagModel 模型儲存到元資料資料庫 (Metadata DB) 中。
現在,Webserver 無需再次解析 DAG 檔案,而是讀取 JSON 格式的序列化 dag,將其反序列化並建立 DagBag,然後使用它在 UI 中顯示。而排程器無需實際的 dag 來做出排程決策,從 Airflow 2.0.0 開始,我們使用包含排程 dag所需所有資訊的序列化 dag,而不是使用 dag 檔案(這是 排程器高可用性 (Scheduler HA) 的一部分)。
DAG 序列化實現的關鍵特性之一是,Webserver 啟動時不再載入整個 DagBag,而是從 Serialized Dag 表中按需載入每個 DAG。這有助於減少 Webserver 的啟動時間和記憶體消耗。當您有大量 dag 時,這種減少非常顯著。
您可以啟用將原始碼儲存在資料庫中,使 Webserver 完全獨立於 DAG 檔案。如果您的檔案嵌入在 Docker 映象中或您可以透過其他方式將其提供給 Webserver,則無需這樣做。資料儲存在 DagCode 模型中。
最後一個要素是模板欄位的渲染 (rendering)。啟用序列化後,模板不會渲染到請求中,而是在任務工作程序 (worker) 上執行之前儲存欄位內容的副本。資料儲存在 RenderedTaskInstanceFields 模型中。為了限制資料庫的過度增長,僅保留最新的條目,舊的條目會被清除。
注意
從 Airflow 2.0+ 開始,DAG 序列化是強制要求且無法關閉的。
DAG 序列化設定¶
在 airflow.cfg 中新增以下設定
[core]
# You can also update the following default configurations based on your needs
min_serialized_dag_update_interval = 30
min_serialized_dag_fetch_interval = 10
max_num_rendered_ti_fields_per_task = 30
compress_serialized_dags = False
min_serialized_dag_update_interval:此標誌設定資料庫中序列化 dag 的最小更新間隔(以秒為單位)。這有助於減少資料庫寫入速率。min_serialized_dag_fetch_interval:此選項控制當序列化 DAG 已載入到 Webserver 的 DagBag 中時,多久(多頻繁)會從資料庫中重新獲取一次。將其設定得更高會減少資料庫負載,但代價是可能會顯示過期的 DAG 快取版本。max_num_rendered_ti_fields_per_task:此選項控制每個任務在資料庫中儲存的最大渲染任務例項欄位 (Rendered Task Instance Fields)(模板欄位)數量。compress_serialized_dags:此選項控制是否將序列化的 DAG 壓縮後儲存到資料庫中。當您的叢集中有非常大的 dag 時,這很有用。當設定為True時,這將停用 DAG 依賴檢視。
如果您正在從 <1.10.7 版本更新 Airflow,請不要忘記執行 airflow db migrate。
限制¶
使用使用者自定義過濾器和宏時,Webserver 中的渲染檢視 (Rendered View) 對於尚未執行的任務例項 (TIs) 可能會顯示不正確的結果,因為它可能使用了 Webserver 無法訪問的外部模組。在這種情況下,請使用
airflow tasks renderCLI 命令來除錯或測試模板欄位的渲染。一旦任務開始執行,渲染後的模板欄位將儲存在資料庫的單獨表中,之後 Webserver(渲染檢視選項卡)中將顯示正確的值。
注意
您需要 Airflow >= 1.10.10 才能獲得完全無狀態的 Webserver。Airflow 1.10.7 到 1.10.9 版本在某些情況下仍需要訪問 DAG 檔案。更多資訊請參閱:https://airflow.apache.tw/docs/1.10.9/dag-serialization.html#limitations
使用不同的 JSON 庫¶
要使用不同的 JSON 庫(例如 ujson),而不是標準的 json 庫,您需要在本地 Airflow 設定檔案 (airflow_local_settings.py) 中定義一個 json 變數,如下所示:
import ujson
json = ujson
有關如何配置本地設定的詳細資訊,請參閱配置本地設定。