Amazon S3¶
Amazon Simple Storage Service (Amazon S3) 是網際網路儲存服務。您可以使用 Amazon S3 在任何時間、從任何位置儲存和檢索任意數量的資料。
前置任務¶
要使用這些運算子,您必須完成以下幾項工作:
透過 pip 安裝 API 庫。
pip install 'apache-airflow[amazon]'詳細資訊請參閱 Airflow® 安裝
設定連線.
運算子¶
建立 Amazon S3 儲存桶¶
要建立 Amazon S3 儲存桶,您可以使用 S3CreateBucketOperator。
tests/system/amazon/aws/example_s3.py
create_bucket = S3CreateBucketOperator(
task_id="create_bucket",
bucket_name=bucket_name,
)
刪除 Amazon S3 儲存桶¶
要刪除 Amazon S3 儲存桶,您可以使用 S3DeleteBucketOperator。
tests/system/amazon/aws/example_s3.py
delete_bucket = S3DeleteBucketOperator(
task_id="delete_bucket",
bucket_name=bucket_name,
force_delete=True,
)
獲取 Amazon S3 儲存桶的標籤¶
要獲取與 Amazon S3 儲存桶關聯的標籤集,您可以使用 S3GetBucketTaggingOperator。
tests/system/amazon/aws/example_s3.py
get_tagging = S3GetBucketTaggingOperator(
task_id="get_tagging",
bucket_name=bucket_name,
)
建立 Amazon S3 物件¶
要建立新(或替換現有)Amazon S3 物件,您可以使用 S3CreateObjectOperator。
tests/system/amazon/aws/example_s3.py
create_object = S3CreateObjectOperator(
task_id="create_object",
s3_bucket=bucket_name,
s3_key=key,
data=DATA,
replace=True,
)
複製 Amazon S3 物件¶
要將 Amazon S3 物件從一個儲存桶複製到另一個儲存桶,您可以使用 S3CopyObjectOperator。這裡使用的 Amazon S3 連線需要同時擁有源儲存桶/鍵和目標儲存桶/鍵的訪問許可權。
tests/system/amazon/aws/example_s3.py
copy_object = S3CopyObjectOperator(
task_id="copy_object",
source_bucket_name=bucket_name,
dest_bucket_name=bucket_name_2,
source_bucket_key=key,
dest_bucket_key=key_2,
)
刪除 Amazon S3 物件¶
要刪除一個或多個 Amazon S3 物件,您可以使用 S3DeleteObjectsOperator。
tests/system/amazon/aws/example_s3.py
delete_objects = S3DeleteObjectsOperator(
task_id="delete_objects",
bucket=bucket_name_2,
keys=key_2,
)
轉換 Amazon S3 物件¶
要轉換一個 Amazon S3 物件的資料並將其儲存到另一個物件,您可以使用 S3FileTransformOperator。您還可以應用可選的 Amazon S3 Select 表示式,使用 select_expression 從 source_s3_key 中選擇您想要檢索的資料。
tests/system/amazon/aws/example_s3.py
file_transform = S3FileTransformOperator(
task_id="file_transform",
source_s3_key=f"s3://{bucket_name}/{key}",
dest_s3_key=f"s3://{bucket_name_2}/{key_2}",
# Use `cp` command as transform script as an example
transform_script="cp",
replace=True,
)
列出 Amazon S3 字首¶
要列出 Amazon S3 儲存桶中的所有 Amazon S3 字首,您可以使用 S3ListPrefixesOperator。有關 Amazon S3 字首的更多資訊,請參閱 此處。
tests/system/amazon/aws/example_s3.py
list_prefixes = S3ListPrefixesOperator(
task_id="list_prefixes",
bucket=bucket_name,
prefix=PREFIX,
delimiter=DELIMITER,
)
列出 Amazon S3 物件¶
要列出 Amazon S3 儲存桶中的所有 Amazon S3 物件,您可以使用 S3ListOperator。您可以指定一個 prefix 來過濾名稱以該字首開頭的物件。
tests/system/amazon/aws/example_s3.py
list_keys = S3ListOperator(
task_id="list_keys",
bucket=bucket_name,
prefix=PREFIX,
)
感測器¶
等待 Amazon S3 鍵¶
要等待一個或多個鍵出現在 Amazon S3 儲存桶中,您可以使用 S3KeySensor。對於每個鍵,它都會呼叫 head_object API(如果 wildcard_match 為 True,則呼叫 list_objects_v2 API)來檢查它是否存在。請注意,特別是當用於檢查大量鍵時,每個鍵會進行一次 API 呼叫。
檢查單個檔案
tests/system/amazon/aws/example_s3.py
# Check if a file exists
sensor_one_key = S3KeySensor(
task_id="sensor_one_key",
bucket_name=bucket_name,
bucket_key=key,
)
檢查多個檔案
tests/system/amazon/aws/example_s3.py
# Check if both files exist
sensor_two_keys = S3KeySensor(
task_id="sensor_two_keys",
bucket_name=bucket_name,
bucket_key=[key, key_2],
)
使用正則表示式檢查檔案
tests/system/amazon/aws/example_s3.py
# Check if a file exists and match a certain regular expression pattern
sensor_key_with_regex = S3KeySensor(
task_id="sensor_key_with_regex", bucket_name=bucket_name, bucket_key=key_regex_pattern, use_regex=True
)
要使用額外的自定義檢查,您可以定義一個函式,該函式接收匹配的 S3 物件屬性列表並返回布林值
True: 滿足特定條件False: 不滿足條件
對於作為 bucket_key 引數傳入的每個鍵,都會呼叫此函式。該函式引數是物件列表的原因是,當 wildcard_match 為 True 時,多個檔案可能匹配一個鍵。匹配的 S3 物件屬性列表僅包含大小,格式如下:
[{"Size": int}]
tests/system/amazon/aws/example_s3.py
def check_fn(files: list, **kwargs) -> bool:
"""
Example of custom check: check if all files are bigger than ``20 bytes``
:param files: List of S3 object attributes.
:return: true if the criteria is met
"""
return all(f.get("Size", 0) > 20 for f in files)
tests/system/amazon/aws/example_s3.py
# Check if a file exists and match a certain pattern defined in check_fn
sensor_key_with_function = S3KeySensor(
task_id="sensor_key_with_function",
bucket_name=bucket_name,
bucket_key=key,
check_fn=check_fn,
)
您還可以透過將 deferrable 引數設定為 True,在可延遲模式下執行此運算子。這將提高 Airflow worker 的利用效率,因為作業狀態的輪詢在 triggerer 上非同步進行。請注意,這需要在您的 Airflow 部署中提供 triggerer。
檢查單個檔案
tests/system/amazon/aws/example_s3.py
# Check if a file exists
sensor_one_key_deferrable = S3KeySensor(
task_id="sensor_one_key_deferrable",
bucket_name=bucket_name,
bucket_key=key,
deferrable=True,
)
檢查多個檔案
tests/system/amazon/aws/example_s3.py
# Check if both files exist
sensor_two_keys_deferrable = S3KeySensor(
task_id="sensor_two_keys_deferrable",
bucket_name=bucket_name,
bucket_key=[key, key_2],
deferrable=True,
)
使用正則表示式檢查檔案
tests/system/amazon/aws/example_s3.py
# Check if a file exists and match a certain regular expression pattern
sensor_key_with_regex_deferrable = S3KeySensor(
task_id="sensor_key_with_regex_deferrable",
bucket_name=bucket_name,
bucket_key=key_regex_pattern,
use_regex=True,
deferrable=True,
)
等待 Amazon S3 字首更改¶
要檢查 Amazon S3 儲存桶中特定字首處物件數量的變化,並等待直到不活動期過去且物件數量沒有增加為止,您可以使用 S3KeysUnchangedSensor。請注意,此感測器在重新排程模式下無法正常工作,因為 Amazon S3 儲存桶中列出的物件狀態將在重新排程的呼叫之間丟失。
tests/system/amazon/aws/example_s3.py
sensor_keys_unchanged = S3KeysUnchangedSensor(
task_id="sensor_keys_unchanged",
bucket_name=bucket_name_2,
prefix=PREFIX,
inactivity_period=10, # inactivity_period in seconds
)
您還可以透過將 deferrable 引數設定為 True,在可延遲模式下執行此感測器。這將提高 Airflow worker 的利用效率,因為作業狀態的輪詢在 triggerer 上非同步進行。請注意,這需要在您的 Airflow 部署中提供 triggerer。