在 Google Cloud Storage 中傳輸資料

Google Cloud Storage (GCS) 用於儲存來自各種應用程式的大型資料。請注意,在 GCS 術語中,檔案被稱為物件,因此在本指南中,“物件”和“檔案”這兩個術語可以互換使用。有幾個操作器用於作為 Google Cloud 服務的一部分複製資料。本頁面展示瞭如何使用這些操作器。另請參閱 Google Cloud Storage 操作器,瞭解用於管理 Google Cloud Storage 儲存桶的操作器。

Cloud Storage Transfer Service

有許多操作器用於管理 Google Cloud Data Transfer 服務。如果您想建立新的資料傳輸任務,請使用操作器 CloudDataTransferServiceCreateJobOperator。您也可以使用該服務的先前操作器 - CloudDataTransferServiceGCSToGCSOperator

這些操作器不會在本地控制複製過程,而是使用 Google 資源,這使得它們能夠更快、更經濟地執行此任務。當 Airflow 未託管在 Google Cloud 中時,經濟效益尤為突出,因為這些操作器可以減少出站流量。

如果啟用指定物件傳輸到目標後應從源刪除的選項,這些操作器將修改源物件。

當您使用 Google Cloud Data Transfer 服務時,您可以指定是否允許覆蓋目標中已存在的物件,是否應刪除僅存在於目標的,或者物件傳輸到目標後是否應從源刪除。

可以使用包含和排除字首以及基於檔案修改日期來指定源物件。

如果您需要有關如何使用的資訊,請參閱指南:Google Cloud Transfer Service 操作器

適用於 Google Cloud Storage 的專用傳輸操作器

請參閱 Google 傳輸操作器,瞭解往返 Google Cloud Storage 的專用傳輸操作器列表。

本地傳輸

有兩個操作器用於複製資料,整個過程都在本地控制。

下一節將介紹它們。

先決條件任務

要使用這些操作器,您必須完成一些準備工作

操作器

GCSToGCSOperator

GCSToGCSOperator 允許您在 GCS 中複製一個或多個檔案。檔案可以在兩個不同的儲存桶之間或同一儲存桶內複製。複製始終進行,不考慮目標儲存桶的初始狀態。

僅當檔案移動選項處於活動狀態時,此操作器才會刪除源儲存桶中的物件。在兩個不同儲存桶之間複製檔案時,此操作器從不刪除目標儲存桶中的資料。

當您使用此操作器時,您可以指定物件傳輸到目標後是否應從源刪除。可以使用單個萬用字元以及基於檔案修改日期來指定源物件。

可以根據物件的路徑使用 match_glob 欄位進行過濾。您應避免在源物件的路徑中使用 delimiter 欄位或萬用字元,因為這兩種做法都已棄用。此外,還可以基於檔案的建立日期 (is_older_than) 或修改日期 (last_modified_timemaximum_modified_time) 進行過濾。

此操作器預設的工作方式可以與 cp 命令進行比較。當檔案移動選項處於活動狀態時,此操作器功能類似於 mv 命令。

下面是使用 GCSToGCSOperator 複製單個檔案、使用萬用字元複製多個檔案、複製多個檔案、移動單個檔案和移動多個檔案的示例。

複製單個檔案

以下示例會將單個檔案 OBJECT_1BUCKET_1_SRC GCS 儲存桶複製到 BUCKET_1_DST 儲存桶。注意,如果標誌 exact_match=False,則 source_object 將被視為在 BUCKET_1_SRC GCS 儲存桶中搜索物件的**字首**。因此,如果找到任何匹配項,它們也將被複制。為防止這種情況發生,請使用 exact_match=False

tests/system/google/cloud/gcs/example_gcs_to_gcs.py

copy_single_file = GCSToGCSOperator(
    task_id="copy_single_gcs_file",
    source_bucket=BUCKET_NAME_SRC,
    source_object=PREFIX + OBJECT_1,
    destination_bucket=BUCKET_NAME_DST,  # If not supplied the source_bucket value will be used
    destination_object="backup_" + OBJECT_1,  # If not supplied the source_object value will be used
    exact_match=True,
)

複製多個檔案

複製多個檔案有幾種方法,下面展示了各種示例。

如前所述,delimiter 欄位以及在源物件中使用萬用字元 (*) 都已棄用。因此,不建議使用它們,而是使用 match_glob,如下所示:

以下示例將複製與 data/ 資料夾中的 glob 模式匹配的檔案,從 BUCKET_1_SRC GCS 儲存桶複製到 BUCKET_1_DST 儲存桶的 backup/ 資料夾中。

tests/system/google/cloud/gcs/example_gcs_to_gcs.py

copy_files_with_match_glob = GCSToGCSOperator(
    task_id="copy_files_with_match_glob",
    source_bucket=BUCKET_NAME_SRC,
    source_object="data/",
    destination_bucket=BUCKET_NAME_DST,
    destination_object="backup/",
    match_glob="**/*.txt",
)

以下示例將複製 subdir/ 資料夾中的所有檔案(即 subdir/a.csv、subdir/b.csv、subdir/c.csv),從 BUCKET_1_SRC GCS 儲存桶複製到 BUCKET_1_DST 儲存桶的 backup/ 資料夾中(即 backup/a.csv、backup/b.csv、backup/c.csv)。

tests/system/google/cloud/gcs/example_gcs_to_gcs.py

copy_files = GCSToGCSOperator(
    task_id="copy_files",
    source_bucket=BUCKET_NAME_SRC,
    source_object=PREFIX + "subdir/",
    destination_bucket=BUCKET_NAME_DST,
    destination_object="backup/",
)

