Airflow Summit 2025 將於 10 月 07-09 日舉行。立即註冊享早鳥票優惠!

可延遲的運算子與觸發器

標準的 Operator(運算子)Sensor(感測器) 在整個執行期間都會佔用一個完整的 worker slot(工作節點插槽),即使它們處於空閒狀態。例如,如果您只有 100 個 worker slot 可用來執行任務,並且有 100 個 DAG 正在等待一個當前正在執行但空閒的 Sensor,那麼您就 無法執行任何其他任務 - 即使您的整個 Airflow 叢集實際上是空閒的。Sensor 的 reschedule 模式在一定程度上解決了這個問題,它允許 Sensor 只在固定時間間隔執行,但這不夠靈活,並且只允許使用時間作為恢復的原因,而不是其他條件。

這就是 *Deferrable Operator*(可延遲運算子)的作用。當一個 Operator 除了等待別無他事時,它可以透過 *延遲* 來掛起自身,釋放 worker 給其他程序使用。當 Operator 延遲時,執行會轉移到 Triggerer(觸發器),在那裡將執行由 Operator 指定的 Trigger。Trigger 可以執行 Operator 所需的輪詢或等待操作。然後,當 Trigger 完成輪詢或等待後,它會發送訊號通知 Operator 恢復執行。在延遲執行階段,由於工作已解除安裝到 Triggerer,任務不再佔用 worker slot,您擁有更多的空閒工作負載能力。預設情況下,處於延遲狀態的任務不佔用 pool slots。如果您希望它們佔用,可以透過編輯相關 pool 來更改此設定。

*Trigger*(觸發器)是小型、非同步的 Python 程式碼段,設計用於在單個 Python 程序中執行。由於它們是非同步的,因此可以在 Airflow 元件 *triggerer* 中高效地共存。

該過程的工作概述

  • 任務例項(執行中的 Operator)到達需要等待其他操作或條件的點,並使用繫結到恢復事件的 Trigger 延遲自身。這會釋放 worker 以執行其他任務。

  • 新的 Trigger 例項由 Airflow 註冊,並由 Triggerer 程序拾取。

  • Trigger 執行直到觸發,此時其源任務將由 Scheduler(排程器)重新排程。

  • Scheduler 將任務排隊以在 worker 節點上恢復。

作為 DAG 作者,您可以使用預先編寫的可延遲 Operator,也可以編寫自己的 Operator。然而,編寫它們需要滿足某些設計標準。

使用可延遲的運算子

如果您想使用 Airflow 自帶的預先編寫的可延遲 Operator,例如 TimeSensorAsync,則只需完成兩個步驟

  • 確保您的 Airflow 安裝執行至少一個 triggerer 程序以及正常的 scheduler

  • 在您的 DAG 中使用可延遲的 Operator/Sensor。

Airflow 會自動為您處理和實現延遲過程。

如果您正在升級現有 DAG 以使用可延遲 Operator,Airflow 包含與 API 相容的 Sensor 變體,例如 TimeSensorAsync 對應 TimeSensor。將這些變體新增到您的 DAG 中即可使用可延遲 Operator,無需其他更改。

請注意,您不能在自定義 PythonOperator 或 TaskFlow Python 函式內部使用延遲能力。延遲功能僅適用於傳統的、基於類的 Operator。

編寫可延遲的運算子

編寫可延遲 Operator 時,主要需要考慮以下幾點

  • 您的 Operator 必須使用 Trigger 延遲自身。您可以使用 Airflow 核心中包含的 Trigger,或者編寫一個自定義 Trigger。

  • 您的 Operator 在延遲期間將停止並在其 worker 中移除,並且不會自動保留任何狀態。您可以透過指示 Airflow 在特定方法恢復 Operator 或透過傳遞特定的 kwargs 來保留狀態。

  • 您可以多次延遲,也可以在 Operator 執行重要工作之前或之後延遲。或者,您可以在滿足特定條件時延遲。例如,如果系統沒有立即的響應。延遲完全由您控制。

  • 任何 Operator 都可以延遲;不需要在其類上進行特殊標記,並且不限於 Sensor。

  • 如果您想新增一個同時支援可延遲和不可延遲模式的 Operator 或 Sensor,建議將 deferrable: bool = conf.getboolean("operators", "default_deferrable", fallback=False) 新增到 Operator 的 __init__ 方法中,並用它來決定是否在可延遲模式下執行 Operator。您可以透過 operator 部分的 default_deferrable 配置所有支援在可延遲和不可延遲模式之間切換的 Operator 和 Sensor 的 deferrable 預設值。這是一個支援兩種模式的 Sensor 示例。

