AWS DataSync

AWS DataSync 是一項資料傳輸服務,可簡化、自動化並加速透過網際網路或 AWS Direct Connect 在本地儲存系統與 AWS 儲存服務之間移動和複製資料。

前置任務

要使用這些運算子,您必須執行以下幾項操作:

通用引數

aws_conn_id

引用 Amazon Web Services 連線 ID。如果此引數設定為 None,則使用預設的 boto3 行為,不進行連線查詢。否則,使用連線中儲存的憑證。預設值: aws_default

region_name

AWS 區域名稱。如果此引數設定為 None 或省略,則使用 AWS 連線額外引數 中的 region_name。否則,使用指定的值而不是連線值。預設值: None

verify

是否驗證 SSL 證書。

  • False - 不驗證 SSL 證書。

  • path/to/cert/bundle.pem - 要使用的 CA 證書包檔名。如果您想使用與 botocore 使用的 CA 證書包不同的證書包,可以指定此引數。

如果此引數設定為 None 或省略,則使用 AWS 連線額外引數 中的 verify。否則,使用指定的值而不是連線值。預設值: None

botocore_config

提供的字典用於構建 botocore.config.Config。此配置可用於配置 避免限流異常、超時等。

示例:有關引數的更多詳細資訊,請參閱 botocore.config.Config
{
    "signature_version": "unsigned",
    "s3": {
        "us_east_1_regional_endpoint": True,
    },
    "retries": {
      "mode": "standard",
      "max_attempts": 10,
    },
    "connect_timeout": 300,
    "read_timeout": 300,
    "tcp_keepalive": True,
}

如果此引數設定為 None 或省略,則使用 AWS 連線額外引數 中的 config_kwargs。否則,使用指定的值而不是連線值。預設值: None

注意

指定一個空字典 {} 將覆蓋 botocore.config.Config 的連線配置。

運算子

與 AWS DataSync 任務互動

您可以使用 DataSyncOperator 來查詢、建立、更新、執行和刪除 AWS DataSync 任務。

一旦 DataSyncOperator 確定了要執行的正確 TaskArn(無論是因為您指定了它,還是因為它已被找到),它就會被執行。每當執行一個 AWS DataSync 任務時,都會建立一個由 TaskExecutionArn 標識的 AWS DataSync TaskExecution。

TaskExecutionArn 將被監控直到完成(成功/失敗),其狀態將定期寫入 Airflow 任務日誌。

DataSyncOperator 支援向底層的 boto3.start_task_execution() API 可選地傳遞額外的 kwargs。這是透過 task_execution_kwargs 引數完成的。例如,這對於限制頻寬或過濾包含的檔案很有用,更多詳細資訊請參閱 boto3 Datasync 文件

執行任務

要執行特定任務,您可以將 task_arn 傳遞給運算子。

tests/system/amazon/aws/example_datasync.py

# Execute a specific task
execute_task_by_arn = DataSyncOperator(
    task_id="execute_task_by_arn",
    task_arn=created_task_arn,
)

搜尋並執行任務

要搜尋任務,您可以將 source_location_uridestination_location_uri 引數指定給運算子。如果找到一個任務,該任務將被執行。如果找到多個任務,運算子將引發一個 Exception。為避免這種情況,您可以將 allow_random_task_choice 設定為 True,以便從候選任務中隨機選擇一個。

tests/system/amazon/aws/example_datasync.py

# Search and execute a task
execute_task_by_locations = DataSyncOperator(
    task_id="execute_task_by_locations",
    source_location_uri=f"s3://{s3_bucket_source}/test",
    destination_location_uri=f"s3://{s3_bucket_destination}/test",
    # Only transfer files from /test/subdir folder
    task_execution_kwargs={
        "Includes": [{"FilterType": "SIMPLE_PATTERN", "Value": "/test/subdir"}],
    },
)

建立並執行任務

搜尋任務時,如果未找到任何任務,您可以選擇在執行前建立一個。為此,您需要提供額外的引數 create_task_kwargscreate_source_location_kwargscreate_destination_location_kwargs

這些額外引數提供了一種方式,讓運算子在未找到合適的現有任務時自動建立任務和/或位置。如果這些引數保持預設值 (None),則不會嘗試建立。

此外,由於 delete_task_after_execution 設定為 True,任務在成功完成後將從 AWS DataSync 中刪除。

tests/system/amazon/aws/example_datasync.py

# Create a task (the task does not exist)
create_and_execute_task = DataSyncOperator(
    task_id="create_and_execute_task",
    source_location_uri=f"s3://{s3_bucket_source}/test_create",
    destination_location_uri=f"s3://{s3_bucket_destination}/test_create",
    create_task_kwargs={"Name": "Created by Airflow"},
    create_source_location_kwargs={
        "Subdirectory": "test_create",
        "S3BucketArn": get_s3_bucket_arn(s3_bucket_source),
        "S3Config": {
            "BucketAccessRoleArn": test_context[ROLE_ARN_KEY],
        },
    },
    create_destination_location_kwargs={
        "Subdirectory": "test_create",
        "S3BucketArn": get_s3_bucket_arn(s3_bucket_destination),
        "S3Config": {
            "BucketAccessRoleArn": test_context[ROLE_ARN_KEY],
        },
    },
    delete_task_after_execution=False,
)

建立任務時,DataSyncOperator 將嘗試查詢並使用現有的 LocationArns,而不是建立新的。如果多個 LocationArns 與指定的 URI 匹配,則我們需要選擇一個使用。在這種情況下,運算子的行為類似於從多個任務中選擇單個任務的方式

運算子將引發一個 Exception。為避免這種情況,您可以將 allow_random_location_choice 設定為 True,以便從候選位置中隨機選擇一個。

參考

此條目是否有幫助?