Airflow 2025 峰會將於 10 月 07 日至 09 日舉行。立即註冊享早鳥票優惠!

排程器

Airflow 排程器監控所有任務和 DAG,並在其依賴項完成後觸發任務例項。在幕後,排程器會啟動一個子程序,該程序監控指定 DAG 目錄中的所有 DAG 並與之保持同步。預設情況下,排程器每分鐘收集一次 DAG 解析結果,並檢查是否有任何活動任務可以觸發。

Airflow 排程器設計為在 Airflow 生產環境中作為持久服務執行。要啟動它,您只需要執行 airflow scheduler 命令。它使用 airflow.cfg 中指定的配置。

排程器使用配置的 Executor(執行器) 來執行就緒的任務。

要啟動排程器,只需執行命令

airflow scheduler

一旦排程器成功執行,您的 DAG 將開始執行。

注意

第一個 DAG Run 是根據您 DAG 中任務的最小 start_date 建立的。隨後的 DAG Run 將根據您 DAG 的 timetable(時間表) 建立。

對於具有 cron 或 timedelta 排程的 DAG,排程器只有在其涵蓋的時間段結束後才會觸發您的任務,例如,設定 schedule@daily 的任務將在一天結束後執行。這種技術確保在該時間段所需的任何資料在 DAG 執行之前完全可用。在 UI 中,看起來 Airflow 似乎將您的任務延遲了一天執行。

注意

如果您將 DAG 的 schedule 設定為一天,那麼資料時間間隔從 2019-11-21 開始的執行將在 2019-11-21T23:59 之後觸發。

我們再重複一遍,排程器在開始日期之後一個 schedule 週期,在間隔的結束時執行您的任務。

有關排程 DAG 的詳細資訊,您應參考 DAG Run(DAG 執行)

注意

排程器旨在實現高吞吐量。這是一個經過深思熟慮的設計決策,旨在儘快排程任務。排程器檢查池 (pool) 中有多少可用插槽,並在一次迭代中最多排程相應數量的任務例項。這意味著只有當等待排程的任務多於佇列插槽時,任務優先順序才會生效。因此,如果低優先順序任務與高優先順序任務共享同一個批次,可能會出現低優先順序任務在更高優先順序任務之前被排程的情況。要了解更多資訊,您可以參考這個 GitHub 討論

執行多個排程器

Airflow 支援同時執行多個排程器——既為了效能原因,也為了提高彈性。

概述

HA 排程器旨在利用現有的元資料資料庫。這主要是為了操作的簡單性:每個元件都已經需要與該資料庫通訊,並且透過不使用排程器之間的直接通訊或共識演算法(Raft, Paxos 等),也不使用其他共識工具(例如 Apache Zookeeper 或 Consul),我們將“操作面”保持在最低限度。

排程器現在使用序列化的 DAG 表示來進行排程決策,排程迴圈的大致流程是

  • 檢查是否有任何 DAG 需要新的 DagRun 並建立它們

  • 檢查一批 DagRun,查詢可排程的 TaskInstances 或已完成的 DagRun

  • 選擇可排程的 TaskInstances,並在尊重 Pool 限制及其他併發限制的情況下,將其排隊等待執行

但是,這確實對資料庫提出了一些要求。

資料庫要求

簡而言之,PostgreSQL 12+ 或 MySQL 8.0+ 的使用者都已準備就緒——您可以根據需要執行任意數量的排程器副本——無需進一步的設定或配置選項。如果您使用其他資料庫,請繼續閱讀。

為了保持效能和吞吐量,排程迴圈中有一部分在記憶體中進行大量計算(因為每次 TaskInstance 都需要往返資料庫會太慢),因此我們需要確保在任何時候只有一個排程器處於此關鍵區域——否則限制將無法得到正確遵守。為實現這一點,我們使用資料庫行級鎖(使用 SELECT ... FOR UPDATE)。

