Airflow Summit 2025 將於 10 月 07-09 日舉行。立即註冊可享早鳥票優惠!

連線池

當太多程序同時訪問某些系統時,這些系統可能會不堪重負。Airflow 連線池可用於限制任意任務集的並行執行。透過 UI (選單 -> 管理員 -> 連線池) 管理連線池列表,為連線池命名並分配工作槽位數量。在這裡,您還可以決定連線池在計算已佔用槽位時是否應包含延遲任務

然後,在建立任務時,可以使用 pool 引數將任務與現有連線池之一關聯起來

aggregate_db_message_job = BashOperator(
    task_id="aggregate_db_message_job",
    execution_timeout=timedelta(hours=3),
    pool="ep_data_pipeline_db_msg_agg",
    bash_command=aggregate_db_message_job_cmd,
    dag=dag,
)
aggregate_db_message_job.set_upstream(wait_for_empty_queue)

槽位被佔用時,任務將照常排程。任務佔用的槽位數量可透過 pool_slots 進行配置(詳見下文)。一旦達到容量限制,可執行任務將被排隊,其狀態將在 UI 中顯示為排隊狀態。隨著槽位釋放,排隊任務將根據任務及其後代的優先順序權重開始執行。

請注意,如果未給任務指定連線池,它們將被分配給預設連線池 default_pool,該連線池初始化時有 128 個槽位,可透過 UI 或 CLI 修改(但不能刪除)。

使用多個連線池槽位

Airflow 任務預設每個佔用一個連線池槽位,但如果需要,可以透過 pool_slots 引數將其配置為佔用更多槽位。當屬於同一連線池的多個任務具有不同的“計算權重”時,這尤其有用。

例如,考慮一個具有 2 個槽位的連線池,Pool(pool='maintenance', slots=2),以及以下任務

BashOperator(
    task_id="heavy_task",
    bash_command="bash backup_data.sh",
    pool_slots=2,
    pool="maintenance",
)

BashOperator(
    task_id="light_task1",
    bash_command="bash check_files.sh",
    pool_slots=1,
    pool="maintenance",
)

BashOperator(
    task_id="light_task2",
    bash_command="bash remove_files.sh",
    pool_slots=1,
    pool="maintenance",
)

由於繁重任務配置為使用 2 個連線池槽位,因此執行時會耗盡連線池。因此,任何輕量任務都必須排隊等待繁重任務完成才能執行。在這裡,就資源使用而言,一個繁重任務相當於兩個輕量任務同時執行。

這種實現可以防止系統資源過載,例如(在此示例中)當一個繁重任務和一個輕量任務同時執行時可能發生過載。另一方面,兩個輕量任務可以同時執行,因為它們每個只佔用一個連線池槽位,而繁重任務則必須等待兩個連線池槽位可用後才能執行。

此條目有幫助嗎?