Google Cloud Stackdriver 運算子

先決條件任務

要使用這些運算子,您必須完成以下幾項操作:

StackdriverListAlertPoliciesOperator

使用 StackdriverListAlertPoliciesOperator 來獲取由給定過濾器標識的所有提醒政策。

使用運算子

您可以使用此運算子(帶或不帶專案 ID)來獲取所有提醒政策。如果缺少專案 ID,則將從使用的 Google Cloud 連線中檢索。

tests/system/google/cloud/stackdriver/example_stackdriver.py

list_alert_policies = StackdriverListAlertPoliciesOperator(
    task_id="list-alert-policies",
)

StackdriverEnableAlertPoliciesOperator

使用 StackdriverEnableAlertPoliciesOperator 來啟用由給定過濾器標識的提醒政策。

使用運算子

您可以使用此運算子(帶或不帶專案 ID)來獲取所有提醒政策。如果缺少專案 ID,則將從使用的 Google Cloud 連線中檢索。

tests/system/google/cloud/stackdriver/example_stackdriver.py

enable_alert_policy = StackdriverEnableAlertPoliciesOperator(
    task_id="enable-alert-policies",
    filter_=f'(displayName="{ALERT_1_NAME}" OR displayName="{ALERT_2_NAME}")',
)

StackdriverDisableAlertPoliciesOperator

使用 StackdriverDisableAlertPoliciesOperator 來停用由給定過濾器標識的提醒政策。

使用運算子

您可以使用此運算子(帶或不帶專案 ID)來獲取所有提醒政策。如果缺少專案 ID,則將從使用的 Google Cloud 連線中檢索。

tests/system/google/cloud/stackdriver/example_stackdriver.py

disable_alert_policy = StackdriverDisableAlertPoliciesOperator(
    task_id="disable-alert-policies",
    filter_=f'displayName="{ALERT_1_NAME}"',
)

StackdriverUpsertAlertOperator

使用 StackdriverUpsertAlertOperator 來 upsert (更新或插入) 由給定 JSON 字串過濾器標識的提醒政策。如果具有給定名稱的提醒已存在,則該運算子會更新現有政策;否則,會建立一個新的政策。

使用運算子

您可以使用此運算子(帶或不帶專案 ID)來獲取所有提醒政策。如果缺少專案 ID,則將從使用的 Google Cloud 連線中檢索。

tests/system/google/cloud/stackdriver/example_stackdriver.py

create_alert_policy = StackdriverUpsertAlertOperator(
    task_id="create-alert-policies",
    alerts=json.dumps({"policies": [TEST_ALERT_POLICY_1, TEST_ALERT_POLICY_2]}),
)

StackdriverDeleteAlertOperator

使用 StackdriverDeleteAlertOperator 來刪除由給定名稱標識的提醒政策。

使用運算子

要刪除的提醒的名稱應採用以下格式提供:projects/<PROJECT_NAME>/alertPolicies/<ALERT_NAME>

tests/system/google/cloud/stackdriver/example_stackdriver.py

delete_alert_policy = StackdriverDeleteAlertOperator(
    task_id="delete-alert-policy",
    name="{{ task_instance.xcom_pull('list-alert-policies')[0]['name'] }}",
)

StackdriverListNotificationChannelsOperator

使用 StackdriverListNotificationChannelsOperator 來獲取由給定過濾器標識的所有 Notification Channels。

使用運算子

您可以使用此運算子(帶或不帶專案 ID)來獲取所有提醒政策。如果缺少專案 ID,則將從使用的 Google Cloud 連線中檢索。

tests/system/google/cloud/stackdriver/example_stackdriver.py

list_notification_channel = StackdriverListNotificationChannelsOperator(
    task_id="list-notification-channel", filter_='type="pubsub"'
)

StackdriverEnableNotificationChannelsOperator

使用 StackdriverEnableNotificationChannelsOperator 來啟用由給定過濾器標識的 Notification Channels。

使用運算子

您可以使用此運算子(帶或不帶專案 ID)來獲取所有提醒政策。如果缺少專案 ID,則將從使用的 Google Cloud 連線中檢索。

tests/system/google/cloud/stackdriver/example_stackdriver.py

enable_notification_channel = StackdriverEnableNotificationChannelsOperator(
    task_id="enable-notification-channel", filter_='type="pubsub"'
)

StackdriverDisableNotificationChannelsOperator

使用 StackdriverDisableNotificationChannelsOperator 來停用由給定過濾器標識的 Notification Channels。

使用運算子

您可以使用此運算子(帶或不帶專案 ID)來獲取所有提醒政策。如果缺少專案 ID,則將從使用的 Google Cloud 連線中檢索。

tests/system/google/cloud/stackdriver/example_stackdriver.py

disable_notification_channel = StackdriverDisableNotificationChannelsOperator(
    task_id="disable-notification-channel", filter_=f'displayName="{CHANNEL_1_NAME}"'
)

StackdriverUpsertNotificationChannelOperator

使用 StackdriverUpsertNotificationChannelOperator 來 upsert (更新或插入) 由給定頻道 JSON 字串標識的 Notification Channels。如果具有給定名稱的頻道已存在,則該運算子會更新現有頻道;否則,會建立一個新的頻道。

使用運算子

您可以使用此運算子(帶或不帶專案 ID)來獲取所有提醒政策。如果缺少專案 ID,則將從使用的 Google Cloud 連線中檢索。

tests/system/google/cloud/stackdriver/example_stackdriver.py

disable_notification_channel = StackdriverDisableNotificationChannelsOperator(
    task_id="disable-notification-channel", filter_=f'displayName="{CHANNEL_1_NAME}"'
)

StackdriverDeleteNotificationChannelOperator

要刪除的頻道的名稱應採用以下格式提供:projects/<PROJECT_NAME>/notificationChannels/<CHANNEL_NAME>

使用運算子

您可以使用此運算子(帶或不帶專案 ID)來獲取所有提醒政策。如果缺少專案 ID,則將從使用的 Google Cloud 連線中檢索。

tests/system/google/cloud/stackdriver/example_stackdriver.py

delete_notification_channel = StackdriverDeleteNotificationChannelOperator(
    task_id="delete-notification-channel",
    name="{{ task_instance.xcom_pull('list-notification-channel')[0]['name'] }}",
)

本條目有幫助嗎?