使用 Timetable 定製 DAG 排程¶
舉例來說,假設一家公司希望在每個工作日結束後執行一個作業,以處理工作日期間收集的資料。第一個直觀的答案是 schedule="0 0 * * 1-5" (週一至週五午夜),但這意味著週五收集的資料不會在週五結束後立即處理,而是在下一個週一,並且該執行的資料區間將從週五午夜到週一午夜。此外,上述排程字串無法跳過節假日處理。我們希望做到的是:
為每個週一、週二、週三、週四和週五排程一次執行。執行的資料區間覆蓋從每天的午夜到第二天午夜(例如 2021-01-01 00:00:00 到 2021-01-02 00:00:00)。
每次執行將在資料區間結束後立即建立。覆蓋週一的執行發生在週二午夜,以此類推。覆蓋週五的執行發生在週六午夜。週日和週一的午夜不發生執行。
不在定義的節假日排程執行。
為簡單起見,本例中我們僅處理 UTC 日期時間。
注意
自定義 Timetable 返回的所有日期時間值必須是“感知型”的,即包含時區資訊。此外,它們必須使用 pendulum 的日期時間型別和時區型別。
Timetable 註冊¶
Timetable 必須是 Timetable 的子類,並註冊為 plugin 的一部分。以下是我們實現新 Timetable 的骨架程式碼:
from airflow.plugins_manager import AirflowPlugin
from airflow.timetables.base import Timetable
class AfterWorkdayTimetable(Timetable):
pass
class WorkdayTimetablePlugin(AirflowPlugin):
name = "workday_timetable_plugin"
timetables = [AfterWorkdayTimetable]
接下來,我們將開始編寫 AfterWorkdayTimetable 的程式碼。實現完成後,我們就可以在 DAG 檔案中使用這個 timetable 了。
import pendulum
from airflow.sdk import DAG
from airflow.example_dags.plugins.workday import AfterWorkdayTimetable
with DAG(
dag_id="example_after_workday_timetable_dag",
start_date=pendulum.datetime(2021, 3, 10, tz="UTC"),
schedule=AfterWorkdayTimetable(),
tags=["example", "timetable"],
):
...
定義排程邏輯¶
當 Airflow 的排程器遇到一個 DAG 時,它會呼叫以下兩個方法之一來確定何時排程該 DAG 的下一次執行。
next_dagrun_info: 排程器使用此方法瞭解 timetable 的常規排程,即示例中“每個工作日結束時執行一次”的部分。infer_manual_data_interval: 當 DAG 執行被手動觸發(例如,從 Web UI)時,排程器使用此方法瞭解如何反向推斷非排程執行的資料區間。
我們將從 infer_manual_data_interval 開始,因為它相對簡單。
src/airflow/example_dags/plugins/workday.py
def infer_manual_data_interval(self, run_after: DateTime) -> DataInterval:
start = DateTime.combine((run_after - timedelta(days=1)).date(), Time.min).replace(tzinfo=UTC)
# Skip backwards over weekends and holidays to find last run
start = self.get_next_workday(start, incr=-1)
return DataInterval(start=start, end=(start + timedelta(days=1)))
該方法接受一個引數 run_after,它是一個 pendulum.DateTime 物件,指示 DAG 何時被外部觸發。由於我們的 timetable 為每個完整工作日建立一個數據區間,這裡推斷出的資料區間通常應在 run_after 前一天的午夜開始,但如果 run_after 落在了週日或週一(即前一天是週六或週日),則應進一步推回到上一個週五。一旦確定了區間的開始時間,結束時間就是其後整整一天。然後我們建立一個 DataInterval 物件來描述這個區間。
接下來是 next_dagrun_info 的實現
src/airflow/example_dags/plugins/workday.py
def next_dagrun_info(
self,
*,
last_automated_data_interval: DataInterval | None,
restriction: TimeRestriction,
) -> DagRunInfo | None:
if last_automated_data_interval is not None: # There was a previous run on the regular schedule.
last_start = last_automated_data_interval.start
next_start = DateTime.combine((last_start + timedelta(days=1)).date(), Time.min)
# Otherwise this is the first ever run on the regular schedule...
elif (earliest := restriction.earliest) is None:
return None # No start_date. Don't schedule.
elif not restriction.catchup:
# If the DAG has catchup=False, today is the earliest to consider.
next_start = max(earliest, DateTime.combine(Date.today(), Time.min, tzinfo=UTC))
elif earliest.time() != Time.min:
# If earliest does not fall on midnight, skip to the next day.
next_start = DateTime.combine(earliest.date() + timedelta(days=1), Time.min)
else:
next_start = earliest
# Skip weekends and holidays
next_start = self.get_next_workday(next_start.replace(tzinfo=UTC))
if restriction.latest is not None and next_start > restriction.latest:
return None # Over the DAG's scheduled end; don't schedule.
return DagRunInfo.interval(start=next_start, end=(next_start + timedelta(days=1)))
此方法接受兩個引數。last_automated_data_interval 是一個 DataInterval 例項,指示該 DAG 上一次非手動觸發執行的資料區間;如果這是 DAG 首次被排程,則為 None。restriction 封裝了 DAG 及其任務如何指定排程,幷包含三個屬性:
earliest:DAG 可以被排程的最早時間。這是一個pendulum.DateTime物件,透過計算 DAG 及其任務的所有start_date引數得出;如果完全沒有找到start_date引數,則為None。latest:類似於earliest,這是 DAG 可以被排程的最晚時間,透過計算end_date引數得出。catchup:一個布林值,反映 DAG 的catchup引數。預設為False。
注意
earliest 和 latest 都適用於 DAG 執行的邏輯日期(資料區間的開始),而不是執行將被排程的實際時間(通常在資料區間結束之後)。
如果之前有已排程的執行,我們現在應該透過迴圈後續日期來查詢下一個非節假日工作日,該工作日不是週六、週日或美國節假日。然而,如果之前沒有已排程的執行,我們則選取 restriction.earliest 之後的下一個非節假日工作日的午夜。restriction.catchup 也需要考慮——如果它為 False,則即使 start_date 值在過去,我們也無法排程當前時間之前的執行。最後,如果我們計算出的資料區間晚於 restriction.latest,我們必須遵守它,並返回 None 來指示不排程執行。
如果我們決定排程一次執行,我們需要使用 DagRunInfo 來描述它。該型別有兩個引數和屬性:
data_interval:一個DataInterval例項,描述下一次執行的資料區間。run_after:一個pendulum.DateTime例項,告訴排程器何時可以排程 DAG 執行。
一個 DagRunInfo 可以這樣建立:
info = DagRunInfo(
data_interval=DataInterval(start=start, end=end),
run_after=run_after,
)
由於我們通常希望在資料區間結束後立即排程執行,上面的 end 和 run_after 通常是相同的。因此,DagRunInfo 提供了一個快捷方式:
info = DagRunInfo.interval(start=start, end=end)
assert info.data_interval.end == info.run_after # Always True.
作為參考,以下是我們的 plugin 和 DAG 檔案的完整程式碼:
src/airflow/example_dags/plugins/workday.py
from pendulum import UTC, Date, DateTime, Time
from airflow.plugins_manager import AirflowPlugin
from airflow.timetables.base import DagRunInfo, DataInterval, Timetable
if TYPE_CHECKING:
from airflow.timetables.base import TimeRestriction
log = logging.getLogger(__name__)
try:
from pandas.tseries.holiday import USFederalHolidayCalendar
holiday_calendar = USFederalHolidayCalendar()
except ImportError:
log.warning("Could not import pandas. Holidays will not be considered.")
holiday_calendar = None # type: ignore[assignment]
class AfterWorkdayTimetable(Timetable):
def get_next_workday(self, d: DateTime, incr=1) -> DateTime:
next_start = d
while True:
if next_start.weekday() not in (5, 6): # not on weekend
if holiday_calendar is None:
holidays = set()
else:
holidays = holiday_calendar.holidays(start=next_start, end=next_start).to_pydatetime()
if next_start not in holidays:
break
next_start = next_start.add(days=incr)
return next_start
def infer_manual_data_interval(self, run_after: DateTime) -> DataInterval:
start = DateTime.combine((run_after - timedelta(days=1)).date(), Time.min).replace(tzinfo=UTC)
# Skip backwards over weekends and holidays to find last run
start = self.get_next_workday(start, incr=-1)
return DataInterval(start=start, end=(start + timedelta(days=1)))
def next_dagrun_info(
self,
*,
last_automated_data_interval: DataInterval | None,
restriction: TimeRestriction,
) -> DagRunInfo | None:
if last_automated_data_interval is not None: # There was a previous run on the regular schedule.
last_start = last_automated_data_interval.start
next_start = DateTime.combine((last_start + timedelta(days=1)).date(), Time.min)
# Otherwise this is the first ever run on the regular schedule...
elif (earliest := restriction.earliest) is None:
return None # No start_date. Don't schedule.
elif not restriction.catchup:
# If the DAG has catchup=False, today is the earliest to consider.
next_start = max(earliest, DateTime.combine(Date.today(), Time.min, tzinfo=UTC))
elif earliest.time() != Time.min:
# If earliest does not fall on midnight, skip to the next day.
next_start = DateTime.combine(earliest.date() + timedelta(days=1), Time.min)
else:
next_start = earliest
# Skip weekends and holidays
next_start = self.get_next_workday(next_start.replace(tzinfo=UTC))
if restriction.latest is not None and next_start > restriction.latest:
return None # Over the DAG's scheduled end; don't schedule.
return DagRunInfo.interval(start=next_start, end=(next_start + timedelta(days=1)))
class WorkdayTimetablePlugin(AirflowPlugin):
name = "workday_timetable_plugin"
timetables = [AfterWorkdayTimetable]
import pendulum
from airflow.sdk import DAG
from airflow.example_dags.plugins.workday import AfterWorkdayTimetable
from airflow.providers.standard.operators.empty import EmptyOperator
with DAG(
dag_id="example_workday_timetable",
start_date=pendulum.datetime(2021, 1, 1, tz="UTC"),
schedule=AfterWorkdayTimetable(),
tags=["example", "timetable"],
):
EmptyOperator(task_id="run_this")
引數化 Timetable¶
有時我們需要向 timetable 傳遞一些執行時引數。繼續以 AfterWorkdayTimetable 為例,也許我們有一些 DAG 執行在不同的時區,並且我們希望在第二天上午 8 點排程一些 DAG,而不是午夜。我們不希望為每個目的建立單獨的 timetable,而是希望這樣做:
class SometimeAfterWorkdayTimetable(Timetable):
def __init__(self, schedule_at: Time) -> None:
self._schedule_at = schedule_at
def next_dagrun_info(self, last_automated_dagrun, restriction):
...
end = start + timedelta(days=1)
return DagRunInfo(
data_interval=DataInterval(start=start, end=end),
run_after=DateTime.combine(end.date(), self._schedule_at).replace(tzinfo=UTC),
)
然而,由於 timetable 是 DAG 的一部分,我們需要告訴 Airflow 如何使用我們在 __init__ 中提供的上下文來序列化它。這可以透過在我們的 timetable 類中實現另外兩個方法來實現:
class SometimeAfterWorkdayTimetable(Timetable):
...
def serialize(self) -> dict[str, Any]:
return {"schedule_at": self._schedule_at.isoformat()}
@classmethod
def deserialize(cls, value: dict[str, Any]) -> Timetable:
return cls(Time.fromisoformat(value["schedule_at"]))
當 DAG 被序列化時,會呼叫 serialize 來獲取一個可 JSON 序列化的值。當排程器訪問序列化的 DAG 以重建 timetable 時,會將該值傳遞給 deserialize。
Timetable 在 UI 中的顯示¶
預設情況下,自定義 timetable 在 UI 中以其類名顯示(例如,“dags”表中的Schedule列)。可以透過重寫 summary 屬性來定製此顯示。這對於引數化 timetable 尤其有用,可以將 __init__ 中提供的引數包含在內。例如,對於我們的 SometimeAfterWorkdayTimetable 類,我們可以這樣做:
@property
def summary(self) -> str:
return f"after each workday, at {self._schedule_at}"
因此,對於如下宣告的 DAG:
with DAG(
schedule=SometimeAfterWorkdayTimetable(Time(8)), # 8am.
...,
):
...
Schedule 列將顯示 after each workday, at 08:00:00。
另請參閱
- 模組
airflow.timetables.base 公共介面有詳細的文件,解釋了子類應該實現什麼。
Timetable 描述在 UI 中的顯示¶
您還可以透過重寫 description 屬性為您的 Timetable 實現提供描述。這對於在 UI 中為您的實現提供全面的描述特別有用。例如,對於我們的 SometimeAfterWorkdayTimetable 類,我們可以這樣做:
description = "Schedule: after each workday"
如果您想根據引數生成描述,也可以將其放在 __init__ 中。
def __init__(self) -> None:
self.description = "Schedule: after each workday, at f{self._schedule_at}"
當您想提供與 summary 屬性不同的全面描述時,這特別有用。
因此,對於如下宣告的 DAG:
with DAG(
schedule=SometimeAfterWorkdayTimetable(Time(8)), # 8am.
...,
):
...
i 圖示會顯示 Schedule: after each workday, at 08:00:00。
另請參閱
- 模組
airflow.timetables.interval 檢視
CronDataIntervalTimetable的 description 實現,它在 UI 中提供了全面的 cron 描述。
更改生成的 run_id¶
添加於版本 2.4。
自 Airflow 2.4 起,Timetable 也負責生成 DagRun 的 run_id。
例如,要讓 Run ID 顯示一個“人類友好”的執行開始日期(即資料區間的結束時間,而不是當前使用的開始時間),您可以向自定義 timetable 新增如下方法:
def generate_run_id(
self,
*,
run_type: DagRunType,
logical_date: DateTime,
data_interval: DataInterval | None,
**extra,
) -> str:
if run_type == DagRunType.SCHEDULED and data_interval:
return data_interval.end.format("YYYY-MM-DD dddd")
return super().generate_run_id(
run_type=run_type, logical_date=logical_date, data_interval=data_interval, **extra
)
請記住,RunID 的長度限制為 250 個字元,並且在同一個 DAG 內必須唯一。