airflow.providers.amazon.aws.sensors.s3

S3KeySensor

等待一個或多個 Key(S3 上的檔案狀例項)存在於 S3 儲存桶中。

S3KeysUnchangedSensor

如果在 inactivity_period 過去後匹配字首的物件數量沒有增加,則返回 True。

模組內容

airflow.providers.amazon.aws.sensors.s3.S3KeySensor(*, bucket_key, bucket_name=None, wildcard_match=False, check_fn=None, verify=None, deferrable=conf.getboolean('operators', 'default_deferrable', fallback=False), use_regex=False, metadata_keys=None, **kwargs)[原始碼]

基類: airflow.providers.amazon.aws.sensors.base_aws.AwsBaseSensor[airflow.providers.amazon.aws.hooks.s3.S3Hook]

等待一個或多個 Key(S3 上的檔案狀例項)存在於 S3 儲存桶中。

路徑只是給定 S3 路徑資源的鍵/值指標。注意:S3 不直接支援資料夾,只提供鍵/值對。

另請參閱

有關如何使用此感測器的更多資訊,請參閱指南:等待 Amazon S3 鍵

引數:
  • bucket_key (str | list[str]) – 等待的 Key(s)。支援完整的 s3:// 風格 URL 或根級別的相對路徑。當指定為完整的 s3:// URL 時,請將 bucket_name 留空 None

  • bucket_name (str | None) – S3 儲存桶的名稱。僅當 bucket_key 未提供為完整的 s3:// URL 時需要。指定時,傳遞給 bucket_key 的所有鍵都引用此儲存桶

  • wildcard_match (bool) – bucket_key 是否應被解釋為 Unix 萬用字元模式

  • check_fn (Callable[Ellipsis, bool] | None) –

    接收 S3 物件列表以及上下文值的函式,並返回一個布林值: - True: 滿足條件 - False: 不滿足條件 示例:等待任何 S3 物件大小大於 1 兆位元組

    def check_fn(files: List, **kwargs) -> bool:
        return any(f.get('Size', 0) > 1048576 for f in files)
    

  • deferrable (bool) – 以可延期模式執行運算子

  • use_regex (bool) – 是否使用正則表示式檢查儲存桶

  • metadata_keys (list[str] | None) – 要收集併發送到 check_fn 的 head_object 屬性列表。可接受的值:s3.head_object 返回的任何頂級屬性。指定 * 以返回所有可用屬性。預設值:“Size”。如果請求的屬性未找到,則仍包含該鍵,值為 None。

  • aws_conn_id – 用於 AWS 憑據的 Airflow 連線。如果此項為 None 或為空,則使用預設的 boto3 行為。如果以分散式方式執行 Airflow 且 aws_conn_id 為 None 或為空,則將使用預設的 boto3 配置(並且必須在每個 Worker 節點上維護)。

  • region_name – AWS region_name。如果未指定,則使用預設的 boto3 行為。

  • verify (str | bool | None) – 是否驗證 SSL 證書。參見: https://boto3.amazonaws.com/v1/documentation/api/latest/reference/core/session.html

template_fields: collections.abc.Sequence[str][原始碼]
aws_hook_class[原始碼]
bucket_name = None[原始碼]
bucket_key[原始碼]
wildcard_match = False[原始碼]
check_fn = None[原始碼]
verify = None[原始碼]
deferrable = True[原始碼]
use_regex = False[原始碼]
metadata_keys = ['Size'][原始碼]
poke(context)[原始碼]

派生此類時重寫。

execute(context)[原始碼]

Airflow 在 Worker 上執行此方法並使用觸發器進行延期。

execute_complete(context, event)[原始碼]

當觸發器觸發時執行 - 立即返回。

依賴觸發器丟擲異常,否則假定執行成功。

airflow.providers.amazon.aws.sensors.s3.S3KeysUnchangedSensor(*, bucket_name, prefix, verify=None, inactivity_period=60 * 60, min_objects=1, previous_objects=None, allow_delete=True, deferrable=conf.getboolean('operators', 'default_deferrable', fallback=False), **kwargs)[原始碼]

基類: airflow.providers.amazon.aws.sensors.base_aws.AwsBaseSensor[airflow.providers.amazon.aws.hooks.s3.S3Hook]

如果在 inactivity_period 過去後匹配字首的物件數量沒有增加,則返回 True。

注意,此感測器在 reschedule 模式下無法正常工作,因為 S3 儲存桶中列出的物件狀態將在 reschedule 呼叫之間丟失。

另請參閱

有關如何使用此感測器的更多資訊,請參閱指南:等待 Amazon S3 字首更改

引數:
  • bucket_name (str) – S3 儲存桶的名稱

  • prefix (str) – 正在等待的字首。從儲存桶根級別開始的相對路徑。https://boto3.amazonaws.com/v1/documentation/api/latest/reference/core/session.html

  • inactivity_period (float) – 用於指定鍵未更改的總不活動秒數。注意,此機制不是即時的,此運算子可能在此週期過去且沒有檢測到附加物件之前的 poke_interval 後才會返回。

  • min_objects (int) – 鍵未更改感測器被視為有效所需的最小物件數。

  • previous_objects (set[str] | None) – 上次 poke 期間找到的物件 ID 集合。

  • allow_delete (bool) – 此感測器是否應將兩次 poke 之間刪除的物件視為有效行為。如果為 True,發生這種情況時將記錄警告訊息。如果為 False,將引發錯誤。

  • deferrable (bool) – 以可延期模式執行感測器

  • aws_conn_id – 用於 AWS 憑據的 Airflow 連線。如果此項為 None 或為空,則使用預設的 boto3 行為。如果以分散式方式執行 Airflow 且 aws_conn_id 為 None 或為空,則將使用預設的 boto3 配置(並且必須在每個 Worker 節點上維護)。

  • region_name – AWS region_name。如果未指定,則使用預設的 boto3 行為。

  • verify (bool | str | None) – 是否驗證 SSL 證書。參見: https://boto3.amazonaws.com/v1/documentation/api/latest/reference/core/session.html

template_fields: collections.abc.Sequence[str][原始碼]
aws_hook_class[原始碼]
bucket_name[原始碼]
prefix[原始碼]
inactivity_period = 3600[原始碼]
min_objects = 1[原始碼]
previous_objects[原始碼]
inactivity_seconds = 0[原始碼]
allow_delete = True[原始碼]
deferrable = True[原始碼]
verify = None[原始碼]
last_activity_time: datetime.datetime | None = None[原始碼]
is_keys_unchanged(current_objects)[原始碼]

檢查 inactivity_period 後是否有新物件,並相應地更新感測器狀態。

引數:

current_objects (set[str]) – 上次 poke 期間儲存桶中的物件 ID 集合。

poke(context)[原始碼]

派生此類時重寫。

execute(context)[原始碼]

如果 deferrable 為 True,Airflow 在 Worker 上執行此方法並使用觸發器進行延期。

execute_complete(context, event=None)[原始碼]

當觸發器觸發時執行 - 立即返回。

依賴觸發器丟擲異常,否則假定執行成功。

此條目有幫助嗎?