Airflow 峰會 2025 即將於 10 月 07-09 日舉行。立即註冊獲取早鳥票!

使用 Timetable 定製 DAG 排程

舉例來說,假設一家公司希望在每個工作日結束後執行一個作業,以處理工作日期間收集的資料。第一個直觀的答案是 schedule="0 0 * * 1-5" (週一至週五午夜),但這意味著週五收集的資料不會在週五結束後立即處理,而是在下一個週一,並且該執行的資料區間將從週五午夜到週一午夜。此外,上述排程字串無法跳過節假日處理。我們希望做到的是:

  • 為每個週一、週二、週三、週四和週五排程一次執行。執行的資料區間覆蓋從每天的午夜到第二天午夜(例如 2021-01-01 00:00:00 到 2021-01-02 00:00:00)。

  • 每次執行將在資料區間結束後立即建立。覆蓋週一的執行發生在週二午夜,以此類推。覆蓋週五的執行發生在週六午夜。週日和週一的午夜不發生執行。

  • 不在定義的節假日排程執行。

為簡單起見,本例中我們僅處理 UTC 日期時間。

注意

自定義 Timetable 返回的所有日期時間值必須是“感知型”的,即包含時區資訊。此外,它們必須使用 pendulum 的日期時間型別和時區型別。

Timetable 註冊

Timetable 必須是 Timetable 的子類,並註冊為 plugin 的一部分。以下是我們實現新 Timetable 的骨架程式碼:

from airflow.plugins_manager import AirflowPlugin
from airflow.timetables.base import Timetable


class AfterWorkdayTimetable(Timetable):
    pass


class WorkdayTimetablePlugin(AirflowPlugin):
    name = "workday_timetable_plugin"
    timetables = [AfterWorkdayTimetable]

接下來,我們將開始編寫 AfterWorkdayTimetable 的程式碼。實現完成後,我們就可以在 DAG 檔案中使用這個 timetable 了。

import pendulum

from airflow.sdk import DAG
from airflow.example_dags.plugins.workday import AfterWorkdayTimetable


with DAG(
    dag_id="example_after_workday_timetable_dag",
    start_date=pendulum.datetime(2021, 3, 10, tz="UTC"),
    schedule=AfterWorkdayTimetable(),
    tags=["example", "timetable"],
):
    ...

定義排程邏輯

當 Airflow 的排程器遇到一個 DAG 時,它會呼叫以下兩個方法之一來確定何時排程該 DAG 的下一次執行。

  • next_dagrun_info: 排程器使用此方法瞭解 timetable 的常規排程,即示例中“每個工作日結束時執行一次”的部分。

  • infer_manual_data_interval: 當 DAG 執行被手動觸發(例如,從 Web UI)時,排程器使用此方法瞭解如何反向推斷非排程執行的資料區間。

我們將從 infer_manual_data_interval 開始,因為它相對簡單。

src/airflow/example_dags/plugins/workday.py

def infer_manual_data_interval(self, run_after: DateTime) -> DataInterval:
    start = DateTime.combine((run_after - timedelta(days=1)).date(), Time.min).replace(tzinfo=UTC)
    # Skip backwards over weekends and holidays to find last run
    start = self.get_next_workday(start, incr=-1)
    return DataInterval(start=start, end=(start + timedelta(days=1)))

該方法接受一個引數 run_after,它是一個 pendulum.DateTime 物件,指示 DAG 何時被外部觸發。由於我們的 timetable 為每個完整工作日建立一個數據區間,這裡推斷出的資料區間通常應在 run_after 前一天的午夜開始,但如果 run_after 落在了週日或週一(即前一天是週六或週日),則應進一步推回到上一個週五。一旦確定了區間的開始時間,結束時間就是其後整整一天。然後我們建立一個 DataInterval 物件來描述這個區間。

接下來是 next_dagrun_info 的實現

src/airflow/example_dags/plugins/workday.py

def next_dagrun_info(
    self,
    *,
    last_automated_data_interval: DataInterval | None,
    restriction: TimeRestriction,
) -> DagRunInfo | None:
    if last_automated_data_interval is not None:  # There was a previous run on the regular schedule.
        last_start = last_automated_data_interval.start
        next_start = DateTime.combine((last_start + timedelta(days=1)).date(), Time.min)
    # Otherwise this is the first ever run on the regular schedule...
    elif (earliest := restriction.earliest) is None:
        return None  # No start_date. Don't schedule.
    elif not restriction.catchup:
        # If the DAG has catchup=False, today is the earliest to consider.
        next_start = max(earliest, DateTime.combine(Date.today(), Time.min, tzinfo=UTC))
    elif earliest.time() != Time.min:
        # If earliest does not fall on midnight, skip to the next day.
        next_start = DateTime.combine(earliest.date() + timedelta(days=1), Time.min)
    else:
        next_start = earliest
    # Skip weekends and holidays
    next_start = self.get_next_workday(next_start.replace(tzinfo=UTC))

    if restriction.latest is not None and next_start > restriction.latest:
        return None  # Over the DAG's scheduled end; don't schedule.
    return DagRunInfo.interval(start=next_start, end=(next_start + timedelta(days=1)))

