事件驅動排程¶
版本 3.0 中新增。
Apache Airflow 支援事件驅動排程,使得 DAG 可以根據外部事件觸發,而不是預定義的時間表。這在現代資料架構中特別有用,因為工作流需要對即時資料變化、訊息或系統訊號做出反應。
透過使用資產(assets),如 Asset 感知排程 中所述,你可以配置 DAG 在特定外部事件發生時開始執行。資產提供了一種機制來建立外部事件和 DAG 執行的依賴關係,確保工作流能夠動態地響應外部環境的變化。
AssetWatcher 類在此機制中起著關鍵作用。它監控外部事件源(例如訊息佇列),並在相關事件發生時觸發資產更新。在 Asset 定義中的 watchers 引數允許你將多個 AssetWatcher 例項與一個資產關聯,使其能夠響應各種事件源。
請參閱 common.messaging provider 文件 瞭解更多資訊和示例。
事件驅動排程支援的 trigger¶
並非 Airflow 中的所有 trigger 都可用於事件驅動排程。與繼承自 BaseTrigger 的所有 trigger 不同,只有繼承自 BaseEventTrigger 的一部分 trigger 是相容的。此限制的原因是某些 trigger 並非為事件驅動排程而設計,使用它們來排程 DAG 可能會導致意外結果。
BaseEventTrigger 確保用於排程的 trigger 遵循事件驅動正規化,適當地響應外部事件變化而不會導致意外的 DAG 行為。
編寫與事件驅動相容的 trigger¶
要使 trigger 與事件驅動排程相容,它必須繼承自 BaseEventTrigger。在這種情況下,使用 trigger 主要有三種場景:
1. 建立新的事件驅動 trigger:如果你需要針對不受支援的事件源建立新的 trigger,則應建立繼承自 BaseEventTrigger 的新類並實現其邏輯。
2. 調整現有的相容 trigger:如果現有的 trigger(繼承自 BaseTrigger)已被證明與事件驅動排程相容,那麼你只需將其基類從 BaseTrigger 更改為 BaseEventTrigger。
3. 調整現有的不相容 trigger:如果現有的 trigger 似乎與事件驅動排程不相容,那麼必須建立一個新的 trigger。這個新的 trigger 必須繼承 BaseEventTrigger 並確保它能正確地與事件驅動排程配合使用。如果兩個 trigger 共享一些公共程式碼,它也可以繼承現有的 trigger。
避免無限排程¶
某些 trigger 與事件驅動排程不相容的原因是它們正在等待外部資源達到給定狀態。例如:
等待儲存服務中檔案存在
等待作業處於成功狀態
等待資料庫中存在行
在這種條件下進行排程可能導致無限次排程。這是因為一旦條件變為真,它很可能在很長時間內保持真。
例如,考慮一個 DAG,它被排程在特定作業達到“成功”狀態時執行。一旦作業成功,它通常會保持該狀態。因此,每當 triggerer 檢查條件時,該 DAG 都會被重複觸發。
另一個例子是 S3KeyTrigger,它檢查 S3 儲存桶中是否存在特定檔案。檔案建立後,由於條件“檔案 X 是否存在於儲存桶 Y 中”保持為真,trigger 在每次檢查時都會繼續成功。這導致每次 trigger 機制執行時,DAG 都會被無限期地觸發。
建立自定義 trigger 時,請謹慎使用一旦滿足就永久保持為真的條件。這可能會無意中導致無限次的 DAG 執行並使你的系統過載。
事件驅動 DAG 的使用案例¶
資料攝入管道:當新資料到達儲存系統時觸發 ETL 工作流。
機器學習工作流:在新資料集可用時啟動模型訓練。
物聯網 (IoT) 和即時分析:即時響應感測器資料、日誌或應用程式事件。
微服務和事件驅動架構:基於服務間訊息編排工作流。