airflow.providers.apache.kafka.triggers.await_message

AwaitMessageTrigger

一個 Trigger,等待符合特定條件的 Kafka 訊息到達。

模組內容

class airflow.providers.apache.kafka.triggers.await_message.AwaitMessageTrigger(topics, apply_function, kafka_config_id='kafka_default', apply_function_args=None, apply_function_kwargs=None, poll_timeout=1, poll_interval=5)[source]

基類: airflow.triggers.base.BaseTrigger

一個 Trigger,等待符合特定條件的 Kafka 訊息到達。

此 Trigger 的消費者行為如下: - 輪詢 Kafka 主題以獲取訊息,如果未返回訊息則休眠 - 使用提供的可呼叫物件處理訊息並提交訊息偏移量

  • 如果可呼叫物件返回任何資料,則引發帶有返回資料的 TriggerEvent

  • 否則繼續處理下一條訊息

引數:
  • kafka_config_id (str) – 要使用的連線物件,預設為 “kafka_default”

  • topics (collections.abc.Sequence[str]) – 應該搜尋訊息的主題(或主題正則表示式)

  • apply_function (str) – 應用於訊息以確定匹配條件的函式位置。(以 Python 點符號表示為字串)

  • apply_function_args (collections.abc.Sequence[Any] | None) – 應用於可呼叫物件的一組引數,預設為 None

  • apply_function_kwargs (dict[Any, Any] | None) – 應用於可呼叫物件的一組關鍵字引數,預設為 None,預設為 None

  • poll_timeout (float) – Kafka 客戶端在從 Kafka 的 poll 請求返回之前應等待多久(秒),預設為 1

  • poll_interval (float) – Trigger 在到達 Kafka 日誌末尾後應休眠多久(秒),預設為 5

topics[source]
apply_function[source]
apply_function_args = ()[source]
apply_function_kwargs[source]
kafka_config_id = 'kafka_default'[source]
poll_timeout = 1[source]
poll_interval = 5[source]
serialize()[source]

返回重建此 Trigger 所需的資訊。

返回:

元組 (類路徑,重新例項化所需的關鍵字引數)。

返回型別:

tuple[str, dict[str, Any]]

async run()[source]

在非同步上下文中執行 Trigger。

Trigger 應在需要觸發事件時 yield 一個 Event,並在完成時返回 None。因此,單事件 Trigger 應該 yield 後立即返回。

如果它 yield,很可能會很快恢復,但也可能不會(例如,如果工作負載正在轉移到另一個 triggerer 程序,或者多事件 trigger 用於單事件任務延遲)。

在任何情況下,Trigger 類都應假定它們會被持久化,並在不再需要時依賴 cleanup() 方法被呼叫。

本條目是否有幫助?