airflow.providers.google.cloud.sensors.gcs

此模組包含 Google Cloud Storage 感測器。

GCSObjectExistenceSensor

檢查 Google Cloud Storage 中是否存在檔案。

GCSObjectUpdateSensor

檢查 Google Cloud Storage 中的物件是否已更新。

GCSObjectsWithPrefixExistenceSensor

檢查給定字首是否存在 GCS 物件,並透過 XCom 傳遞匹配項。

GCSUploadSessionCompleteSensor

如果非活動期已過,且儲存桶中的物件數量未增加,則返回 True。

函式

ts_function(context)

作為 GoogleCloudStorageObjectUpdatedSensor 的預設回撥函式。

get_time()

作為 datetime.datetime.now 的包裝器,以便在單元測試中簡化模擬。

模組內容

class airflow.providers.google.cloud.sensors.gcs.GCSObjectExistenceSensor(*, bucket, object, use_glob=False, google_cloud_conn_id='google_cloud_default', impersonation_chain=None, retry=DEFAULT_RETRY, deferrable=conf.getboolean('operators', 'default_deferrable', fallback=False), **kwargs)[source]

基類:airflow.sensors.base.BaseSensorOperator

檢查 Google Cloud Storage 中是否存在檔案。

引數:
  • bucket (str) – 物件所在的 Google Cloud Storage 儲存桶。

  • object (str) – 要在 Google Cloud Storage 儲存桶中檢查的物件的名稱。

  • use_glob (bool) – 設定為 True 時,將物件引數解釋為 glob

  • google_cloud_conn_id (str) – 連線到 Google Cloud Storage 時使用的連線 ID。

  • impersonation_chain (str | collections.abc.Sequence[str] | None) – 可選的服務賬號,用於使用短期憑據進行模擬,或者鏈式賬號列表,獲取列表中最後一個賬號的 access_token,該賬號將在請求中被模擬。如果設定為字串,則原始賬號必須向該賬號授予 Service Account Token Creator IAM 角色。如果設定為序列,則列表中的身份必須向直接前一個身份授予 Service Account Token Creator IAM 角色,列表中的第一個賬號向原始賬號授予此角色(模板化)。

  • retry (google.api_core.retry.Retry) – (可選) 如何重試 RPC

template_fields: collections.abc.Sequence[str] = ('bucket', 'object', 'impersonation_chain')[source]
ui_color = '#f0eee4'[source]
bucket[source]
object[source]
use_glob = False[source]
google_cloud_conn_id = 'google_cloud_default'[source]
impersonation_chain = None[source]
retry[source]
deferrable = True[source]
poke(context)[source]

在派生此類時重寫。

execute(context)[source]

Airflow 在 worker 上執行此方法,並使用觸發器推遲執行。

execute_complete(context, event)[source]

作為觸發器觸發時的回撥函式 - 立即返回。

依賴於觸發器丟擲異常,否則假定執行成功。

airflow.providers.google.cloud.sensors.gcs.ts_function(context)[source]

作為 GoogleCloudStorageObjectUpdatedSensor 的預設回撥函式。

預設行為是檢查物件在資料間隔結束之後是否已更新。

class airflow.providers.google.cloud.sensors.gcs.GCSObjectUpdateSensor(bucket, object, ts_func=ts_function, google_cloud_conn_id='google_cloud_default', impersonation_chain=None, deferrable=conf.getboolean('operators', 'default_deferrable', fallback=False), **kwargs)[source]

基類:airflow.sensors.base.BaseSensorOperator

檢查 Google Cloud Storage 中的物件是否已更新。

引數:
  • bucket (str) – 物件所在的 Google Cloud Storage 儲存桶。

  • object (str) – 要在 Google Cloud Storage 儲存桶中下載的物件的名稱。

  • ts_func (Callable) – 用於定義更新條件的回撥函式。預設回撥函式返回 logical_date + schedule_interval。回撥函式接受 context 作為引數。

  • google_cloud_conn_id (str) – 連線到 Google Cloud Storage 時使用的連線 ID。

  • impersonation_chain (str | collections.abc.Sequence[str] | None) – 可選的服務賬號,用於使用短期憑據進行模擬,或者鏈式賬號列表,獲取列表中最後一個賬號的 access_token,該賬號將在請求中被模擬。如果設定為字串,則原始賬號必須向該賬號授予 Service Account Token Creator IAM 角色。如果設定為序列,則列表中的身份必須向直接前一個身份授予 Service Account Token Creator IAM 角色,列表中的第一個賬號向原始賬號授予此角色(模板化)。

  • deferrable (bool) – 在可推遲模式下執行感測器。

template_fields: collections.abc.Sequence[str] = ('bucket', 'object', 'impersonation_chain')[source]
ui_color = '#f0eee4'[source]
bucket[source]
object[source]
ts_func[source]
google_cloud_conn_id = 'google_cloud_default'[source]
impersonation_chain = None[source]
deferrable = True[source]
poke(context)[source]

在派生此類時重寫。

execute(context)[source]

Airflow 在 worker 上執行此方法,並使用觸發器推遲執行。

execute_complete(context, event=None)[source]

立即返回並依賴觸發器丟擲成功事件。觸發器的回撥函式。

class airflow.providers.google.cloud.sensors.gcs.GCSObjectsWithPrefixExistenceSensor(bucket, prefix, google_cloud_conn_id='google_cloud_default', impersonation_chain=None, deferrable=conf.getboolean('operators', 'default_deferrable', fallback=False), **kwargs)[source]

