airflow.providers.apache.kafka.operators.consume¶
屬性¶
類¶
一個 operator,用於從 Kafka 主題(或多個主題)消費訊息並處理它們。 |
模組內容¶
- class airflow.providers.apache.kafka.operators.consume.ConsumeFromTopicOperator(topics, kafka_config_id='kafka_default', apply_function=None, apply_function_batch=None, apply_function_args=None, apply_function_kwargs=None, commit_cadence='end_of_operator', max_messages=None, max_batch_size=1000, poll_timeout=60, **kwargs)[原始碼]¶
基類:
airflow.models.BaseOperator一個 operator,用於從 Kafka 主題(或多個主題)消費訊息並處理它們。
此 operator 建立一個 Kafka 消費者,該消費者從叢集讀取一批訊息,並使用使用者提供的可呼叫函式處理這些訊息。消費者將繼續批次讀取,直到到達日誌末尾或達到最大訊息數為止。
- 引數:
kafka_config_id (str) – 要使用的連線物件,預設為 "kafka_default"
topics (str | collections.abc.Sequence[str]) – 消費者應訂閱的主題列表或正則表示式模式。
apply_function (Callable[Ellipsis, Any] | str | None) – 應該應用於每次獲取一個訊息的函式。字串格式為執行該函式的 dag 檔名和函式名,用 . 分隔。
apply_function_batch (Callable[Ellipsis, Any] | str | None) – 應該應用於獲取的訊息批次的函式。不能與 apply_function 一起使用。旨在用於事務性工作負載,其中可能在對訊息進行操作之前或之後呼叫耗時任務。
apply_function_args (collections.abc.Sequence[Any] | None) – 應該應用於可呼叫函式的額外位置引數,預設為 None
apply_function_kwargs (dict[Any, Any] | None) – 應該應用於可呼叫函式的額外關鍵字引數,預設為 None
commit_cadence (str | None) – 消費者何時應提交 offset ("never", "end_of_batch","end_of_operator"),預設為 "end_of_operator";如果為 end_of_operator,則根據 max_messages 引數呼叫 commit()。Operator 在處理完 operator 中的最大訊息數量(透過 apply_function 方法處理)後進行提交。如果為 end_of_batch,則根據 max_batch_size 引數呼叫 commit()。Operator 在處理完每個批次的所有訊息(透過 apply_function 方法處理)後進行提交。如果為 never,則呼叫 close() 而不呼叫 commit() 方法。
max_messages (int | None) – operator 應從 Kafka 讀取的最大總訊息數,預設為 None 表示讀取到主題末尾。
max_batch_size (int) – 消費者 polling 時應讀取的最大訊息數,預設為 1000
poll_timeout (float) – Kafka 消費者在判斷沒有更多可用訊息之前應等待的時間,預設為 60
另請參閱
有關如何使用此 operator 的更多資訊,請參閱指南:ConsumeFromTopicOperator