此方法接受兩個引數。last_automated_data_interval 是一個 DataInterval 例項,指示該 DAG 上一次非手動觸發執行的資料區間;如果這是 DAG 首次被排程,則為 Nonerestriction 封裝了 DAG 及其任務如何指定排程,幷包含三個屬性:

  • earliest:DAG 可以被排程的最早時間。這是一個 pendulum.DateTime 物件,透過計算 DAG 及其任務的所有 start_date 引數得出;如果完全沒有找到 start_date 引數,則為 None

  • latest:類似於 earliest,這是 DAG 可以被排程的最晚時間,透過計算 end_date 引數得出。

  • catchup:一個布林值,反映 DAG 的 catchup 引數。預設為 False

注意

earliestlatest 都適用於 DAG 執行的邏輯日期(資料區間的開始),而不是執行將被排程的實際時間(通常在資料區間結束之後)。

如果之前有已排程的執行,我們現在應該透過迴圈後續日期來查詢下一個非節假日工作日,該工作日不是週六、週日或美國節假日。然而,如果之前沒有已排程的執行,我們則選取 restriction.earliest 之後的下一個非節假日工作日的午夜。restriction.catchup 也需要考慮——如果它為 False,則即使 start_date 值在過去,我們也無法排程當前時間之前的執行。最後,如果我們計算出的資料區間晚於 restriction.latest,我們必須遵守它,並返回 None 來指示不排程執行。

如果我們決定排程一次執行,我們需要使用 DagRunInfo 來描述它。該型別有兩個引數和屬性:

  • data_interval:一個 DataInterval 例項,描述下一次執行的資料區間。

  • run_after:一個 pendulum.DateTime 例項,告訴排程器何時可以排程 DAG 執行。

一個 DagRunInfo 可以這樣建立:

info = DagRunInfo(
    data_interval=DataInterval(start=start, end=end),
    run_after=run_after,
)

由於我們通常希望在資料區間結束後立即排程執行,上面的 endrun_after 通常是相同的。因此,DagRunInfo 提供了一個快捷方式:

info = DagRunInfo.interval(start=start, end=end)
assert info.data_interval.end == info.run_after  # Always True.

作為參考,以下是我們的 plugin 和 DAG 檔案的完整程式碼:

src/airflow/example_dags/plugins/workday.py

from pendulum import UTC, Date, DateTime, Time

from airflow.plugins_manager import AirflowPlugin
from airflow.timetables.base import DagRunInfo, DataInterval, Timetable

if TYPE_CHECKING:
    from airflow.timetables.base import TimeRestriction

log = logging.getLogger(__name__)
try:
    from pandas.tseries.holiday import USFederalHolidayCalendar

    holiday_calendar = USFederalHolidayCalendar()
except ImportError:
    log.warning("Could not import pandas. Holidays will not be considered.")
    holiday_calendar = None  # type: ignore[assignment]


class AfterWorkdayTimetable(Timetable):
    def get_next_workday(self, d: DateTime, incr=1) -> DateTime:
        next_start = d
        while True:
            if next_start.weekday() not in (5, 6):  # not on weekend
                if holiday_calendar is None:
                    holidays = set()
                else:
                    holidays = holiday_calendar.holidays(start=next_start, end=next_start).to_pydatetime()
                if next_start not in holidays:
                    break
            next_start = next_start.add(days=incr)
        return next_start
    def infer_manual_data_interval(self, run_after: DateTime) -> DataInterval:
        start = DateTime.combine((run_after - timedelta(days=1)).date(), Time.min).replace(tzinfo=UTC)
        # Skip backwards over weekends and holidays to find last run
        start = self.get_next_workday(start, incr=-1)
        return DataInterval(start=start, end=(start + timedelta(days=1)))
    def next_dagrun_info(
        self,
        *,
        last_automated_data_interval: DataInterval | None,
        restriction: TimeRestriction,
    ) -> DagRunInfo | None:
        if last_automated_data_interval is not None:  # There was a previous run on the regular schedule.
            last_start = last_automated_data_interval.start
            next_start = DateTime.combine((last_start + timedelta(days=1)).date(), Time.min)
        # Otherwise this is the first ever run on the regular schedule...
        elif (earliest := restriction.earliest) is None:
            return None  # No start_date. Don't schedule.
        elif not restriction.catchup:
            # If the DAG has catchup=False, today is the earliest to consider.
            next_start = max(earliest, DateTime.combine(Date.today(), Time.min, tzinfo=UTC))
        elif earliest.time() != Time.min:
            # If earliest does not fall on midnight, skip to the next day.
            next_start = DateTime.combine(earliest.date() + timedelta(days=1), Time.min)
        else:
            next_start = earliest
        # Skip weekends and holidays
        next_start = self.get_next_workday(next_start.replace(tzinfo=UTC))

        if restriction.latest is not None and next_start > restriction.latest:
            return None  # Over the DAG's scheduled end; don't schedule.
        return DagRunInfo.interval(start=next_start, end=(next_start + timedelta(days=1)))


