Google Cloud Transfer Service 運算子¶
先決任務¶
要使用這些運算子,您必須執行以下幾項操作
使用 Cloud Console 選擇或建立 Cloud Platform 專案。
為您的專案啟用結算,如 Google Cloud 文件中所述。
啟用 API,如 Cloud Console 文件中所述。
透過 pip 安裝 API 庫。
pip install 'apache-airflow[google]'有關 安裝 的詳細資訊請參閱。
CloudDataTransferServiceCreateJobOperator¶
建立一個傳輸作業。
此函式接受兩種日期格式
與 Google API 一致
{ "year": 2019, "month": 2, "day": 11 }
作為
datetime物件
此函式接受兩種時間格式
與 Google API 一致
{ "hours": 12, "minutes": 30, "seconds": 0 }
作為
time物件
如果您想建立從 AWS S3 複製資料的傳輸作業,則必須配置連線。有關 AWS 配置的資訊,請參閱:Amazon Web Services 連線。可透過引數 aws_conn_id 指定選定的 AWS 連線。
有關引數定義,請參閱 CloudDataTransferServiceCreateJobOperator。
使用此運算子¶
tests/system/google/cloud/storage_transfer/example_cloud_storage_transfer_service_gcp.py
gcs_to_gcs_transfer_body = {
DESCRIPTION: "description",
STATUS: GcpTransferJobsStatus.ENABLED,
PROJECT_ID: PROJECT_ID_TRANSFER,
SCHEDULE: {
SCHEDULE_START_DATE: datetime(2015, 1, 1).date(),
SCHEDULE_END_DATE: datetime(2030, 1, 1).date(),
START_TIME_OF_DAY: (datetime.now(tz=timezone.utc) + timedelta(seconds=120)).time(),
},
TRANSFER_SPEC: {
GCS_DATA_SOURCE: {BUCKET_NAME: BUCKET_NAME_SRC},
GCS_DATA_SINK: {BUCKET_NAME: BUCKET_NAME_DST},
TRANSFER_OPTIONS: {ALREADY_EXISTING_IN_SINK: True},
},
}
tests/system/google/cloud/storage_transfer/example_cloud_storage_transfer_service_aws.py
aws_to_gcs_transfer_body = {
DESCRIPTION: GCP_DESCRIPTION,
STATUS: GcpTransferJobsStatus.ENABLED,
PROJECT_ID: GCP_PROJECT_ID,
JOB_NAME: GCP_TRANSFER_JOB_NAME,
SCHEDULE: {
SCHEDULE_START_DATE: datetime(2015, 1, 1).date(),
SCHEDULE_END_DATE: datetime(2030, 1, 1).date(),
START_TIME_OF_DAY: (datetime.now(tz=timezone.utc) + timedelta(minutes=1)).time(),
},
TRANSFER_SPEC: {
AWS_S3_DATA_SOURCE: {BUCKET_NAME: BUCKET_SOURCE_AWS},
GCS_DATA_SINK: {BUCKET_NAME: BUCKET_TARGET_GCS},
TRANSFER_OPTIONS: {ALREADY_EXISTING_IN_SINK: True},
},
}
tests/system/google/cloud/storage_transfer/example_cloud_storage_transfer_service_aws.py
create_transfer_job_s3_to_gcs = CloudDataTransferServiceCreateJobOperator(
task_id="create_transfer_job_s3_to_gcs", body=aws_to_gcs_transfer_body
)
模板化¶
template_fields: Sequence[str] = (
"body",
"gcp_conn_id",
"aws_conn_id",
"google_impersonation_chain",
)
更多資訊¶
請參閱 Google Cloud Transfer Service - 方法: transferJobs.create。
CloudDataTransferServiceDeleteJobOperator¶
刪除一個傳輸作業。
有關引數定義,請參閱 CloudDataTransferServiceDeleteJobOperator。
使用此運算子¶
tests/system/google/cloud/storage_transfer/example_cloud_storage_transfer_service_aws.py
delete_transfer_job_s3_to_gcs = CloudDataTransferServiceDeleteJobOperator(
task_id="delete_transfer_job_s3_to_gcs",
job_name="{{task_instance.xcom_pull('create_transfer_job_s3_to_gcs')['name']}}",
project_id=GCP_PROJECT_ID,
trigger_rule=TriggerRule.ALL_DONE,
)
模板化¶
template_fields: Sequence[str] = (
"job_name",
"project_id",
"gcp_conn_id",
"api_version",
"google_impersonation_chain",
)
更多資訊¶
請參閱 Google Cloud Transfer Service - REST 資源: transferJobs - Status
CloudDataTransferServiceRunJobOperator¶
執行一個傳輸作業。
有關引數定義,請參閱 CloudDataTransferServiceRunJobOperator。
使用此運算子¶
tests/system/google/cloud/storage_transfer/example_cloud_storage_transfer_service_gcp.py
run_transfer = CloudDataTransferServiceRunJobOperator(
task_id="run_transfer",
job_name="{{task_instance.xcom_pull('create_transfer')['name']}}",
project_id=PROJECT_ID_TRANSFER,
)
模板化¶
template_fields: Sequence[str] = (
"job_name",
"project_id",
"gcp_conn_id",
"api_version",
"google_impersonation_chain",
)
更多資訊¶
請參閱 Google Cloud Transfer Service - REST 資源: transferJobs - Run
CloudDataTransferServiceUpdateJobOperator¶
更新一個傳輸作業。
有關引數定義,請參閱 CloudDataTransferServiceUpdateJobOperator。
使用此運算子¶
tests/system/google/cloud/storage_transfer/example_cloud_storage_transfer_service_gcp.py
update_body = {
PROJECT_ID: PROJECT_ID_TRANSFER,
TRANSFER_JOB: {DESCRIPTION: "description_updated"},
TRANSFER_JOB_FIELD_MASK: "description",
}
tests/system/google/cloud/storage_transfer/example_cloud_storage_transfer_service_gcp.py
update_transfer = CloudDataTransferServiceUpdateJobOperator(
task_id="update_transfer",
job_name="{{task_instance.xcom_pull('create_transfer')['name']}}",
body=update_body,
)
模板化¶
template_fields: Sequence[str] = (
"job_name",
"body",
"gcp_conn_id",
"aws_conn_id",
"google_impersonation_chain",
)
更多資訊¶
CloudDataTransferServiceCancelOperationOperator¶
獲取傳輸操作。結果將返回到 XCOM。
有關引數定義,請參閱 CloudDataTransferServiceCancelOperationOperator。
使用此運算子¶
tests/system/google/cloud/storage_transfer/example_cloud_storage_transfer_service_aws.py
cancel_operation = CloudDataTransferServiceCancelOperationOperator(
task_id="cancel_operation",
operation_name="{{task_instance.xcom_pull("
"'wait_for_operation_to_start_2', key='sensed_operations')[0]['name']}}",
)
模板化¶
template_fields: Sequence[str] = (
"operation_name",
"gcp_conn_id",
"api_version",
"google_impersonation_chain",
)
更多資訊¶
請參閱 Google Cloud Transfer Service - 方法: transferOperations.cancel
CloudDataTransferServiceGetOperationOperator¶
獲取傳輸操作。結果將返回到 XCOM。
有關引數定義,請參閱 CloudDataTransferServiceGetOperationOperator。
使用此運算子¶
tests/system/google/cloud/storage_transfer/example_cloud_storage_transfer_service_aws.py
get_operation = CloudDataTransferServiceGetOperationOperator(
task_id="get_operation", operation_name="{{task_instance.xcom_pull('list_operations')[0]['name']}}"
)
模板化¶
template_fields: Sequence[str] = (
"operation_name",
"gcp_conn_id",
"google_impersonation_chain",
)
更多資訊¶
請參閱 Google Cloud Transfer Service - 方法: transferOperations.get
CloudDataTransferServiceListOperationsOperator¶
列出傳輸操作。結果將返回到 XCOM。
有關引數定義,請參閱 CloudDataTransferServiceListOperationsOperator。
使用此運算子¶
tests/system/google/cloud/storage_transfer/example_cloud_storage_transfer_service_aws.py
list_operations = CloudDataTransferServiceListOperationsOperator(
task_id="list_operations",
request_filter={
FILTER_PROJECT_ID: GCP_PROJECT_ID,
FILTER_JOB_NAMES: ["{{task_instance.xcom_pull('create_transfer_job_s3_to_gcs')['name']}}"],
},
)
模板化¶
template_fields: Sequence[str] = (
"request_filter",
"gcp_conn_id",
"google_impersonation_chain",
)
更多資訊¶
請參閱 Google Cloud Transfer Service - 方法: transferOperations.list
CloudDataTransferServicePauseOperationOperator¶
暫停傳輸操作。
有關引數定義,請參閱 CloudDataTransferServicePauseOperationOperator。
使用此運算子¶
tests/system/google/cloud/storage_transfer/example_cloud_storage_transfer_service_aws.py
pause_operation = CloudDataTransferServicePauseOperationOperator(
task_id="pause_operation",
operation_name="{{task_instance.xcom_pull('wait_for_operation_to_start', "
"key='sensed_operations')[0]['name']}}",
)
模板化¶
template_fields: Sequence[str] = (
"operation_name",
"gcp_conn_id",
"api_version",
"google_impersonation_chain",
)
更多資訊¶
請參閱 Google Cloud Transfer Service - 方法: transferOperations.pause
CloudDataTransferServiceResumeOperationOperator¶
恢復傳輸操作。
有關引數定義,請參閱 CloudDataTransferServiceResumeOperationOperator。
使用此運算子¶
tests/system/google/cloud/storage_transfer/example_cloud_storage_transfer_service_aws.py
resume_operation = CloudDataTransferServiceResumeOperationOperator(
task_id="resume_operation", operation_name="{{task_instance.xcom_pull('get_operation')['name']}}"
)
模板化¶
template_fields: Sequence[str] = (
"operation_name",
"gcp_conn_id",
"api_version",
"google_impersonation_chain",
)
更多資訊¶
請參閱 Google Cloud Transfer Service - 方法: transferOperations.resume
CloudDataTransferServiceJobStatusSensor¶
等待屬於該作業的至少一個操作達到預期狀態。
有關引數定義,請參閱 CloudDataTransferServiceJobStatusSensor。
使用此運算子¶
tests/system/google/cloud/storage_transfer/example_cloud_storage_transfer_service_aws.py
wait_for_operation_to_end = CloudDataTransferServiceJobStatusSensor(
task_id="wait_for_operation_to_end",
job_name="{{task_instance.xcom_pull('create_transfer_job_s3_to_gcs')['name']}}",
project_id=GCP_PROJECT_ID,
expected_statuses={GcpTransferOperationStatus.SUCCESS},
poke_interval=WAIT_FOR_OPERATION_POKE_INTERVAL,
)
模板化¶
template_fields: Sequence[str] = (
"job_name",
"impersonation_chain",
)
CloudDataTransferServiceGCSToGCSOperator¶
將資料從一個 GCS 儲存桶複製到另一個儲存桶。
有關引數定義,請參閱 CloudDataTransferServiceGCSToGCSOperator。
使用此運算子¶
tests/system/google/cloud/storage_transfer/example_cloud_storage_transfer_service_gcs_to_gcs.py
transfer_gcs_to_gcs = CloudDataTransferServiceGCSToGCSOperator(
task_id="transfer_gcs_to_gcs",
source_bucket=BUCKET_NAME_SRC,
source_path=FILE_URI,
destination_bucket=BUCKET_NAME_DST,
destination_path=FILE_URI,
wait=True,
)
模板化¶
template_fields: Sequence[str] = (
"gcp_conn_id",
"source_bucket",
"destination_bucket",
"source_path",
"destination_path",
"description",
"object_conditions",
"google_impersonation_chain",
)
參考資料¶
更多資訊請參閱