Airflow 2025 峰會將於 10 月 07 日至 09 日舉行。立即註冊即可獲得早鳥票!

時間表

對於基於時間排程的 DAG(與事件驅動相對),DAG 的內部“時間表”驅動排程。時間表還決定了為 DAG 建立的每次執行的資料間隔和邏輯日期。

使用 cron 表示式或 timedelta 物件排程的 DAG 在內部被轉換為始終使用時間表。

如果 cron 表示式或 timedelta 足以滿足您的用例,則無需擔心編寫自定義時間表,因為 Airflow 具有處理這些情況的預設時間表。但對於更復雜的排程要求,您可以建立自己的時間表類並將其傳遞給 DAG 的 schedule 引數。

自定義時間表實現的有用示例

  • 每天在不同時間發生的任務執行。例如,天文學家可能會發現,在黎明時執行任務來處理前一夜收集的資料很有用。

  • 不遵循公曆的排程。例如,為農曆中的每個月建立一次執行。這在概念上類似於日出情況,但時間尺度不同。

  • 滾動視窗或重疊的資料間隔。例如,您可能希望每天都有一次執行,但讓每次執行涵蓋前七天的時間段。可以使用 cron 表示式來巧妙地實現這一點,但自定義資料間隔提供了更自然的表示方式。

  • 資料間隔之間存在“間隔”,而不是連續間隔,因為 cron 表示式和 timedelta 排程都表示連續間隔。參見資料間隔

Airflow 允許您在外掛中編寫自定義時間表並由 DAG 使用。您可以在使用時間表自定義 DAG 排程操作指南中找到演示自定義時間表的示例。

注意

作為一般規則,始終在程式碼中儘可能晚地訪問 Variables、Connections 或其他任何需要訪問資料庫的內容。有關應遵循的更多最佳實踐,請參見時間表

內建時間表

Airflow 內建了幾個常用的時間表,以涵蓋最常見的用例。外掛中可能提供額外的時間表。

DeltaTriggerTimetable

接受 datetime.timedeltadateutil.relativedelta.relativedelta,並在經過一個時間差後執行 DAG 的時間表。

from datetime import timedelta

from airflow.timetables.trigger import DeltaTriggerTimetable


@dag(schedule=DeltaTriggerTimetable(timedelta(days=7)), ...)  # Once every week.
def example_dag():
    pass

您還可以向時間表提供靜態資料間隔。可選的 interval 引數也應該是一個 datetime.timedeltadateutil.relativedelta.relativedelta。使用這些引數時,觸發的 DAG 執行的資料間隔跨越指定的持續時間,並以觸發時間為結束

from datetime import UTC, datetime, timedelta

from dateutil.relativedelta import relativedelta, FR

from airflow.timetables.trigger import DeltaTriggerTimetable


@dag(
    # Runs every Friday at 18:00 to cover the work week.
    schedule=DeltaTriggerTimetable(
        relativedelta(weekday=FR(), hour=18),
        interval=timedelta(days=4, hours=9),
    ),
    start_date=datetime(2025, 1, 3, 18, tzinfo=UTC),
    ...,
)
def example_dag():
    pass

CronTriggerTimetable

接受 cron 表示式並根據其觸發 DAG 執行的時間表。

from airflow.timetables.trigger import CronTriggerTimetable


@dag(schedule=CronTriggerTimetable("0 1 * * 3", timezone="UTC"), ...)  # At 01:00 on Wednesday
def example_dag():
    pass

您還可以向時間表提供靜態資料間隔。可選的 interval 引數必須是一個 datetime.timedeltadateutil.relativedelta.relativedelta。使用這些引數時,觸發的 DAG 執行的資料間隔跨越指定的持續時間,並以觸發時間為結束

from datetime import timedelta

from airflow.timetables.trigger import CronTriggerTimetable


@dag(
    # Runs every Friday at 18:00 to cover the work week (9:00 Monday to 18:00 Friday).
    schedule=CronTriggerTimetable(
        "0 18 * * 5",
        timezone="UTC",
        interval=timedelta(days=4, hours=9),
    ),
    ...,
)
def example_dag():
    pass

MultipleCronTriggerTimetable

這類似於CronTriggerTimetable,但它接受多個 cron 表示式。只要任何表示式與時間匹配,就會排程一次 DAG 執行。當所需的排程無法用一個單一的 cron 表示式表示時,這尤其有用。

