Celery Executor

注意

從 Airflow 2.7.0 版本開始,您需要安裝 celery provider 包才能使用此執行器。這可以透過安裝 apache-airflow-providers-celery>=3.3.0 或透過安裝帶有 celery extra 的 Airflow 來完成:pip install 'apache-airflow[celery]'

CeleryExecutor 是您可以擴充套件 worker 數量的方式之一。為了使其工作,您需要設定一個 Celery 後端(RabbitMQRedisRedis Sentinel…),安裝所需的依賴項(例如 librabbitmqredis…),並修改 airflow.cfg,將 executor 引數指向 CeleryExecutor 並提供相關的 Celery 設定。

有關設定 Celery broker 的更多資訊,請參閱詳細的Celery 相關文件

Celery Executor 的配置引數可以在 Celery provider 的配置參考中找到。

以下是您的 worker 的幾個強制性要求

  • 需要安裝 airflow,並且 CLI 需要在 path 中

  • Airflow 配置設定在叢集中應保持一致

  • 在 worker 上執行的 Operator 需要在該上下文中滿足其依賴項。例如,如果您使用 HiveOperator,則需要在該機器上安裝 Hive CLI;或者如果您使用 MySqlOperator,則需要以某種方式在PYTHONPATH 中提供所需的 Python 庫

  • worker 需要能夠訪問其 DAGS_FOLDER,並且您需要透過自己的方式同步檔案系統。一種常見的設定是將您的 DAGS_FOLDER 儲存在 Git 倉庫中,並使用 Chef、Puppet、Ansible 或您用於在環境中配置機器的任何工具在機器之間同步。如果您的所有機器都有一個公共掛載點,將您的管道檔案共享在那裡也應該可以工作

要啟動一個 worker,您需要設定 Airflow 並啟動 worker 子命令

airflow celery worker

您的 worker 一旦任務被髮送到其方向,就應該開始接收任務。要在機器上停止正在執行的 worker,您可以使用

airflow celery stop

它將嘗試透過向主 Celery 程序傳送 SIGTERM 訊號來優雅地停止 worker,這與 Celery 文件推薦的一樣。

請注意,您還可以執行 Celery Flower(一個構建在 Celery 之上的 web UI)來監控您的 worker。您可以使用快捷命令啟動一個 Flower web 伺服器

airflow celery flower

請注意,您必須已在系統上安裝了 flower python 庫。推薦的方法是安裝 airflow celery bundle。

pip install 'apache-airflow[celery]'

一些注意事項

  • 確保使用資料庫支援的結果後端

  • 確保在 [celery_broker_transport_options] 中設定一個可見性超時 (visibility timeout),該超時超過您執行時間最長任務的 ETA

  • 如果您使用 Redis Sentinel 作為 broker 並且 Redis 伺服器受密碼保護,請確保在 [celery_broker_transport_options] 部分指定 Redis 伺服器的密碼

  • 確保在 [worker_umask] 中設定 umask 以設定 worker 新建立檔案的許可權。

  • 任務會消耗資源。確保您的 worker 有足夠的資源來執行 worker_concurrency 任務

  • 佇列名稱限制為 256 個字元,但每個 broker 後端可能有自己的限制

有關 Python 和 Airflow 如何管理模組的詳細資訊,請參閱模組管理

架構

digraph A{ rankdir="TB" node[shape="rectangle", style="rounded"] subgraph cluster { label="Cluster"; {rank = same; dag; database} {rank = same; workers; scheduler; web} workers[label="Workers"] scheduler[label="Scheduler"] web[label="Web server"] database[label="Database"] dag[label="DAG files"] subgraph cluster_queue { label="Celery"; {rank = same; queue_broker; queue_result_backend} queue_broker[label="Queue broker"] queue_result_backend[label="Result backend"] } web->workers[label="1"] web->dag[label="2"] web->database[label="3"] workers->dag[label="4"] workers->database[label="5"] workers->queue_result_backend[label="6"] workers->queue_broker[label="7"] scheduler->dag[label="8"] scheduler->database[label="9"] scheduler->queue_result_backend[label="10"] scheduler->queue_broker[label="11"] } }

