airflow.providers.amazon.aws.hooks.datasync

使用 AWS 的 boto3 庫與 AWS DataSync 互動。

DataSyncHook

與 AWS DataSync 互動。

模組內容

class airflow.providers.amazon.aws.hooks.datasync.DataSyncHook(wait_interval_seconds=30, *args, **kwargs)[source]

基類: airflow.providers.amazon.aws.hooks.base_aws.AwsBaseHook

與 AWS DataSync 互動。

提供對 boto3.client("datasync") 的厚封裝。

可以指定額外的引數(例如 aws_conn_id),這些引數將傳遞給底層的 AwsBaseHook。

引數:

wait_interval_seconds (int) – 檢查 TaskExecution 狀態的兩次連續呼叫之間的等待時間。預設為 30 秒。

引發:

ValueError – 如果 wait_interval_seconds 不在 0 到 15*60 秒之間。

TASK_EXECUTION_INTERMEDIATE_STATES = ('INITIALIZING', 'QUEUED', 'LAUNCHING', 'PREPARING', 'TRANSFERRING', 'VERIFYING')[source]
TASK_EXECUTION_FAILURE_STATES = ('ERROR',)[source]
TASK_EXECUTION_SUCCESS_STATES = ('SUCCESS',)[source]
locations: list = [][source]
tasks: list = [][source]
create_location(location_uri, **create_location_kwargs)[source]

建立新位置。

引數:
  • location_uri (str) – 用於確定位置型別(S3、SMB、NFS、EFS)的位置 URI。

  • create_location_kwargs – 傳遞給 DataSync.Client.create_location_* 方法。

返回:

建立位置的 LocationArn。

引發:

AirflowException – 如果位置型別(從 location_uri 獲取的字首)無效。

返回型別:

str

get_location_arns(location_uri, case_sensitive=False, ignore_trailing_slash=True)[source]

返回匹配 LocationUri 的所有 LocationArn。

引數:
  • location_uri (str) – 要搜尋的位置 URI,例如 s3://mybucket/mypath

  • case_sensitive (bool) – 對位置 URI 進行大小寫敏感搜尋。

  • ignore_trailing_slash (bool) – 匹配時忽略 URI 末尾的 /。

返回:

LocationArn 列表。

引發:

AirflowBadRequest – 如果 location_uri 為空

返回型別:

list[str]

create_task(source_location_arn, destination_location_arn, **create_task_kwargs)[source]

在指定的源 LocationArn 和目標 LocationArn 之間建立任務。

引數:
  • source_location_arn (str) – 源 LocationArn。必須已存在。

  • destination_location_arn (str) – 目標 LocationArn。必須已存在。

  • create_task_kwargs – 傳遞給 boto.create_task()。請參閱 AWS boto3 DataSync 文件。

返回:

建立的任務的 TaskArn

返回型別:

str

update_task(task_arn, **update_task_kwargs)[source]

更新任務。

引數:
  • task_arn (str) – 要更新的 TaskArn。

  • update_task_kwargs – 傳遞給 boto.update_task(),請參閱 AWS boto3 DataSync 文件。

delete_task(task_arn)[source]

刪除任務。

引數:

task_arn (str) – 要刪除的 TaskArn。

get_task_arns_for_location_arns(source_location_arns, destination_location_arns)[source]

返回同時使用指定源和目標 LocationArn 的 TaskArn 列表。

引數:
  • source_location_arns (list) – 源 LocationArn 列表。

  • destination_location_arns (list) – 目標 LocationArn 列表。

引發:

AirflowBadRequest – 如果 source_location_arnsdestination_location_arns 為空。

start_task_execution(task_arn, **kwargs)[source]

為指定的 task_arn 啟動 TaskExecution。

每個任務最多隻能有一個 TaskExecution。額外的關鍵字引數會發送給 start_task_execution boto3 方法。

引數:

task_arn (str) – TaskArn

返回:

TaskExecutionArn

引發:
  • ClientError – 如果此 task_arn 已有正在執行的 TaskExecution。

  • AirflowBadRequest – 如果 task_arn 為空。

返回型別:

str

cancel_task_execution(task_execution_arn)[source]

取消指定 task_execution_arn 的 TaskExecution。

引數:

task_execution_arn (str) – TaskExecutionArn。

引發:

AirflowBadRequest – 如果 task_execution_arn 為空。

get_task_description(task_arn)[source]

獲取指定 task_arn 的描述。

引數:

task_arn (str) – TaskArn

返回:

關於任務的 AWS 元資料。

引發:

AirflowBadRequest – 如果 task_arn 為空。

返回型別:

dict

describe_task_execution(task_execution_arn)[source]

獲取指定 task_execution_arn 的描述。

引數:

task_execution_arn (str) – TaskExecutionArn

返回:

關於任務執行的 AWS 元資料。

引發:

AirflowBadRequest – 如果 task_execution_arn 為空。

返回型別:

dict

get_current_task_execution_arn(task_arn)[source]

獲取指定 task_arn 的當前 TaskExecutionArn(如果存在)。

引數:

task_arn (str) – TaskArn

返回:

task_arn 的當前 TaskExecutionArn 或 None。

引發:

AirflowBadRequest – 如果 task_arn 為空。

返回型別:

str | None

wait_for_task_execution(task_execution_arn, max_iterations=60)[source]

等待任務執行狀態完成 (SUCCESS/ERROR)。

必須存在 task_execution_arn,否則將引發 boto3 ClientError。

引數:
  • task_execution_arn (str) – TaskExecutionArn

  • max_iterations (int) – 超時前的最大迭代次數。

返回:

任務執行結果。

引發:
  • AirflowTaskTimeout – 如果超過最大迭代次數。

  • AirflowBadRequest – 如果 task_execution_arn 為空。

返回型別:

bool

此條目是否有幫助?