這個關鍵區域是 TaskInstances 從排程狀態轉移到執行器佇列的地方,同時確保各種併發和 Pool 限制得到遵守。透過請求對 Pool 表的每一行進行行級寫鎖來獲得關鍵區域(大致相當於 SELECT * FROM slot_pool FOR UPDATE NOWAIT,但確切的查詢略有不同)。

以下資料庫完全支援並提供“最優”體驗

  • PostgreSQL 12+

  • MySQL 8.0+

警告

MariaDB 直到 10.6.0 版本才實現了 SKIP LOCKEDNOWAIT SQL 子句。如果沒有這些功能,則不支援執行多個排程器,並且已報告了死鎖錯誤。MariaDB 10.6.0 及後續版本可能與多個排程器正常配合工作,但這尚未經過測試。

注意

Microsoft SQL Server 尚未與 HA 一起進行過測試。

微調排程器效能

影響排程器效能的因素

排程器負責持續排程任務以供執行。為了微調您的排程器,您需要考慮多個因素

  • 您的部署型別
    • 您有多少可用記憶體

    • 您有多少可用 CPU

    • 您有多少可用網路吞吐量

  • 您的 DAG 結構的邏輯和定義
    • 您有多少個 DAG

    • 它們的複雜程度(即它們有多少任務和依賴項)

  • 排程器配置
    • 您有多少個排程器

    • 排程器在一個迴圈中處理多少任務例項

    • 每個迴圈應該建立/排程多少新的 DAG 執行

    • 排程器應該多久執行一次清理並檢查孤立任務/認領它們

為了進行微調,最好了解排程器在底層是如何工作的。您可以觀看 Airflow Summit 2021 的“深入瞭解 Airflow 排程器”演講來進行微調。

如何進行排程器微調

Airflow 提供了許多“旋鈕”供您調整以微調效能,但根據您的具體部署、DAG 結構、硬體可用性和期望,決定調整哪些“旋鈕”才能獲得最佳效果,這是一個單獨的任務。管理部署的一部分工作是決定您將最佳化哪些方面。

Airflow 賦予您決定權,但您應該找出對您最重要的效能方面,並決定您要朝哪個方向調整哪些“旋鈕”。

一般來說,進行微調的方法應與任何效能改進和最佳化一樣(我們不會推薦任何特定工具——只需使用您通常用於觀察和監控系統的工具)

  • 使用您通常用於監控系統的工具集來監控您的系統至關重要。本文件不詳細介紹您可以使用的具體指標和工具,它只描述了您應該監控哪些型別的資源,但您應該遵循您的最佳監控實踐來獲取正確的資料。

  • 決定對您最重要的效能方面(您想改進什麼)

  • 觀察您的系統,檢視瓶頸所在:CPU、記憶體、I/O 是常見的限制因素

  • 根據您的期望和觀察結果,決定您的下一個改進是什麼,並回到對您的效能、瓶頸的觀察。效能改進是一個迭代過程。

可能限制排程器效能的資源

有幾個資源使用方面您應該注意

  • 隨著您希望提高效能和並行處理更多事物,資料庫連線和資料庫使用可能會成為問題。Airflow 以“資料庫連線飢渴”而聞名——您擁有的 DAG 越多,希望並行處理的事物越多,開啟的資料庫連線就越多。這對於 MySQL 通常不是問題,因為它的連線處理模型是基於執行緒的,但這對於 Postgres 可能是一個問題,因為它的連線處理是基於程序的。一般認為,即使是中等規模的基於 Postgres 的 Airflow 安裝,最好的解決方案是使用 PGBouncer 作為您資料庫的代理。Apache Airflow 的 Helm Chart 開箱即用地支援 PGBouncer。

  • Airflow 排程器在多個例項下幾乎線性擴充套件,因此如果您的排程器效能受限於 CPU,您也可以新增更多排程器。

  • 在檢視記憶體使用情況時,請務必注意您正在觀察的是哪種型別的記憶體。通常您應該檢視 working memory(工作記憶體,名稱可能因您的部署而異),而不是 total memory used(總已用記憶體)。

您可以做些什麼來提高排程器效能

當您瞭解自己的資源使用情況時,您可以考慮的改進措施可能包括

  • 提高資源利用率。這指的是您的系統中存在看似未充分利用的閒置容量(同樣,CPU、記憶體、I/O、網路是主要候選項)——您可以採取增加排程器數量或縮短間隔以更頻繁地執行操作等措施,這可能會提高效能,但代價是更高的資源利用率。

  • 增加硬體容量(例如,如果您發現 CPU 是您的瓶頸)。排程器效能問題通常只是因為您的系統不夠“強大”,這可能是唯一的解決方案。例如,如果您發現機器上的所有 CPU 都已被佔用,您可能需要在新機器上新增另一個排程器——在大多數情況下,當您新增第 2 個或第 3 個排程器時,排程能力會線性增長(除非共享資料庫或類似資源成為瓶頸)。

  • 嘗試不同的“排程器可調引數”值。通常,您可以透過簡單地權衡一個性能方面來換取另一個方面,從而獲得更好的效果。效能調優通常是平衡不同方面的藝術。

排程器配置選項

以下配置設定可用於控制排程器的各個方面。但是,您也可以檢視 配置參考[scheduler] 部分提供的其他與效能無關的排程器配置引數。

  • max_dagruns_to_create_per_loop

    這改變了每個排程器在建立 DAG 執行時鎖定的 DAG 數量。設定此引數較低的一個可能原因是,如果您有大型 DAG(每個 DAG 包含 1 萬個以上的任務)並執行多個排程器,您不會希望一個排程器完成所有工作。

  • max_dagruns_per_loop_to_schedule

    排程器在排程和排隊任務時應該檢查(並鎖定)多少個 DagRun。增加此限制將允許小型 DAG 獲得更高的吞吐量,但可能會降低大型 DAG(例如,超過 500 個任務)的吞吐量。在使用多個排程器時將此值設定得過高,也可能導致一個排程器接管所有 DAG 執行,而沒有工作留給其他排程器。

  • use_row_level_locking

    排程器是否應在相關查詢中發出 SELECT ... FOR UPDATE。如果此設定為 False,則您不應同時執行多個排程器。

  • pool_metrics_interval

    Pool 使用統計資訊應該多久(以秒為單位)傳送到 StatsD(如果啟用了 statsd_on)?計算此資訊是一個相對耗時的查詢,因此應將其設定為與您的 StatsD 彙總週期相同的週期。

  • running_metrics_interval

    執行中的任務例項統計資訊應該多久(以秒為單位)傳送到 StatsD(如果啟用了 statsd_on)?計算此資訊是一個相對耗時的查詢,因此應將其設定為與您的 StatsD 彙總週期相同的週期。

  • orphaned_tasks_check_interval

    排程器應該多久(以秒為單位)檢查一次孤立任務或死亡的 SchedulerJobs?

    此設定控制如何檢測到死亡的排程器,以及如何由另一個排程器接管其“監督”的任務。任務將繼續執行,因此暫時不檢測到死亡排程器也沒有關係。

    當一個 SchedulerJob 被檢測為“死亡”(由 scheduler_health_check_threshold 決定)時,由該死亡程序啟動的任何正在執行或排隊的任務將被當前排程器“認領”並轉為監控。

  • max_tis_per_query 排程主迴圈中查詢的批處理大小。此值不應大於 core.parallelism。如果此值過高,則 SQL 查詢效能可能會受到查詢謂詞複雜性和/或過度鎖定的影響。

    此外,您可能會達到資料庫允許的最大查詢長度。將此值設定為 0 以使用 core.parallelism 的值。

  • scheduler_idle_sleep_time 控制排程器在迴圈之間休眠多久,但僅當該迴圈中沒有任何任務可做時。也就是說,如果它排程了任務,則會立即開始下一個迴圈迭代。此引數命名不佳(出於歷史原因),未來將棄用當前名稱並進行重新命名。

此條目是否有幫助?