import time
from datetime import timedelta
from typing import Any

from airflow.configuration import conf
from airflow.sdk import BaseSensorOperator
from airflow.providers.standard.triggers.temporal import TimeDeltaTrigger
from airflow.utils.context import Context


class WaitOneHourSensor(BaseSensorOperator):
    def __init__(
        self, deferrable: bool = conf.getboolean("operators", "default_deferrable", fallback=False), **kwargs
    ) -> None:
        super().__init__(**kwargs)
        self.deferrable = deferrable

    def execute(self, context: Context) -> None:
        if self.deferrable:
            self.defer(
                trigger=TimeDeltaTrigger(timedelta(hours=1)),
                method_name="execute_complete",
            )
        else:
            time.sleep(3600)

    def execute_complete(
        self,
        context: Context,
        event: dict[str, Any] | None = None,
    ) -> None:
        # We have no more work to do here. Mark as complete.
        return

編寫觸發器

一個 *Trigger*(觸發器)被編寫為一個繼承自 BaseTrigger 的類,並實現三個方法

  • __init__: 用於接收例項化它的 Operator 傳遞的引數的方法。從 2.10.0 版本開始,我們可以直接從預定義的 Trigger 開始任務執行。為了利用此功能,__init__ 中的所有引數必須是可序列化的。

  • run: 一個非同步方法,用於執行其邏輯並生成(yield)一個或多個 TriggerEvent 例項作為非同步生成器。

  • serialize: 返回重建此 Trigger 所需的資訊,作為類路徑和傳遞給 __init__ 的關鍵字引數組成的元組。

此示例展示了一個基本 Trigger 的結構,它是 Airflow DateTimeTrigger 的非常簡化版本

import asyncio

from airflow.triggers.base import BaseTrigger, TriggerEvent
from airflow.utils import timezone


class DateTimeTrigger(BaseTrigger):
    def __init__(self, moment):
        super().__init__()
        self.moment = moment

    def serialize(self):
        return ("airflow.providers.standard.triggers.temporal.DateTimeTrigger", {"moment": self.moment})

    async def run(self):
        while self.moment > timezone.utcnow():
            await asyncio.sleep(1)
        yield TriggerEvent(self.moment)

程式碼示例展示了幾件事

  • __init__serialize 是成對編寫的。Trigger 在由 Operator 作為其延遲請求的一部分提交時例項化一次,然後序列化並在執行該 Trigger 的任何 Triggerer 程序上重新例項化。

  • run 方法宣告為 async def,因為它 *必須* 是非同步的,並且使用 asyncio.sleep 而不是常規的 time.sleep(因為後者會阻塞程序)。

  • 當它發出事件時,它會包含 self.moment,這樣如果此 Trigger 在多個主機上冗餘執行,則可以對事件進行去重。

Trigger 可以根據您的需求複雜或簡單,前提是它們符合設計約束。它們可以以高可用方式執行,並自動分配到執行 Triggerer 的主機上。我們鼓勵您避免在 Trigger 中使用任何型別的持久狀態。Trigger 應從其 __init__ 中獲取所需的一切,以便它們可以自由序列化和移動。

如果您剛開始編寫非同步 Python,在編寫您的 run() 方法時要非常小心。Python 的非同步模型意味著如果程式碼在執行阻塞操作時沒有正確地 await,它可能會阻塞整個程序。Airflow 會嘗試檢測阻塞程序的程式碼,並在 Triggerer 日誌中發出警告。您可以在編寫 Trigger 時透過設定環境變數 PYTHONASYNCIODEBUG=1 來啟用 Python 的額外檢查,以確保您編寫的是非阻塞程式碼。在進行檔案系統呼叫時要特別小心,因為如果底層檔案系統是網路支援的,它可能會阻塞。

