airflow.providers.google.cloud.hooks.pubsub

此模組包含一個 Google Pub/Sub Hook。

異常

PubSubException

Exception 的別名。

PubSubHook

用於訪問 Google Pub/Sub 的 Hook。

PubSubAsyncHook

獲取 Google Cloud PubSub 非同步 Hook 的類。

模組內容

exception airflow.providers.google.cloud.hooks.pubsub.PubSubException[source]

基類: Exception

Exception 的別名。

class airflow.providers.google.cloud.hooks.pubsub.PubSubHook(gcp_conn_id='google_cloud_default', impersonation_chain=None, enable_message_ordering=False, **kwargs)[source]

基類: airflow.providers.google.common.hooks.base_google.GoogleBaseHook

用於訪問 Google Pub/Sub 的 Hook。

操作所應用的 Google Cloud 專案由 gcp_conn_id 引用的連線中嵌入的專案確定。

enable_message_ordering = False[source]
get_conn()[source]

檢索 Google Cloud Pub/Sub 的連線。

返回:

Google Cloud Pub/Sub 客戶端物件。

返回型別:

google.cloud.pubsub_v1.PublisherClient

property subscriber_client: google.cloud.pubsub_v1.SubscriberClient[source]

建立 SubscriberClient。

返回:

Google Cloud Pub/Sub 客戶端物件。

返回型別:

google.cloud.pubsub_v1.SubscriberClient

publish(topic, messages, project_id=PROVIDE_PROJECT_ID)[source]

釋出訊息到 Pub/Sub 主題。

引數:
  • topic (str) – 要釋出到的 Pub/Sub 主題;不要包含 projects/{project}/topics/ 字首。

  • messages (list[dict]) – 要釋出的訊息;如果訊息中的 data 欄位已設定,它應為位元組串(utf-8 編碼)https://cloud.google.com/pubsub/docs/reference/rpc/google.pubsub.v1#pubsubmessage

  • project_id (str) – 可選,要在其中釋出的 Google Cloud 專案 ID。如果設定為 None 或缺失,則使用 Google Cloud 連線中的預設 project_id。

create_topic(topic, project_id=PROVIDE_PROJECT_ID, fail_if_exists=False, labels=None, message_storage_policy=None, kms_key_name=None, schema_settings=None, message_retention_duration=None, retry=DEFAULT, timeout=None, metadata=())[source]

建立一個 Pub/Sub 主題,如果它尚不存在。

