AWS DataSync¶
AWS DataSync 是一項資料傳輸服務,可簡化、自動化並加速透過網際網路或 AWS Direct Connect 在本地儲存系統與 AWS 儲存服務之間移動和複製資料。
前置任務¶
要使用這些運算子,您必須執行以下幾項操作:
透過 pip 安裝 API 庫。
pip install 'apache-airflow[amazon]'詳細資訊請參閱 安裝 Airflow®
設定連線.
通用引數¶
- aws_conn_id
引用 Amazon Web Services 連線 ID。如果此引數設定為
None,則使用預設的 boto3 行為,不進行連線查詢。否則,使用連線中儲存的憑證。預設值:aws_default- region_name
AWS 區域名稱。如果此引數設定為
None或省略,則使用 AWS 連線額外引數 中的 region_name。否則,使用指定的值而不是連線值。預設值:None- verify
是否驗證 SSL 證書。
False- 不驗證 SSL 證書。path/to/cert/bundle.pem - 要使用的 CA 證書包檔名。如果您想使用與 botocore 使用的 CA 證書包不同的證書包,可以指定此引數。
如果此引數設定為
None或省略,則使用 AWS 連線額外引數 中的 verify。否則,使用指定的值而不是連線值。預設值:None- botocore_config
提供的字典用於構建 botocore.config.Config。此配置可用於配置 避免限流異常、超時等。
示例:有關引數的更多詳細資訊,請參閱 botocore.config.Config¶{ "signature_version": "unsigned", "s3": { "us_east_1_regional_endpoint": True, }, "retries": { "mode": "standard", "max_attempts": 10, }, "connect_timeout": 300, "read_timeout": 300, "tcp_keepalive": True, }
如果此引數設定為
None或省略,則使用 AWS 連線額外引數 中的 config_kwargs。否則,使用指定的值而不是連線值。預設值:None注意
指定一個空字典
{}將覆蓋 botocore.config.Config 的連線配置。
運算子¶
與 AWS DataSync 任務互動¶
您可以使用 DataSyncOperator 來查詢、建立、更新、執行和刪除 AWS DataSync 任務。
一旦 DataSyncOperator 確定了要執行的正確 TaskArn(無論是因為您指定了它,還是因為它已被找到),它就會被執行。每當執行一個 AWS DataSync 任務時,都會建立一個由 TaskExecutionArn 標識的 AWS DataSync TaskExecution。
TaskExecutionArn 將被監控直到完成(成功/失敗),其狀態將定期寫入 Airflow 任務日誌。
DataSyncOperator 支援向底層的 boto3.start_task_execution() API 可選地傳遞額外的 kwargs。這是透過 task_execution_kwargs 引數完成的。例如,這對於限制頻寬或過濾包含的檔案很有用,更多詳細資訊請參閱 boto3 Datasync 文件。
執行任務¶
要執行特定任務,您可以將 task_arn 傳遞給運算子。
tests/system/amazon/aws/example_datasync.py
# Execute a specific task
execute_task_by_arn = DataSyncOperator(
task_id="execute_task_by_arn",
task_arn=created_task_arn,
)
搜尋並執行任務¶
要搜尋任務,您可以將 source_location_uri 和 destination_location_uri 引數指定給運算子。如果找到一個任務,該任務將被執行。如果找到多個任務,運算子將引發一個 Exception。為避免這種情況,您可以將 allow_random_task_choice 設定為 True,以便從候選任務中隨機選擇一個。
tests/system/amazon/aws/example_datasync.py
# Search and execute a task
execute_task_by_locations = DataSyncOperator(
task_id="execute_task_by_locations",
source_location_uri=f"s3://{s3_bucket_source}/test",
destination_location_uri=f"s3://{s3_bucket_destination}/test",
# Only transfer files from /test/subdir folder
task_execution_kwargs={
"Includes": [{"FilterType": "SIMPLE_PATTERN", "Value": "/test/subdir"}],
},
)
建立並執行任務¶
搜尋任務時,如果未找到任何任務,您可以選擇在執行前建立一個。為此,您需要提供額外的引數 create_task_kwargs、create_source_location_kwargs 和 create_destination_location_kwargs。
這些額外引數提供了一種方式,讓運算子在未找到合適的現有任務時自動建立任務和/或位置。如果這些引數保持預設值 (None),則不會嘗試建立。
此外,由於 delete_task_after_execution 設定為 True,任務在成功完成後將從 AWS DataSync 中刪除。
tests/system/amazon/aws/example_datasync.py
# Create a task (the task does not exist)
create_and_execute_task = DataSyncOperator(
task_id="create_and_execute_task",
source_location_uri=f"s3://{s3_bucket_source}/test_create",
destination_location_uri=f"s3://{s3_bucket_destination}/test_create",
create_task_kwargs={"Name": "Created by Airflow"},
create_source_location_kwargs={
"Subdirectory": "test_create",
"S3BucketArn": get_s3_bucket_arn(s3_bucket_source),
"S3Config": {
"BucketAccessRoleArn": test_context[ROLE_ARN_KEY],
},
},
create_destination_location_kwargs={
"Subdirectory": "test_create",
"S3BucketArn": get_s3_bucket_arn(s3_bucket_destination),
"S3Config": {
"BucketAccessRoleArn": test_context[ROLE_ARN_KEY],
},
},
delete_task_after_execution=False,
)
建立任務時,DataSyncOperator 將嘗試查詢並使用現有的 LocationArns,而不是建立新的。如果多個 LocationArns 與指定的 URI 匹配,則我們需要選擇一個使用。在這種情況下,運算子的行為類似於從多個任務中選擇單個任務的方式
運算子將引發一個 Exception。為避免這種情況,您可以將 allow_random_location_choice 設定為 True,以便從候選位置中隨機選擇一個。