Google Cloud PubSub Operator(運算子)¶
Google Cloud PubSub 是一項全託管的即時訊息服務,允許您在獨立應用程式之間傳送和接收訊息。您可以利用 Cloud Pub/Sub 的靈活性來解耦託管在 Google Cloud 或網際網路其他地方的系統和元件。
釋出者應用程式可以將訊息傳送到某個主題,而其他應用程式可以訂閱該主題來接收訊息。透過解耦傳送方和接收方,Google Cloud PubSub 允許開發者在獨立編寫的應用程式之間進行通訊。
先決條件任務¶
要使用這些 Operator(運算子),您必須執行以下幾項操作
使用 Cloud Console 選擇或建立一個 Cloud Platform 專案。
為您的專案啟用結算功能,詳情請參閱 Google Cloud 文件。
啟用 API,詳情請參閱 Cloud Console 文件。
透過 pip 安裝 API 庫。
pip install 'apache-airflow[google]'有關 安裝 的詳細資訊。
建立 PubSub 主題¶
PubSub 主題是釋出者傳送訊息的命名資源。PubSubCreateTopicOperator operator(運算子)用於建立主題。
tests/system/google/cloud/pubsub/example_pubsub.py
create_topic = PubSubCreateTopicOperator(
task_id="create_topic", topic=TOPIC_ID, project_id=PROJECT_ID, fail_if_exists=False
)
建立 PubSub 訂閱¶
訂閱(Subscription)是一個命名資源,表示來自單個特定主題的訊息流,將傳遞給訂閱應用程式。PubSubCreateSubscriptionOperator operator(運算子)用於建立訂閱。
tests/system/google/cloud/pubsub/example_pubsub.py
subscribe_task = PubSubCreateSubscriptionOperator(
task_id="subscribe_task", project_id=PROJECT_ID, topic=TOPIC_ID
)
釋出 PubSub 訊息¶
訊息(Message)是釋出者傳送到主題的資料和(可選)屬性的組合,最終會傳遞給訂閱者。PubSubPublishMessageOperator operator(運算子)用於釋出訊息。
tests/system/google/cloud/pubsub/example_pubsub.py
publish_task = PubSubPublishMessageOperator(
task_id="publish_task",
project_id=PROJECT_ID,
topic=TOPIC_ID,
messages=[MESSAGE, MESSAGE],
)
從 PubSub 訂閱拉取(Pull)訊息¶
PubSubPullSensor sensor(感測器)從 PubSub 訂閱中拉取訊息,並透過 XCom 傳遞它們。
tests/system/google/cloud/pubsub/example_pubsub.py
subscription = subscribe_task.output
pull_messages = PubSubPullSensor(
task_id="pull_messages",
ack_messages=True,
project_id=PROJECT_ID,
subscription=subscription,
)
此外,對於此操作,您可以在可推遲模式(deferrable mode)下使用 sensor(感測器)。
tests/system/google/cloud/pubsub/example_pubsub_deferrable.py
pull_messages_async = PubSubPullSensor(
task_id="pull_messages_async",
ack_messages=True,
project_id=PROJECT_ID,
subscription=subscription,
deferrable=True,
)
tests/system/google/cloud/pubsub/example_pubsub.py
pull_messages_operator = PubSubPullOperator(
task_id="pull_messages_operator",
ack_messages=True,
project_id=PROJECT_ID,
subscription=subscription,
)
要從 XCom 拉取訊息,請使用 BashOperator。
tests/system/google/cloud/pubsub/example_pubsub.py
echo_cmd = """
{% for m in task_instance.xcom_pull('pull_messages') %}
echo "AckID: {{ m.get('ackId') }}, Base64-Encoded: {{ m.get('message') }}"
{% endfor %}
"""
tests/system/google/cloud/pubsub/example_pubsub.py
pull_messages_result = BashOperator(task_id="pull_messages_result", bash_command=echo_cmd)
刪除 PubSub 訂閱¶
PubSubDeleteSubscriptionOperator operator(運算子)用於刪除訂閱。
tests/system/google/cloud/pubsub/example_pubsub.py
unsubscribe_task = PubSubDeleteSubscriptionOperator(
task_id="unsubscribe_task",
project_id=PROJECT_ID,
subscription=subscription,
)
刪除 PubSub 主題¶
PubSubDeleteTopicOperator operator(運算子)用於刪除主題。
tests/system/google/cloud/pubsub/example_pubsub.py
delete_topic = PubSubDeleteTopicOperator(task_id="delete_topic", topic=TOPIC_ID, project_id=PROJECT_ID)
參考資料¶
更多資訊,請參閱