Google Cloud PubSub Operator(運算子)

Google Cloud PubSub 是一項全託管的即時訊息服務,允許您在獨立應用程式之間傳送和接收訊息。您可以利用 Cloud Pub/Sub 的靈活性來解耦託管在 Google Cloud 或網際網路其他地方的系統和元件。

釋出者應用程式可以將訊息傳送到某個主題,而其他應用程式可以訂閱該主題來接收訊息。透過解耦傳送方和接收方,Google Cloud PubSub 允許開發者在獨立編寫的應用程式之間進行通訊。

先決條件任務

要使用這些 Operator(運算子),您必須執行以下幾項操作

建立 PubSub 主題

PubSub 主題是釋出者傳送訊息的命名資源。PubSubCreateTopicOperator operator(運算子)用於建立主題。

tests/system/google/cloud/pubsub/example_pubsub.py

    create_topic = PubSubCreateTopicOperator(
        task_id="create_topic", topic=TOPIC_ID, project_id=PROJECT_ID, fail_if_exists=False
    )

建立 PubSub 訂閱

訂閱(Subscription)是一個命名資源,表示來自單個特定主題的訊息流,將傳遞給訂閱應用程式。PubSubCreateSubscriptionOperator operator(運算子)用於建立訂閱。

tests/system/google/cloud/pubsub/example_pubsub.py

    subscribe_task = PubSubCreateSubscriptionOperator(
        task_id="subscribe_task", project_id=PROJECT_ID, topic=TOPIC_ID
    )

釋出 PubSub 訊息

訊息(Message)是釋出者傳送到主題的資料和(可選)屬性的組合,最終會傳遞給訂閱者。PubSubPublishMessageOperator operator(運算子)用於釋出訊息。

tests/system/google/cloud/pubsub/example_pubsub.py

    publish_task = PubSubPublishMessageOperator(
        task_id="publish_task",
        project_id=PROJECT_ID,
        topic=TOPIC_ID,
        messages=[MESSAGE, MESSAGE],
    )

從 PubSub 訂閱拉取(Pull)訊息

PubSubPullSensor sensor(感測器)從 PubSub 訂閱中拉取訊息,並透過 XCom 傳遞它們。

tests/system/google/cloud/pubsub/example_pubsub.py

    subscription = subscribe_task.output

    pull_messages = PubSubPullSensor(
        task_id="pull_messages",
        ack_messages=True,
        project_id=PROJECT_ID,
        subscription=subscription,
    )

此外,對於此操作,您可以在可推遲模式(deferrable mode)下使用 sensor(感測器)。

tests/system/google/cloud/pubsub/example_pubsub_deferrable.py

pull_messages_async = PubSubPullSensor(
    task_id="pull_messages_async",
    ack_messages=True,
    project_id=PROJECT_ID,
    subscription=subscription,
    deferrable=True,
)

tests/system/google/cloud/pubsub/example_pubsub.py


    pull_messages_operator = PubSubPullOperator(
        task_id="pull_messages_operator",
        ack_messages=True,
        project_id=PROJECT_ID,
        subscription=subscription,
    )

要從 XCom 拉取訊息,請使用 BashOperator

tests/system/google/cloud/pubsub/example_pubsub.py

echo_cmd = """
{% for m in task_instance.xcom_pull('pull_messages') %}
    echo "AckID: {{ m.get('ackId') }}, Base64-Encoded: {{ m.get('message') }}"
{% endfor %}
"""

tests/system/google/cloud/pubsub/example_pubsub.py

    pull_messages_result = BashOperator(task_id="pull_messages_result", bash_command=echo_cmd)

刪除 PubSub 訂閱

PubSubDeleteSubscriptionOperator operator(運算子)用於刪除訂閱。

tests/system/google/cloud/pubsub/example_pubsub.py

    unsubscribe_task = PubSubDeleteSubscriptionOperator(
        task_id="unsubscribe_task",
        project_id=PROJECT_ID,
        subscription=subscription,
    )

刪除 PubSub 主題

PubSubDeleteTopicOperator operator(運算子)用於刪除主題。

tests/system/google/cloud/pubsub/example_pubsub.py

    delete_topic = PubSubDeleteTopicOperator(task_id="delete_topic", topic=TOPIC_ID, project_id=PROJECT_ID)

參考資料

更多資訊,請參閱

此條目有幫助嗎?