BranchDateTimeOperator¶
使用 BranchDateTimeOperator 將工作流分支到兩個執行路徑之一,取決於時間是否落在兩個目標引數給定的範圍內,
此運算子有兩種模式。第一種模式是使用當前時間(DAG 執行時的機器時鐘時間),第二種模式是使用它執行所在的 DAG 執行的 logical_date。
與當前時間一起使用¶
上述用法在某些情況下可能有用 - 例如當 DAG 用於執行清理和維護,並且不適用於任何需要回填的 DAG,因為“當前時間”使回填非冪等,其結果取決於 DAG 實際執行的時間。即使按計劃執行,它也可能略微不確定。DAGRun 的排程和執行之間可能會有一些時間差,這可能意味著即使 DAGRun 已正確排程,用於分支決定的實際時間將與排程時間不同,並且分支決定可能會因這些延遲而異。
airflow/example_dags/example_branch_datetime_operator.py
empty_task_11 = EmptyOperator(task_id="date_in_range", dag=dag1)
empty_task_21 = EmptyOperator(task_id="date_outside_range", dag=dag1)
cond1 = BranchDateTimeOperator(
task_id="datetime_branch",
follow_task_ids_if_true=["date_in_range"],
follow_task_ids_if_false=["date_outside_range"],
target_upper=pendulum.datetime(2020, 10, 10, 15, 0, 0),
target_lower=pendulum.datetime(2020, 10, 10, 14, 0, 0),
dag=dag1,
)
# Run empty_task_11 if cond1 executes between 2020-10-10 14:00:00 and 2020-10-10 15:00:00
cond1 >> [empty_task_11, empty_task_21]
目標引數 target_upper 和 target_lower 可以接收一個 datetime.datetime、一個 datetime.time 或 None。當使用一個 datetime.time 物件時,它將與當前日期組合以便允許進行比較。如果 target_upper 設定為一個發生在給定的 target_lower 之前的 datetime.time,一天將被新增到 target_upper 中。這樣做是為了允許跨越兩天的時段。
airflow/example_dags/example_branch_datetime_operator.py
empty_task_12 = EmptyOperator(task_id="date_in_range", dag=dag2)
empty_task_22 = EmptyOperator(task_id="date_outside_range", dag=dag2)
cond2 = BranchDateTimeOperator(
task_id="datetime_branch",
follow_task_ids_if_true=["date_in_range"],
follow_task_ids_if_false=["date_outside_range"],
target_upper=pendulum.time(0, 0, 0),
target_lower=pendulum.time(15, 0, 0),
dag=dag2,
)
# Since target_lower happens after target_upper, target_upper will be moved to the following day
# Run empty_task_12 if cond2 executes between 15:00:00, and 00:00:00 of the following day
cond2 >> [empty_task_12, empty_task_22]
如果目標引數設定為 None,運算子將僅使用非 None 的目標進行單邊比較。將 target_upper 和 target_lower 都設定為 None 將引發異常。
與邏輯日期一起使用¶
這種用法對“資料範圍”更友好。該 logical_date 在 DAG 重新執行時不會改變,也不受執行延遲的影響,因此這種方法適用於可能需要回填的冪等 DAG 執行。
airflow/example_dags/example_branch_datetime_operator.py
empty_task_13 = EmptyOperator(task_id="date_in_range", dag=dag3)
empty_task_23 = EmptyOperator(task_id="date_outside_range", dag=dag3)
cond3 = BranchDateTimeOperator(
task_id="datetime_branch",
use_task_logical_date=True,
follow_task_ids_if_true=["date_in_range"],
follow_task_ids_if_false=["date_outside_range"],
target_upper=pendulum.datetime(2020, 10, 10, 15, 0, 0),
target_lower=pendulum.datetime(2020, 10, 10, 14, 0, 0),
dag=dag3,
)
# Run empty_task_13 if cond3 executes between 2020-10-10 14:00:00 and 2020-10-10 15:00:00
cond3 >> [empty_task_13, empty_task_23]
BranchDayOfWeekOperator¶
使用 BranchDayOfWeekOperator 根據星期幾的值來分支你的工作流。
airflow/example_dags/example_branch_day_of_week_operator.py
empty_task_1 = EmptyOperator(task_id="branch_true")
empty_task_2 = EmptyOperator(task_id="branch_false")
empty_task_3 = EmptyOperator(task_id="branch_weekend")
empty_task_4 = EmptyOperator(task_id="branch_mid_week")
branch = BranchDayOfWeekOperator(
task_id="make_choice",
follow_task_ids_if_true="branch_true",
follow_task_ids_if_false="branch_false",
week_day="Monday",
)
branch_weekend = BranchDayOfWeekOperator(
task_id="make_weekend_choice",
follow_task_ids_if_true="branch_weekend",
follow_task_ids_if_false="branch_mid_week",
week_day={WeekDay.SATURDAY, WeekDay.SUNDAY},
)
# Run empty_task_1 if branch executes on Monday, empty_task_2 otherwise
branch >> [empty_task_1, empty_task_2]
# Run empty_task_3 if it's a weekend, empty_task_4 otherwise
empty_task_2 >> branch_weekend >> [empty_task_3, empty_task_4]