from airflow.timetables.trigger import MultipleCronTriggerTimetable


# At 1:10 and 2:40 each day.
@dag(schedule=MultipleCronTriggerTimetable("10 1 * * *", "40 2 * * *", timezone="UTC"), ...)
def example_dag():
    pass

CronTriggerTimetable 相同的可選 interval 引數也可用。

from datetime import timedelta

from airflow.timetables.trigger import MultipleCronTriggerTimetable


@dag(
    schedule=MultipleCronTriggerTimetable(
        "10 1 * * *",
        "40 2 * * *",
        timezone="UTC",
        interval=timedelta(hours=1),
    ),
    ...,
)
def example_dag():
    pass

DeltaDataIntervalTimetable

以時間差排程資料間隔的時間表。您可以透過將 datetime.timedeltadateutil.relativedelta.relativedelta 提供給 DAG 的 schedule 引數來選擇它。

此時間表側重於資料間隔值,不一定將執行日期與任意邊界(例如,一天或一小時的開始)對齊。

@dag(schedule=datetime.timedelta(minutes=30))
def example_dag():
    pass

CronDataIntervalTimetable

接受 cron 表示式,根據每個 cron 觸發點之間的時間間隔建立資料間隔,並在每個資料間隔結束時觸發 DAG 執行的時間表。

DAGs 文件中所述,透過將有效的 cron 表示式作為字串提供給 DAG 的 schedule 引數來選擇此時間表。

@dag(schedule="0 1 * * 3")  # At 01:00 on Wednesday.
def example_dag():
    pass

EventsTimetable

傳遞一個 datetime 列表,讓 DAG 在這些時間點之後執行。這對於基於體育賽事、計劃的溝通活動以及其他任意、不規則但可預測的排程很有用。

事件列表必須是有限且合理大小的,因為它必須在每次解析 DAG 時載入。可選地,使用 restrict_to_events 標誌來強制手動執行 DAG,手動執行使用最近或最早事件的時間作為資料間隔。否則,手動執行的 data_interval_startdata_interval_end 等於手動執行開始的時間。您還可以使用 description 引數命名這組事件,該名稱將顯示在 Airflow UI 中。

from airflow.timetables.events import EventsTimetable


@dag(
    schedule=EventsTimetable(
        event_dates=[
            pendulum.datetime(2022, 4, 5, 8, 27, tz="America/Chicago"),
            pendulum.datetime(2022, 4, 17, 8, 27, tz="America/Chicago"),
            pendulum.datetime(2022, 4, 22, 20, 50, tz="America/Chicago"),
        ],
        description="My Team's Baseball Games",
        restrict_to_events=False,
    ),
    ...,
)
def example_dag():
    pass

基於資產事件和時間結合的排程

將條件資產表示式與基於時間的排程結合可以增強排程的靈活性。

AssetOrTimeSchedule 是一種專門的時間表,它允許基於時間排程和資產事件來排程 DAG。它還促進了按照傳統時間表建立計劃執行,以及獨立執行的資產觸發執行。

此功能在 DAG 需要在資產更新時執行並且還需要定期執行檢查或更新的場景中特別有用。它確保了工作流對資料更改保持響應,並始終執行定期檢查或更新。

以下是使用 AssetOrTimeSchedule 的 DAG 示例

from airflow.timetables.assets import AssetOrTimeSchedule
from airflow.timetables.trigger import CronTriggerTimetable


@dag(
    schedule=AssetOrTimeSchedule(
        timetable=CronTriggerTimetable("0 1 * * 3", timezone="UTC"), assets=(dag1_asset & dag2_asset)
    )
    # Additional arguments here, replace this comment with actual arguments
)
def example_dag():
    # DAG tasks go here
    pass

時間表對比

“觸發器”和“資料間隔”時間表之間的區別

Airflow 為 cron 和 delta 排程提供了兩組時間表

是否關心 資料間隔

觸發器時間表包含資料間隔。這意味著 data_interval_startdata_interval_end 的值(以及舊版 execution_date)是相同的;即 DAG 執行被觸發的時間。

對於資料間隔時間表,data_interval_startdata_interval_end 的值(以及舊版 execution_date)是不同的。data_interval_start 是 DAG 執行被觸發的時間,而 data_interval_end 是間隔的結束時間。

