Google Cloud Storage 運算子¶
Cloud Storage 允許隨時隨地儲存和檢索任意量的資料。您可以使用 Cloud Storage 進行一系列場景,包括提供網站內容、儲存資料用於存檔和災難恢復,或者透過直接下載向用戶分發大型資料物件。
有關往返於 Google Cloud Storage 的專用傳輸運算子列表,請參閱Google Transfer Operators。
前置任務¶
要使用這些運算子,您必須執行以下幾項操作:
使用Cloud Console 選擇或建立一個 Cloud Platform 專案。
為您的專案啟用結算功能,如Google Cloud 文件中所述。
啟用 API,如Cloud Console 文件中所述。
透過 pip 安裝 API 庫。
pip install 'apache-airflow[google]'有關安裝的詳細資訊可在Installation 中找到。
運算子¶
GCSTimeSpanFileTransformOperator¶
使用 GCSTimeSpanFileTransformOperator 來轉換在特定時間跨度(資料間隔)內修改的檔案。時間跨度由其開始和結束時間戳定義。如果 DAG 沒有排程 *下一個* DAG 例項,則時間跨度的結束是無限的,這意味著該運算子處理所有早於 data_interval_start 的檔案。
tests/system/google/cloud/gcs/example_gcs_transform_timespan.py
gcs_timespan_transform_files_task = GCSTimeSpanFileTransformOperator(
task_id="gcs_timespan_transform_files",
source_bucket=BUCKET_NAME_SRC,
source_prefix=SOURCE_PREFIX,
source_gcp_conn_id=SOURCE_GCP_CONN_ID,
destination_bucket=BUCKET_NAME_DST,
destination_prefix=DESTINATION_PREFIX,
destination_gcp_conn_id=DESTINATION_GCP_CONN_ID,
transform_script=["python", TRANSFORM_SCRIPT_PATH],
)
GCSBucketCreateAclEntryOperator¶
在指定的儲存桶上建立一個新的 ACL 條目。
有關引數定義,請參閱 GCSBucketCreateAclEntryOperator
使用此運算子¶
tests/system/google/cloud/gcs/example_gcs_acl.py
gcs_bucket_create_acl_entry_task = GCSBucketCreateAclEntryOperator(
bucket=BUCKET_NAME,
entity=GCS_ACL_ENTITY,
role=GCS_ACL_BUCKET_ROLE,
task_id="gcs_bucket_create_acl_entry_task",
)
模板化¶
template_fields: Sequence[str] = (
"bucket",
"entity",
"role",
"user_project",
"impersonation_chain",
)
更多資訊¶
請參閱 Google Cloud Storage 文件,瞭解如何在儲存桶中建立新的 ACL 條目:create a new ACL entry for a bucket。
GCSObjectCreateAclEntryOperator¶
在指定的物件上建立一個新的 ACL 條目。
有關引數定義,請參閱 GCSObjectCreateAclEntryOperator
使用此運算子¶
tests/system/google/cloud/gcs/example_gcs_acl.py
gcs_object_create_acl_entry_task = GCSObjectCreateAclEntryOperator(
bucket=BUCKET_NAME,
object_name=FILE_NAME,
entity=GCS_ACL_ENTITY,
role=GCS_ACL_OBJECT_ROLE,
task_id="gcs_object_create_acl_entry_task",
)
模板化¶
template_fields: Sequence[str] = (
"bucket",
"object_name",
"entity",
"generation",
"role",
"user_project",
"impersonation_chain",
)
更多資訊¶
請參閱 Google Cloud Storage insert 文件,瞭解如何為 ObjectAccess 建立 ACL 條目:create a ACL entry for ObjectAccess。
GCSListObjectsOperator¶
使用 GCSListObjectsOperator 列出 Google Cloud Storage 儲存桶中的物件。您可以選擇指定字首來僅列出名稱以此字首開頭的物件,並指定分隔符來模擬類似目錄的組織結構。
tests/system/google/cloud/gcs/example_gcs_copy_delete.py
list_buckets = GCSListObjectsOperator(task_id="list_buckets", bucket=BUCKET_NAME_SRC)
GCSDeleteObjectsOperator¶
使用 GCSDeleteObjectsOperator 從 Google Cloud Storage 儲存桶中刪除一個或多個物件。
tests/system/google/cloud/gcs/example_gcs_copy_delete.py
delete_files = GCSDeleteObjectsOperator(
task_id="delete_files", bucket_name=BUCKET_NAME_SRC, objects=[FILE_NAME]
)
刪除儲存桶¶
刪除儲存桶允許您從 Google Cloud Storage 中移除儲存桶。這是透過 GCSDeleteBucketOperator 運算子執行的。
tests/system/google/cloud/gcs/example_gcs_upload_download.py
delete_bucket = GCSDeleteBucketOperator(task_id="delete_bucket", bucket_name=BUCKET_NAME)
您可以將 Jinja 模板化用於 bucket_name, gcp_conn_id, impersonation_chain, user_project 等引數,從而動態地確定值。
參考¶
更多資訊請參閱:
感測器¶
GCSObjectExistenceSensor¶
使用 GCSObjectExistenceSensor 來等待(輪詢) Google Cloud Storage 中某個檔案的存在。
tests/system/google/cloud/gcs/example_gcs_sensor.py
gcs_object_exists = GCSObjectExistenceSensor(
bucket=DESTINATION_BUCKET_NAME,
object=FILE_NAME,
task_id="gcs_object_exists_task",
)
如果您希望在感測器執行時釋放 worker 槽,則還可以在此運算子中使用可延遲模式。
tests/system/google/cloud/gcs/example_gcs_sensor.py
gcs_object_exists_defered = GCSObjectExistenceSensor(
bucket=DESTINATION_BUCKET_NAME, object=FILE_NAME, task_id="gcs_object_exists_defered", deferrable=True
)
GCSObjectsWithPrefixExistenceSensor¶
使用 GCSObjectsWithPrefixExistenceSensor 來等待(輪詢) Google Cloud Storage 中具有指定字首的檔案的存在。
tests/system/google/cloud/gcs/example_gcs_sensor.py
gcs_object_with_prefix_exists = GCSObjectsWithPrefixExistenceSensor(
bucket=DESTINATION_BUCKET_NAME,
prefix=FILE_NAME[:5],
task_id="gcs_object_with_prefix_exists_task",
)
如果您希望此感測器非同步執行,從而更有效地利用 Airflow 部署中的資源,可以將 deferrable 引數設定為 True。但是,此功能需要啟用觸發器元件才能工作。
tests/system/google/cloud/gcs/example_gcs_sensor.py
gcs_object_with_prefix_exists_async = GCSObjectsWithPrefixExistenceSensor(
bucket=DESTINATION_BUCKET_NAME,
prefix=FILE_NAME[:5],
task_id="gcs_object_with_prefix_exists_task_async",
deferrable=True,
)
GCSUploadSessionCompleteSensor¶
使用 GCSUploadSessionCompleteSensor 來檢查 Google Cloud Storage 中具有指定字首的檔案數量是否發生變化。
tests/system/google/cloud/gcs/example_gcs_sensor.py
gcs_upload_session_complete = GCSUploadSessionCompleteSensor(
bucket=DESTINATION_BUCKET_NAME,
prefix=FILE_NAME,
inactivity_period=15,
min_objects=1,
allow_delete=True,
previous_objects=set(),
task_id="gcs_upload_session_complete_task",
)
如果您希望在感測器執行時釋放 worker 槽,可以將引數 deferrable 設定為 True。
tests/system/google/cloud/gcs/example_gcs_sensor.py
gcs_upload_session_async_complete = GCSUploadSessionCompleteSensor(
bucket=DESTINATION_BUCKET_NAME,
prefix=FILE_NAME,
inactivity_period=15,
min_objects=1,
allow_delete=True,
previous_objects=set(),
task_id="gcs_upload_session_async_complete",
deferrable=True,
)
GCSObjectUpdateSensor¶
使用 GCSObjectUpdateSensor 來檢查 Google Cloud Storage 中的物件是否已更新。
tests/system/google/cloud/gcs/example_gcs_sensor.py
gcs_update_object_exists = GCSObjectUpdateSensor(
bucket=DESTINATION_BUCKET_NAME,
object=FILE_NAME,
task_id="gcs_object_update_sensor_task",
)
如果您希望此感測器非同步執行,從而有效地利用 Airflow 部署中的資源,可以將 deferrable 引數設定為 True。但是,此功能需要啟用觸發器元件才能工作。
tests/system/google/cloud/gcs/example_gcs_sensor.py
gcs_update_object_exists_async = GCSObjectUpdateSensor(
bucket=DESTINATION_BUCKET_NAME,
object=FILE_NAME,
task_id="gcs_object_update_sensor_task_async",
deferrable=True,
)
更多資訊¶
感測器具有不同的模式,用於確定任務執行期間資源的表現。有關使用感測器的最佳實踐,請參閱Airflow 感測器文件。
參考¶
更多資訊請參閱: