airflow.providers.google.cloud.operators.gcs¶
此模組包含一個 Google Cloud Storage 桶 operator。
類¶
建立一個新桶。 |
|
列出桶中所有物件,可按名稱字首、分隔符或 match_glob 進行過濾。 |
|
從 Google Cloud Storage 桶中刪除列表中的物件或所有匹配字首的物件。 |
|
在指定的桶上建立一個新的 ACL 條目。 |
|
在指定的物件上建立一個新的 ACL 條目。 |
|
將資料從源 GCS 位置複製到本地檔案系統上的臨時位置。 |
|
複製在指定時間範圍內修改的物件,執行轉換,並將結果上傳到桶。 |
|
從 Google Cloud Storage 中刪除桶。 |
|
同步 Google Cloud Services 中桶或桶目錄的內容。 |
模組內容¶
- class airflow.providers.google.cloud.operators.gcs.GCSCreateBucketOperator(*, bucket_name, resource=None, storage_class='MULTI_REGIONAL', location='US', project_id=PROVIDE_PROJECT_ID, labels=None, gcp_conn_id='google_cloud_default', impersonation_chain=None, **kwargs)[source]¶
繼承自:
airflow.providers.google.cloud.operators.cloud_base.GoogleCloudBaseOperator建立一個新桶。
Google Cloud Storage 使用扁平名稱空間,因此您不能建立一個已存在的桶名。
另請參閱
更多資訊,請參閱桶命名指南:https://cloud.google.com/storage/docs/bucketnaming.html#requirements
- 引數:
bucket_name (str) – 桶的名稱。(templated)
resource (dict | None) – 用於建立桶的可選字典引數。有關可用引數的資訊,請參閱 Cloud Storage API 文件:https://cloud.google.com/storage/docs/json_api/v1/buckets/insert
storage_class (str) –
這定義了桶中物件的儲存方式,並決定了 SLA 和儲存成本。(templated) 可用值包括
MULTI_REGIONALREGIONALSTANDARDNEARLINECOLDLINE.
如果在建立桶時未指定此值,則預設為 STANDARD。
location (str) –
桶的位置。(templated) 桶中物件的資料物理儲存在此區域內。預設為 US。
project_id (str) – Google Cloud 專案的 ID。(templated)
labels (dict | None) – 使用者提供的標籤,以鍵/值對形式。
gcp_conn_id (str) – (可選) 用於連線 Google Cloud 的連線 ID。
impersonation_chain (str | collections.abc.Sequence[str] | None) – (可選) 要使用短期憑證模擬的服務帳號,或者為了獲取列表中最後一個帳號的 access_token 而必需的鏈式帳號列表,該 access_token 將用於請求中的模擬。如果設定為字串,則該帳號必須授予原始帳號 Service Account Token Creator IAM 角色。如果設定為序列,則列表中的身份必須授予直接前置身份 Service Account Token Creator IAM 角色,列表中第一個帳號將此角色授予原始帳號。(templated)
以下 Operator 將在
EU區域建立一個名為test-bucket的新桶,儲存類別為MULTI_REGIONALCreateBucket = GCSCreateBucketOperator( task_id="CreateNewBucket", bucket_name="test-bucket", storage_class="MULTI_REGIONAL", location="EU", labels={"env": "dev", "team": "airflow"}, gcp_conn_id="airflow-conn-id", )
- template_fields: collections.abc.Sequence[str] = ('bucket_name', 'storage_class', 'location', 'project_id', 'impersonation_chain')[source]¶
- class airflow.providers.google.cloud.operators.gcs.GCSListObjectsOperator(*, bucket, prefix=None, delimiter=None, gcp_conn_id='google_cloud_default', impersonation_chain=None, match_glob=None, **kwargs)[source]¶
繼承自:
airflow.providers.google.cloud.operators.cloud_base.GoogleCloudBaseOperator列出桶中所有物件,可按名稱字首、分隔符或 match_glob 進行過濾。
此 operator 返回一個 Python 列表,其中包含物件名稱,可供下游任務中的 XCom 使用。
- 引數:
bucket (str) – 要查詢物件的 Google Cloud Storage 桶。(templated)
prefix (str | list[str] | None) – 字串或字串列表,用於過濾名稱以此開頭/開頭的物件。(templated)
delimiter (str | None) – (已棄用) 用於過濾物件的分隔符。(templated) 例如,要在 GCS 中列出目錄中的 CSV 檔案,可以使用 delimiter='.csv'。
gcp_conn_id (str) – (可選) 用於連線 Google Cloud 的連線 ID。
impersonation_chain (str | collections.abc.Sequence[str] | None) – (可選) 要使用短期憑證模擬的服務帳號,或者為了獲取列表中最後一個帳號的 access_token 而必需的鏈式帳號列表,該 access_token 將用於請求中的模擬。如果設定為字串,則該帳號必須授予原始帳號 Service Account Token Creator IAM 角色。如果設定為序列,則列表中的身份必須授予直接前置身份 Service Account Token Creator IAM 角色,列表中第一個帳號將此角色授予原始帳號。(templated)
match_glob (str | None) – (可選) 根據給定的 glob 模式字串過濾物件 (例如,
'**/*.json')
- 示例:
以下 Operator 將列出
data桶中sales/sales-2017資料夾中的所有 Avro 檔案。GCS_Files = GCSListOperator( task_id="GCS_Files", bucket="data", prefix="sales/sales-2017/", match_glob="**/*.avro", gcp_conn_id=google_cloud_conn_id, )
- template_fields: collections.abc.Sequence[str] = ('bucket', 'prefix', 'delimiter', 'impersonation_chain')[source]¶
- class airflow.providers.google.cloud.operators.gcs.GCSDeleteObjectsOperator(*, bucket_name, objects=None, prefix=None, gcp_conn_id='google_cloud_default', impersonation_chain=None, **kwargs)[source]¶
繼承自:
airflow.providers.google.cloud.operators.cloud_base.GoogleCloudBaseOperator從 Google Cloud Storage 桶中刪除列表中的物件或所有匹配字首的物件。
- 引數:
bucket_name (str) – 要從中刪除的 GCS 桶
objects (list[str] | None) – 要刪除的物件列表。這些應該是桶中物件的名稱,不包含 gs://bucket/
prefix (str | list[str] | None) – 字串或字串列表,用於過濾名稱以此開頭/開頭的物件。(templated)
gcp_conn_id (str) – (可選) 用於連線 Google Cloud 的連線 ID。
impersonation_chain (str | collections.abc.Sequence[str] | None) – (可選) 要使用短期憑證模擬的服務帳號,或者為了獲取列表中最後一個帳號的 access_token 而必需的鏈式帳號列表,該 access_token 將用於請求中的模擬。如果設定為字串,則該帳號必須授予原始帳號 Service Account Token Creator IAM 角色。如果設定為序列,則列表中的身份必須授予直接前置身份 Service Account Token Creator IAM 角色,列表中第一個帳號將此角色授予原始帳號。(templated)
- template_fields: collections.abc.Sequence[str] = ('bucket_name', 'prefix', 'objects', 'impersonation_chain')[source]¶
- class airflow.providers.google.cloud.operators.gcs.GCSBucketCreateAclEntryOperator(*, bucket, entity, role, user_project=None, gcp_conn_id='google_cloud_default', impersonation_chain=None, **kwargs)[source]¶
繼承自:
airflow.providers.google.cloud.operators.cloud_base.GoogleCloudBaseOperator在指定的桶上建立一個新的 ACL 條目。
另請參閱
有關如何使用此 operator 的更多資訊,請參閱指南:GCSBucketCreateAclEntryOperator
- 引數:
bucket (str) – 桶的名稱。
entity (str) – 持有許可權的實體,以下形式之一:user-userId, user-email, group-groupId, group-email, domain-domain, project-team-projectId, allUsers, allAuthenticatedUsers
role (str) – 實體的訪問許可權。可接受的值為:“OWNER”、“READER”、“WRITER”。
user_project (str | None) – (可選) 此請求的計費專案。對於 Requester Pays 桶是必需的。
gcp_conn_id (str) – (可選) 用於連線 Google Cloud 的連線 ID。
impersonation_chain (str | collections.abc.Sequence[str] | None) – (可選) 要使用短期憑證模擬的服務帳號,或者為了獲取列表中最後一個帳號的 access_token 而必需的鏈式帳號列表,該 access_token 將用於請求中的模擬。如果設定為字串,則該帳號必須授予原始帳號 Service Account Token Creator IAM 角色。如果設定為序列,則列表中的身份必須授予直接前置身份 Service Account Token Creator IAM 角色,列表中第一個帳號將此角色授予原始帳號。(templated)
- template_fields: collections.abc.Sequence[str] = ('bucket', 'entity', 'role', 'user_project', 'impersonation_chain')[source]¶
- 類 airflow.providers.google.cloud.operators.gcs.GCSObjectCreateAclEntryOperator(*, bucket, object_name, entity, role, generation=None, user_project=None, gcp_conn_id='google_cloud_default', impersonation_chain=None, **kwargs)[source]¶
繼承自:
airflow.providers.google.cloud.operators.cloud_base.GoogleCloudBaseOperator在指定的物件上建立一個新的 ACL 條目。
另請參閱
有關如何使用此運算子的更多資訊,請參閱指南: GCSObjectCreateAclEntryOperator
- 引數:
bucket (str) – 桶的名稱。
object_name (str) – 物件的名稱。有關如何對物件名稱進行 URL 編碼以使其路徑安全的更多資訊,請參閱:https://cloud.google.com/storage/docs/json_api/#encoding
entity (str) – 持有許可權的實體,以下形式之一:user-userId, user-email, group-groupId, group-email, domain-domain, project-team-projectId, allUsers, allAuthenticatedUsers
role (str) – 實體(entity)的訪問許可權。可接受的值為:“OWNER”、“READER”。
user_project (str | None) – (可選) 此請求的計費專案。對於 Requester Pays 桶是必需的。
gcp_conn_id (str) – (可選) 用於連線 Google Cloud 的連線 ID。
impersonation_chain (str | collections.abc.Sequence[str] | None) – (可選) 要使用短期憑證模擬的服務帳號,或者為了獲取列表中最後一個帳號的 access_token 而必需的鏈式帳號列表,該 access_token 將用於請求中的模擬。如果設定為字串,則該帳號必須授予原始帳號 Service Account Token Creator IAM 角色。如果設定為序列,則列表中的身份必須授予直接前置身份 Service Account Token Creator IAM 角色,列表中第一個帳號將此角色授予原始帳號。(templated)
- template_fields: collections.abc.Sequence[str] = ('bucket', 'object_name', 'entity', 'generation', 'role', 'user_project', 'impersonation_chain')[source]¶
- 類 airflow.providers.google.cloud.operators.gcs.GCSFileTransformOperator(*, source_bucket, source_object, transform_script, destination_bucket=None, destination_object=None, gcp_conn_id='google_cloud_default', impersonation_chain=None, **kwargs)[source]¶
繼承自:
airflow.providers.google.cloud.operators.cloud_base.GoogleCloudBaseOperator將資料從源 GCS 位置複製到本地檔案系統上的臨時位置。
根據指定的轉換指令碼對該檔案執行轉換,並將輸出上傳到目標儲存桶。如果未指定輸出儲存桶,則原始檔案將被覆蓋。
本地檔案系統中原始檔和目標檔案的位置作為轉換指令碼的第一個和第二個引數提供。轉換指令碼應從源讀取資料,進行轉換,並將輸出寫入本地目標檔案。
- 引數:
source_bucket (str) – 源物件的儲存桶。(模板化)
source_object (str) – 要從 GCS 中檢索的鍵(物件路徑)。(模板化)
destination_bucket (str | None) – 轉換後上傳鍵(物件路徑)的儲存桶。如果未提供,將使用 source_bucket。(模板化)
destination_object (str | None) – 要寫入 GCS 的鍵(物件路徑)。如果未提供,將使用 source_object。(模板化)
transform_script (str | list[str]) – 可執行轉換指令碼的位置或傳遞給子程序的引數列表,例如 [‘python’, ‘script.py’, 10]。(模板化)
gcp_conn_id (str) – 連線到 Google Cloud 時使用的連線 ID。
impersonation_chain (str | collections.abc.Sequence[str] | None) – (可選) 要使用短期憑證模擬的服務帳號,或者為了獲取列表中最後一個帳號的 access_token 而必需的鏈式帳號列表,該 access_token 將用於請求中的模擬。如果設定為字串,則該帳號必須授予原始帳號 Service Account Token Creator IAM 角色。如果設定為序列,則列表中的身份必須授予直接前置身份 Service Account Token Creator IAM 角色,列表中第一個帳號將此角色授予原始帳號。(templated)
- template_fields: collections.abc.Sequence[str] = ('source_bucket', 'source_object', 'destination_bucket', 'destination_object',...[source]¶
- 類 airflow.providers.google.cloud.operators.gcs.GCSTimeSpanFileTransformOperator(*, source_bucket, source_prefix, source_gcp_conn_id, destination_bucket, destination_prefix, destination_gcp_conn_id, transform_script, source_impersonation_chain=None, destination_impersonation_chain=None, chunk_size=None, download_continue_on_fail=False, download_num_attempts=1, upload_continue_on_fail=False, upload_num_attempts=1, **kwargs)[source]¶
繼承自:
airflow.providers.google.cloud.operators.cloud_base.GoogleCloudBaseOperator複製在指定時間範圍內修改的物件,執行轉換,並將結果上傳到桶。
確定在 GCS 源位置的特定時間段內新增或修改的物件列表,將它們複製到本地檔案系統的臨時位置,根據指定的轉換指令碼對檔案執行轉換,並將輸出上傳到目標儲存桶。
另請參閱
有關如何使用此運算子的更多資訊,請參閱指南: GCSTimeSpanFileTransformOperator
本地檔案系統中原始檔和目標檔案的位置作為轉換指令碼的第一個和第二個引數提供。時間跨度作為 UTC ISO 8601 字串以第三個和第四個引數的形式傳遞給轉換指令碼。
轉換指令碼應從源讀取資料,進行轉換,並將輸出寫入本地目標檔案。
- 引數:
source_bucket (str) – 從中獲取資料的儲存桶。(模板化)
source_prefix (str) – 用於過濾物件名稱以此字首開頭的字串字首。可以插入邏輯日期和時間元件。(模板化)
source_gcp_conn_id (str) – 連線到 Google Cloud 以下載要處理的檔案時使用的連線 ID。
source_impersonation_chain (str | collections.abc.Sequence[str] | None) – 可選的服務帳號,用於使用短期憑據進行模擬(以下載要處理的檔案),或為了獲取列表中最後一個帳號的 access_token 所需的帳號鏈列表,該帳號將在請求中被模擬。如果設定為字串,該帳號必須授予發起帳號 Service Account Token Creator IAM 角色。如果設定為序列,列表中的身份必須授予直接前一個身份 Service Account Token Creator IAM 角色,並且列表中的第一個帳號將此角色授予發起帳號(模板化)。
destination_bucket (str) – 用於寫入資料的儲存桶。(模板化)
destination_prefix (str) – 上傳位置的字串字首。可以插入邏輯日期和時間元件。(模板化)
destination_gcp_conn_id (str) – 連線到 Google Cloud 以上傳處理後的檔案時使用的連線 ID。
destination_impersonation_chain (str | collections.abc.Sequence[str] | None) – 可選的服務帳號,用於使用短期憑據進行模擬(以上傳處理後的檔案),或為了獲取列表中最後一個帳號的 access_token 所需的帳號鏈列表,該帳號將在請求中被模擬。如果設定為字串,該帳號必須授予發起帳號 Service Account Token Creator IAM 角色。如果設定為序列,列表中的身份必須授予直接前一個身份 Service Account Token Creator IAM 角色,並且列表中的第一個帳號將此角色授予發起帳號(模板化)。
transform_script (str | list[str]) – 可執行轉換指令碼的位置或傳遞給子程序的引數列表,例如 [‘python’, ‘script.py’, 10]。(模板化)
chunk_size (int | None) – 下載或上傳時的資料塊大小(以位元組為單位)。根據 Google Cloud Storage API 規範,此值必須是 256 KB 的倍數。
download_continue_on_fail (bool | None) – 如果設定為 true,則下載失敗時任務不會出錯,但會繼續執行。
upload_chunk_size – 上傳時的資料塊大小(以位元組為單位)。根據 Google Cloud Storage API 規範,此值必須是 256 KB 的倍數。
upload_continue_on_fail (bool | None) – 如果設定為 true,則上傳失敗時任務不會出錯,但會繼續執行。
upload_num_attempts (int) – 嘗試上傳單個檔案的次數。
- template_fields: collections.abc.Sequence[str] = ('source_bucket', 'source_prefix', 'destination_bucket', 'destination_prefix',...[source]¶
- 靜態 interpolate_prefix(prefix, dt)[source]¶
將字首與日期時間進行插值。
- 引數:
prefix (str) – 要進行插值的字首
dt (datetime.datetime) – 要進行插值的日期時間
- class airflow.providers.google.cloud.operators.gcs.GCSDeleteBucketOperator(*, bucket_name, force=True, gcp_conn_id='google_cloud_default', impersonation_chain=None, user_project=None, **kwargs)[source]¶
繼承自:
airflow.providers.google.cloud.operators.cloud_base.GoogleCloudBaseOperator從 Google Cloud Storage 中刪除桶。
另請參閱
有關如何使用此操作器的更多資訊,請參閱指南:刪除儲存桶
- 引數:
bucket_name (str) – 將要刪除的儲存桶名稱
force (bool) – 如果為 false,則不允許刪除非空儲存桶,設定為 force=True 允許刪除非空儲存桶
gcp_conn_id (str) – 連線到 Google Cloud 時使用的連線 ID。
impersonation_chain (str | collections.abc.Sequence[str] | None) – (可選) 要使用短期憑證模擬的服務帳號,或者為了獲取列表中最後一個帳號的 access_token 而必需的鏈式帳號列表,該 access_token 將用於請求中的模擬。如果設定為字串,則該帳號必須授予原始帳號 Service Account Token Creator IAM 角色。如果設定為序列,則列表中的身份必須授予直接前置身份 Service Account Token Creator IAM 角色,列表中第一個帳號將此角色授予原始帳號。(templated)
user_project (str | None) – (可選) 此請求的計費專案識別符號。對於請求方付費儲存桶是必需的。
- template_fields: collections.abc.Sequence[str] = ('bucket_name', 'gcp_conn_id', 'impersonation_chain', 'user_project')[source]¶
- class airflow.providers.google.cloud.operators.gcs.GCSSynchronizeBucketsOperator(*, source_bucket, destination_bucket, source_object=None, destination_object=None, recursive=True, delete_extra_files=False, allow_overwrite=False, gcp_conn_id='google_cloud_default', impersonation_chain=None, **kwargs)[source]¶
繼承自:
airflow.providers.google.cloud.operators.cloud_base.GoogleCloudBaseOperator同步 Google Cloud Services 中的儲存桶或儲存桶目錄內容。
引數
source_object和destination_object描述根同步目錄。如果未傳遞它們,將同步整個儲存桶。它們應指向目錄。注意
不支援同步單個檔案。只能同步整個目錄。
另請參閱
有關如何使用此操作器的更多資訊,請參閱指南:GCSSynchronizeBucketsOperator
- 引數:
source_bucket (str) – 包含源物件的儲存桶名稱。
destination_bucket (str) – 包含目標物件的儲存桶名稱。
source_object (str | None) – 源儲存桶中的根同步目錄。
destination_object (str | None) – 目標儲存桶中的根同步目錄。
recursive (bool) – 如果為 True,將考慮子目錄
allow_overwrite (bool) – 如果為 True,當找到不匹配的檔案時將覆蓋檔案。預設情況下不允許覆蓋檔案
delete_extra_files (bool) –
如果為 True,刪除源中存在但在目標中不存在的其他檔案。預設情況下不刪除其他檔案。
注意
如果指定錯誤的源/目標組合,此選項會快速刪除資料。
gcp_conn_id (str) – (可選) 用於連線 Google Cloud 的連線 ID。
impersonation_chain (str | collections.abc.Sequence[str] | None) – (可選) 要使用短期憑證模擬的服務帳號,或者為了獲取列表中最後一個帳號的 access_token 而必需的鏈式帳號列表,該 access_token 將用於請求中的模擬。如果設定為字串,則該帳號必須授予原始帳號 Service Account Token Creator IAM 角色。如果設定為序列,則列表中的身份必須授予直接前置身份 Service Account Token Creator IAM 角色,列表中第一個帳號將此角色授予原始帳號。(templated)
- template_fields: collections.abc.Sequence[str] = ('source_bucket', 'destination_bucket', 'source_object', 'destination_object', 'recursive',...[source]¶