Airflow Summit 2025 將於 10 月 07-09 日舉行。立即註冊享早鳥票優惠!

DAG 檔案處理

DAG 檔案處理是指讀取定義 DAG 的 Python 檔案並存儲它們的過程,以便排程器可以排程它們。

DAG 檔案處理主要涉及兩個元件。其中,DagFileProcessorManager 是一個執行無限迴圈的程序,它確定哪些檔案需要被處理;而 DagFileProcessorProcess 是一個獨立的程序,它被啟動用於將單個檔案轉換為一個或多個 DAG 物件。

DagFileProcessorManager 執行使用者程式碼。因此,它透過執行 airflow dag-processor CLI 命令作為獨立程序執行。

../_images/dag_file_processing_diagram.png

DagFileProcessorManager 的步驟如下:

  1. 檢查新檔案:如果自上次重新整理 DAG 以來經過的時間大於 dag_dir_list_interval (已棄用),則更新檔案路徑列表。

  2. 排除近期處理過的檔案:排除最近處理時間早於 min_file_process_interval 且未被修改的檔案。

  3. 檔案路徑入隊:將發現的檔案新增到檔案路徑佇列。

  4. 處理檔案:為每個檔案啟動一個新的 DagFileProcessorProcess,最多可達 parsing_processes 個。

  5. 收集結果:收集任何已完成的 DAG 處理器返回的結果。

  6. 記錄統計資訊:列印統計資訊併發送 dag_processing.total_parse_time

DagFileProcessorProcess 的步驟如下:

  1. 處理檔案:整個處理過程必須在 dag_file_processor_timeout 之內完成。

  2. 將 DAG 檔案載入為 Python 模組:必須在 dagbag_import_timeout 之內完成。

  3. 處理模組:在 Python 模組中查詢 DAG 物件。

  4. 返回 DagBag:向 DagFileProcessorManager 提供發現的 DAG 物件列表。

最佳化 DAG 處理器效能

影響 DAG 處理器效能的因素

