airflow.providers.google.cloud.hooks.gcs

此模組包含一個 Google Cloud Storage 鉤子。

屬性

RT

T

FParams

List

DEFAULT_TIMEOUT

PROVIDE_BUCKET

GCSHook

使用 Google Cloud 連線與 Google Cloud Storage 進行互動。

GCSAsyncHook

GCSAsyncHook 執行在觸發器 worker 上,繼承自 GoogleBaseAsyncHook。

函式

gcs_object_is_directory(bucket)

如果給定的 Google Cloud Storage URL (gs://<bucket>/<blob>) 是一個目錄或空 bucket,則返回 True。

parse_json_from_gcs(gcp_conn_id, file_uri[, ...])

從 Google Cloud Storage 下載並解析 json 檔案。

模組內容

airflow.providers.google.cloud.hooks.gcs.RT[source]
airflow.providers.google.cloud.hooks.gcs.T[source]
airflow.providers.google.cloud.hooks.gcs.FParams[source]
airflow.providers.google.cloud.hooks.gcs.List[source]
airflow.providers.google.cloud.hooks.gcs.DEFAULT_TIMEOUT = 60[source]
airflow.providers.google.cloud.hooks.gcs.PROVIDE_BUCKET: str = None[source]
class airflow.providers.google.cloud.hooks.gcs.GCSHook(gcp_conn_id='google_cloud_default', impersonation_chain=None, **kwargs)[source]

繼承自: airflow.providers.google.common.hooks.base_google.GoogleBaseHook

使用 Google Cloud 連線與 Google Cloud Storage 進行互動。

get_conn()[source]

返回一個 Google Cloud Storage 服務物件。

copy(source_bucket, source_object, destination_bucket=None, destination_object=None)[source]

將一個物件從一個 bucket 複製到另一個 bucket,如果請求則重新命名。

可以省略 destination_bucket 或 destination_object,在這種情況下使用 source bucket/object,但不能兩者都省略。

引數:
  • source_bucket (str) – 要複製的物件的源 bucket。

  • source_object (str) – 要複製的物件。

  • destination_bucket (str | None) – 要複製到的物件的目的地。可以省略;如果省略,則使用相同的 bucket。

  • destination_object (str | None) – 如果給出,則為物件的(重新命名後的)路徑。可以省略;如果省略,則使用相同的名稱。

rewrite(source_bucket, source_object, destination_bucket, destination_object=None)[source]

類似於 copy;支援超過 5 TB 的檔案,以及在位置和/或儲存類別之間複製。

可以省略 destination_object,在這種情況下使用 source_object。

引數:
  • source_bucket (str) – 要複製的物件的源 bucket。

  • source_object (str) – 要複製的物件。

  • destination_bucket (str) – 要複製到的物件的目的地。

  • destination_object (str | None) – 如果給出,則為物件的(重新命名後的)路徑。可以省略;如果省略,則使用相同的名稱。

download(bucket_name: str, object_name: str, filename: None = None, chunk_size: int | None = None, timeout: int | None = DEFAULT_TIMEOUT, num_max_attempts: int | None =1, user_project: str | None =None) bytes[source]
download(bucket_name: str, object_name: str, filename: str, chunk_size: int | None = None, timeout: int | None =DEFAULT_TIMEOUT, num_max_attempts: int | None =1, user_project: str | None =None) str

從 Google Cloud Storage 下載檔案。

如果未提供 filename,operator 將檔案載入到記憶體並返回其內容。如果提供了 filename,它將檔案寫入指定位置並返回該位置。對於超出可用記憶體的檔案大小,建議寫入檔案。

引數:
  • bucket_name – 要從中獲取的 bucket。

  • object_name – 要獲取的物件。

  • filename – 如果設定,則為檔案應該寫入的本地檔案路徑。

  • chunk_size – Blob 塊大小。

  • timeout – 請求超時(秒)。

  • num_max_attempts – 下載檔案的嘗試次數。

  • user_project – 要向其收取請求費用的 Google Cloud 專案識別符號。Requester Pays buckets 需要此項。

download_as_byte_array(bucket_name, object_name, chunk_size=None, timeout=DEFAULT_TIMEOUT, num_max_attempts=1)[source]

從 Google Cloud Storage 下載檔案。

如果未提供 filename,operator 將檔案載入到記憶體並返回其內容。如果提供了 filename,它將檔案寫入指定位置並返回該位置。對於超出可用記憶體的檔案大小,建議寫入檔案。

引數:
  • bucket_name (str) – 要從中獲取的 bucket。

  • object_name (str) – 要獲取的物件。

  • chunk_size (int | None) – Blob 塊大小。

  • timeout (int | None) – 請求超時(秒)。

  • num_max_attempts (int | None) – 下載檔案的嘗試次數。

provide_file(bucket_name=PROVIDE_BUCKET, object_name=None, object_url=None, dir=None, user_project=None)[source]

將檔案下載到臨時目錄並返回一個檔案控制代碼。

您可以透過傳遞 bucket_name 和 object_name 引數或僅傳遞 object_url 引數來使用此方法。

引數:
  • bucket_name (str) – 要從中獲取的 bucket。

  • object_name (str | None) – 要獲取的物件。

  • object_url (str | None) – 檔案引用 URL。必須以 "gs: //" 開頭。

  • dir (str | None) – 下載檔案的臨時子目錄。(傳遞給 NamedTemporaryFile)

  • user_project (str | None) – 要向其收取請求費用的 Google Cloud 專案識別符號。Requester Pays buckets 需要此項。

返回:

檔案控制代碼

返回型別:

collections.abc.Generator[IO[bytes], None, None]

provide_file_and_upload(bucket_name=PROVIDE_BUCKET, object_name=None, object_url=None, user_project=None)[source]

建立臨時檔案,返回檔案控制代碼,並在關閉時上傳檔案內容。

您可以透過傳遞 bucket_name 和 object_name 引數或僅傳遞 object_url 引數來使用此方法。

引數:
  • bucket_name (str) – 要從中獲取的 bucket。

  • object_name (str | None) – 要獲取的物件。

  • object_url (str | None) – 檔案引用 URL。必須以 "gs: //" 開頭。

  • user_project (str | None) – 要向其收取請求費用的 Google Cloud 專案識別符號。Requester Pays buckets 需要此項。

返回:

檔案控制代碼

返回型別:

collections.abc.Generator[IO[bytes], None, None]

upload(bucket_name, object_name, filename=None, data=None, mime_type=None, gzip=False, encoding='utf-8', chunk_size=None, timeout=DEFAULT_TIMEOUT, num_max_attempts=1, metadata=None, cache_control=None, user_project=None)[source]

將本地檔案或檔案資料(字串或位元組)上傳到 Google Cloud Storage。

引數:
  • bucket_name (str) – 要上傳到的 bucket。

  • object_name (str) – 上傳檔案時要設定的物件名稱。

  • filename (str | None) – 要上傳檔案的本地檔案路徑。

  • data (str | bytes | None) – 要上傳的檔案資料(字串或位元組)。

  • mime_type (str | None) – 上傳檔案時設定的檔案 mime 型別。

  • gzip (bool) – 上傳時壓縮本地檔案或檔案資料的選項。

  • encoding (str) – 如果檔案資料以字串形式提供,則為位元組編碼。

  • chunk_size (int | None) – Blob 塊大小。

  • timeout (int | None) – 請求超時(秒)。

  • num_max_attempts (int) – 嘗試上傳檔案的次數。

  • metadata (dict | None) – 與檔案一起上傳的元資料。

  • cache_control (str | None) – Cache-Control 元資料欄位。

  • user_project (str | None) – 要向其收取請求費用的 Google Cloud 專案識別符號。Requester Pays buckets 需要此項。

exists(bucket_name, object_name, retry=DEFAULT_RETRY, user_project=None)[source]

檢查 Google Cloud Storage 中是否存在檔案。

引數:
  • bucket_name (str) – 物件所在的 Google Cloud Storage bucket。

  • object_name (str) – 要在 Google Cloud Storage bucket 中檢查的 blob 名稱。

  • retry (google.api_core.retry.Retry) – (可選) 如何重試 RPC。

  • user_project (str | None) – 要向其收取請求費用的 Google Cloud 專案識別符號。Requester Pays buckets 需要此項。

get_blob_update_time(bucket_name, object_name)[source]

獲取 Google Cloud Storage 中檔案的更新時間。

引數:
  • bucket_name (str) – 物件所在的 Google Cloud Storage bucket。

  • object_name (str) – 要從 Google Cloud Storage bucket 獲取更新時間的 blob 名稱。

is_updated_after(bucket_name, object_name, ts)[source]

檢查 Google Cloud Storage 中的 blob 是否已更新。

引數:
  • bucket_name (str) – 物件所在的 Google Cloud Storage bucket。

  • object_name (str) – 要在 Google Cloud Storage bucket 中檢查的物件名稱。

  • ts (datetime.datetime) – 要對照檢查的時間戳。

is_updated_between(bucket_name, object_name, min_ts, max_ts)[source]

檢查 Google Cloud Storage 中的 blob 是否已更新。

引數:
  • bucket_name (str) – 物件所在的 Google Cloud Storage bucket。

  • object_name (str) – 要在 Google Cloud Storage bucket 中檢查的物件名稱。

  • min_ts (datetime.datetime) – 要對照檢查的最小時間戳。

  • max_ts (datetime.datetime) – 要對照檢查的最大時間戳。

is_updated_before(bucket_name, object_name, ts)[source]

檢查 Google Cloud Storage 中的 blob 是否在給定時間之前已更新。

引數:
  • bucket_name (str) – 物件所在的 Google Cloud Storage bucket。

  • object_name (str) – 要在 Google Cloud Storage bucket 中檢查的物件名稱。

  • ts (datetime.datetime) – 要對照檢查的時間戳。

is_older_than(bucket_name, object_name, seconds)[source]

檢查物件是否比給定時間更舊。

引數:
  • bucket_name (str) – 物件所在的 Google Cloud Storage bucket。

  • object_name (str) – 要在 Google Cloud Storage bucket 中檢查的物件名稱。

  • seconds (int) – 要對照檢查的時間(秒)。

delete(bucket_name, object_name)[source]

從 bucket 中刪除物件。

引數:
  • bucket_name (str) – 物件所在的 bucket 名稱。

  • object_name (str) – 要刪除的物件名稱。

get_bucket(bucket_name)[source]

從 Google Cloud Storage 獲取 bucket 物件。

引數:

bucket_name (str) – bucket 名稱。

delete_bucket(bucket_name, force=False, user_project=None)[source]

從 Google Cloud Storage 刪除 bucket 物件。

引數:
  • bucket_name (str) – 將要刪除的 bucket 名稱。

  • force (bool) – false 不允許刪除非空 bucket,設定 force=True 允許刪除非空 bucket。

  • user_project (str | None) – 要向其收取請求費用的 Google Cloud 專案識別符號。Requester Pays buckets 需要此項。

list(bucket_name, versions=None, max_results=None, prefix=None, delimiter=None, match_glob=None, user_project=None)[source]

列出指定單個或多個字首的儲存桶中的所有物件。

引數:
  • bucket_name (str) – 儲存桶名稱

  • versions (bool | None) – 如果為 true,則列出物件的所有版本

  • max_results (int | None) – 單頁響應中返回的最大專案數

  • prefix (str | List[str] | None) – 字串或字串列表,用於過濾名稱以此字串/列表中的字串開頭的物件

  • delimiter (str | None) – (已棄用) 基於分隔符(例如 ‘.csv’)過濾物件

  • match_glob (str | None) – (可選) 基於字串給定的 glob 模式(例如 '**/*/.json')過濾物件。

  • user_project (str | None) – 要向其收取請求費用的 Google Cloud 專案識別符號。Requester Pays buckets 需要此項。

返回:

與過濾條件匹配的物件名稱流

list_by_timespan(bucket_name, timespan_start, timespan_end, versions=None, max_results=None, prefix=None, delimiter=None, match_glob=None)[source]

列出指定字首且在指定時間範圍內更新的儲存桶中的所有物件。

引數:
  • bucket_name (str) – 儲存桶名稱

  • timespan_start (datetime.datetime) – 將返回在此日期時間(UTC)或之後更新的物件

  • timespan_end (datetime.datetime) – 將返回在此日期時間(UTC)之前更新的物件

  • versions (bool | None) – 如果為 true,則列出物件的所有版本

  • max_results (int | None) – 單頁響應中返回的最大專案數

  • prefix (str | None) – 過濾名稱以此字首開頭的物件的字串字首

  • delimiter (str | None) – (已棄用) 基於分隔符(例如 ‘.csv’)過濾物件

  • match_glob (str | None) – (可選) 基於字串給定的 glob 模式(例如 '**/*/.json')過濾物件。

返回:

與過濾條件匹配的物件名稱流

返回型別:

List[str]

get_size(bucket_name, object_name)[source]

獲取 Google Cloud Storage 中檔案的大小。

引數:
  • bucket_name (str) – 儲存 blob_name 的 Google Cloud Storage 儲存桶。

  • object_name (str) – 要在 Google Cloud Storage bucket_name 中檢查的物件名稱。

get_crc32c(bucket_name, object_name)[source]

獲取 Google Cloud Storage 中物件的 CRC32c 校驗和。

引數:
  • bucket_name (str) – 儲存 blob_name 的 Google Cloud Storage 儲存桶。

  • object_name (str) – 要在 Google Cloud Storage bucket_name 中檢查的物件名稱。

get_md5hash(bucket_name, object_name)[source]

獲取 Google Cloud Storage 中物件的 MD5 雜湊。

引數:
  • bucket_name (str) – 儲存 blob_name 的 Google Cloud Storage 儲存桶。

  • object_name (str) – 要在 Google Cloud Storage bucket_name 中檢查的物件名稱。

get_metadata(bucket_name, object_name)[source]

獲取 Google Cloud Storage 中物件的元資料。

引數:
  • bucket_name (str) – 物件所在的 Google Cloud Storage 儲存桶的名稱。

  • object_name (str) – 包含所需元資料的物件的名稱

返回:

與物件關聯的元資料

返回型別:

dict | None

create_bucket(bucket_name, resource=None, storage_class='MULTI_REGIONAL', location='US', project_id=PROVIDE_PROJECT_ID, labels=None)[source]

建立一個新儲存桶。

Google Cloud Storage 使用扁平名稱空間,因此無法建立名稱已在使用中的儲存桶。

另請參閱

有關更多資訊,請參閱儲存桶命名指南:https://cloud.google.com/storage/docs/bucketnaming.html#requirements

引數:
  • bucket_name (str) – 儲存桶的名稱。

  • resource (dict | None) – 用於建立儲存桶的可選字典引數。有關可用引數的資訊,請參閱 Cloud Storage API 文件:https://cloud.google.com/storage/docs/json_api/v1/buckets/insert

  • storage_class (str) –

    這定義了儲存桶中物件的儲存方式,並決定了 SLA 和儲存成本。包括以下值:

    • MULTI_REGIONAL (多區域)

    • REGIONAL (區域)

    • STANDARD (標準)

    • NEARLINE (近線)

    • COLDLINE (冷線).

    如果在建立儲存桶時未指定此值,則預設為 STANDARD。

  • location (str) –

    儲存桶的位置。儲存桶中物件的資料物理儲存在該區域內。預設為 US。

  • project_id (str) – Google Cloud 專案的 ID。

  • labels (dict | None) – 使用者提供的標籤,以鍵/值對形式。

返回:

如果成功,返回儲存桶的 id

返回型別:

str

insert_bucket_acl(bucket_name, entity, role, user_project=None)[source]

在指定的 bucket_name 上建立一個新的 ACL 條目。

參閱:https://cloud.google.com/storage/docs/json_api/v1/bucketAccessControls/insert

引數:
  • bucket_name (str) – 儲存桶名稱。

  • entity (str) – 擁有許可權的實體,格式如下之一:user-userId, user-email, group-groupId, group-email, domain-domain, project-team-projectId, allUsers, allAuthenticatedUsers。參閱:https://cloud.google.com/storage/docs/access-control/lists#scopes

  • role (str) – 實體的訪問許可權。可接受的值有:“OWNER”、“READER”、“WRITER”。

  • user_project (str | None) – (可選) 為此請求付費的專案。對於請求者付費儲存桶是必需的。

insert_object_acl(bucket_name, object_name, entity, role, generation=None, user_project=None)[source]

在指定的物件上建立一個新的 ACL 條目。

參閱:https://cloud.google.com/storage/docs/json_api/v1/objectAccessControls/insert

引數:
  • bucket_name (str) – 儲存桶名稱。

  • object_name (str) – 物件的名稱。有關如何對物件名稱進行 URL 編碼以確保路徑安全的資訊,請參閱:https://cloud.google.com/storage/docs/json_api/#encoding

  • entity (str) – 擁有許可權的實體,格式如下之一:user-userId, user-email, group-groupId, group-email, domain-domain, project-team-projectId, allUsers, allAuthenticatedUsers 參閱:https://cloud.google.com/storage/docs/access-control/lists#scopes

  • role (str) – 實體的訪問許可權。可接受的值有:“OWNER”、“READER”。

  • generation (int | None) – 可選。如果存在,選擇此物件的特定修訂版本。

  • user_project (str | None) – (可選) 為此請求付費的專案。對於請求者付費儲存桶是必需的。

compose(bucket_name, source_objects, destination_object)[source]

將現有物件列表組合到同一儲存桶中的一個新物件中。

目前僅支援在單個操作中連線最多 32 個物件

https://cloud.google.com/storage/docs/json_api/v1/objects/compose

引數:
  • bucket_name (str) – 包含源物件的儲存桶的名稱。這也是儲存組合後的目標物件的儲存桶。

  • source_objects (List[str]) – 將組合成單個物件的源物件列表。

  • destination_object (str) – 如果給定,則為物件的路徑。

sync(source_bucket, destination_bucket, source_object=None, destination_object=None, recursive=True, allow_overwrite=False, delete_extra_files=False)[source]

同步儲存桶的內容。

引數 source_objectdestination_object 描述了根同步目錄。如果未傳遞,則同步整個儲存桶。如果傳遞,則應指向目錄。

注意

不支援單個檔案的同步。只能同步整個目錄。

引數:
  • source_bucket (str) – 包含源物件的儲存桶名稱。

  • destination_bucket (str) – 包含目標物件的儲存桶名稱。

  • source_object (str | None) – 源儲存桶中的根同步目錄。

  • destination_object (str | None) – 目標儲存桶中的根同步目錄。

  • recursive (bool) – 如果為 True,將考慮子目錄

  • recursive – 如果為 True,將考慮子目錄

  • allow_overwrite (bool) – 如果為 True,在找到不匹配檔案時將覆蓋檔案。預設情況下,不允許覆蓋檔案

  • delete_extra_files (bool) –

    如果為 True,刪除源中存在而目標中不存在的附加檔案。預設情況下,不會刪除附加檔案。

    注意

    如果您指定了錯誤的源/目標組合,此選項可能會快速刪除資料。

返回:

none

返回型別:

None

airflow.providers.google.cloud.hooks.gcs.gcs_object_is_directory(bucket)[source]

如果給定的 Google Cloud Storage URL (gs://<bucket>/<blob>) 是一個目錄或空 bucket,則返回 True。

airflow.providers.google.cloud.hooks.gcs.parse_json_from_gcs(gcp_conn_id, file_uri, impersonation_chain=None)[source]

從 Google Cloud Storage 下載並解析 json 檔案。

引數:
  • gcp_conn_id (str) – Airflow Google Cloud 連線 ID。

  • file_uri (str) – json 檔案的完整路徑,例如:gs://test-bucket/dir1/dir2/file

class airflow.providers.google.cloud.hooks.gcs.GCSAsyncHook(**kwargs)[source]

基類:airflow.providers.google.common.hooks.base_google.GoogleBaseAsyncHook

GCSAsyncHook 執行在觸發器 worker 上,繼承自 GoogleBaseAsyncHook。

sync_hook_class[source]
async get_storage_client(session)[source]

返回一個 Google Cloud Storage 服務物件。

此條目是否有幫助?