Airflow Summit 2025 將於 10 月 07-09 日舉行。立即註冊獲取早鳥票!

優先順序權重

priority_weight 定義了執行器佇列中的優先順序。預設的 priority_weight1,可以提升到任意整數。此外,每個任務都有一個真實的 priority_weight,它是根據其 weight_rule 計算得出的,weight_rule 定義了用於計算任務有效總優先順序權重的加權方法。

以下是加權方法。預設情況下,Airflow 的加權方法是 downstream

downstream

任務的有效權重是所有下游後代的總和。因此,當使用正權重值時,上游任務將具有更高的權重,並在排程時更積極。當你有多個 DAG 執行例項,並且希望在每個 DAG 繼續處理下游任務之前,所有執行的所有上游任務都能完成時,這很有用。

upstream

有效權重是所有上游祖先的總和。這與 downstream 相反,下游任務具有更高的權重,並在排程時更積極(當使用正權重值時)。當你有多個 DAG 執行例項,並且更傾向於讓每個 DAG 完成後再開始其他 DAG 執行的上游任務時,這很有用。

absolute

有效權重就是指定的 priority_weight,不進行額外的加權。當你清楚地知道每個任務應該具有的確切優先順序權重時,可能希望這樣做。此外,當設定為 absolute 時,還有一個額外的好處,可以顯著加快任務建立過程,尤其是對於非常大的 DAG。

priority_weight 引數可以與 Pools (資源池) 結合使用。

注意

由於大多數資料庫引擎對整數使用 32 位,任何計算或定義的 priority_weight 的最大值為 2,147,483,647,最小值為 -2,147,483,648。

自定義權重規則

在 2.9.0 版本中新增。

你可以透過擴充套件 PriorityWeightStrategy 類並在外掛中註冊它來實現自己的自定義加權方法。

src/airflow/example_dags/plugins/decreasing_priority_weight_strategy.py

class DecreasingPriorityStrategy(PriorityWeightStrategy):
    """A priority weight strategy that decreases the priority weight with each attempt of the DAG task."""

    def get_weight(self, ti: TaskInstance):
        return max(3 - ti.try_number + 1, 1)


class DecreasingPriorityWeightStrategyPlugin(AirflowPlugin):
    name = "decreasing_priority_weight_strategy_plugin"
    priority_weight_strategies = [DecreasingPriorityStrategy]


要檢查自定義優先順序權重策略是否已在 Airflow 中可用,你可以執行 bash 命令 airflow plugins。然後要使用它,你可以建立自定義類的例項並將其提供給任務的 weight_rule 引數,或者提供自定義類的路徑。

src/airflow/example_dags/example_custom_weight.py


with DAG(
    dag_id="example_custom_weight",
    schedule="0 0 * * *",
    start_date=pendulum.datetime(2021, 1, 1, tz="UTC"),
    catchup=False,
    dagrun_timeout=datetime.timedelta(minutes=60),
    tags=["example", "example2"],
) as dag:
    start = EmptyOperator(
        task_id="start",
    )

    # provide the class instance
    task_1 = BashOperator(task_id="task_1", bash_command="echo 1", weight_rule=DecreasingPriorityStrategy())

    # or provide the path of the class
    task_2 = BashOperator(
        task_id="task_2",
        bash_command="echo 1",
        weight_rule="airflow.example_dags.plugins.decreasing_priority_weight_strategy.DecreasingPriorityStrategy",
    )

    task_non_custom = BashOperator(task_id="task_non_custom", bash_command="echo 1", priority_weight=2)

    start >> [task_1, task_2, task_non_custom]

在 DAG 執行後,你可以在任務上檢查 priority_weight 引數,以驗證它正在使用自定義優先順序策略規則。

這是一項實驗性功能

此條目是否有幫助?