airflow.providers.google.cloud.transfers.s3_to_gcs

S3ToGCSOperator

將 S3 鍵(可能是一個字首)與 Google Cloud Storage 目標路徑同步。

模組內容

class airflow.providers.google.cloud.transfers.s3_to_gcs.S3ToGCSOperator(*, bucket, prefix='', apply_gcs_prefix=False, delimiter='', aws_conn_id='aws_default', verify=None, gcp_conn_id='google_cloud_default', dest_gcs=None, replace=False, gzip=False, google_impersonation_chain=None, deferrable=conf.getboolean('operators', 'default_deferrable', fallback=False), poll_interval=10, **kwargs)[原始碼]

Bases: airflow.providers.amazon.aws.operators.s3.S3ListOperator

將 S3 鍵(可能是一個字首)與 Google Cloud Storage 目標路徑同步。

另請參閱

有關如何使用此運算子的更多資訊,請參閱指南: 從 Amazon S3 傳輸資料到 Google Cloud Storage

引數:
  • bucket – 查詢物件的 S3 儲存桶。(模板化的)

  • prefix – 用於過濾物件名稱以此字首開頭的字串。(模板化的)

  • apply_gcs_prefix

    (可選)是否將源物件的路徑替換為給定的 GCS 目標路徑。如果 apply_gcs_prefix 為 False(預設),則 S3 中的物件將複製到 GCS 儲存桶的給定 GCS 路徑中,並且源路徑將保留在其中。例如: =>

    如果 apply_gcs_prefix 為 True,則 S3 中的物件將複製到 GCS 儲存桶的給定 GCS 路徑中,並且源路徑將被省略。例如: =>

  • delimiter – 分隔符,標記鍵的層級。(模板化的)

  • aws_conn_id – 源 S3 連線

  • verify

    是否驗證 S3 連線的 SSL 證書。預設情況下會驗證 SSL 證書。您可以提供以下值:

    • False:不驗證 SSL 證書。SSL 仍會使用

      (除非 use_ssl 為 False),但 SSL 證書不會被驗證。

    • path/to/cert/bundle.pem:要使用的 CA 證書捆綁包的檔名。

      如果要使用與 botocore 使用的不同的 CA 證書捆綁包,可以指定此引數。

  • gcp_conn_id – (可選) 用於連線到 Google Cloud 的連線 ID。

  • dest_gcs – 目標 Google Cloud Storage 儲存桶和字首,用於儲存檔案。(模板化的)

  • replace – 是否要替換現有的目標檔案。

  • gzip – 上傳時壓縮檔案的選項。在 deferrable 模式下忽略此引數。

  • google_impersonation_chain (str | collections.abc.Sequence[str] | None) – 可選的 Google 服務賬號,用於使用短期憑據模擬身份,或者鏈式賬號列表,用於獲取列表中最後一個賬號的 access_token,該 access_token 將用於請求中的身份模擬。如果設定為字串,該賬號必須授予發起賬號 Service Account Token Creator IAM 角色。如果設定為序列,列表中的身份必須授予直接前一個身份 Service Account Token Creator IAM 角色,列表中的第一個賬號將此角色授予發起賬號。(模板化的)

  • deferrable – 是否在 deferrable 模式下執行運算子

  • poll_interval (int) – 輪詢作業完成之間的時間間隔(秒)。僅在 deferrable 模式下執行時考慮此值。必須大於 0。

示例:

s3_to_gcs_op = S3ToGCSOperator(
    task_id="s3_to_gcs_example",
    bucket="my-s3-bucket",
    prefix="data/customers-201804",
    gcp_conn_id="google_cloud_default",
    dest_gcs="gs://my.gcs.bucket/some/customers/",
    replace=False,
    gzip=True,
    dag=my_dag,
)

請注意,bucketprefixdelimiterdest_gcs 是模板化的,因此您可以在其中使用變數。

template_fields: collections.abc.Sequence[str] = ('bucket', 'prefix', 'delimiter', 'dest_gcs', 'google_impersonation_chain')[原始碼]
ui_color = '#e09411'[原始碼]
transfer_job_max_files_number = 1000[原始碼]
apply_gcs_prefix = False[原始碼]
gcp_conn_id = 'google_cloud_default'[原始碼]
dest_gcs = None[原始碼]
replace = False[原始碼]
verify = None[原始碼]
gzip = False[原始碼]
google_impersonation_chain = None[原始碼]
deferrable = True[原始碼]
poll_interval = 10[原始碼]
execute(context)[原始碼]

在建立運算子時派生。

上下文是與渲染 jinja 模板時使用的相同的字典。

有關更多上下文資訊,請參閱 get_template_context。

exclude_existing_objects(s3_objects, gcs_hook)[原始碼]

從列表中排除已存在於 GCS 儲存桶中的物件。

s3_to_gcs_object(s3_object)[原始碼]

根據運算子的邏輯將 S3 路徑轉換為 GCS 路徑。

如果 apply_gcs_prefix == True 則 <s3_prefix><content> => <gcs_prefix><content> 如果 apply_gcs_prefix == False 則 <s3_prefix><content> => <gcs_prefix><s3_prefix><content>

gcs_to_s3_object(gcs_object)[原始碼]

根據運算子的邏輯將 GCS 路徑轉換為 S3 路徑。

如果 apply_gcs_prefix == True 則 <gcs_prefix><content> => <s3_prefix><content> 如果 apply_gcs_prefix == False 則 <gcs_prefix><s3_prefix><content> => <s3_prefix><content>

transfer_files(s3_objects, gcs_hook, s3_hook)[原始碼]
transfer_files_async(files, gcs_hook, s3_hook)[原始碼]

提交 Google Cloud Storage Transfer Service 作業,將檔案從 AWS S3 複製到 GCS。

submit_transfer_jobs(files, gcs_hook, s3_hook)[原始碼]
execute_complete(context, event)[原始碼]

立即返回並依賴觸發器丟擲成功事件。觸發器的回撥函式。

依賴觸發器丟擲異常,否則假定執行成功。

get_transfer_hook()[原始碼]
get_openlineage_facets_on_start()[原始碼]

此條目是否有幫助?