DAG 處理器負責持續解析 DAG 檔案並與資料庫中的 DAG 同步。為了最佳化 DAG 處理器,需要考慮以下多種因素。

  • 您的部署型別
    • 用於共享 DAG 的檔案系統型別(影響持續讀取 DAG 的效能)

    • 檔案系統的速度如何(在許多分散式雲檔案系統的情況下,可以透過額外付費獲得更高的吞吐量/更快速的檔案系統)

    • 您有多少可用於處理的記憶體

    • 您有多少可用的 CPU

    • 您有多少可用的網路吞吐量

  • DAG 結構的邏輯和定義
    • 您有多少個 DAG 檔案

    • 您的檔案中有多少個 DAG

    • DAG 檔案有多大(記住 DAG 解析器需要每 n 秒讀取和解析檔案)

    • 它們有多複雜(即解析速度有多快,有多少任務和依賴關係)

    • 解析 DAG 檔案是否涉及匯入大量庫或在頂層進行大量處理(提示!不應該這樣做。請參閱 頂層 Python 程式碼

  • DAG 處理器配置
    • 您有多少個 DAG 處理器

    • 您的 DAG 處理器中有多少個解析程序

    • DAG 處理器在重新解析同一個 DAG 之間等待多長時間(這是持續發生的)

    • 每個 DAG 處理器迴圈中執行多少個回撥

如何著手最佳化 DAG 處理器

Airflow 提供了許多“旋鈕”供您調整以最佳化效能,但這取決於您的特定部署、DAG 結構、硬體可用性和期望,您需要單獨決定調整哪些旋鈕才能獲得最佳效果。管理部署的一部分工作是決定您要最佳化哪些方面。例如,一些使用者可以接受新 DAG 解析延遲 30 秒,以換取較低的 CPU 使用率;而另一些使用者則希望 DAG 出現在 DAG 資料夾中時幾乎立即被解析,但這會以較高的 CPU 使用率為代價。

Airflow 提供了靈活的決策空間,但是您應該弄清楚哪個效能方面對您來說最重要,並決定要朝哪個方向調整哪些旋鈕。

通常,對於效能最佳化,您的方法應該與進行任何效能改進和最佳化時相同(我們不會推薦任何特定工具 - 只需使用您通常用來觀察和監控系統的工具)。

  • 使用您通常用來監控系統的正確工具集來監控您的系統至關重要。本文件不詳細介紹您可以使用的特定指標和工具,它只描述您應該監控哪種資源,但您應遵循自己的監控最佳實踐來獲取正確的資料。

  • 決定哪個效能方面對您來說最重要(您想要改進什麼)

  • 觀察您的系統以檢視瓶頸所在:CPU、記憶體、I/O 通常是限制因素。

  • 根據您的期望和觀察結果 - 決定您的下一步改進是什麼,並回到對效能和瓶頸的觀察。效能改進是一個迭代過程。

可能限制 DAG 處理器效能的資源

有幾個資源使用方面需要您注意:

  • 檔案系統性能。Airflow DAG 處理器嚴重依賴解析 Python 檔案(有時是大量檔案),這些檔案通常位於共享檔案系統上。DAG 處理器持續讀取和重新解析這些檔案。同樣的檔案必須對 Worker 可用,因此它們通常儲存在分散式檔案系統中。您可以使用各種檔案系統來實現此目的(NFS、CIFS、EFS、GCS fuse、Azure File System 都是很好的例子)。您可以控制這些檔案系統的各種引數並最佳化其效能,但這超出了本文件的範圍。您應該觀察檔案系統的統計資訊和使用情況,以確定問題是否來自檔案系統性能。例如,有傳聞證據表明,在使用 EFS 時,提高 EFS 效能的 IOPS(併為此支付更多費用)可以顯著提高解析 Airflow DAG 的穩定性和速度。

  • 如果檔案系統性能成為您的瓶頸,另一種解決方案是轉向分發 DAG 的替代機制。將 DAG 嵌入到您的映象中以及 GitSync 分發都具有檔案對 DAG 處理器本地可用的特性,並且它不必使用分散式檔案系統來讀取檔案,檔案對 DAG 處理器本地可用,並且通常儘可能快,特別是如果您的機器使用快速 SSD 磁碟進行本地儲存。這些分發機制具有其他特性,可能使其對您來說不是最佳選擇,但如果您的效能問題來自分散式檔案系統性能,它們可能是最佳方法。

  • 隨著您想要提高效能並並行處理更多事物,資料庫連線和資料庫使用可能會成為問題。Airflow 因“資料庫連線飢渴”而聞名 - 您擁有的 DAG 越多,想要並行處理的越多,就會開啟更多的資料庫連線。對於 MySQL 來說,這通常不是問題,因為其連線處理模型是基於執行緒的;但這對於 Postgres 來說可能是問題,因為其連線處理是基於程序的。普遍的共識是,即使是中等規模的基於 Postgres 的 Airflow 安裝,最佳解決方案也是使用 PGBouncer 作為資料庫代理。Apache Airflow 的 Helm Chart 開箱即用地支援 PGBouncer。

  • CPU 使用率對 FileProcessors 最重要 - 這些是解析和執行 Python DAG 檔案的程序。由於 DAG 處理器通常持續觸發此類解析,當您有大量 DAG 時,處理可能會佔用大量 CPU。您可以透過增加 min_file_process_interval 來緩解,但這是一種權衡,結果是對此類檔案的更改會被更慢地拾取,您將看到提交檔案與它們在 Airflow UI 中可用並由排程器執行之間的延遲。最佳化 DAG 的構建方式,避免外部資料來源是改善 CPU 使用率的最佳方法。如果您有更多可用的 CPU,您可以增加處理執行緒數 parsing_processes

  • 當您嘗試從 Airflow 中榨取更多效能時,它可能會使用相當大的記憶體。在 Airflow 中,通常透過增加處理負載的程序數來獲得更高的效能,每個程序都需要載入整個 Python 直譯器、匯入大量類和臨時記憶體儲存。Airflow 透過使用 forking 和寫時複製 (copy-on-write) 記憶體來最佳化其中很多部分,但在 forking 後匯入新類的情況下,這可能會導致額外的記憶體壓力。您需要觀察您的系統是否使用了超過其擁有的記憶體 - 這會導致使用交換空間,從而顯著降低效能。檢視記憶體使用情況時,請務必注意您正在觀察的是哪種型別的記憶體。通常,您應該檢視 工作記憶體 (working memory)(名稱可能因您的部署而異),而不是 已使用的總記憶體 (total memory used)

如何改進 DAG 處理器效能

當您瞭解自己的資源使用情況後,可以考慮的改進措施可能包括:

  • 改進頂層 DAG Python 程式碼的邏輯、解析效率並降低其複雜性。它會被持續解析,因此最佳化該程式碼可能會帶來巨大的改進,特別是如果您在解析 DAG 時嘗試訪問某些外部資料庫等(應不惜一切代價避免這樣做)。頂層 Python 程式碼 解釋了編寫頂層 Python 程式碼的最佳實踐。降低 DAG 複雜性 文件提供了一些您在想要降低程式碼複雜性時可以關注的領域。

  • 提高資源利用率。當您的系統中存在未充分利用的空閒容量時(CPU、記憶體、I/O、網路再次是主要候選者),您可以採取增加解析程序數等措施,這可能會以提高這些資源的利用率為代價來改善效能。

  • 增加硬體容量(例如,如果您發現 CPU 是瓶頸或者用於 DAG 檔案系統的 I/O 已達到極限)。通常,DAG 處理器效能問題僅僅是因為您的系統“能力”不足,這可能是唯一的解決方案,除非共享資料庫或檔案系統是瓶頸。

  • 試驗“DAG 處理器可調引數”的不同值。通常,您可以透過簡單地用一個性能方面交換另一個方面來獲得更好的效果。例如,如果您想降低 CPU 使用率,可以增加檔案處理間隔(但這會導致新 DAG 出現更大的延遲)。通常,效能調優是平衡不同方面的藝術。

  • 有時您可以稍微更改 DAG 處理器的行為(例如更改解析排序順序),以便為您的特定部署獲得更好的調優結果。

DAG 處理器配置選項

以下配置設定可用於控制排程器的各個方面。然而,您還可以檢視其他與效能無關的排程器配置引數,這些引數可在 配置參考[scheduler] 部分找到。

  • file_parsing_sort_mode 排程器將列出並排序 DAG 檔案以決定解析順序。

  • min_file_process_interval DAG 檔案重新解析的秒數間隔。DAG 檔案每隔 min_file_process_interval 秒被解析一次。對 DAG 的更新在此間隔後反映。保持此值較低會增加 CPU 使用率。

  • parsing_processes 排程器可以並行執行多個程序來解析 DAG 檔案。這定義了將執行多少個程序。

本條目有幫助嗎?