class WorkdayTimetablePlugin(AirflowPlugin):
    name = "workday_timetable_plugin"
    timetables = [AfterWorkdayTimetable]


import pendulum

from airflow.sdk import DAG
from airflow.example_dags.plugins.workday import AfterWorkdayTimetable
from airflow.providers.standard.operators.empty import EmptyOperator


with DAG(
    dag_id="example_workday_timetable",
    start_date=pendulum.datetime(2021, 1, 1, tz="UTC"),
    schedule=AfterWorkdayTimetable(),
    tags=["example", "timetable"],
):
    EmptyOperator(task_id="run_this")

引數化 Timetable

有時我們需要向 timetable 傳遞一些執行時引數。繼續以 AfterWorkdayTimetable 為例,也許我們有一些 DAG 執行在不同的時區,並且我們希望在第二天上午 8 點排程一些 DAG,而不是午夜。我們不希望為每個目的建立單獨的 timetable,而是希望這樣做:

class SometimeAfterWorkdayTimetable(Timetable):
    def __init__(self, schedule_at: Time) -> None:
        self._schedule_at = schedule_at

    def next_dagrun_info(self, last_automated_dagrun, restriction):
        ...
        end = start + timedelta(days=1)
        return DagRunInfo(
            data_interval=DataInterval(start=start, end=end),
            run_after=DateTime.combine(end.date(), self._schedule_at).replace(tzinfo=UTC),
        )

然而,由於 timetable 是 DAG 的一部分,我們需要告訴 Airflow 如何使用我們在 __init__ 中提供的上下文來序列化它。這可以透過在我們的 timetable 類中實現另外兩個方法來實現:

class SometimeAfterWorkdayTimetable(Timetable):
    ...

    def serialize(self) -> dict[str, Any]:
        return {"schedule_at": self._schedule_at.isoformat()}

    @classmethod
    def deserialize(cls, value: dict[str, Any]) -> Timetable:
        return cls(Time.fromisoformat(value["schedule_at"]))

當 DAG 被序列化時,會呼叫 serialize 來獲取一個可 JSON 序列化的值。當排程器訪問序列化的 DAG 以重建 timetable 時,會將該值傳遞給 deserialize

Timetable 在 UI 中的顯示

預設情況下,自定義 timetable 在 UI 中以其類名顯示(例如,“dags”表中的Schedule列)。可以透過重寫 summary 屬性來定製此顯示。這對於引數化 timetable 尤其有用,可以將 __init__ 中提供的引數包含在內。例如,對於我們的 SometimeAfterWorkdayTimetable 類,我們可以這樣做:

@property
def summary(self) -> str:
    return f"after each workday, at {self._schedule_at}"

因此,對於如下宣告的 DAG:

with DAG(
    schedule=SometimeAfterWorkdayTimetable(Time(8)),  # 8am.
    ...,
):
    ...

Schedule 列將顯示 after each workday, at 08:00:00

另請參閱

模組 airflow.timetables.base

公共介面有詳細的文件,解釋了子類應該實現什麼。

Timetable 描述在 UI 中的顯示

您還可以透過重寫 description 屬性為您的 Timetable 實現提供描述。這對於在 UI 中為您的實現提供全面的描述特別有用。例如,對於我們的 SometimeAfterWorkdayTimetable 類,我們可以這樣做:

description = "Schedule: after each workday"

如果您想根據引數生成描述,也可以將其放在 __init__ 中。

def __init__(self) -> None:
    self.description = "Schedule: after each workday, at f{self._schedule_at}"

當您想提供與 summary 屬性不同的全面描述時,這特別有用。

因此,對於如下宣告的 DAG:

with DAG(
    schedule=SometimeAfterWorkdayTimetable(Time(8)),  # 8am.
    ...,
):
    ...

i 圖示會顯示 Schedule: after each workday, at 08:00:00

另請參閱

模組 airflow.timetables.interval

檢視 CronDataIntervalTimetable 的 description 實現,它在 UI 中提供了全面的 cron 描述。

更改生成的 run_id

添加於版本 2.4。

自 Airflow 2.4 起,Timetable 也負責生成 DagRun 的 run_id

例如,要讓 Run ID 顯示一個“人類友好”的執行開始日期(即資料區間的結束時間,而不是當前使用的開始時間),您可以向自定義 timetable 新增如下方法:

def generate_run_id(
    self,
    *,
    run_type: DagRunType,
    logical_date: DateTime,
    data_interval: DataInterval | None,
    **extra,
) -> str:
    if run_type == DagRunType.SCHEDULED and data_interval:
        return data_interval.end.format("YYYY-MM-DD dddd")
    return super().generate_run_id(
        run_type=run_type, logical_date=logical_date, data_interval=data_interval, **extra
    )

請記住,RunID 的長度限制為 250 個字元,並且在同一個 DAG 內必須唯一。

本條目有幫助嗎?