Google Cloud 代管式 Apache Kafka 運算元¶
Google Cloud 代管式 Apache Kafka 可幫助您設定、保護、維護和擴充套件 Apache Kafka 叢集。
與 Apache Kafka 叢集互動¶
要建立 Apache Kafka 叢集,您可以使用 ManagedKafkaCreateClusterOperator。
tests/system/google/cloud/managed_kafka/example_managed_kafka_cluster.py
create_cluster = ManagedKafkaCreateClusterOperator(
task_id="create_cluster",
project_id=PROJECT_ID,
location=LOCATION,
cluster=CLUSTER_CONF,
cluster_id=CLUSTER_ID,
)
要刪除叢集,您可以使用 ManagedKafkaDeleteClusterOperator。
tests/system/google/cloud/managed_kafka/example_managed_kafka_cluster.py
delete_cluster = ManagedKafkaDeleteClusterOperator(
task_id="delete_cluster",
project_id=PROJECT_ID,
location=LOCATION,
cluster_id=CLUSTER_ID,
trigger_rule=TriggerRule.ALL_DONE,
)
要獲取叢集,您可以使用 ManagedKafkaGetClusterOperator。
tests/system/google/cloud/managed_kafka/example_managed_kafka_cluster.py
get_cluster = ManagedKafkaGetClusterOperator(
task_id="get_cluster",
project_id=PROJECT_ID,
location=LOCATION,
cluster_id=CLUSTER_ID,
)
要獲取叢集列表,您可以使用 ManagedKafkaListClustersOperator。
tests/system/google/cloud/managed_kafka/example_managed_kafka_cluster.py
list_clusters = ManagedKafkaListClustersOperator(
task_id="list_clusters",
project_id=PROJECT_ID,
location=LOCATION,
)
要更新叢集,您可以使用 ManagedKafkaUpdateClusterOperator。
tests/system/google/cloud/managed_kafka/example_managed_kafka_cluster.py
update_cluster = ManagedKafkaUpdateClusterOperator(
task_id="update_cluster",
project_id=PROJECT_ID,
location=LOCATION,
cluster_id=CLUSTER_ID,
cluster=CLUSTER_TO_UPDATE,
update_mask=CLUSTER_UPDATE_MASK,
)
與 Apache Kafka 主題互動¶
要建立 Apache Kafka 主題,您可以使用 ManagedKafkaCreateTopicOperator。
tests/system/google/cloud/managed_kafka/example_managed_kafka_topic.py
create_topic = ManagedKafkaCreateTopicOperator(
task_id="create_topic",
project_id=PROJECT_ID,
location=LOCATION,
cluster_id=CLUSTER_ID,
topic_id=TOPIC_ID,
topic=TOPIC_CONF,
)
要刪除主題,您可以使用 ManagedKafkaDeleteTopicOperator。
tests/system/google/cloud/managed_kafka/example_managed_kafka_topic.py
delete_topic = ManagedKafkaDeleteTopicOperator(
task_id="delete_topic",
project_id=PROJECT_ID,
location=LOCATION,
cluster_id=CLUSTER_ID,
topic_id=TOPIC_ID,
trigger_rule=TriggerRule.ALL_DONE,
)
要獲取主題,您可以使用 ManagedKafkaGetTopicOperator。
tests/system/google/cloud/managed_kafka/example_managed_kafka_topic.py
get_topic = ManagedKafkaGetTopicOperator(
task_id="get_topic",
project_id=PROJECT_ID,
location=LOCATION,
cluster_id=CLUSTER_ID,
topic_id=TOPIC_ID,
)
要獲取主題列表,您可以使用 ManagedKafkaListTopicsOperator。
tests/system/google/cloud/managed_kafka/example_managed_kafka_topic.py
list_topics = ManagedKafkaListTopicsOperator(
task_id="list_topics",
project_id=PROJECT_ID,
location=LOCATION,
cluster_id=CLUSTER_ID,
)
要更新主題,您可以使用 ManagedKafkaUpdateTopicOperator。
tests/system/google/cloud/managed_kafka/example_managed_kafka_topic.py
update_topic = ManagedKafkaUpdateTopicOperator(
task_id="update_topic",
project_id=PROJECT_ID,
location=LOCATION,
cluster_id=CLUSTER_ID,
topic_id=TOPIC_ID,
topic=TOPIC_TO_UPDATE,
update_mask=TOPIC_UPDATE_MASK,
)
與 Apache Kafka 消費者組互動¶
要刪除消費者組,您可以使用 ManagedKafkaDeleteConsumerGroupOperator。
tests/system/google/cloud/managed_kafka/example_managed_kafka_consumer_group.py
delete_consumer_group = ManagedKafkaDeleteConsumerGroupOperator(
task_id="delete_consumer_group",
project_id=PROJECT_ID,
location=LOCATION,
cluster_id=CLUSTER_ID,
consumer_group_id=CONSUMER_GROUP_ID,
trigger_rule=TriggerRule.ALL_DONE,
)
要獲取消費者組,您可以使用 ManagedKafkaGetConsumerGroupOperator。
tests/system/google/cloud/managed_kafka/example_managed_kafka_consumer_group.py
get_consumer_group = ManagedKafkaGetConsumerGroupOperator(
task_id="get_consumer_group",
project_id=PROJECT_ID,
location=LOCATION,
cluster_id=CLUSTER_ID,
consumer_group_id=CONSUMER_GROUP_ID,
)
要獲取消費者組列表,您可以使用 ManagedKafkaListConsumerGroupsOperator。
tests/system/google/cloud/managed_kafka/example_managed_kafka_consumer_group.py
list_consumer_groups = ManagedKafkaListConsumerGroupsOperator(
task_id="list_consumer_groups",
project_id=PROJECT_ID,
location=LOCATION,
cluster_id=CLUSTER_ID,
)
要更新消費者組,您可以使用 ManagedKafkaUpdateConsumerGroupOperator。
tests/system/google/cloud/managed_kafka/example_managed_kafka_consumer_group.py
update_consumer_group = ManagedKafkaUpdateConsumerGroupOperator(
task_id="update_consumer_group",
project_id=PROJECT_ID,
location=LOCATION,
cluster_id=CLUSTER_ID,
consumer_group_id=CONSUMER_GROUP_ID,
consumer_group={
"topics": {},
},
update_mask={"paths": ["topics"]},
)
將 Apache Kafka Provider 與 Google Cloud 代管式 Apache Kafka 一起使用¶
要向主題生產資料,您可以使用 ProduceToTopicOperator。
tests/system/google/cloud/managed_kafka/example_managed_kafka_consumer_group.py
produce_to_topic = ProduceToTopicOperator(
task_id="produce_to_topic",
kafka_config_id=CONNECTION_ID,
topic=TOPIC_ID,
producer_function=producer,
poll_timeout=10,
)
要從主題消費資料,您可以使用 ConsumeFromTopicOperator。
tests/system/google/cloud/managed_kafka/example_managed_kafka_consumer_group.py
consume_from_topic = ConsumeFromTopicOperator(
task_id="consume_from_topic",
kafka_config_id=CONNECTION_ID,
topics=[TOPIC_ID],
apply_function=consumer,
poll_timeout=20,
max_messages=20,
max_batch_size=20,
)
參考資料¶
更多資訊,請參閱