Apache Kafka 感測器

AwaitMessageSensor

一個感測器,它會推遲執行,直到特定的訊息被髮布到 Kafka 主題。該感測器將建立一個消費者來讀取 Kafka 主題中的訊息,直到找到滿足 apply_function 引數中定義的條件的訊息。如果 apply_function 返回任何資料,則會引發一個 TriggerEvent,並且 AwaitMessageSensor 成功完成。

有關引數定義,請參考 AwaitMessageSensor

使用感測器

tests/system/apache/kafka/example_dag_hello_kafka.py

t5 = AwaitMessageSensor(
    kafka_config_id="t5",
    task_id="awaiting_message",
    topics=["test_1"],
    apply_function="example_dag_hello_kafka.await_function",
    xcom_push_key="retrieved_message",
)

參考

更多資訊,請參閱 Apache Kafka Consumer 文件

AwaitMessageTriggerFunctionSensor

與上面的 AwaitMessageSensor 類似,該感測器將推遲執行,直到從 Kafka 主題中消費到滿足其 apply_function 條件的訊息。一旦遇到一個積極事件,AwaitMessageTriggerFunctionSensor 將觸發提供給 event_triggered_function 的可呼叫物件。之後,感測器將再次推遲,繼續消費訊息。

有關引數定義,請參考 AwaitMessageTriggerFunctionSensor

使用感測器

tests/system/apache/kafka/example_dag_event_listener.py

listen_for_message = AwaitMessageTriggerFunctionSensor(
    kafka_config_id="fizz_buzz_2",
    task_id="listen_for_message",
    topics=["fizz_buzz"],
    apply_function="example_dag_event_listener.await_function",
    event_triggered_function=wait_for_event,
)

參考

更多資訊,請參閱 Apache Kafka Consumer 文件

本條目有幫助嗎?