2025 年 Airflow 峰會將於 10 月 07-09 日舉行。立即註冊以獲取早鳥票!

airflow.triggers.base

屬性

log

DiscrimatedTriggerEvent

StartTriggerArgs

從觸發器啟動任務執行所需的引數。

BaseTrigger

所有觸發器的基類。

BaseEventTrigger

用於根據外部事件排程 DAG 的觸發器的基類。

TriggerEvent

當條件滿足時,觸發器可以觸發的事件。

TaskSuccessEvent

生成此事件以使任務成功結束。

TaskFailedEvent

生成此事件以使任務失敗結束。

TaskSkippedEvent

生成此事件以使任務以 '跳過' 狀態結束。

函式

trigger_event_discriminator(v)

模組內容

airflow.triggers.base.log[source]
class airflow.triggers.base.StartTriggerArgs[source]

從觸發器啟動任務執行所需的引數。

trigger_cls: str[source]
next_method: str[source]
trigger_kwargs: dict[str, Any] | None = None[source]
next_kwargs: dict[str, Any] | None = None[source]
timeout: datetime.timedelta | None = None[source]
class airflow.triggers.base.BaseTrigger(**kwargs)[source]

基類: abc.ABC, airflow.utils.log.logging_mixin.LoggingMixin

所有觸發器的基類。

觸發器可以存在於兩種上下文中

  • 在 Operator 內部,當它被傳遞給 TaskDeferred 時

  • 在觸發器工作程序中主動執行

我們在兩種情況下使用相同的類,並依賴所有 Trigger 類能夠返回(可以使用 Airflow-JSON 編碼的)引數,這些引數將允許它們在其他地方被重新例項化。

task_instance = None[source]
trigger_id = None[source]
abstract serialize()[source]

返回重新構造此 Trigger 所需的資訊。

返回:

元組 (類路徑, 重新例項化所需的關鍵字引數)。

返回型別:

tuple[str, dict[str, Any]]

abstract run()[source]
非同步:

在非同步上下文中執行觸發器。

觸發器應該在需要觸發事件時 yield 一個 Event,並在完成時返回 None。單事件觸發器應該 yield 然後立即返回。

如果它 yield,它很可能會很快恢復,但也有可能不會(例如,如果工作負載被移動到另一個觸發器程序,或者多事件觸發器被用於單事件任務延遲)。

在任何一種情況下,Trigger 類都應假定它們會被持久化,然後在不再需要時依賴於 cleanup() 方法被呼叫。

async cleanup()[source]

清理觸發器。

當不再需要觸發器並將其從活動觸發器程序中移除時呼叫此方法。

此方法遵循 async/await 模式,允許在觸發器主事件迴圈中執行清理。清理方法引發的異常會被忽略,因此如果您想除錯它們並被通知清理方法失敗,您應該用 try/except 塊包裹您的程式碼並適當處理(以非同步相容的方式)。

static repr(classpath, kwargs)[source]
__repr__()[source]
class airflow.triggers.base.BaseEventTrigger(**kwargs)[source]

基類: BaseTrigger

用於根據外部事件排程 DAG 的觸發器的基類。

BaseEventTriggerBaseTrigger 的子類,用於標識與事件驅動排程相容的觸發器。

static hash(classpath, kwargs)[source]

返回觸發器類路徑和 kwargs 的雜湊值。這用於唯一標識一個觸發器。

我們不希望將此邏輯放在 BaseTrigger 中,因為在用於延遲任務時,兩個觸發器可以具有相同的類路徑和 kwargs。這對於事件驅動排程來說是不正確的。

class airflow.triggers.base.TriggerEvent(payload, **kwargs)[source]

基類: pydantic.BaseModel

當條件滿足時,觸發器可以觸發的事件。

事件必須具有一個唯一標識值,該值在無論觸發器在何處執行都應相同;這是為了確保如果同一觸發器在兩個位置執行(出於高可用性原因),我們可以對其事件進行去重。

payload: Any = None[source]

要傳送回任務的事件負載。

必須是原生 JSON 可序列化的,或在 airflow 序列化程式碼中註冊。

__repr__()[source]
class airflow.triggers.base.TaskSuccessEvent(*, xcoms=None, **kwargs)[source]

基類: BaseTaskEndEvent

生成此事件以使任務成功結束。

task_instance_state: airflow.utils.state.TaskInstanceState[source]
class airflow.triggers.base.TaskFailedEvent(*, xcoms=None, **kwargs)[source]

基類: BaseTaskEndEvent

生成此事件以使任務失敗結束。

task_instance_state: airflow.utils.state.TaskInstanceState[source]
class airflow.triggers.base.TaskSkippedEvent(*, xcoms=None, **kwargs)[source]

基類: BaseTaskEndEvent

生成此事件以使任務以 '跳過' 狀態結束。

task_instance_state: airflow.utils.state.TaskInstanceState[source]
airflow.triggers.base.trigger_event_discriminator(v)[source]
airflow.triggers.base.DiscrimatedTriggerEvent[source]

此條目有幫助嗎?