Airflow 由幾個元件組成

  • Workers - 執行分配的任務

  • Scheduler - 負責將必要的任務新增到佇列中

  • Web server - HTTP 伺服器提供訪問 DAG/任務狀態資訊的功能

  • Database - 包含任務、DAG、變數、連線等的狀態資訊

  • Celery - 佇列機制

請注意,Celery 的佇列由兩個元件組成

  • Broker - 儲存待執行的命令

  • Result backend - 儲存已完成命令的狀態

元件在許多地方相互通訊

  • [1] Web server –> Workers - 獲取任務執行日誌

  • [2] Web server –> DAG files - 揭示 DAG 結構

  • [3] Web server –> Database - 獲取任務狀態

  • [4] Workers –> DAG files - 揭示 DAG 結構並執行任務

  • [5] Workers –> Database - 獲取並存儲有關連線配置、變數和 XCOM 的資訊。

  • [6] Workers –> Celery’s result backend - 儲存任務狀態

  • [7] Workers –> Celery’s broker - 儲存待執行的命令

  • [8] Scheduler –> DAG files - 揭示 DAG 結構並執行任務

  • [9] Scheduler –> Database - 儲存 DAG 執行和相關任務

  • [10] Scheduler –> Celery’s result backend - 獲取有關已完成任務狀態的資訊

  • [11] Scheduler –> Celery’s broker - 將命令放入待執行佇列

任務執行過程

_images/run_task_on_celery_executor.png

時序圖 - 任務執行過程

最初,有兩個程序正在執行

  • SchedulerProcess - 處理任務並使用 CeleryExecutor 執行

  • WorkerProcess - 監視佇列等待新任務出現

  • WorkerChildProcess - 等待新任務

也有兩個資料庫可用

  • QueueBroker

  • ResultBackend

在此過程中,建立了兩個程序

  • LocalTaskJobProcess - 其邏輯由 LocalTaskJob 描述。它監視 RawTaskProcess。新程序使用 TaskRunner 啟動。

  • RawTaskProcess - 它是包含使用者程式碼的程序,例如 execute()

[1] SchedulerProcess 處理任務,當發現有任務需要完成時,將其傳送到 QueueBroker
[2] SchedulerProcess 也開始定期查詢 ResultBackend 以獲取任務狀態。
[3] QueueBroker 在得知任務後,將有關任務的資訊傳送給一個 WorkerProcess。
[4] WorkerProcess 將單個任務分配給一個 WorkerChildProcess
[5] WorkerChildProcess 執行適當的任務處理功能 - execute_command()。它建立一個新程序 - LocalTaskJobProcess
[6] LocalTaskJobProcess 的邏輯由 LocalTaskJob 類描述。它使用 TaskRunner 啟動新程序。
[7][8] 程序 RawTaskProcessLocalTaskJobProcess 完成工作後停止。
[10][12] WorkerChildProcess 通知主程序 - WorkerProcess 任務結束以及後續任務的可用性。
[11] WorkerProcess 將狀態資訊儲存在 ResultBackend 中。
[13] 當 SchedulerProcess 再次詢問 ResultBackend 關於任務狀態時,它將獲取有關任務狀態的資訊。

佇列

使用 CeleryExecutor 時,可以指定將任務傳送到的 Celery 佇列。queue 是 BaseOperator 的一個屬性,因此任何任務都可以分配給任何佇列。環境的預設佇列在 airflow.cfgoperators -> default_queue 中定義。這定義了未指定時任務被分配到的佇列,以及 Airflow worker 啟動時監聽的佇列。

Worker 可以監聽一個或多個任務佇列。當 worker 啟動時(使用命令 airflow celery worker),可以指定一組以逗號分隔的佇列名稱(無空格)(例如 airflow celery worker -q spark,quark)。然後,該 worker 將僅接收分配到指定佇列的任務。

如果您需要專用的 worker,這會很有用,無論從資源角度(例如對於非常輕量級的任務,一個 worker 可以輕鬆處理數千個任務),還是從環境角度(您希望 worker 在 Spark 叢集內部執行,因為它需要非常特定的環境和安全許可權)。

此條目是否有幫助?