編寫自定義 Trigger 時需要注意一些設計約束

  • run 方法 *必須是非同步的*(使用 Python 的 asyncio),並在執行阻塞操作時正確地 await

  • run 必須 yield 其 TriggerEvent,而不是 return 它們。如果在至少生成一個事件之前返回,Airflow 將認為這是一個錯誤,並使任何等待它的任務例項失敗。如果它丟擲異常,Airflow 也會使任何依賴的任務例項失敗。

  • 您應該假定 Trigger 例項可以執行 *不止一次*。如果發生網路分割槽,Airflow 在分離的機器上重新啟動 Trigger,可能會發生這種情況。因此,您必須注意副作用。例如,您可能不想使用 Trigger 來插入資料庫行。

  • 如果您的 Trigger 設計為發出多個事件(目前不支援),那麼每個發出的事件 *必須* 包含一個 payload,如果在多個地方執行 Trigger,該 payload 可用於去重事件。如果您只觸發一個事件並且不需要將資訊傳回 Operator,則可以將 payload 設定為 None

  • Trigger 可以突然從一個 Triggerer 服務中移除,並在新的服務上啟動。例如,如果子網發生變化導致網路分割槽,或者進行部署時。如果需要,您可以實現 cleanup 方法,無論 Trigger 正常退出還是發生其他情況,此方法都會在 run 之後被呼叫。

  • 為了使 Trigger 的任何更改生效,*triggerer* 需要在 Trigger 修改後重新啟動。

  • 您的 Trigger 不能來自 DAG bundle - sys.path 上的其他任何位置都可以。Triggerer 在執行 Trigger 時不會初始化任何 bundle。

注意

目前 Trigger 僅在其第一個事件觸發後使用,因為它們僅用於恢復延遲的任務,並且任務在第一個事件觸發後恢復。然而,Airflow 計劃未來允許從 Trigger 啟動 DAG,屆時多事件 Trigger 將更有用。

觸發器中的敏感資訊

從 Airflow 2.9.0 開始,Trigger 的 kwargs 在儲存到資料庫之前會被序列化和加密。這意味著您傳遞給 Trigger 的任何敏感資訊都將以加密形式儲存在資料庫中,並在從資料庫讀取時解密。

觸發延遲

如果您想在 Operator 的任何位置觸發延遲,可以呼叫 self.defer(trigger, method_name, kwargs, timeout)。這會為 Airflow 引發一個特殊異常。引數如下

  • trigger: 您希望延遲到的 Trigger 例項。它將被序列化到資料庫中。

  • method_name: 您希望 Airflow 在恢復時呼叫的 Operator 方法名稱。

  • kwargs:(可選)呼叫方法時傳遞的額外關鍵字引數。預設為 {}

  • timeout:(可選)一個 timedelta,指定此延遲將失敗並導致任務例項失敗的超時時間。預設為 None,表示沒有超時。

以下是 Sensor 如何觸發延遲的基本示例

from __future__ import annotations

from datetime import timedelta
from typing import TYPE_CHECKING, Any

from airflow.sdk import BaseSensorOperator
from airflow.providers.standard.triggers.temporal import TimeDeltaTrigger

if TYPE_CHECKING:
    from airflow.utils.context import Context


class WaitOneHourSensor(BaseSensorOperator):
    def execute(self, context: Context) -> None:
        self.defer(trigger=TimeDeltaTrigger(timedelta(hours=1)), method_name="execute_complete")

    def execute_complete(self, context: Context, event: dict[str, Any] | None = None) -> None:
        # We have no more work to do here. Mark as complete.
        return

當您選擇延遲時,您的 Operator 將在該點停止執行,並從其當前 worker 中移除。不會保留任何狀態,例如區域性變數或在 self 上設定的屬性。當您的 Operator 恢復時,它將作為 Operator 的新例項恢復。將狀態從 Operator 的舊例項傳遞到新例項的唯一方法是使用 method_namekwargs

當您的 Operator 恢復時,Airflow 會將 context 物件和 event 物件新增到傳遞給 method_name 方法的 kwargs 中。這個 event 物件包含觸發您的 Operator 恢復的 Trigger 事件的 payload。根據 Trigger 的不同,這可能對您的 Operator 有用,例如狀態碼或用於獲取結果的 URL。或者,它可能是無關緊要的資訊,例如日期時間。然而,您的 method_name 方法 *必須* 接受 contextevent 作為關鍵字引數。

如果您的 Operator 從其首次新的 execute() 方法或由 method_name 指定的後續方法返回,它將被視為已完成並結束執行。

讓我們更深入地瞭解上面的 WaitOneHourSensor 示例。這個 Sensor 只是 Trigger 的一個簡單包裝。它會延遲到 Trigger,並指定當 Trigger 觸發時返回到的不同方法。當它立即返回時,它會標記該 Sensor 為成功。