補齊 行為

預設情況下,catchup 設定為 False。這可以防止在以下場景中執行不必要的 DAG: - 如果您建立了一個開始日期在過去的新的 DAG,並且不想執行過去的 DAG。如果 catchupTrue,Airflow 會執行在該時間間隔內本應執行的所有 DAG。 - 如果您暫停了現有的 DAG,然後在稍後的日期重新啟用它,catchupFalse 意味著 Airflow 不會執行在暫停期間本應執行的 DAG。

在這些場景中,run_id 中的 logical_date 基於時間表如何處理資料間隔。

您可以使用 Airflow 配置 [scheduler] catchup_by_default 更改預設的 catchup 行為。

有關使用 catchup 時如何觸發 DAG 執行的更多資訊,請參見補齊

DAG 執行被觸發的時間

觸發器和資料間隔時間表在同一時間觸發 DAG 執行。但是,run_id 的時間戳對於每種時間表都不同。這是因為 run_id 基於 logical_date

例如,假設有一個 cron 表示式 @daily0 0 * * *,計劃每天 12AM 執行。如果您在 1 月 31 日 3PM 啟用使用這兩種時間表的 DAG: - CronTriggerTimetable 在 2 月 1 日 12AM 建立一個新的 DAG 執行。run_id 時間戳是 2 月 1 日午夜。 - CronDataIntervalTimetable 立即建立一個新的 DAG 執行,因為從 1 月 31 日 12AM 開始的每日時間間隔的 DAG 執行尚未發生。run_id 時間戳是 1 月 31 日午夜,因為那是資料間隔的開始。

以下是另一個示例,顯示了跳過 DAG 執行情況下的區別

假設有兩個執行中的 DAG 使用 cron 表示式 @daily0 0 * * *,並使用兩種不同的時間表。如果您在 1 月 31 日 3PM 暫停 DAG,並在 2 月 2 日 3PM 重新啟用它們: - CronTriggerTimetable 會跳過本應在 2 月 1 日和 2 日觸發的 DAG 執行。下一次 DAG 執行將在 2 月 3 日 12AM 觸發。 - CronDataIntervalTimetable 只跳過本應在 2 月 1 日觸發的 DAG 執行。在您重新啟用 DAG 後,會立即觸發 2 月 2 日的 DAG 執行。

在這些示例中,您可以看到觸發器時間表如何更直觀地建立 DAG 執行,並且與人們期望工作流行為的方式相似,而資料間隔時間表則側重於它處理的資料間隔,不反映工作流自身的屬性。

Cron 和 Delta 資料間隔時間表之間的區別

選擇 DeltaDataIntervalTimetableCronDataIntervalTimetable 取決於您的用例。如果您在 2 月 1 日 01:05 啟用一個 DAG,下表總結了根據 3 個引數:schedulestart_datecatchup,建立的 DAG 執行以及它們涵蓋的資料間隔。

排程引數 (schedule)

開始日期 (start_date)

補齊 (catchup)

涵蓋的間隔

備註

*/30 * * * *

year-02-01

True

  • 00:00 - 00:30

  • 00:30 - 01:00

行為與使用 timedelta 物件相同。

*/30 * * * *

year-02-01

False

  • 00:30 - 01:00

*/30 * * * *

year-02-01 00:10

True

  • 00:30 - 01:00

間隔 00:00 - 00:30 不在開始日期之後,因此被跳過。

*/30 * * * *

year-02-01 00:10

False

  • 00:30 - 01:00

無論開始日期如何,資料間隔都與小時/天等邊界對齊。

datetime.timedelta(minutes=30)

year-02-01

True

  • 00:00 - 00:30

  • 00:30 - 01:00

行為與使用 cron 表示式相同。

datetime.timedelta(minutes=30)

year-02-01

False

  • 00:35 - 01:05

間隔不與開始日期對齊,而是與當前時間對齊。

datetime.timedelta(minutes=30)

year-02-01 00:10

True

  • 00:10 - 00:40

間隔與開始日期對齊。下一個將在 5 分鐘後觸發,涵蓋 00:40 - 01:10。

datetime.timedelta(minutes=30)

year-02-01 00:10

False

  • 00:35 - 01:05

間隔與當前時間對齊。下一次執行將在 30 分鐘後觸發。

本條目有幫助嗎?