引數:
  • topic (str) – 要建立的 Pub/Sub 主題名稱;不要包含 projects/{project}/topics/ 字首。

  • project_id (str) – 可選,要在其中建立主題的 Google Cloud 專案 ID。如果設定為 None 或缺失,則使用 Google Cloud 連線中的預設 project_id。

  • fail_if_exists (bool) – 如果設定,則在主題已存在時引發異常

  • labels (dict[str, str] | None) – 客戶端分配的標籤;請參閱 https://cloud.google.com/pubsub/docs/labels

  • message_storage_policy (dict | google.cloud.pubsub_v1.types.MessageStoragePolicy) – 限制釋出到主題的訊息可能儲存在哪些 Google Cloud 區域的策略。如果不存在,則沒有限制生效。Union[dict, google.cloud.pubsub_v1.types.MessageStoragePolicy]

  • kms_key_name (str | None) – 用於保護對此主題上釋出的訊息訪問的 Cloud KMS CryptoKey 的資源名稱。預期格式為 projects/*/locations/*/keyRings/*/cryptoKeys/*

  • schema_settings (dict | google.cloud.pubsub_v1.types.SchemaSettings) – (可選)用於針對現有模式驗證釋出的訊息的設定。預期格式為 projects/*/schemas/*

  • message_retention_duration (str | None) – (可選)指示訊息釋出到主題後保留的最短時長。預期格式是秒數,最多包含九位小數,以“s”結尾。示例:“3.5s”。

  • retry (google.api_core.retry.Retry | google.api_core.gapic_v1.method._MethodDefault) – (可選)用於重試請求的重試物件。如果指定 None,請求將不會重試。

  • timeout (float | None) – (可選)等待請求完成的時間量,以秒為單位。請注意,如果指定了 retry,則 timeout 適用於每次嘗試。

  • metadata (collections.abc.Sequence[tuple[str, str]]) – (可選)提供給方法的額外元資料。

delete_topic(topic, project_id=PROVIDE_PROJECT_ID, fail_if_not_exists=False, retry=DEFAULT, timeout=None, metadata=())[source]

刪除一個 Pub/Sub 主題,如果它存在。

引數:
  • topic (str) – 要刪除的 Pub/Sub 主題名稱;不要包含 projects/{project}/topics/ 字首。

  • project_id (str) – 可選,要在其中刪除主題的 Google Cloud 專案 ID。如果設定為 None 或缺失,則使用 Google Cloud 連線中的預設 project_id。

  • fail_if_not_exists (bool) – 如果設定,則在主題不存在時引發異常

  • retry (google.api_core.retry.Retry | google.api_core.gapic_v1.method._MethodDefault) – (可選)用於重試請求的重試物件。如果指定 None,請求將不會重試。

  • timeout (float | None) – (可選)等待請求完成的時間量,以秒為單位。請注意,如果指定了 retry,則 timeout 適用於每次嘗試。

  • metadata (collections.abc.Sequence[tuple[str, str]]) – (可選)提供給方法的額外元資料。

create_subscription(topic, project_id=PROVIDE_PROJECT_ID, subscription=None, subscription_project_id=None, ack_deadline_secs=10, fail_if_exists=False, push_config=None, retain_acked_messages=None, message_retention_duration=None, labels=None, enable_message_ordering=False, expiration_policy=None, filter_=None, dead_letter_policy=None, retry_policy=None, retry=DEFAULT, timeout=None, metadata=())[source]

建立一個 Pub/Sub 訂閱,如果它尚不存在。

引數:
  • topic (str) – 訂閱將繫結的 Pub/Sub 主題名稱;不要包含 projects/{project}/subscriptions/ 字首。

  • project_id (str) – 可選,訂閱將繫結的主題所在的 Google Cloud 專案 ID。如果設定為 None 或缺失,則使用 Google Cloud 連線中的預設 project_id。

  • subscription (str | None) – Pub/Sub 訂閱名稱。如果為空,將使用 uuid 模組生成一個隨機名稱

  • subscription_project_id (str | None) – 將在其中建立訂閱的 Google Cloud 專案 ID。如果未指定,將使用 project_id

  • ack_deadline_secs (int) – 訂閱者確認從訂閱中拉取的每條訊息所需的秒數。

  • fail_if_exists (bool) – 如果設定,則在主題已存在時引發異常

  • push_config (dict | google.cloud.pubsub_v1.types.PushConfig | None) – 如果此訂閱使用推送交付,此欄位用於配置它。空的 pushConfig 表示訂閱者將使用 API 方法拉取和確認訊息。

  • retain_acked_messages (bool | None) – 指示是否保留已確認的訊息。如果為 true,則訊息不會從訂閱的積壓中刪除,即使已確認,直到它們超出 message_retention_duration 視窗。如果您想 Seek 到時間戳,則此值必須為 true。

  • message_retention_duration (dict | google.cloud.pubsub_v1.types.Duration | None) – 從訊息釋出之時起,未確認訊息在訂閱積壓中保留多長時間。如果 retain_acked_messages 為 true,則這也配置了已確認訊息的保留時間,從而配置了可以回溯到多久之前的 Seek 操作。預設為 7 天。不能超過 7 天或少於 10 分鐘。

  • labels (dict[str, str] | None) – 客戶端分配的標籤;請參閱 https://cloud.google.com/pubsub/docs/labels

  • enable_message_ordering (bool) – 如果為 true,則在 PubsubMessage 中使用相同 ordering_key 釋出的訊息將按照 Pub/Sub 系統接收到的順序傳遞給訂閱者。否則,它們可能以任何順序傳遞。

  • expiration_policy (dict | google.cloud.pubsub_v1.types.ExpirationPolicy | None) – 指定此訂閱過期條件的策略。只要任何已連線的訂閱者成功消費來自訂閱的訊息或正在對訂閱執行操作,訂閱就被視為處於活動狀態。如果未設定 expiration_policy,將使用預設的 ttl 為 31 天的策略。expiration_policy.ttl 的最小允許值為 1 天。

  • filter – 使用 Cloud Pub/Sub 過濾語言編寫的表示式。如果非空,則只有其 attributes 欄位與過濾器匹配的 PubsubMessages 才會在此訂閱上交付。如果為空,則不過濾任何訊息。

  • dead_letter_policy (dict | google.cloud.pubsub_v1.types.DeadLetterPolicy | None) – 指定此訂閱中死信訊息條件的策略。如果未設定 dead_letter_policy,則停用死信功能。

  • retry_policy (dict | google.cloud.pubsub_v1.types.RetryPolicy | None) – 指定 Pub/Sub 如何為此訂閱重試訊息交付的策略。如果未設定,則應用預設重試策略。這通常意味著對於健康的訂閱者,訊息將盡快重試。RetryPolicy 將在對給定訊息進行 NACK 或超出確認截止時間事件時觸發。

  • retry (google.api_core.retry.Retry | google.api_core.gapic_v1.method._MethodDefault) – (可選)用於重試請求的重試物件。如果指定 None,請求將不會重試。

  • timeout (float | None) – (可選)等待請求完成的時間量,以秒為單位。請注意,如果指定了 retry,則 timeout 適用於每次嘗試。

  • metadata (collections.abc.Sequence[tuple[str, str]]) – (可選)提供給方法的額外元資料。

返回:

訂閱名稱,如果未提供 subscription 引數,則為系統生成的值

返回型別:

str

delete_subscription(subscription, project_id=PROVIDE_PROJECT_ID, fail_if_not_exists=False, retry=DEFAULT, timeout=None, metadata=())[source]

刪除一個 Pub/Sub 訂閱,如果它存在。

引數:
  • subscription (str) – 要刪除的 Pub/Sub 訂閱名稱;不要包含 projects/{project}/subscriptions/ 字首。

  • project_id (str) – 可選,訂閱所在的 Google Cloud 專案 ID。如果設定為 None 或缺失,則使用 Google Cloud 連線中的預設 project_id。

  • fail_if_not_exists (bool) – 如果設定,則在主題不存在時引發異常

  • retry (google.api_core.retry.Retry | google.api_core.gapic_v1.method._MethodDefault) – (可選)用於重試請求的重試物件。如果指定 None,請求將不會重試。

  • timeout (float | None) – (可選)等待請求完成的時間量,以秒為單位。請注意,如果指定了 retry,則 timeout 適用於每次嘗試。

  • metadata (collections.abc.Sequence[tuple[str, str]]) – (可選)提供給方法的額外元資料。

pull(subscription, max_messages, project_id=PROVIDE_PROJECT_ID, return_immediately=False, retry=DEFAULT, timeout=None, metadata=())[source]

從 Pub/Sub 訂閱中最多拉取 max_messages 條訊息。

引數:
  • subscription (str) – 要從中拉取的 Pub/Sub 訂閱名稱;不要包含 'projects/{project}/topics/' 字首。

  • max_messages (int) – 從 Pub/Sub API 返回的最大訊息數量。

  • project_id (str) – 可選,訂閱所在的 Google Cloud 專案 ID。如果設定為 None 或缺失,則使用 Google Cloud 連線中的預設 project_id。

  • return_immediately (bool) – 如果設定,Pub/Sub API 將在沒有可用訊息時立即返回。否則,請求將阻塞一段未公開但有限的時間。

  • retry (google.api_core.retry.Retry | google.api_core.gapic_v1.method._MethodDefault) – (可選)用於重試請求的重試物件。如果指定 None,請求將不會重試。

  • timeout (float | None) – (可選)等待請求完成的時間量,以秒為單位。請注意,如果指定了 retry,則 timeout 適用於每次嘗試。

  • metadata (collections.abc.Sequence[tuple[str, str]]) – (可選)提供給方法的額外元資料。

返回:

一個 Pub/Sub ReceivedMessage 物件列表,每個物件包含一個 ackId 屬性和一個 message 屬性,message 屬性包含 base64 編碼的訊息內容。請參閱 https://cloud.google.com/pubsub/docs/reference/rpc/google.pubsub.v1#google.pubsub.v1.ReceivedMessage

返回型別:

list[google.cloud.pubsub_v1.types.ReceivedMessage]

acknowledge(subscription, project_id, ack_ids=None, messages=None, retry=DEFAULT, timeout=None, metadata=())[source]

確認與 Pub/Sub 訂閱中的 ack_ids 關聯的訊息。

引數:
  • subscription (str) – 要刪除的 Pub/Sub 訂閱名稱;不要包含 'projects/{project}/topics/' 字首。

  • ack_ids (list[str] | None) – 來自先前拉取響應的 ReceivedMessage ackIds 列表。與 messages 引數互斥。

  • messages (list[google.cloud.pubsub_v1.types.ReceivedMessage] | None) – 要確認的 ReceivedMessage 物件列表。與 ack_ids 引數互斥。

  • project_id (str) – 可選,要在其中建立主題的 Google Cloud 專案名稱或 ID。如果設定為 None 或缺失,則使用 Google Cloud 連線中的預設 project_id。

  • retry (google.api_core.retry.Retry | google.api_core.gapic_v1.method._MethodDefault) – (可選)用於重試請求的重試物件。如果指定 None,請求將不會重試。

  • timeout (float | None) – (可選)等待請求完成的時間量,以秒為單位。請注意,如果指定了 retry,則 timeout 適用於每次嘗試。

  • metadata (collections.abc.Sequence[tuple[str, str]]) – (可選)提供給方法的額外元資料。

class airflow.providers.google.cloud.hooks.pubsub.PubSubAsyncHook(project_id=PROVIDE_PROJECT_ID, **kwargs)[source]

基類: airflow.providers.google.common.hooks.base_google.GoogleBaseAsyncHook

獲取 Google Cloud PubSub 非同步 Hook 的類。

sync_hook_class[source]
project_id = None[source]
async acknowledge(subscription, project_id, ack_ids=None, messages=None, retry=DEFAULT, timeout=None, metadata=())[source]

確認與 Pub/Sub 訂閱中的 ack_ids 關聯的訊息。

引數:
  • subscription (str) – 要刪除的 Pub/Sub 訂閱名稱;不要包含 'projects/{project}/topics/' 字首。

  • ack_ids (list[str] | None) – 來自先前拉取響應的 ReceivedMessage ackIds 列表。與 messages 引數互斥。

  • messages (list[google.cloud.pubsub_v1.types.ReceivedMessage] | None) – 要確認的 ReceivedMessage 物件列表。與 ack_ids 引數互斥。

  • project_id (str) – 可選,要在其中建立主題的 Google Cloud 專案名稱或 ID。如果設定為 None 或缺失,則使用 Google Cloud 連線中的預設 project_id。

  • retry (google.api_core.retry_async.AsyncRetry | google.api_core.gapic_v1.method._MethodDefault) – (可選) 用於重試請求的重試物件。如果指定為 None,則不會重試請求。

  • timeout (float | None) – (可選)等待請求完成的時間量,以秒為單位。請注意,如果指定了 retry,則 timeout 適用於每次嘗試。

  • metadata (collections.abc.Sequence[tuple[str, str]]) – (可選)提供給方法的額外元資料。

async pull(subscription, max_messages, project_id=PROVIDE_PROJECT_ID, return_immediately=False, retry=DEFAULT, timeout=None, metadata=())[source]

從 Pub/Sub 訂閱中最多拉取 max_messages 條訊息。

引數:
  • subscription (str) – 要從中拉取的 Pub/Sub 訂閱名稱;不要包含 'projects/{project}/topics/' 字首。

  • max_messages (int) – 從 Pub/Sub API 返回的最大訊息數量。

  • project_id (str) – 可選,訂閱所在的 Google Cloud 專案 ID。如果設定為 None 或缺失,則使用 Google Cloud 連線中的預設 project_id。

  • return_immediately (bool) – 如果設定,Pub/Sub API 將在沒有可用訊息時立即返回。否則,請求將阻塞一段未公開但有限的時間。

  • retry (google.api_core.retry_async.AsyncRetry | google.api_core.gapic_v1.method._MethodDefault) – (可選) 用於重試請求的重試物件。如果指定為 None,則不會重試請求。

  • timeout (float | None) – (可選)等待請求完成的時間量,以秒為單位。請注意,如果指定了 retry,則 timeout 適用於每次嘗試。

  • metadata (collections.abc.Sequence[tuple[str, str]]) – (可選)提供給方法的額外元資料。

返回:

一個 Pub/Sub ReceivedMessage 物件列表,每個物件包含一個 ackId 屬性和一個 message 屬性,message 屬性包含 base64 編碼的訊息內容。請參閱 https://cloud.google.com/pubsub/docs/reference/rpc/google.pubsub.v1#google.pubsub.v1.ReceivedMessage

返回型別:

list[google.cloud.pubsub_v1.types.ReceivedMessage]

此條目有幫助嗎?