優先順序權重¶
priority_weight 定義了執行器佇列中的優先順序。預設的 priority_weight 是 1,可以提升到任意整數。此外,每個任務都有一個真實的 priority_weight,它是根據其 weight_rule 計算得出的,weight_rule 定義了用於計算任務有效總優先順序權重的加權方法。
以下是加權方法。預設情況下,Airflow 的加權方法是 downstream。
downstream任務的有效權重是所有下游後代的總和。因此,當使用正權重值時,上游任務將具有更高的權重,並在排程時更積極。當你有多個 DAG 執行例項,並且希望在每個 DAG 繼續處理下游任務之前,所有執行的所有上游任務都能完成時,這很有用。
upstream有效權重是所有上游祖先的總和。這與 downstream 相反,下游任務具有更高的權重,並在排程時更積極(當使用正權重值時)。當你有多個 DAG 執行例項,並且更傾向於讓每個 DAG 完成後再開始其他 DAG 執行的上游任務時,這很有用。
absolute有效權重就是指定的 priority_weight,不進行額外的加權。當你清楚地知道每個任務應該具有的確切優先順序權重時,可能希望這樣做。此外,當設定為 absolute 時,還有一個額外的好處,可以顯著加快任務建立過程,尤其是對於非常大的 DAG。
priority_weight 引數可以與 Pools (資源池) 結合使用。
注意
由於大多數資料庫引擎對整數使用 32 位,任何計算或定義的 priority_weight 的最大值為 2,147,483,647,最小值為 -2,147,483,648。
自定義權重規則¶
在 2.9.0 版本中新增。
你可以透過擴充套件 PriorityWeightStrategy 類並在外掛中註冊它來實現自己的自定義加權方法。
src/airflow/example_dags/plugins/decreasing_priority_weight_strategy.py
class DecreasingPriorityStrategy(PriorityWeightStrategy):
"""A priority weight strategy that decreases the priority weight with each attempt of the DAG task."""
def get_weight(self, ti: TaskInstance):
return max(3 - ti.try_number + 1, 1)
class DecreasingPriorityWeightStrategyPlugin(AirflowPlugin):
name = "decreasing_priority_weight_strategy_plugin"
priority_weight_strategies = [DecreasingPriorityStrategy]
要檢查自定義優先順序權重策略是否已在 Airflow 中可用,你可以執行 bash 命令 airflow plugins。然後要使用它,你可以建立自定義類的例項並將其提供給任務的 weight_rule 引數,或者提供自定義類的路徑。
src/airflow/example_dags/example_custom_weight.py
with DAG(
dag_id="example_custom_weight",
schedule="0 0 * * *",
start_date=pendulum.datetime(2021, 1, 1, tz="UTC"),
catchup=False,
dagrun_timeout=datetime.timedelta(minutes=60),
tags=["example", "example2"],
) as dag:
start = EmptyOperator(
task_id="start",
)
# provide the class instance
task_1 = BashOperator(task_id="task_1", bash_command="echo 1", weight_rule=DecreasingPriorityStrategy())
# or provide the path of the class
task_2 = BashOperator(
task_id="task_2",
bash_command="echo 1",
weight_rule="airflow.example_dags.plugins.decreasing_priority_weight_strategy.DecreasingPriorityStrategy",
)
task_non_custom = BashOperator(task_id="task_non_custom", bash_command="echo 1", priority_weight=2)
start >> [task_1, task_2, task_non_custom]
在 DAG 執行後,你可以在任務上檢查 priority_weight 引數,以驗證它正在使用自定義優先順序策略規則。
這是一項實驗性功能。