self.defer 呼叫會丟擲 TaskDeferred 異常,因此它可以在 Operator 程式碼中的任何位置工作,即使巢狀在 execute() 方法深處。您也可以手動丟擲 TaskDeferred,它使用與 self.defer 相同的引數。

Operator 的 execution_timeout 是根據 *總執行時間* 確定的,而不是延遲之間的單個執行時間。這意味著如果設定了 execution_timeout,Operator 可能會在延遲期間或延遲後執行期間失敗,即使它只恢復了幾秒鐘。

多次延遲

想象一個場景:您希望 Operator 迭代一個長度可變的列表項,並延遲處理每個項。

例如,向資料庫提交多個查詢,或處理多個檔案。

如果您希望 Operator 只有一個入口點,可以將 method_name 設定為 execute,但它也必須接受 event 作為可選關鍵字引數。

以下是實現此功能的概要。

import asyncio

from airflow.sdk import BaseOperator
from airflow.triggers.base import BaseTrigger, TriggerEvent


class MyItemTrigger(BaseTrigger):
    def __init__(self, item):
        super().__init__()
        self.item = item

    def serialize(self):
        return (self.__class__.__module__ + "." + self.__class__.__name__, {"item": self.item})

    async def run(self):
        result = None
        try:
            # Somehow process the item to calculate the result
            ...
            yield TriggerEvent({"result": result})
        except Exception as e:
            yield TriggerEvent({"error": str(e)})


class MyItemsOperator(BaseOperator):
    def __init__(self, items, **kwargs):
        super().__init__(**kwargs)
        self.items = items

    def execute(self, context, current_item_index=0, event=None):
        last_result = None
        if event is not None:
            # execute method was deferred
            if "error" in event:
                raise Exception(event["error"])
            last_result = event["result"]
            current_item_index += 1

        try:
            current_item = self.items[current_item_index]
        except IndexError:
            return last_result

        self.defer(
            trigger=MyItemTrigger(item),
            method_name="execute",  # The trigger will call this same method again
            kwargs={"current_item_index": current_item_index},
        )

從任務開始時觸發延遲

在 2.10.0 版本中新增。

如果您想將任務直接延遲到 Triggerer 而不進入 worker,可以將類級別屬性 start_from_trigger 設定為 True,併為您的可延遲 Operator 新增一個帶有 StartTriggerArgs 物件的類級別屬性 start_trigger_args,該物件包含以下 4 個屬性

  • trigger_cls: 您的 Trigger 類可匯入路徑。

  • trigger_kwargs: 初始化 trigger_cls 時要傳遞的關鍵字引數。**請注意,所有引數都需要可由 Airflow 序列化。這是此功能的主要限制。**

  • next_method: 您希望 Airflow 在恢復時呼叫的 Operator 方法名稱。

  • next_kwargs: 呼叫 next_method 時傳遞的額外關鍵字引數。

  • timeout:(可選)一個 timedelta,指定此延遲將失敗並導致任務例項失敗的超時時間。預設為 None,表示沒有超時。

在 Sensor 部分,我們需要提供 TimeDeltaTrigger 的路徑作為 trigger_cls

from __future__ import annotations

from datetime import timedelta
from typing import TYPE_CHECKING, Any

from airflow.sdk import BaseSensorOperator
from airflow.triggers.base import StartTriggerArgs

if TYPE_CHECKING:
    from airflow.utils.context import Context


class WaitOneHourSensor(BaseSensorOperator):
    start_trigger_args = StartTriggerArgs(
        trigger_cls="airflow.providers.standard.triggers.temporal.TimeDeltaTrigger",
        trigger_kwargs={"moment": timedelta(hours=1)},
        next_method="execute_complete",
        next_kwargs=None,
        timeout=None,
    )
    start_from_trigger = True

    def execute_complete(self, context: Context, event: dict[str, Any] | None = None) -> None:
        # We have no more work to do here. Mark as complete.
        return

start_from_triggertrigger_kwargs 也可以在例項級別修改,以實現更靈活的配置。

from __future__ import annotations

from datetime import timedelta
from typing import TYPE_CHECKING, Any

from airflow.sdk import BaseSensorOperator
from airflow.triggers.base import StartTriggerArgs

if TYPE_CHECKING:
    from airflow.utils.context import Context


