Amazon S3

Amazon Simple Storage Service (Amazon S3) 是網際網路儲存服務。您可以使用 Amazon S3 在任何時間、從任何位置儲存和檢索任意數量的資料。

前置任務

要使用這些運算子,您必須完成以下幾項工作:

運算子

建立 Amazon S3 儲存桶

要建立 Amazon S3 儲存桶,您可以使用 S3CreateBucketOperator

tests/system/amazon/aws/example_s3.py

create_bucket = S3CreateBucketOperator(
    task_id="create_bucket",
    bucket_name=bucket_name,
)

刪除 Amazon S3 儲存桶

要刪除 Amazon S3 儲存桶,您可以使用 S3DeleteBucketOperator

tests/system/amazon/aws/example_s3.py

delete_bucket = S3DeleteBucketOperator(
    task_id="delete_bucket",
    bucket_name=bucket_name,
    force_delete=True,
)

設定 Amazon S3 儲存桶的標籤

要設定 Amazon S3 儲存桶的標籤,您可以使用 S3PutBucketTaggingOperator

tests/system/amazon/aws/example_s3.py

put_tagging = S3PutBucketTaggingOperator(
    task_id="put_tagging",
    bucket_name=bucket_name,
    key=TAG_KEY,
    value=TAG_VALUE,
)

獲取 Amazon S3 儲存桶的標籤

要獲取與 Amazon S3 儲存桶關聯的標籤集,您可以使用 S3GetBucketTaggingOperator

tests/system/amazon/aws/example_s3.py

get_tagging = S3GetBucketTaggingOperator(
    task_id="get_tagging",
    bucket_name=bucket_name,
)

刪除 Amazon S3 儲存桶的標籤

要刪除 Amazon S3 儲存桶的標籤,您可以使用 S3DeleteBucketTaggingOperator

tests/system/amazon/aws/example_s3.py

delete_tagging = S3DeleteBucketTaggingOperator(
    task_id="delete_tagging",
    bucket_name=bucket_name,
)

建立 Amazon S3 物件

要建立新(或替換現有)Amazon S3 物件,您可以使用 S3CreateObjectOperator

tests/system/amazon/aws/example_s3.py

create_object = S3CreateObjectOperator(
    task_id="create_object",
    s3_bucket=bucket_name,
    s3_key=key,
    data=DATA,
    replace=True,
)

複製 Amazon S3 物件

要將 Amazon S3 物件從一個儲存桶複製到另一個儲存桶,您可以使用 S3CopyObjectOperator。這裡使用的 Amazon S3 連線需要同時擁有源儲存桶/鍵和目標儲存桶/鍵的訪問許可權。

tests/system/amazon/aws/example_s3.py

copy_object = S3CopyObjectOperator(
    task_id="copy_object",
    source_bucket_name=bucket_name,
    dest_bucket_name=bucket_name_2,
    source_bucket_key=key,
    dest_bucket_key=key_2,
)

刪除 Amazon S3 物件

要刪除一個或多個 Amazon S3 物件,您可以使用 S3DeleteObjectsOperator

tests/system/amazon/aws/example_s3.py

delete_objects = S3DeleteObjectsOperator(
    task_id="delete_objects",
    bucket=bucket_name_2,
    keys=key_2,
)

轉換 Amazon S3 物件

要轉換一個 Amazon S3 物件的資料並將其儲存到另一個物件,您可以使用 S3FileTransformOperator。您還可以應用可選的 Amazon S3 Select 表示式,使用 select_expressionsource_s3_key 中選擇您想要檢索的資料。

tests/system/amazon/aws/example_s3.py

file_transform = S3FileTransformOperator(
    task_id="file_transform",
    source_s3_key=f"s3://{bucket_name}/{key}",
    dest_s3_key=f"s3://{bucket_name_2}/{key_2}",
    # Use `cp` command as transform script as an example
    transform_script="cp",
    replace=True,
)

列出 Amazon S3 字首

要列出 Amazon S3 儲存桶中的所有 Amazon S3 字首,您可以使用 S3ListPrefixesOperator。有關 Amazon S3 字首的更多資訊,請參閱 此處

tests/system/amazon/aws/example_s3.py

list_prefixes = S3ListPrefixesOperator(
    task_id="list_prefixes",
    bucket=bucket_name,
    prefix=PREFIX,
    delimiter=DELIMITER,
)

列出 Amazon S3 物件

要列出 Amazon S3 儲存桶中的所有 Amazon S3 物件,您可以使用 S3ListOperator。您可以指定一個 prefix 來過濾名稱以該字首開頭的物件。

tests/system/amazon/aws/example_s3.py

