Google Cloud 代管式 Apache Kafka 運算元

Google Cloud 代管式 Apache Kafka 可幫助您設定、保護、維護和擴充套件 Apache Kafka 叢集。

與 Apache Kafka 叢集互動

要建立 Apache Kafka 叢集,您可以使用 ManagedKafkaCreateClusterOperator

tests/system/google/cloud/managed_kafka/example_managed_kafka_cluster.py

create_cluster = ManagedKafkaCreateClusterOperator(
    task_id="create_cluster",
    project_id=PROJECT_ID,
    location=LOCATION,
    cluster=CLUSTER_CONF,
    cluster_id=CLUSTER_ID,
)

要刪除叢集,您可以使用 ManagedKafkaDeleteClusterOperator

tests/system/google/cloud/managed_kafka/example_managed_kafka_cluster.py

delete_cluster = ManagedKafkaDeleteClusterOperator(
    task_id="delete_cluster",
    project_id=PROJECT_ID,
    location=LOCATION,
    cluster_id=CLUSTER_ID,
    trigger_rule=TriggerRule.ALL_DONE,
)

要獲取叢集,您可以使用 ManagedKafkaGetClusterOperator

tests/system/google/cloud/managed_kafka/example_managed_kafka_cluster.py

get_cluster = ManagedKafkaGetClusterOperator(
    task_id="get_cluster",
    project_id=PROJECT_ID,
    location=LOCATION,
    cluster_id=CLUSTER_ID,
)

要獲取叢集列表,您可以使用 ManagedKafkaListClustersOperator

tests/system/google/cloud/managed_kafka/example_managed_kafka_cluster.py

list_clusters = ManagedKafkaListClustersOperator(
    task_id="list_clusters",
    project_id=PROJECT_ID,
    location=LOCATION,
)

要更新叢集,您可以使用 ManagedKafkaUpdateClusterOperator

tests/system/google/cloud/managed_kafka/example_managed_kafka_cluster.py

update_cluster = ManagedKafkaUpdateClusterOperator(
    task_id="update_cluster",
    project_id=PROJECT_ID,
    location=LOCATION,
    cluster_id=CLUSTER_ID,
    cluster=CLUSTER_TO_UPDATE,
    update_mask=CLUSTER_UPDATE_MASK,
)

與 Apache Kafka 主題互動

要建立 Apache Kafka 主題,您可以使用 ManagedKafkaCreateTopicOperator

tests/system/google/cloud/managed_kafka/example_managed_kafka_topic.py

create_topic = ManagedKafkaCreateTopicOperator(
    task_id="create_topic",
    project_id=PROJECT_ID,
    location=LOCATION,
    cluster_id=CLUSTER_ID,
    topic_id=TOPIC_ID,
    topic=TOPIC_CONF,
)

要刪除主題,您可以使用 ManagedKafkaDeleteTopicOperator

tests/system/google/cloud/managed_kafka/example_managed_kafka_topic.py

delete_topic = ManagedKafkaDeleteTopicOperator(
    task_id="delete_topic",
    project_id=PROJECT_ID,
    location=LOCATION,
    cluster_id=CLUSTER_ID,
    topic_id=TOPIC_ID,
    trigger_rule=TriggerRule.ALL_DONE,
)

要獲取主題,您可以使用 ManagedKafkaGetTopicOperator

tests/system/google/cloud/managed_kafka/example_managed_kafka_topic.py

get_topic = ManagedKafkaGetTopicOperator(
    task_id="get_topic",
    project_id=PROJECT_ID,
    location=LOCATION,
    cluster_id=CLUSTER_ID,
    topic_id=TOPIC_ID,
)

要獲取主題列表,您可以使用 ManagedKafkaListTopicsOperator

tests/system/google/cloud/managed_kafka/example_managed_kafka_topic.py

list_topics = ManagedKafkaListTopicsOperator(
    task_id="list_topics",
    project_id=PROJECT_ID,
    location=LOCATION,
    cluster_id=CLUSTER_ID,
)

要更新主題,您可以使用 ManagedKafkaUpdateTopicOperator

tests/system/google/cloud/managed_kafka/example_managed_kafka_topic.py

update_topic = ManagedKafkaUpdateTopicOperator(
    task_id="update_topic",
    project_id=PROJECT_ID,
    location=LOCATION,
    cluster_id=CLUSTER_ID,
    topic_id=TOPIC_ID,
    topic=TOPIC_TO_UPDATE,
    update_mask=TOPIC_UPDATE_MASK,
)

與 Apache Kafka 消費者組互動

要刪除消費者組,您可以使用 ManagedKafkaDeleteConsumerGroupOperator

tests/system/google/cloud/managed_kafka/example_managed_kafka_consumer_group.py

delete_consumer_group = ManagedKafkaDeleteConsumerGroupOperator(
    task_id="delete_consumer_group",
    project_id=PROJECT_ID,
    location=LOCATION,
    cluster_id=CLUSTER_ID,
    consumer_group_id=CONSUMER_GROUP_ID,
    trigger_rule=TriggerRule.ALL_DONE,
)

要獲取消費者組,您可以使用 ManagedKafkaGetConsumerGroupOperator

tests/system/google/cloud/managed_kafka/example_managed_kafka_consumer_group.py

get_consumer_group = ManagedKafkaGetConsumerGroupOperator(
    task_id="get_consumer_group",
    project_id=PROJECT_ID,
    location=LOCATION,
    cluster_id=CLUSTER_ID,
    consumer_group_id=CONSUMER_GROUP_ID,
)

要獲取消費者組列表,您可以使用 ManagedKafkaListConsumerGroupsOperator

tests/system/google/cloud/managed_kafka/example_managed_kafka_consumer_group.py

list_consumer_groups = ManagedKafkaListConsumerGroupsOperator(
    task_id="list_consumer_groups",
    project_id=PROJECT_ID,
    location=LOCATION,
    cluster_id=CLUSTER_ID,
)

要更新消費者組,您可以使用 ManagedKafkaUpdateConsumerGroupOperator

tests/system/google/cloud/managed_kafka/example_managed_kafka_consumer_group.py

update_consumer_group = ManagedKafkaUpdateConsumerGroupOperator(
    task_id="update_consumer_group",
    project_id=PROJECT_ID,
    location=LOCATION,
    cluster_id=CLUSTER_ID,
    consumer_group_id=CONSUMER_GROUP_ID,
    consumer_group={
        "topics": {},
    },
    update_mask={"paths": ["topics"]},
)

將 Apache Kafka Provider 與 Google Cloud 代管式 Apache Kafka 一起使用

要向主題生產資料,您可以使用 ProduceToTopicOperator

tests/system/google/cloud/managed_kafka/example_managed_kafka_consumer_group.py

produce_to_topic = ProduceToTopicOperator(
    task_id="produce_to_topic",
    kafka_config_id=CONNECTION_ID,
    topic=TOPIC_ID,
    producer_function=producer,
    poll_timeout=10,
)

要從主題消費資料,您可以使用 ConsumeFromTopicOperator

tests/system/google/cloud/managed_kafka/example_managed_kafka_consumer_group.py

consume_from_topic = ConsumeFromTopicOperator(
    task_id="consume_from_topic",
    kafka_config_id=CONNECTION_ID,
    topics=[TOPIC_ID],
    apply_function=consumer,
    poll_timeout=20,
    max_messages=20,
    max_batch_size=20,
)

參考資料

更多資訊,請參閱

此條目有幫助嗎?