airflow.providers.apache.kafka.operators.consume

屬性

VALID_COMMIT_CADENCE

ConsumeFromTopicOperator

一個 operator,用於從 Kafka 主題(或多個主題)消費訊息並處理它們。

模組內容

airflow.providers.apache.kafka.operators.consume.VALID_COMMIT_CADENCE[原始碼]
class airflow.providers.apache.kafka.operators.consume.ConsumeFromTopicOperator(topics, kafka_config_id='kafka_default', apply_function=None, apply_function_batch=None, apply_function_args=None, apply_function_kwargs=None, commit_cadence='end_of_operator', max_messages=None, max_batch_size=1000, poll_timeout=60, **kwargs)[原始碼]

基類: airflow.models.BaseOperator

一個 operator,用於從 Kafka 主題(或多個主題)消費訊息並處理它們。

此 operator 建立一個 Kafka 消費者,該消費者從叢集讀取一批訊息,並使用使用者提供的可呼叫函式處理這些訊息。消費者將繼續批次讀取,直到到達日誌末尾或達到最大訊息數為止。

引數:
  • kafka_config_id (str) – 要使用的連線物件,預設為 "kafka_default"

  • topics (str | collections.abc.Sequence[str]) – 消費者應訂閱的主題列表或正則表示式模式。

  • apply_function (Callable[Ellipsis, Any] | str | None) – 應該應用於每次獲取一個訊息的函式。字串格式為執行該函式的 dag 檔名和函式名,用 . 分隔。

  • apply_function_batch (Callable[Ellipsis, Any] | str | None) – 應該應用於獲取的訊息批次的函式。不能與 apply_function 一起使用。旨在用於事務性工作負載,其中可能在對訊息進行操作之前或之後呼叫耗時任務。

  • apply_function_args (collections.abc.Sequence[Any] | None) – 應該應用於可呼叫函式的額外位置引數,預設為 None

  • apply_function_kwargs (dict[Any, Any] | None) – 應該應用於可呼叫函式的額外關鍵字引數,預設為 None

  • commit_cadence (str | None) – 消費者何時應提交 offset ("never", "end_of_batch","end_of_operator"),預設為 "end_of_operator";如果為 end_of_operator,則根據 max_messages 引數呼叫 commit()。Operator 在處理完 operator 中的最大訊息數量(透過 apply_function 方法處理)後進行提交。如果為 end_of_batch,則根據 max_batch_size 引數呼叫 commit()。Operator 在處理完每個批次的所有訊息(透過 apply_function 方法處理)後進行提交。如果為 never,則呼叫 close() 而不呼叫 commit() 方法。

  • max_messages (int | None) – operator 應從 Kafka 讀取的最大總訊息數,預設為 None 表示讀取到主題末尾。

  • max_batch_size (int) – 消費者 polling 時應讀取的最大訊息數,預設為 1000

  • poll_timeout (float) – Kafka 消費者在判斷沒有更多可用訊息之前應等待的時間,預設為 60

另請參閱

有關如何使用此 operator 的更多資訊,請參閱指南:ConsumeFromTopicOperator

BLUE = '#ffefeb'[原始碼]
ui_color = '#ffefeb'[原始碼]
template_fields = ('topics', 'apply_function_args', 'apply_function_kwargs', 'kafka_config_id')[原始碼]
topics[原始碼]
apply_function = None[原始碼]
apply_function_batch = None[原始碼]
apply_function_args = ()[原始碼]
apply_function_kwargs[原始碼]
kafka_config_id = 'kafka_default'[原始碼]
commit_cadence = 'end_of_operator'[原始碼]
max_messages = True[原始碼]
max_batch_size = 1000[原始碼]
poll_timeout = 60[原始碼]
execute(context)[原始碼]

建立 operator 時需要派生此方法。

context 是用於渲染 jinja 模板的同一個字典。

請參考 get_template_context 獲取更多 context 資訊。

此條目是否有幫助?