Apache Kafka Operator¶
ConsumeFromTopicOperator¶
一個用於消費一個或多個 Kafka 主題的訊息並對其進行處理的 Operator。該 Operator 建立一個 Kafka Consumer,它從叢集中讀取一批訊息,並使用使用者提供的可呼叫函式 apply_function 對其進行處理。Consumer 將持續以批次方式讀取,直到達到日誌末尾或達到最大讀取訊息數量 (max_messages) 為止。
有關引數定義,請參閱 ConsumeFromTopicOperator。
使用該 Operator¶
tests/system/apache/kafka/example_dag_hello_kafka.py
t2 = ConsumeFromTopicOperator(
kafka_config_id="t2",
task_id="consume_from_topic",
topics=["test_1"],
apply_function="example_dag_hello_kafka.consumer_function",
apply_function_kwargs={"prefix": "consumed:::"},
commit_cadence="end_of_batch",
max_messages=10,
max_batch_size=2,
)
參考¶
更多資訊請參閱 Apache Kafka Consumer 文件。
ProduceToTopicOperator¶
一個用於向 Kafka 主題生產訊息的 Operator。該 Operator 將生產由使用者提供的 producer_function 函式建立的鍵/值對訊息。
有關引數定義,請參閱 ProduceToTopicOperator。
使用該 Operator¶
tests/system/apache/kafka/example_dag_hello_kafka.py
t1 = ProduceToTopicOperator(
kafka_config_id="t1-3",
task_id="produce_to_topic",
topic="test_1",
producer_function="example_dag_hello_kafka.producer_function",
)
參考¶
更多資訊請參閱 Apache Kafka Producer 文件。