airflow.providers.google.cloud.sensors.pubsub

此模組包含一個 Google PubSub 感測器。

異常

PubSubMessageTransformException

當訊息轉換為 pubsub 接收格式失敗時丟擲。

PubSubPullSensor

從 PubSub 訂閱中拉取訊息,並透過 XCom 傳遞它們。

模組內容

exception airflow.providers.google.cloud.sensors.pubsub.PubSubMessageTransformException[源]

基類: airflow.exceptions.AirflowException

當訊息轉換為 pubsub 接收格式失敗時丟擲。

class airflow.providers.google.cloud.sensors.pubsub.PubSubPullSensor(*, project_id, subscription, max_messages=5, return_immediately=True, ack_messages=False, gcp_conn_id='google_cloud_default', messages_callback=None, impersonation_chain=None, poke_interval=10.0, deferrable=conf.getboolean('operators', 'default_deferrable', fallback=False), **kwargs)[源]

基類: airflow.sensors.base.BaseSensorOperator

從 PubSub 訂閱中拉取訊息,並透過 XCom 傳遞它們。

始終等待從訂閱中返回至少一條訊息。

另請參閱

有關如何使用此運算子的更多資訊,請參閱指南:從 PubSub 訂閱拉取訊息

另請參閱

如果您不想等待至少一條訊息到來,請改用運算子: PubSubPullOperator

此感測器運算子將從指定的 PubSub 訂閱中拉取最多 max_messages 條訊息。當訂閱返回訊息時,poke 方法的條件將被滿足,訊息將從運算子返回,並透過 XCom 傳遞給下游任務。

如果將 ack_messages 設定為 True,訊息將在返回前立即得到確認;否則,下游任務將負責確認它們。

如果您想要一個不等待訊息的非阻塞任務,請改用 PubSubPullOperator

project_idsubscription 是模板化的,因此您可以在其中使用變數。

引數:
  • project_id (str) – 訂閱的 Google Cloud 專案 ID(模板化)

  • subscription (str) – Pub/Sub 訂閱名稱。請勿包含完整的訂閱路徑。

  • max_messages (int) – 每次 PubSub 拉取請求檢索的最大訊息數

  • return_immediately (bool) – 如果此欄位設定為 true,即使在 Pull 響應中沒有可返回的訊息,系統也會立即響應。否則,系統可能會等待(有限的時間)直到至少有一條訊息可用,而不是返回零條訊息。警告:不建議將此欄位設定為 true,因為它會對 Pull 操作的效能產生不利影響。我們建議使用者不要設定此欄位。

  • ack_messages (bool) – 如果為 True,每條訊息將立即得到確認,而不是由任何下游任務確認

  • gcp_conn_id (str) – 連線到 Google Cloud 時使用的連線 ID。

  • messages_callback (Callable[[list[google.cloud.pubsub_v1.types.ReceivedMessage], airflow.utils.context.Context], Any] | None) – (可選)用於處理接收到訊息的回撥函式。其返回值將儲存到 XCom 中。如果您正在拉取大量訊息,可能需要提供自定義回撥函式。如果未提供,預設實現將使用 google.protobuf.json_format.MessageToDict 函式將 ReceivedMessage 物件轉換為 JSON 可序列化的字典。

  • impersonation_chain (str | collections.abc.Sequence[str] | None) – (可選)用於使用短期憑據模擬的服務賬號,或獲取列表中最後一個賬號的 access_token 所需的鏈式賬號列表,最後一個賬號將在請求中被模擬。如果設定為字串,該賬號必須授予源賬號 Service Account Token Creator IAM 角色。如果設定為序列,列表中的身份必須將 Service Account Token Creator IAM 角色授予緊接在其前的身份,列表中的第一個賬號將此角色授予源賬號(模板化)。

  • deferrable (bool) – 在可延遲模式下執行 sensor

template_fields: collections.abc.Sequence[str] = ('project_id', 'subscription', 'impersonation_chain')[源]
ui_color = '#ff7f50'[源]
gcp_conn_id = 'google_cloud_default'[源]
project_id[源]
subscription[源]
max_messages = 5[源]
return_immediately = True[源]
ack_messages = False[源]
messages_callback = None[源]
impersonation_chain = None[源]
deferrable = True[源]
poke_interval = 10.0[源]
poke(context)[源]

在派生此類時重寫。

execute(context)[源]

如果 deferrable 為 True,Airflow 會在 worker 上執行此方法,並使用 trigger 進行延遲。

execute_complete(context, event)[源]

如果提供了 messages_callback,則執行它;否則,立即返回 trigger 事件訊息。

此條目是否有幫助?