Apache Kafka Operator

ConsumeFromTopicOperator

一個用於消費一個或多個 Kafka 主題的訊息並對其進行處理的 Operator。該 Operator 建立一個 Kafka Consumer,它從叢集中讀取一批訊息,並使用使用者提供的可呼叫函式 apply_function 對其進行處理。Consumer 將持續以批次方式讀取,直到達到日誌末尾或達到最大讀取訊息數量 (max_messages) 為止。

有關引數定義,請參閱 ConsumeFromTopicOperator

使用該 Operator

tests/system/apache/kafka/example_dag_hello_kafka.py

t2 = ConsumeFromTopicOperator(
    kafka_config_id="t2",
    task_id="consume_from_topic",
    topics=["test_1"],
    apply_function="example_dag_hello_kafka.consumer_function",
    apply_function_kwargs={"prefix": "consumed:::"},
    commit_cadence="end_of_batch",
    max_messages=10,
    max_batch_size=2,
)

參考

更多資訊請參閱 Apache Kafka Consumer 文件

ProduceToTopicOperator

一個用於向 Kafka 主題生產訊息的 Operator。該 Operator 將生產由使用者提供的 producer_function 函式建立的鍵/值對訊息。

有關引數定義,請參閱 ProduceToTopicOperator

使用該 Operator

tests/system/apache/kafka/example_dag_hello_kafka.py

t1 = ProduceToTopicOperator(
    kafka_config_id="t1-3",
    task_id="produce_to_topic",
    topic="test_1",
    producer_function="example_dag_hello_kafka.producer_function",
)

參考

更多資訊請參閱 Apache Kafka Producer 文件

這篇文件有幫助嗎?