list_keys = S3ListOperator(
    task_id="list_keys",
    bucket=bucket_name,
    prefix=PREFIX,
)

感測器

等待 Amazon S3 鍵

要等待一個或多個鍵出現在 Amazon S3 儲存桶中,您可以使用 S3KeySensor。對於每個鍵,它都會呼叫 head_object API(如果 wildcard_matchTrue,則呼叫 list_objects_v2 API)來檢查它是否存在。請注意,特別是當用於檢查大量鍵時,每個鍵會進行一次 API 呼叫。

檢查單個檔案

tests/system/amazon/aws/example_s3.py

# Check if a file exists
sensor_one_key = S3KeySensor(
    task_id="sensor_one_key",
    bucket_name=bucket_name,
    bucket_key=key,
)

檢查多個檔案

tests/system/amazon/aws/example_s3.py

# Check if both files exist
sensor_two_keys = S3KeySensor(
    task_id="sensor_two_keys",
    bucket_name=bucket_name,
    bucket_key=[key, key_2],
)

使用正則表示式檢查檔案

tests/system/amazon/aws/example_s3.py

# Check if a file exists and match a certain regular expression pattern
sensor_key_with_regex = S3KeySensor(
    task_id="sensor_key_with_regex", bucket_name=bucket_name, bucket_key=key_regex_pattern, use_regex=True
)

要使用額外的自定義檢查,您可以定義一個函式,該函式接收匹配的 S3 物件屬性列表並返回布林值

  • True: 滿足特定條件

  • False: 不滿足條件

對於作為 bucket_key 引數傳入的每個鍵,都會呼叫此函式。該函式引數是物件列表的原因是,當 wildcard_matchTrue 時,多個檔案可能匹配一個鍵。匹配的 S3 物件屬性列表僅包含大小,格式如下:

[{"Size": int}]

tests/system/amazon/aws/example_s3.py

def check_fn(files: list, **kwargs) -> bool:
    """
    Example of custom check: check if all files are bigger than ``20 bytes``

    :param files: List of S3 object attributes.
    :return: true if the criteria is met
    """
    return all(f.get("Size", 0) > 20 for f in files)

tests/system/amazon/aws/example_s3.py

# Check if a file exists and match a certain pattern defined in check_fn
sensor_key_with_function = S3KeySensor(
    task_id="sensor_key_with_function",
    bucket_name=bucket_name,
    bucket_key=key,
    check_fn=check_fn,
)

您還可以透過將 deferrable 引數設定為 True,在可延遲模式下執行此運算子。這將提高 Airflow worker 的利用效率,因為作業狀態的輪詢在 triggerer 上非同步進行。請注意,這需要在您的 Airflow 部署中提供 triggerer。

檢查單個檔案

tests/system/amazon/aws/example_s3.py

# Check if a file exists
sensor_one_key_deferrable = S3KeySensor(
    task_id="sensor_one_key_deferrable",
    bucket_name=bucket_name,
    bucket_key=key,
    deferrable=True,
)

檢查多個檔案

tests/system/amazon/aws/example_s3.py

# Check if both files exist
sensor_two_keys_deferrable = S3KeySensor(
    task_id="sensor_two_keys_deferrable",
    bucket_name=bucket_name,
    bucket_key=[key, key_2],
    deferrable=True,
)

使用正則表示式檢查檔案

tests/system/amazon/aws/example_s3.py

# Check if a file exists and match a certain regular expression pattern
sensor_key_with_regex_deferrable = S3KeySensor(
    task_id="sensor_key_with_regex_deferrable",
    bucket_name=bucket_name,
    bucket_key=key_regex_pattern,
    use_regex=True,
    deferrable=True,
)

等待 Amazon S3 字首更改

要檢查 Amazon S3 儲存桶中特定字首處物件數量的變化,並等待直到不活動期過去且物件數量沒有增加為止,您可以使用 S3KeysUnchangedSensor。請注意,此感測器在重新排程模式下無法正常工作,因為 Amazon S3 儲存桶中列出的物件狀態將在重新排程的呼叫之間丟失。

tests/system/amazon/aws/example_s3.py

sensor_keys_unchanged = S3KeysUnchangedSensor(
    task_id="sensor_keys_unchanged",
    bucket_name=bucket_name_2,
    prefix=PREFIX,
    inactivity_period=10,  # inactivity_period in seconds
)

您還可以透過將 deferrable 引數設定為 True,在可延遲模式下執行此感測器。這將提高 Airflow worker 的利用效率,因為作業狀態的輪詢在 triggerer 上非同步進行。請注意,這需要在您的 Airflow 部署中提供 triggerer。

參考

此條目有幫助嗎?