基類:airflow.sensors.base.BaseSensorOperator

檢查給定字首是否存在 GCS 物件,並透過 XCom 傳遞匹配項。

當找到匹配給定字首的檔案時,poke 方法的條件將得到滿足,匹配的物件將從操作器返回,並透過 XCom 傳遞給下游任務。

引數:
  • bucket (str) – 物件所在的 Google Cloud Storage 儲存桶。

  • prefix (str) – 要在 Google Cloud Storage 儲存桶中檢查的字首名稱。

  • google_cloud_conn_id (str) – 連線到 Google Cloud Storage 時使用的連線 ID。

  • impersonation_chain (str | collections.abc.Sequence[str] | None) – 可選的服務賬號,用於使用短期憑據進行模擬,或者鏈式賬號列表,獲取列表中最後一個賬號的 access_token,該賬號將在請求中被模擬。如果設定為字串,則原始賬號必須向該賬號授予 Service Account Token Creator IAM 角色。如果設定為序列,則列表中的身份必須向直接前一個身份授予 Service Account Token Creator IAM 角色,列表中的第一個賬號向原始賬號授予此角色(模板化)。

  • deferrable (bool) – 在可推遲模式下執行感測器。

template_fields: collections.abc.Sequence[str] = ('bucket', 'prefix', 'impersonation_chain')[source]
ui_color = '#f0eee4'[source]
bucket[source]
prefix[source]
google_cloud_conn_id = 'google_cloud_default'[source]
impersonation_chain = None[source]
deferrable = True[source]
poke(context)[source]

在派生此類時重寫。

execute(context)[source]

重寫以允許傳遞匹配項。

execute_complete(context, event)[source]

立即返回並依賴觸發器丟擲成功事件。觸發器的回撥函式。

airflow.providers.google.cloud.sensors.gcs.get_time()[source]

作為 datetime.datetime.now 的包裝器,以便在單元測試中簡化模擬。

class airflow.providers.google.cloud.sensors.gcs.GCSUploadSessionCompleteSensor(bucket, prefix, inactivity_period=60 * 60, min_objects=1, previous_objects=None, allow_delete=True, google_cloud_conn_id='google_cloud_default', impersonation_chain=None, deferrable=conf.getboolean('operators', 'default_deferrable', fallback=False), **kwargs)[source]

基類:airflow.sensors.base.BaseSensorOperator

如果非活動期已過,且儲存桶中的物件數量未增加,則返回 True。

檢查 Google Cloud Storage 儲存桶中字首下物件數量的變化,如果非活動期已過且物件數量沒有增加,則返回 True。請注意,此感測器在重新排程模式下表現不正確,因為 GCS 儲存桶中列出的物件狀態會在重新排程呼叫之間丟失。

引數:
  • bucket (str) – 預期物件所在的 Google Cloud Storage 儲存桶。

  • prefix (str) – 要在 Google Cloud Storage 儲存桶中檢查的字首名稱。

  • inactivity_period (float) – 用於確定上傳會話結束的總非活動秒數。請注意,此機制不是即時的,此操作器可能要等到此週期過去且沒有感知到其他物件後才會返回(延遲一個 poke_interval)。

  • min_objects (int) – 上傳會話被視為有效所需的最小物件數量。

  • previous_objects (set[str] | None) – 上次 poke 期間找到的物件 ID 集合。

  • allow_delete (bool) – 此感測器是否應將兩次 poke 之間物件被刪除視為有效行為。如果為 True,當發生這種情況時會記錄警告訊息。如果為 False,則會引發錯誤。

  • google_cloud_conn_id (str) – 連線到 Google Cloud Storage 時使用的連線 ID。

  • impersonation_chain (str | collections.abc.Sequence[str] | None) – 可選的服務賬號,用於使用短期憑據進行模擬,或者鏈式賬號列表,獲取列表中最後一個賬號的 access_token,該賬號將在請求中被模擬。如果設定為字串,則原始賬號必須向該賬號授予 Service Account Token Creator IAM 角色。如果設定為序列,則列表中的身份必須向直接前一個身份授予 Service Account Token Creator IAM 角色,列表中的第一個賬號向原始賬號授予此角色(模板化)。

  • deferrable (bool) – 在可推遲模式下執行感測器。

template_fields: collections.abc.Sequence[str] = ('bucket', 'prefix', 'impersonation_chain')[source]
ui_color = '#f0eee4'[source]
bucket[source]
prefix[source]
inactivity_period = 3600[source]
min_objects = 1[source]
previous_objects[source]
inactivity_seconds = 0[source]
allow_delete = True[source]
google_cloud_conn_id = 'google_cloud_default'[source]
last_activity_time = None[source]
impersonation_chain = None[source]
hook: airflow.providers.google.cloud.hooks.gcs.GCSHook | None = None[source]
deferrable = True[source]
is_bucket_updated(current_objects)[source]

檢查是否添加了新物件且非活動週期已過,並更新狀態。

引數:

current_objects (set[str]) – 上次探測時桶中的物件ID集合。

poke(context)[source]

在派生此類時重寫。

execute(context)[source]

Airflow 在 worker 上執行此方法,並使用觸發器推遲執行。

execute_complete(context, event=None)[source]

依賴觸發器丟擲異常,否則認為執行成功。

觸發器觸發時的回撥 - 立即返回。

這條目有幫助嗎?