airflow.providers.google.cloud.operators.pubsub¶
此模組包含 Google PubSub 運算子。
類¶
建立 PubSub 主題。 |
|
建立 PubSub 訂閱。 |
|
刪除 PubSub 主題。 |
|
刪除 PubSub 訂閱。 |
|
向 PubSub 主題釋出訊息。 |
|
從 PubSub 訂閱拉取訊息並透過 XCom 傳遞。 |
模組內容¶
- class airflow.providers.google.cloud.operators.pubsub.PubSubCreateTopicOperator(*, topic, project_id=PROVIDE_PROJECT_ID, fail_if_exists=False, gcp_conn_id='google_cloud_default', labels=None, message_storage_policy=None, kms_key_name=None, schema_settings=None, message_retention_duration=None, retry=DEFAULT, timeout=None, metadata=(), impersonation_chain=None, **kwargs)[原始碼]¶
基類:
airflow.providers.google.cloud.operators.cloud_base.GoogleCloudBaseOperator建立 PubSub 主題。
另請參閱
有關如何使用此運算子的更多資訊,請參閱指南: 建立 PubSub 主題
預設情況下,如果主題已存在,此運算子不會導致 DAG 失敗。
with DAG("successful DAG") as dag: create_topic = PubSubCreateTopicOperator(project_id="my-project", topic="my_new_topic") create_topic_again = PubSubCreateTopicOperator(project_id="my-project", topic="my_new_topic") create_topic >> create_topic_again
可以將此運算子配置為在主題已存在時失敗。
with DAG("failing DAG") as dag: create_topic = PubSubCreateTopicOperator(project_id="my-project", topic="my_new_topic") create_topic_again = PubSubCreateTopicOperator( project_id="my-project", topic="my_new_topic", fail_if_exists=True ) create_topic >> create_topic_again
project_id和topic均支援模板化,因此您可以在其值中使用 Jinja 模板。- 引數:
project_id (str) – 可選,將建立主題的 Google Cloud 專案 ID。如果設定為 None 或缺失,則使用 Google Cloud 連線中的預設 project_id。
topic (str) – 要建立的主題。不要包含完整的主題路徑。換句話說,不要使用
projects/{project}/topics/{topic},只需提供{topic}。(支援模板化)gcp_conn_id (str) – 連線到 Google Cloud 時使用的連線 ID。
labels (dict[str, str] | None) – 客戶端分配的標籤;請參閱 https://cloud.google.com/pubsub/docs/labels
message_storage_policy (dict | google.cloud.pubsub_v1.types.MessageStoragePolicy) – 限制釋出到主題的訊息可以儲存在哪些 Google Cloud 區域的策略。如果不存在,則沒有約束生效。Union[dict, google.cloud.pubsub_v1.types.MessageStoragePolicy]
kms_key_name (str | None) – 用於保護對釋出到此主題的訊息的訪問的 Cloud KMS CryptoKey 的資源名稱。預期格式為
projects/*/locations/*/keyRings/*/cryptoKeys/*。retry (google.api_core.retry.Retry | google.api_core.gapic_v1.method._MethodDefault) – (可選)用於重試請求的重試物件。如果指定 None,則不會重試請求。
timeout (float | None) – (可選)等待請求完成的時間(秒)。請注意,如果指定了 retry,則超時應用於每次單獨的嘗試。
metadata (collections.abc.Sequence[tuple[str, str]]) – (可選)提供給方法的額外元資料。
impersonation_chain (str | collections.abc.Sequence[str] | None) – (可選)要使用短期憑據模擬的可選服務賬號,或是獲取列表中最後一個賬號(將在請求中被模擬)的 access_token 所需的賬號鏈式列表。如果設定為字串,則該賬號必須向原始賬號授予 Service Account Token Creator IAM 角色。如果設定為序列,則列表中的身份必須向直接前一個身份授予 Service Account Token Creator IAM 角色,列表中的第一個賬號則向原始賬號授予此角色。(支援模板化)
- template_fields: collections.abc.Sequence[str] = ('project_id', 'topic', 'impersonation_chain')[原始碼]¶
- class airflow.providers.google.cloud.operators.pubsub.PubSubCreateSubscriptionOperator(*, topic, project_id=PROVIDE_PROJECT_ID, subscription=None, subscription_project_id=None, ack_deadline_secs=10, fail_if_exists=False, gcp_conn_id='google_cloud_default', push_config=None, retain_acked_messages=None, message_retention_duration=None, labels=None, enable_message_ordering=False, expiration_policy=None, filter_=None, dead_letter_policy=None, retry_policy=None, retry=DEFAULT, timeout=None, metadata=(), impersonation_chain=None, **kwargs)[原始碼]¶
基類:
airflow.providers.google.cloud.operators.cloud_base.GoogleCloudBaseOperator建立 PubSub 訂閱。
另請參閱
有關如何使用此運算子的更多資訊,請參閱指南: 建立 PubSub 訂閱
預設情況下,訂閱將在
project_id中建立。如果指定了subscription_project_id並且 Google Cloud 憑據允許,則可以在與其主題不同的專案中建立訂閱。預設情況下,如果訂閱已存在,此運算子不會導致 DAG 失敗。但是,主題必須存在於專案中。
with DAG("successful DAG") as dag: create_subscription = PubSubCreateSubscriptionOperator( project_id="my-project", topic="my-topic", subscription="my-subscription" ) create_subscription_again = PubSubCreateSubscriptionOperator( project_id="my-project", topic="my-topic", subscription="my-subscription" ) create_subscription >> create_subscription_again
可以將此運算子配置為在訂閱已存在時失敗。
with DAG("failing DAG") as dag: create_subscription = PubSubCreateSubscriptionOperator( project_id="my-project", topic="my-topic", subscription="my-subscription" ) create_subscription_again = PubSubCreateSubscriptionOperator( project_id="my-project", topic="my-topic", subscription="my-subscription", fail_if_exists=True ) create_subscription >> create_subscription_again
最後,subscription 不是必需的。如果未傳遞,運算子將使用 uuid 模組為訂閱的名稱生成一個通用唯一識別符號。
with DAG("DAG") as dag: PubSubCreateSubscriptionOperator(project_id="my-project", topic="my-topic")
project_id、topic、subscription、subscription_project_id和impersonation_chain均支援模板化,因此您可以在其值中使用 Jinja 模板。- 引數:
project_id (str) – 可選,主題所在的 Google Cloud 專案 ID。如果設定為 None 或缺失,則使用 Google Cloud 連線中的預設 project_id。
topic (str) – 要建立的主題。不要包含完整的主題路徑。換句話說,不要使用
projects/{project}/topics/{topic},只需提供{topic}。(支援模板化)subscription (str | None) – Pub/Sub 訂閱名稱。如果為空,將使用 uuid 模組生成一個隨機名稱
subscription_project_id (str | None) – 將建立訂閱的 Google Cloud 專案 ID。如果為空,將使用
topic_project。ack_deadline_secs (int) – 訂閱者確認從訂閱拉取的每條訊息的時間(秒)
gcp_conn_id (str) – 連線到 Google Cloud 時使用的連線 ID。
push_config (dict | google.cloud.pubsub_v1.types.PushConfig | None) – 如果此訂閱使用推送交付(push delivery),則此欄位用於配置。空的
pushConfig表示訂閱者將使用 API 方法拉取和確認訊息。retain_acked_messages (bool | None) – 指示是否保留已確認的訊息。如果為 true,即使訊息已被確認,它們也不會從訂閱的積壓佇列中清除,直到它們超出
message_retention_duration視窗。如果您想 Seek 到某個時間戳,則此值必須為 true。message_retention_duration (dict | google.cloud.pubsub_v1.types.Duration | None) – 從訊息釋出時刻起,在訂閱的積壓佇列中保留未確認訊息的時間。如果
retain_acked_messages為 true,則這也配置了已確認訊息的保留時間,因此配置了可以回溯進行Seek的時間長度。預設為 7 天。不能超過 7 天或少於 10 分鐘。labels (dict[str, str] | None) – 客戶端分配的標籤;請參閱 https://cloud.google.com/pubsub/docs/labels
enable_message_ordering (bool) – 如果為 true,則使用相同 ordering_key 釋出在 PubsubMessage 中的訊息將按照 Pub/Sub 系統接收它們的順序傳遞給訂閱者。否則,它們可能以任何順序傳遞。
expiration_policy (dict | google.cloud.pubsub_v1.types.ExpirationPolicy | None) – 指定此訂閱過期條件的策略。只要任何連線的訂閱者成功使用訂閱中的訊息或對訂閱執行操作,訂閱就被視為活動狀態。如果未設定 expiration_policy,則將使用 ttl 為 31 天的預設策略。expiration_policy.ttl 允許的最小值是 1 天。
filter – 使用 Cloud Pub/Sub 過濾語言編寫的表示式。如果非空,則只有 attributes 欄位與過濾器匹配的 PubsubMessages 會傳遞到此訂閱。如果為空,則不過濾任何訊息。
dead_letter_policy (dict | google.cloud.pubsub_v1.types.DeadLetterPolicy | None) – 指定此訂閱中死信訊息條件的策略。如果未設定 dead_letter_policy,則停用死信功能。
retry_policy (dict | google.cloud.pubsub_v1.types.RetryPolicy | None) – 指定 Pub/Sub 如何為此訂閱重試訊息傳遞的策略。如果未設定,則應用預設重試策略。這通常意味著對於健康的訂閱者,訊息將盡快重試。RetryPolicy 將在給定訊息的 NACKs 或確認截止日期超時事件時觸發。
retry (google.api_core.retry.Retry | google.api_core.gapic_v1.method._MethodDefault) – (可選)用於重試請求的重試物件。如果指定 None,則不會重試請求。
timeout (float | None) – (可選)等待請求完成的時間(秒)。請注意,如果指定了 retry,則超時應用於每次單獨的嘗試。
metadata (collections.abc.Sequence[tuple[str, str]]) – (可選)提供給方法的額外元資料。
impersonation_chain (str | collections.abc.Sequence[str] | None) – (可選)要使用短期憑據模擬的可選服務賬號,或是獲取列表中最後一個賬號(將在請求中被模擬)的 access_token 所需的賬號鏈式列表。如果設定為字串,則該賬號必須向原始賬號授予 Service Account Token Creator IAM 角色。如果設定為序列,則列表中的身份必須向直接前一個身份授予 Service Account Token Creator IAM 角色,列表中的第一個賬號則向原始賬號授予此角色。(支援模板化)
- template_fields: collections.abc.Sequence[str] = ('project_id', 'topic', 'subscription', 'subscription_project_id', 'impersonation_chain')[原始碼]¶
- class airflow.providers.google.cloud.operators.pubsub.PubSubDeleteTopicOperator(*, topic, project_id=PROVIDE_PROJECT_ID, fail_if_not_exists=False, gcp_conn_id='google_cloud_default', retry=DEFAULT, timeout=None, metadata=(), impersonation_chain=None, **kwargs)[source]¶
基類:
airflow.providers.google.cloud.operators.cloud_base.GoogleCloudBaseOperator刪除 PubSub 主題。
另請參閱
有關如何使用此運算子的更多資訊,請參閱指南: 刪除 Pub/Sub 主題
預設情況下,如果主題不存在,此運算子不會導致 DAG 失敗。
with DAG("successful DAG") as dag: PubSubDeleteTopicOperator(project_id="my-project", topic="non_existing_topic")
可以配置此運算子,使其在主題不存在時失敗。
with DAG("failing DAG") as dag: PubSubDeleteTopicOperator( project_id="my-project", topic="non_existing_topic", fail_if_not_exists=True, )
project_id和topic均支援模板化,因此您可以在其值中使用 Jinja 模板。- 引數:
project_id (str) – 可選,進行操作的 Google Cloud 專案 ID(支援模板化)。如果設定為 None 或缺失,則使用 Google Cloud 連線中的預設 project_id。
topic (str) – 要刪除的主題。不要包含完整的主題路徑。換句話說,不要使用
projects/{project}/topics/{topic},而只提供{topic}。(支援模板化)fail_if_not_exists (bool) – 如果為 True 且主題不存在,則任務將失敗
gcp_conn_id (str) – 連線到 Google Cloud 時使用的連線 ID。
retry (google.api_core.retry.Retry | google.api_core.gapic_v1.method._MethodDefault) – (可選)用於重試請求的重試物件。如果指定 None,則不會重試請求。
timeout (float | None) – (可選)等待請求完成的時間(秒)。請注意,如果指定了 retry,則超時應用於每次單獨的嘗試。
metadata (collections.abc.Sequence[tuple[str, str]]) – (可選)提供給方法的額外元資料。
impersonation_chain (str | collections.abc.Sequence[str] | None) – (可選)要使用短期憑據模擬的可選服務賬號,或是獲取列表中最後一個賬號(將在請求中被模擬)的 access_token 所需的賬號鏈式列表。如果設定為字串,則該賬號必須向原始賬號授予 Service Account Token Creator IAM 角色。如果設定為序列,則列表中的身份必須向直接前一個身份授予 Service Account Token Creator IAM 角色,列表中的第一個賬號則向原始賬號授予此角色。(支援模板化)
- template_fields: collections.abc.Sequence[str] = ('project_id', 'topic', 'impersonation_chain')[source]¶
- class airflow.providers.google.cloud.operators.pubsub.PubSubDeleteSubscriptionOperator(*, subscription, project_id=PROVIDE_PROJECT_ID, fail_if_not_exists=False, gcp_conn_id='google_cloud_default', retry=DEFAULT, timeout=None, metadata=(), impersonation_chain=None, **kwargs)[source]¶
基類:
airflow.providers.google.cloud.operators.cloud_base.GoogleCloudBaseOperator刪除 PubSub 訂閱。
另請參閱
有關如何使用此運算子的更多資訊,請參閱指南: 刪除 Pub/Sub 訂閱
預設情況下,如果訂閱不存在,此運算子不會導致 DAG 失敗。
with DAG("successful DAG") as dag: PubSubDeleteSubscriptionOperator(project_id="my-project", subscription="non-existing")
可以將此運算子配置為在訂閱已存在時失敗。
with DAG("failing DAG") as dag: PubSubDeleteSubscriptionOperator( project_id="my-project", subscription="non-existing", fail_if_not_exists=True, )
project_id和subscription支援模板化,因此您可以在其值中使用 Jinja 模板。- 引數:
project_id (str) – 可選,進行操作的 Google Cloud 專案 ID(支援模板化)。如果設定為 None 或缺失,則使用 Google Cloud 連線中的預設 project_id。
subscription (str) – 要刪除的訂閱。不要包含完整的訂閱路徑。換句話說,不要使用
projects/{project}/subscription/{subscription},而只提供{subscription}。(支援模板化)fail_if_not_exists (bool) – 如果為 True 且訂閱不存在,則任務將失敗
gcp_conn_id (str) – 連線到 Google Cloud 時使用的連線 ID。
retry (google.api_core.retry.Retry | google.api_core.gapic_v1.method._MethodDefault) – (可選)用於重試請求的重試物件。如果指定 None,則不會重試請求。
timeout (float | None) – (可選)等待請求完成的時間(秒)。請注意,如果指定了 retry,則超時應用於每次單獨的嘗試。
metadata (collections.abc.Sequence[tuple[str, str]]) – (可選)提供給方法的額外元資料。
impersonation_chain (str | collections.abc.Sequence[str] | None) – (可選)要使用短期憑據模擬的可選服務賬號,或是獲取列表中最後一個賬號(將在請求中被模擬)的 access_token 所需的賬號鏈式列表。如果設定為字串,則該賬號必須向原始賬號授予 Service Account Token Creator IAM 角色。如果設定為序列,則列表中的身份必須向直接前一個身份授予 Service Account Token Creator IAM 角色,列表中的第一個賬號則向原始賬號授予此角色。(支援模板化)
- template_fields: collections.abc.Sequence[str] = ('project_id', 'subscription', 'impersonation_chain')[source]¶
- class airflow.providers.google.cloud.operators.pubsub.PubSubPublishMessageOperator(*, topic, messages, project_id=PROVIDE_PROJECT_ID, gcp_conn_id='google_cloud_default', enable_message_ordering=False, impersonation_chain=None, **kwargs)[source]¶
基類:
airflow.providers.google.cloud.operators.cloud_base.GoogleCloudBaseOperator向 PubSub 主題釋出訊息。
另請參閱
有關如何使用此運算子的更多資訊,請參閱指南: 釋出 Pub/Sub 訊息
每個任務會將所有提供的訊息釋出到同一 Google Cloud 專案中的同一主題。如果主題不存在,此任務將失敗。
m1 = {"data": b"Hello, World!", "attributes": {"type": "greeting"}} m2 = {"data": b"Knock, knock"} m3 = {"attributes": {"foo": ""}} m4 = {"data": b"Who's there?", "attributes": {"ordering_key": "knock_knock"}} t1 = PubSubPublishMessageOperator( project_id="my-project", topic="my_topic", messages=[m1, m2, m3], create_topic=True, dag=dag, ) t2 = PubSubPublishMessageOperator( project_id="my-project", topic="my_topic", messages=[m4], create_topic=True, enable_message_ordering=True, dag=dag, )
project_id、topic和messages支援模板化,因此您可以在其值中使用 Jinja 模板。- 引數:
project_id (str) – 可選,進行操作的 Google Cloud 專案 ID(支援模板化)。如果設定為 None 或缺失,則使用 Google Cloud 連線中的預設 project_id。
topic (str) – 要釋出到的主題。不要包含完整的主題路徑。換句話說,不要使用
projects/{project}/topics/{topic},而只提供{topic}。(支援模板化)messages (list) – 要釋出到主題的訊息列表。每條訊息都是一個字典,包含以下一個或多個鍵值對映:* ‘data’: 一個位元組字串 (utf-8 編碼) * ‘attributes’: {‘key1’: ‘value1’, …} 每條訊息必須至少包含一個非空的 ‘data’ 值或一個包含至少一個鍵的屬性字典(支援模板化)。請參閱 https://cloud.google.com/pubsub/docs/reference/rest/v1/PubsubMessage
gcp_conn_id (str) – 連線到 Google Cloud 時使用的連線 ID。
enable_message_ordering (bool) – 如果為 True,則使用相同的 ordering_key 在 PubsubMessage 中釋出的訊息將按照 Pub/Sub 系統接收到的順序傳遞給訂閱者。否則,它們可能以任何順序傳遞。預設為 False。
impersonation_chain (str | collections.abc.Sequence[str] | None) – (可選)要使用短期憑據模擬的可選服務賬號,或是獲取列表中最後一個賬號(將在請求中被模擬)的 access_token 所需的賬號鏈式列表。如果設定為字串,則該賬號必須向原始賬號授予 Service Account Token Creator IAM 角色。如果設定為序列,則列表中的身份必須向直接前一個身份授予 Service Account Token Creator IAM 角色,列表中的第一個賬號則向原始賬號授予此角色。(支援模板化)
- template_fields: collections.abc.Sequence[str] = ('project_id', 'topic', 'messages', 'enable_message_ordering', 'impersonation_chain')[source]¶
- class airflow.providers.google.cloud.operators.pubsub.PubSubPullOperator(*, project_id, subscription, max_messages=5, ack_messages=False, messages_callback=None, gcp_conn_id='google_cloud_default', impersonation_chain=None, deferrable=False, poll_interval=300, **kwargs)[source]¶
基類:
airflow.providers.google.cloud.operators.cloud_base.GoogleCloudBaseOperator從 PubSub 訂閱拉取訊息並透過 XCom 傳遞。
如果佇列為空,返回空列表 - 從不等待訊息。如果您確實需要等待,請改用
airflow.providers.google.cloud.sensors.PubSubPullSensor。另請參閱
有關如何使用此運算子和 PubSubPullSensor 的更多資訊,請參閱指南: 從 Pub/Sub 訂閱拉取訊息
此運算子將從指定的 Pub/Sub 訂閱中拉取最多
max_messages條訊息。當訂閱返回訊息時,運算子將立即返回這些訊息,並透過 XCom 傳遞給下游任務。如果
ack_messages設定為 True,訊息將在返回之前立即被確認,否則下游任務將負責確認它們。project_id和subscription支援模板化,因此您可以在其值中使用 Jinja 模板。- 引數:
project_id (str) – 訂閱所在的 Google Cloud 專案 ID(支援模板化)
subscription (str) – Pub/Sub 訂閱名稱。不要包含完整的訂閱路徑。
max_messages (int) – 每次 Pub/Sub 拉取請求檢索的最大訊息數量
ack_messages (bool) – 如果為 True,每條訊息將立即被確認,而不是由任何下游任務確認
gcp_conn_id (str) – 連線到 Google Cloud 時使用的連線 ID。
messages_callback (Callable[[list[google.cloud.pubsub_v1.types.ReceivedMessage], airflow.utils.context.Context], Any] | None) – (可選)用於處理接收到的訊息的回撥。其返回值將儲存到 XCom。如果您正在拉取大量訊息,您可能需要提供一個自定義回撥。如果未提供,預設實現將使用 google.protobuf.json_format.MessageToDict 函式將 ReceivedMessage 物件轉換為 JSON 可序列化的字典。
impersonation_chain (str | collections.abc.Sequence[str] | None) – (可選)要使用短期憑據模擬的可選服務賬號,或是獲取列表中最後一個賬號(將在請求中被模擬)的 access_token 所需的賬號鏈式列表。如果設定為字串,則該賬號必須向原始賬號授予 Service Account Token Creator IAM 角色。如果設定為序列,則列表中的身份必須向直接前一個身份授予 Service Account Token Creator IAM 角色,列表中的第一個賬號則向原始賬號授予此角色。(支援模板化)
deferrable (bool) – 如果為 True,在可推遲模式下執行任務。
poll_interval (int) – 等待檢查任務的兩次連續呼叫之間的時間(秒)。預設值為 300 秒。
- template_fields: collections.abc.Sequence[str] = ('project_id', 'subscription', 'impersonation_chain')[source]¶