tests/system/google/cloud/gcs/example_gcs_to_gcs.py

copy_files_with_list = GCSToGCSOperator(
    task_id="copy_files_with_list",
    source_bucket=BUCKET_NAME_SRC,
    source_objects=[
        PREFIX + OBJECT_1,
        PREFIX + OBJECT_2,
    ],  # Instead of files each element could be a wildcard expression
    destination_bucket=BUCKET_NAME_DST,
    destination_object="backup/",
)

最後,可以透過省略 source_object 引數而向 source_objects 引數提供列表來複制檔案。在此示例中,OBJECT_1OBJECT_2 將從 BUCKET_1_SRC 複製到 BUCKET_1_DST。提供大小為 1 的列表功能與向 source_object 引數提供值相同。

移動單個檔案

True 提供給 move 引數會使操作器在複製完成後刪除 source_object。注意,如果標誌 exact_match=False,則 source_object 將被視為在 BUCKET_1_SRC GCS 儲存桶中搜索物件的**字首**。因此,如果找到任何匹配項,它們也將被複制。為防止這種情況發生,請使用 exact_match=False

tests/system/google/cloud/gcs/example_gcs_to_gcs.py

move_single_file = GCSToGCSOperator(
    task_id="move_single_file",
    source_bucket=BUCKET_NAME_SRC,
    source_object=PREFIX + OBJECT_1,
    destination_bucket=BUCKET_NAME_DST,
    destination_object="backup_" + OBJECT_1,
    exact_match=True,
    move_object=True,
)

移動多個檔案

透過向 move 引數提供 True 可以移動多個檔案。有關萬用字元和 delimiter 引數的相同規則也適用於移動和複製。

tests/system/google/cloud/gcs/example_gcs_to_gcs.py

move_files_with_list = GCSToGCSOperator(
    task_id="move_files_with_list",
    source_bucket=BUCKET_NAME_SRC,
    source_objects=[PREFIX + OBJECT_1, PREFIX + OBJECT_2],
    destination_bucket=BUCKET_NAME_DST,
    destination_object="backup/",
)

GCSSynchronizeBucketsOperator

GCSSynchronizeBucketsOperator 操作器檢查目標儲存桶的初始狀態,然後將其與源儲存桶進行比較。基於此,它會建立一個操作計劃,描述應從目標儲存桶刪除哪些物件、應覆蓋哪些物件以及應複製哪些物件。

此操作器從不修改源儲存桶中的資料。

當您使用此操作器時,您可以指定是否允許覆蓋目標中已存在的物件,是否應刪除僅存在於目標的,是否處理子目錄或應處理哪個子目錄。

此操作器的工作方式可以與 rsync 命令進行比較。

基本同步

以下示例將確保 BUCKET_1_SRC 中的所有檔案(包括子目錄中的檔案)也存在於 BUCKET_1_DST 中。如果 BUCKET_1_DST 中已存在同名檔案,則不會覆蓋它們。不會刪除 BUCKET_1_DST 中不存在於 BUCKET_1_SRC 的任何檔案。

tests/system/google/cloud/gcs/example_gcs_to_gcs.py

sync_bucket = GCSSynchronizeBucketsOperator(
    task_id="sync_bucket", source_bucket=BUCKET_NAME_SRC, destination_bucket=BUCKET_NAME_DST
)

完整儲存桶同步

此示例將確保 BUCKET_1_SRC 中的所有檔案(包括子目錄中的檔案)也存在於 BUCKET_1_DST 中。如果 BUCKET_1_DST 中已存在同名檔案,則會覆蓋它們。會刪除 BUCKET_1_DST 中不存在於 BUCKET_1_SRC 的任何檔案。

tests/system/google/cloud/gcs/example_gcs_to_gcs.py

sync_full_bucket = GCSSynchronizeBucketsOperator(
    task_id="sync_full_bucket",
    source_bucket=BUCKET_NAME_SRC,
    destination_bucket=BUCKET_NAME_DST,
    delete_extra_files=True,
    allow_overwrite=True,
)

同步到子目錄

以下示例將確保 BUCKET_1_SRC 中的所有檔案(包括子目錄中的檔案)也存在於 BUCKET_1_DST 儲存桶的 subdir 資料夾中。如果 BUCKET_1_DST/subdir 中已存在同名檔案,則不會覆蓋它們,也不會刪除 BUCKET_1_DST/subdir 中不存在於 BUCKET_1_SRC 的任何檔案。

tests/system/google/cloud/gcs/example_gcs_to_gcs.py

sync_to_subdirectory = GCSSynchronizeBucketsOperator(
    task_id="sync_to_subdirectory",
    source_bucket=BUCKET_NAME_SRC,
    destination_bucket=BUCKET_NAME_DST,
    destination_object="subdir/",
)

從子目錄同步

此示例將確保 BUCKET_1_SRC/subdir 中的所有檔案(包括子目錄中的檔案)也存在於 BUCKET_1_DST 中。如果 BUCKET_1_DST 中已存在同名檔案,則不會覆蓋它們,也不會刪除 BUCKET_1_DST 中不存在於 BUCKET_1_SRC/subdir 的任何檔案。

tests/system/google/cloud/gcs/example_gcs_to_gcs.py

sync_from_subdirectory = GCSSynchronizeBucketsOperator(
    task_id="sync_from_subdirectory",
    source_bucket=BUCKET_NAME_SRC,
    source_object="subdir/",
    destination_bucket=BUCKET_NAME_DST,
)

參考

更多資訊,請參閱

此條目有幫助嗎?