Apache Kafka 感測器¶
AwaitMessageSensor¶
一個感測器,它會推遲執行,直到特定的訊息被髮布到 Kafka 主題。該感測器將建立一個消費者來讀取 Kafka 主題中的訊息,直到找到滿足 apply_function 引數中定義的條件的訊息。如果 apply_function 返回任何資料,則會引發一個 TriggerEvent,並且 AwaitMessageSensor 成功完成。
有關引數定義,請參考 AwaitMessageSensor。
使用感測器¶
tests/system/apache/kafka/example_dag_hello_kafka.py
t5 = AwaitMessageSensor(
kafka_config_id="t5",
task_id="awaiting_message",
topics=["test_1"],
apply_function="example_dag_hello_kafka.await_function",
xcom_push_key="retrieved_message",
)
參考¶
更多資訊,請參閱 Apache Kafka Consumer 文件。
AwaitMessageTriggerFunctionSensor¶
與上面的 AwaitMessageSensor 類似,該感測器將推遲執行,直到從 Kafka 主題中消費到滿足其 apply_function 條件的訊息。一旦遇到一個積極事件,AwaitMessageTriggerFunctionSensor 將觸發提供給 event_triggered_function 的可呼叫物件。之後,感測器將再次推遲,繼續消費訊息。
有關引數定義,請參考 AwaitMessageTriggerFunctionSensor。
使用感測器¶
tests/system/apache/kafka/example_dag_event_listener.py
listen_for_message = AwaitMessageTriggerFunctionSensor(
kafka_config_id="fizz_buzz_2",
task_id="listen_for_message",
topics=["fizz_buzz"],
apply_function="example_dag_event_listener.await_function",
event_triggered_function=wait_for_event,
)
參考¶
更多資訊,請參閱 Apache Kafka Consumer 文件。