class WaitHoursSensor(BaseSensorOperator):
    start_trigger_args = StartTriggerArgs(
        trigger_cls="airflow.providers.standard.triggers.temporal.TimeDeltaTrigger",
        trigger_kwargs={"moment": timedelta(hours=1)},
        next_method="execute_complete",
        next_kwargs=None,
        timeout=None,
    )
    start_from_trigger = True

    def __init__(self, *args: list[Any], **kwargs: dict[str, Any]) -> None:
        super().__init__(*args, **kwargs)
        self.start_trigger_args.trigger_kwargs = {"hours": 2}
        self.start_from_trigger = True

    def execute_complete(self, context: Context, event: dict[str, Any] | None = None) -> None:
        # We have no more work to do here. Mark as complete.
        return

對映任務的初始化階段發生在 Scheduler 將它們提交給 Executor 之後。因此,此功能提供的動態任務對映支援有限,其用法也與標準實踐不同。要啟用動態任務對映支援,您需要在 __init__ 方法中定義 start_from_triggertrigger_kwargs。**請注意,您不需要同時定義這兩個引數才能使用此功能,但需要使用完全相同的引數名稱。** 例如,如果您將引數定義為 t_kwargs 並將此值賦給 self.start_trigger_args.trigger_kwargs,它將不會產生任何效果。當對映 start_from_trigger 設定為 True 的任務時,整個 __init__ 方法將被跳過。Scheduler 將使用 partialexpand 中提供的 start_from_triggertrigger_kwargs(如果未提供,則回退到類屬性中的值)來確定是否以及如何將任務提交給 Executor 或 Triggerer。請注意,在此階段不會解析 XCom 值。

Trigger 執行完成後,任務可能會被髮送回 worker 執行 next_method,或者任務例項可能直接結束。(參考 從觸發器退出延遲任務)如果任務被髮送回 worker,__init__ 方法中的引數在 next_method 執行之前仍然會生效,但它們不會影響 Trigger 的執行。

from __future__ import annotations

from datetime import timedelta
from typing import TYPE_CHECKING, Any

from airflow.sdk import BaseSensorOperator
from airflow.triggers.base import StartTriggerArgs

if TYPE_CHECKING:
    from airflow.utils.context import Context


class WaitHoursSensor(BaseSensorOperator):
    start_trigger_args = StartTriggerArgs(
        trigger_cls="airflow.providers.standard.triggers.temporal.TimeDeltaTrigger",
        trigger_kwargs={"moment": timedelta(hours=1)},
        next_method="execute_complete",
        next_kwargs=None,
        timeout=None,
    )
    start_from_trigger = True

    def __init__(
        self,
        *args: list[Any],
        trigger_kwargs: dict[str, Any] | None,
        start_from_trigger: bool,
        **kwargs: dict[str, Any],
    ) -> None:
        # This whole method will be skipped during dynamic task mapping.

        super().__init__(*args, **kwargs)
        self.start_trigger_args.trigger_kwargs = trigger_kwargs
        self.start_from_trigger = start_from_trigger

    def execute_complete(self, context: Context, event: dict[str, Any] | None = None) -> None:
        # We have no more work to do here. Mark as complete.
        return

這將擴充套件為 2 個任務,其“hours”引數分別設定為 1 和 2。

WaitHoursSensor.partial(task_id="wait_for_n_hours", start_from_trigger=True).expand(
    trigger_kwargs=[{"hours": 1}, {"hours": 2}]
)

從觸發器退出延遲任務

在 2.10.0 版本中新增。

如果您想直接從 Triggerer 退出任務而無需進入 worker,可以為您的可延遲 Operator 指定例項級別屬性 end_from_trigger,並附帶 Operator 的屬性(如上所述)。這可以節省啟動新 worker 所需的一些資源。

Trigger 可以有兩個選項:要麼將執行傳送回 worker,要麼直接結束任務例項。如果 Trigger 本身結束任務例項,則 method_name 無關緊要,可以為 None。否則,提供任務恢復執行時應使用的 method_name

class WaitFiveHourSensorAsync(BaseSensorOperator):
    # this sensor always exits from trigger.
    def __init__(self, **kwargs) -> None:
        super().__init__(**kwargs)
        self.end_from_trigger = True

    def execute(self, context: Context) -> NoReturn:
        self.defer(
            method_name=None,
            trigger=WaitFiveHourTrigger(duration=timedelta(hours=5), end_from_trigger=self.end_from_trigger),
        )

