在 Google Cloud Storage 中傳輸資料¶
Google Cloud Storage (GCS) 用於儲存來自各種應用程式的大型資料。請注意,在 GCS 術語中,檔案被稱為物件,因此在本指南中,“物件”和“檔案”這兩個術語可以互換使用。有幾個操作器用於作為 Google Cloud 服務的一部分複製資料。本頁面展示瞭如何使用這些操作器。另請參閱 Google Cloud Storage 操作器,瞭解用於管理 Google Cloud Storage 儲存桶的操作器。
Cloud Storage Transfer Service¶
有許多操作器用於管理 Google Cloud Data Transfer 服務。如果您想建立新的資料傳輸任務,請使用操作器 CloudDataTransferServiceCreateJobOperator。您也可以使用該服務的先前操作器 - CloudDataTransferServiceGCSToGCSOperator。
這些操作器不會在本地控制複製過程,而是使用 Google 資源,這使得它們能夠更快、更經濟地執行此任務。當 Airflow 未託管在 Google Cloud 中時,經濟效益尤為突出,因為這些操作器可以減少出站流量。
如果啟用指定物件傳輸到目標後應從源刪除的選項,這些操作器將修改源物件。
當您使用 Google Cloud Data Transfer 服務時,您可以指定是否允許覆蓋目標中已存在的物件,是否應刪除僅存在於目標的,或者物件傳輸到目標後是否應從源刪除。
可以使用包含和排除字首以及基於檔案修改日期來指定源物件。
如果您需要有關如何使用的資訊,請參閱指南:Google Cloud Transfer Service 操作器
適用於 Google Cloud Storage 的專用傳輸操作器¶
請參閱 Google 傳輸操作器,瞭解往返 Google Cloud Storage 的專用傳輸操作器列表。
本地傳輸¶
有兩個操作器用於複製資料,整個過程都在本地控制。
下一節將介紹它們。
先決條件任務¶
要使用這些操作器,您必須完成一些準備工作
使用 Cloud Console 選擇或建立 Cloud Platform 專案。
按照 Google Cloud 文件中的說明,為您的專案啟用結算功能。
按照 Cloud Console 文件中的說明,啟用 API。
透過 pip 安裝 API 庫。
pip install 'apache-airflow[google]'安裝的詳細資訊請參見 安裝。
操作器¶
GCSToGCSOperator¶
GCSToGCSOperator 允許您在 GCS 中複製一個或多個檔案。檔案可以在兩個不同的儲存桶之間或同一儲存桶內複製。複製始終進行,不考慮目標儲存桶的初始狀態。
僅當檔案移動選項處於活動狀態時,此操作器才會刪除源儲存桶中的物件。在兩個不同儲存桶之間複製檔案時,此操作器從不刪除目標儲存桶中的資料。
當您使用此操作器時,您可以指定物件傳輸到目標後是否應從源刪除。可以使用單個萬用字元以及基於檔案修改日期來指定源物件。
可以根據物件的路徑使用 match_glob 欄位進行過濾。您應避免在源物件的路徑中使用 delimiter 欄位或萬用字元,因為這兩種做法都已棄用。此外,還可以基於檔案的建立日期 (is_older_than) 或修改日期 (last_modified_time 和 maximum_modified_time) 進行過濾。
此操作器預設的工作方式可以與 cp 命令進行比較。當檔案移動選項處於活動狀態時,此操作器功能類似於 mv 命令。
下面是使用 GCSToGCSOperator 複製單個檔案、使用萬用字元複製多個檔案、複製多個檔案、移動單個檔案和移動多個檔案的示例。
複製單個檔案¶
以下示例會將單個檔案 OBJECT_1 從 BUCKET_1_SRC GCS 儲存桶複製到 BUCKET_1_DST 儲存桶。注意,如果標誌 exact_match=False,則 source_object 將被視為在 BUCKET_1_SRC GCS 儲存桶中搜索物件的**字首**。因此,如果找到任何匹配項,它們也將被複制。為防止這種情況發生,請使用 exact_match=False。
tests/system/google/cloud/gcs/example_gcs_to_gcs.py
copy_single_file = GCSToGCSOperator(
task_id="copy_single_gcs_file",
source_bucket=BUCKET_NAME_SRC,
source_object=PREFIX + OBJECT_1,
destination_bucket=BUCKET_NAME_DST, # If not supplied the source_bucket value will be used
destination_object="backup_" + OBJECT_1, # If not supplied the source_object value will be used
exact_match=True,
)
複製多個檔案¶
複製多個檔案有幾種方法,下面展示了各種示例。
如前所述,delimiter 欄位以及在源物件中使用萬用字元 (*) 都已棄用。因此,不建議使用它們,而是使用 match_glob,如下所示:
以下示例將複製與 data/ 資料夾中的 glob 模式匹配的檔案,從 BUCKET_1_SRC GCS 儲存桶複製到 BUCKET_1_DST 儲存桶的 backup/ 資料夾中。
tests/system/google/cloud/gcs/example_gcs_to_gcs.py
copy_files_with_match_glob = GCSToGCSOperator(
task_id="copy_files_with_match_glob",
source_bucket=BUCKET_NAME_SRC,
source_object="data/",
destination_bucket=BUCKET_NAME_DST,
destination_object="backup/",
match_glob="**/*.txt",
)
以下示例將複製 subdir/ 資料夾中的所有檔案(即 subdir/a.csv、subdir/b.csv、subdir/c.csv),從 BUCKET_1_SRC GCS 儲存桶複製到 BUCKET_1_DST 儲存桶的 backup/ 資料夾中(即 backup/a.csv、backup/b.csv、backup/c.csv)。
tests/system/google/cloud/gcs/example_gcs_to_gcs.py
copy_files = GCSToGCSOperator(
task_id="copy_files",
source_bucket=BUCKET_NAME_SRC,
source_object=PREFIX + "subdir/",
destination_bucket=BUCKET_NAME_DST,
destination_object="backup/",
)
tests/system/google/cloud/gcs/example_gcs_to_gcs.py
copy_files_with_list = GCSToGCSOperator(
task_id="copy_files_with_list",
source_bucket=BUCKET_NAME_SRC,
source_objects=[
PREFIX + OBJECT_1,
PREFIX + OBJECT_2,
], # Instead of files each element could be a wildcard expression
destination_bucket=BUCKET_NAME_DST,
destination_object="backup/",
)
最後,可以透過省略 source_object 引數而向 source_objects 引數提供列表來複制檔案。在此示例中,OBJECT_1 和 OBJECT_2 將從 BUCKET_1_SRC 複製到 BUCKET_1_DST。提供大小為 1 的列表功能與向 source_object 引數提供值相同。
移動單個檔案¶
將 True 提供給 move 引數會使操作器在複製完成後刪除 source_object。注意,如果標誌 exact_match=False,則 source_object 將被視為在 BUCKET_1_SRC GCS 儲存桶中搜索物件的**字首**。因此,如果找到任何匹配項,它們也將被複制。為防止這種情況發生,請使用 exact_match=False。
tests/system/google/cloud/gcs/example_gcs_to_gcs.py
move_single_file = GCSToGCSOperator(
task_id="move_single_file",
source_bucket=BUCKET_NAME_SRC,
source_object=PREFIX + OBJECT_1,
destination_bucket=BUCKET_NAME_DST,
destination_object="backup_" + OBJECT_1,
exact_match=True,
move_object=True,
)
移動多個檔案¶
透過向 move 引數提供 True 可以移動多個檔案。有關萬用字元和 delimiter 引數的相同規則也適用於移動和複製。
tests/system/google/cloud/gcs/example_gcs_to_gcs.py
move_files_with_list = GCSToGCSOperator(
task_id="move_files_with_list",
source_bucket=BUCKET_NAME_SRC,
source_objects=[PREFIX + OBJECT_1, PREFIX + OBJECT_2],
destination_bucket=BUCKET_NAME_DST,
destination_object="backup/",
)
GCSSynchronizeBucketsOperator¶
GCSSynchronizeBucketsOperator 操作器檢查目標儲存桶的初始狀態,然後將其與源儲存桶進行比較。基於此,它會建立一個操作計劃,描述應從目標儲存桶刪除哪些物件、應覆蓋哪些物件以及應複製哪些物件。
此操作器從不修改源儲存桶中的資料。
當您使用此操作器時,您可以指定是否允許覆蓋目標中已存在的物件,是否應刪除僅存在於目標的,是否處理子目錄或應處理哪個子目錄。
此操作器的工作方式可以與 rsync 命令進行比較。
基本同步¶
以下示例將確保 BUCKET_1_SRC 中的所有檔案(包括子目錄中的檔案)也存在於 BUCKET_1_DST 中。如果 BUCKET_1_DST 中已存在同名檔案,則不會覆蓋它們。不會刪除 BUCKET_1_DST 中不存在於 BUCKET_1_SRC 的任何檔案。
tests/system/google/cloud/gcs/example_gcs_to_gcs.py
sync_bucket = GCSSynchronizeBucketsOperator(
task_id="sync_bucket", source_bucket=BUCKET_NAME_SRC, destination_bucket=BUCKET_NAME_DST
)
完整儲存桶同步¶
此示例將確保 BUCKET_1_SRC 中的所有檔案(包括子目錄中的檔案)也存在於 BUCKET_1_DST 中。如果 BUCKET_1_DST 中已存在同名檔案,則會覆蓋它們。會刪除 BUCKET_1_DST 中不存在於 BUCKET_1_SRC 的任何檔案。
tests/system/google/cloud/gcs/example_gcs_to_gcs.py
sync_full_bucket = GCSSynchronizeBucketsOperator(
task_id="sync_full_bucket",
source_bucket=BUCKET_NAME_SRC,
destination_bucket=BUCKET_NAME_DST,
delete_extra_files=True,
allow_overwrite=True,
)
同步到子目錄¶
以下示例將確保 BUCKET_1_SRC 中的所有檔案(包括子目錄中的檔案)也存在於 BUCKET_1_DST 儲存桶的 subdir 資料夾中。如果 BUCKET_1_DST/subdir 中已存在同名檔案,則不會覆蓋它們,也不會刪除 BUCKET_1_DST/subdir 中不存在於 BUCKET_1_SRC 的任何檔案。
tests/system/google/cloud/gcs/example_gcs_to_gcs.py
sync_to_subdirectory = GCSSynchronizeBucketsOperator(
task_id="sync_to_subdirectory",
source_bucket=BUCKET_NAME_SRC,
destination_bucket=BUCKET_NAME_DST,
destination_object="subdir/",
)
從子目錄同步¶
此示例將確保 BUCKET_1_SRC/subdir 中的所有檔案(包括子目錄中的檔案)也存在於 BUCKET_1_DST 中。如果 BUCKET_1_DST 中已存在同名檔案,則不會覆蓋它們,也不會刪除 BUCKET_1_DST 中不存在於 BUCKET_1_SRC/subdir 的任何檔案。
tests/system/google/cloud/gcs/example_gcs_to_gcs.py
sync_from_subdirectory = GCSSynchronizeBucketsOperator(
task_id="sync_from_subdirectory",
source_bucket=BUCKET_NAME_SRC,
source_object="subdir/",
destination_bucket=BUCKET_NAME_DST,
)
參考¶
更多資訊,請參閱