TaskSuccessEventTaskFailureEvent 是可用於直接結束任務例項的兩個事件。這將任務標記為 task_instance_state 狀態,並在適用時可選地推送 xcom。以下是使用這些事件的示例

class WaitFiveHourTrigger(BaseTrigger):
    def __init__(self, duration: timedelta, *, end_from_trigger: bool = False):
        super().__init__()
        self.duration = duration
        self.end_from_trigger = end_from_trigger

    def serialize(self) -> tuple[str, dict[str, Any]]:
        return (
            "your_module.WaitFiveHourTrigger",
            {"duration": self.duration, "end_from_trigger": self.end_from_trigger},
        )

    async def run(self) -> AsyncIterator[TriggerEvent]:
        await asyncio.sleep(self.duration.total_seconds())
        if self.end_from_trigger:
            yield TaskSuccessEvent()
        else:
            yield TriggerEvent({"duration": self.duration})

在上述示例中,如果 end_from_trigger 設定為 True,Trigger 將透過 yield TaskSuccessEvent 直接結束任務例項。否則,它將使用 Operator 中指定的方法恢復任務例項。

注意

從 Trigger 退出僅在可延遲 Operator 未整合監聽器時有效。目前,當可延遲 Operator 將 end_from_trigger 屬性設定為 True 並整合監聽器時,它會在解析期間引發異常以指示此限制。編寫自定義 Trigger 時,請確保如果從外掛添加了監聽器,則 Trigger 未設定為直接結束任務例項。如果 Trigger 的作者將 end_from_trigger 屬性更改為其他屬性,DAG 解析不會引發任何異常,並且依賴於此任務的監聽器將不起作用。此限制將在未來版本中解決。

高可用性

Trigger 被設計為在高可用性(HA)架構中工作。如果您想執行高可用性設定,請在多個主機上執行多個 triggerer 副本。就像 scheduler 一樣,它們透過正確的鎖定和 HA 自動共存。

根據 Trigger 執行的工作量,單個 triggerer 主機可以容納數百到數萬個 Trigger。預設情況下,每個 triggerer 具有 1000 個 Trigger 的容量,可以嘗試同時執行。您可以使用 --capacity 引數更改可同時執行的 Trigger 數量。如果您嘗試執行的 Trigger 數量超過所有 triggerer 程序的總容量,部分 Trigger 將延遲執行,直到其他 Trigger 完成。

Airflow 嘗試僅在一個地方同時執行 Trigger,並維護與當前執行的所有 triggerer 的心跳。如果 triggerer 死亡或與其執行 Airflow 資料庫的網路分割槽,Airflow 會自動重新排程在該主機上的 Trigger 在其他地方執行。Airflow 會等待 (2.1 * triggerer.job_heartbeat_sec) 秒,等待機器重新出現,然後才重新排程 Trigger。

這意味著 Trigger 可能(但不常見)同時在多個地方執行。然而,此行為已設計到 Trigger 契約中,並且是預期行為。Airflow 會對 Trigger 同時在多個地方執行時觸發的事件進行去重,因此此過程對您的 Operator 是透明的。

請注意,您執行的每個額外 triggerer 都會導致與您的資料庫建立一個額外的持久連線。

感測器中 Mode=’reschedule’ 與 Deferrable=True 的區別

在 Airflow 中,Sensor 會等待特定條件滿足後才繼續執行下游任務。Sensor 有兩種管理空閒期的方式:mode='reschedule'deferrable=True。由於 mode='reschedule' 是 Airflow 中 BaseSensorOperator 特有的引數,它允許 Sensor 在條件不滿足時重新排程自身。deferrable=True 是某些 Operator 用來指示任務可以稍後重試(或延遲)的約定,但它不是 Airflow 內建的引數或模式。延遲任務的實際行為取決於具體的 Operator 實現。

mode=’reschedule’

deferrable=True

持續重新排程自身,直到條件滿足

空閒時暫停執行,條件變化時恢復

資源使用較高(重複執行)

資源使用較低(空閒時暫停,釋放 worker slot)

預期條件會隨時間變化(例如檔案建立)

等待外部事件或資源(例如 API 響應)

內建的重新排程功能

需要自定義邏輯來延遲任務和處理外部變化

此內容有幫助嗎?