airflow.providers.amazon.aws.hooks.s3

使用 boto3 庫與 AWS S3 互動。

屬性

logger

NO_ACL

S3Hook

與 Amazon Simple Storage Service (S3) 互動。

函式

provide_bucket_name(func)

如果函式未傳入儲存桶名稱,則從連線中獲取儲存桶名稱。

unify_bucket_name_and_key(func)

在未傳入儲存桶名稱但至少傳入了一個 key 的情況下,統一儲存桶名稱和 key。

模組內容

airflow.providers.amazon.aws.hooks.s3.logger[source]
airflow.providers.amazon.aws.hooks.s3.NO_ACL = 'no-acl'[source]
airflow.providers.amazon.aws.hooks.s3.provide_bucket_name(func)[source]

如果函式未傳入儲存桶名稱,則從連線中獲取儲存桶名稱。

airflow.providers.amazon.aws.hooks.s3.unify_bucket_name_and_key(func)[source]

在未傳入儲存桶名稱但至少傳入了一個 key 的情況下,統一儲存桶名稱和 key。

class airflow.providers.amazon.aws.hooks.s3.S3Hook(aws_conn_id=AwsBaseHook.default_conn_name, transfer_config_args=None, extra_args=None, *args, **kwargs)[source]

基類: airflow.providers.amazon.aws.hooks.base_aws.AwsBaseHook

與 Amazon Simple Storage Service (S3) 互動。

boto3.client("s3")boto3.resource("s3") 提供厚封裝。

引數:
  • transfer_config_args (dict | None) – 用於託管 S3 傳輸的配置物件。

  • extra_args (dict | None) – 可傳遞給下載/上傳操作的額外引數。

另請參閱

https://boto3.amazonaws.com/v1/documentation/api/latest/reference/customizations/s3.html#s3-transfers

  • 有關允許的上傳額外引數,請參閱 boto3.s3.transfer.S3Transfer.ALLOWED_UPLOAD_ARGS

  • 有關允許的下載額外引數,請參閱 boto3.s3.transfer.S3Transfer.ALLOWED_DOWNLOAD_ARGS

可以指定附加引數(例如 aws_conn_id),這些引數將傳遞給底層的 AwsBaseHook。

transfer_config[source]
property resource[source]
property extra_args[source]

返回 hook 的額外引數(不可變)。

static parse_s3_url(s3url)[source]

將 S3 URL 解析為儲存桶名稱和 key。

有關有效的 URL 格式,請參閱 https://docs.aws.amazon.com/AmazonS3/latest/userguide/access-bucket-intro.html

引數:

s3url (str) – 要解析的 S3 URL。

返回值:

解析後的儲存桶名稱和 key

返回型別:

tuple[str, str]

static get_s3_bucket_key(bucket, key, bucket_param_name, key_param_name)[source]

獲取 S3 儲存桶名稱和 key。

來源可以是: - 儲存桶名稱和 key。檢查 key 是相對路徑後返回資訊。 - key。必須是完整的 s3:// URL。

引數:
  • bucket (str | None) – S3 儲存桶名稱

  • key (str) – S3 key

  • bucket_param_name (str) – 包含儲存桶名稱的引數名稱

  • key_param_name (str) – 包含 key 名稱的引數名稱

返回值:

解析後的儲存桶名稱和 key

返回型別:

tuple[str, str]

check_for_bucket(bucket_name=None)[source]

檢查 bucket_name 是否存在。

引數:

bucket_name (str | None) – 儲存桶名稱

返回值:

如果存在則為 True,否則為 False。

返回型別:

bool

get_bucket(bucket_name=None)[source]

返回一個 S3.Bucket 物件。

引數:

bucket_name (str | None) – 儲存桶名稱

返回值:

與儲存桶名稱對應的儲存桶物件。

返回型別:

mypy_boto3_s3.service_resource.Bucket

create_bucket(bucket_name=None, region_name=None)[source]

建立一個 Amazon S3 儲存桶。

引數:
  • bucket_name (str | None) – 儲存桶名稱

  • region_name (str | None) – 建立儲存桶所在的 AWS 區域名稱。

check_for_prefix(prefix, delimiter, bucket_name=None)[source]

檢查儲存桶中是否存在某個字首。

引數:
  • bucket_name (str | None) – 儲存桶名稱

  • prefix (str) – key 字首

  • delimiter (str) – 用於標記 key 層級結構的分隔符。

返回值:

如果字首在儲存桶中不存在則為 False,如果存在則為 True。

返回型別:

bool

list_prefixes(bucket_name=None, prefix=None, delimiter=None, page_size=None, max_items=None)[source]

列出儲存桶中在指定字首下的字首。

引數:
  • bucket_name (str | None) – 儲存桶名稱

  • prefix (str | None) – key 字首

  • delimiter (str | None) – 用於標記 key 層級結構的分隔符。

  • page_size (int | None) – 分頁大小

  • max_items (int | None) – 最大返回條目數

返回值:

匹配的字首列表

返回型別:

list

async get_head_object_async(client, key, bucket_name=None)[source]

檢索物件的元資料。

引數:
  • client (aiobotocore.client.AioBaseClient) – aiobotocore 客戶端

  • bucket_name (str | None) – 儲存檔案的儲存桶名稱

  • key (str) – 指向檔案的 S3 key

async list_prefixes_async(client, bucket_name=None, prefix=None, delimiter=None, page_size=None, max_items=None)[source]

列出儲存桶中在指定字首下的字首。

引數:
  • client (aiobotocore.client.AioBaseClient) – ClientCreatorContext

  • bucket_name (str | None) – 儲存桶名稱

  • prefix (str | None) – key 字首

  • delimiter (str | None) – 用於標記 key 層級結構的分隔符。

  • page_size (int | None) – 分頁大小

  • max_items (int | None) – 最大返回條目數

返回值:

匹配的字首列表

返回型別:

list[Any]

async get_file_metadata_async(client, bucket_name, key=None)[source]

非同步獲取與萬用字元表示式匹配的 key 存在於儲存桶中的檔案列表。

引數:
  • client (aiobotocore.client.AioBaseClient) – aiobotocore 客戶端

  • bucket_name (str) – 儲存桶名稱

  • key (str | None) – key 的路徑

async check_key_async(client, bucket, bucket_keys, wildcard_match, use_regex=False)[source]

獲取與萬用字元表示式匹配的 key 存在的檔案列表或獲取 head 物件。

如果 wildcard_match 為 True,則非同步獲取與萬用字元表示式匹配的 key 存在於儲存桶中的檔案列表並返回布林值。如果 wildcard_match 為 False,則從儲存桶獲取 head 物件並返回布林值。

引數:
  • client (aiobotocore.client.AioBaseClient) – aiobotocore 客戶端

  • bucket (str) – 儲存桶名稱

  • bucket_keys (str | list[str]) – 指向檔案的 S3 key

  • wildcard_match (bool) – key 的路徑

  • use_regex (bool) – 是否使用正則表示式檢查儲存桶

async check_for_prefix_async(client, prefix, delimiter, bucket_name=None)[source]

檢查儲存桶中是否存在某個字首。

引數:
  • bucket_name (str | None) – 儲存桶名稱

  • prefix (str) – key 字首

  • delimiter (str) – 用於標記 key 層級結構的分隔符。

返回值:

如果字首在儲存桶中不存在則為 False,如果存在則為 True。

返回型別:

bool

async get_files_async(client, bucket, bucket_keys, wildcard_match, delimiter='/')[source]

獲取儲存桶中的檔案列表。

async is_keys_unchanged_async(client, bucket_name, prefix, inactivity_period=60 * 60, min_objects=1, previous_objects=None, inactivity_seconds=0, allow_delete=True, last_activity_time=None)[source]

檢查是否上傳了新物件以及指定週期是否已過;據此更新 sensor 狀態。

引數:
  • client (aiobotocore.client.AioBaseClient) – aiobotocore 客戶端

  • bucket_name (str) – 儲存桶名稱

  • prefix (str) – key 字首

  • inactivity_period (float) – 指定 key 未更改的總不活動秒數。注意,此機制不是即時的,此 operator 可能直到此週期過去且沒有檢測到其他物件後經過一個 poke_interval 才會返回。

  • min_objects (int) – 對於 key 未更改 sensor,視為有效所需的最小物件數。

  • previous_objects (set[str] | None) – 在上次 poke 期間找到的物件 ID 集合。

  • inactivity_seconds (int) – 不活動秒數

  • allow_delete (bool) – 此 sensor 是否應將兩次 poke 之間刪除物件視為有效行為。如果為 true,發生此情況時將記錄警告訊息。如果為 false,則將引發錯誤。

  • last_activity_time (datetime.datetime | None) – 最後活動時間。

list_keys(bucket_name=None, prefix=None, delimiter=None, page_size=None, max_items=None, start_after_key=None, from_datetime=None, to_datetime=None, object_filter=None, apply_wildcard=False)[source]

列出儲存桶中在指定字首下且不包含分隔符的 key。

引數:
  • bucket_name (str | None) – 儲存桶名稱

  • prefix (str | None) – key 字首

  • delimiter (str | None) – 用於標記 key 層級結構的分隔符。

  • page_size (int | None) – 分頁大小

  • max_items (int | None) – 最大返回條目數

  • start_after_key (str | None) – 僅返回大於此 key 的 key

  • from_datetime (datetime.datetime | None) – 僅返回 LastModified 屬性大於或等於此 from_datetime 的 key

  • to_datetime (datetime.datetime | None) – 僅返回 LastModified 屬性小於此 to_datetime 的 key

  • object_filter (Callable[Ellipsis, list] | None) – 接收 S3 物件列表、from_datetime 和 to_datetime 並返回匹配 key 列表的函式。

  • apply_wildcard (bool) – 是否將字首中的 ‘*’ 視為萬用字元或普通符號。

示例:返回 LastModified 屬性大於 from_datetime 的 S3 物件列表

且小於 to_datetime

def object_filter(
    keys: list,
    from_datetime: datetime | None = None,
    to_datetime: datetime | None = None,
) -> list:
    def _is_in_period(input_date: datetime) -> bool:
        if from_datetime is not None and input_date < from_datetime:
            return False

        if to_datetime is not None and input_date > to_datetime:
            return False
        return True

    return [k["Key"] for k in keys if _is_in_period(k["LastModified"])]
返回值:

匹配的 key 列表

返回型別:

list

get_file_metadata(prefix, bucket_name=None, page_size=None, max_items=None)[source]

列出儲存桶中在指定字首下的元資料物件。

引數:
  • prefix (str) – key 字首

  • bucket_name (str | None) – 儲存桶名稱

  • page_size (int | None) – 分頁大小

  • max_items (int | None) – 最大返回條目數

返回值:

物件元資料列表

返回型別:

list

head_object(key, bucket_name=None)[source]

檢索物件的元資料。

引數:
  • key (str) – 指向檔案的 S3 key

  • bucket_name (str | None) – 儲存檔案的儲存桶名稱

返回值:

物件的元資料

返回型別:

dict | None

check_for_key(key, bucket_name=None)[source]

檢查儲存桶中是否存在某個 key。

引數:
  • key (str) – 指向檔案的 S3 key

  • bucket_name (str | None) – 儲存檔案的儲存桶名稱

返回值:

如果 key 存在則為 True,否則為 False。

返回型別:

bool

get_key(key, bucket_name=None)[source]

返回一個 S3.Object

引數:
  • key (str) – key 的路徑

  • bucket_name (str | None) – 儲存桶名稱

返回值:

來自儲存桶的 key 物件

返回型別:

mypy_boto3_s3.service_resource.Object

read_key(key, bucket_name=None)[source]

從 S3 讀取 key。

另請參閱

引數:
  • key (str) – 指向檔案的 S3 key

  • bucket_name (str | None) – 儲存檔案的儲存桶名稱

返回值:

key 的內容

返回型別:

str

select_key(key, bucket_name=None, expression=None, expression_type=None, input_serialization=None, output_serialization=None)[source]

使用 S3 Select 讀取 key。

引數:
  • key (str) – 指向檔案的 S3 key

  • bucket_name (str | None) – 儲存檔案的儲存桶名稱

  • expression (str | None) – S3 Select 表示式

  • expression_type (str | None) – S3 Select 表示式型別

  • input_serialization (dict[str, Any] | None) – S3 Select 輸入資料序列化格式

  • output_serialization (dict[str, Any] | None) – S3 Select 輸出資料序列化格式

返回值:

由 S3 Select 檢索到的原始資料子集

返回型別:

str

check_for_wildcard_key(wildcard_key, bucket_name=None, delimiter='')[原始碼]

檢查儲存桶中是否存在與萬用字元表示式匹配的鍵。

引數:
  • wildcard_key (str) – 鍵的路徑

  • bucket_name (str | None) – 儲存桶名稱

  • delimiter (str) – 定界符標記鍵的層級結構

返回值:

如果鍵存在,則為 True;否則為 False。

返回型別:

bool

get_wildcard_key(wildcard_key, bucket_name=None, delimiter='')[原始碼]

返回與萬用字元表示式匹配的 boto3.s3.Object 物件。

引數:
  • wildcard_key (str) – 鍵的路徑

  • bucket_name (str | None) – 儲存桶名稱

  • delimiter (str) – 定界符標記鍵的層級結構

返回值:

儲存桶中的鍵物件,如果未找到則為 None。

返回型別:

mypy_boto3_s3.service_resource.Object | None

load_file(filename, key, bucket_name=None, replace=False, encrypt=False, gzip=False, acl_policy=None)[原始碼]

將本地檔案載入到 S3。

引數:
  • filename (pathlib.Path | str) – 要載入的檔案的路徑。

  • key (str) – 指向檔案的 S3 key

  • bucket_name (str | None) – 儲存檔案的儲存桶名稱

  • replace (bool) – 一個標誌,用於決定是否覆蓋已存在的鍵。如果 replace 為 False 且鍵已存在,將引發錯誤。

  • encrypt (bool) – 如果為 True,檔案將在 S3 伺服器端加密,並以加密形式儲存在 S3 中。

  • gzip (bool) – 如果為 True,檔案將在本地壓縮。

  • acl_policy (str | None) – 指定上傳到 S3 儲存桶的檔案的預設 ACL 策略的字串。

load_string(string_data, key, bucket_name=None, replace=False, encrypt=False, encoding=None, acl_policy=None, compression=None)[原始碼]

將字串載入到 S3。

提供此功能是為了方便將字串放入 S3。它使用 boto 基礎架構將檔案傳送到 S3。

引數:
  • string_data (str) – 要設定為鍵內容的字串。

  • key (str) – 指向檔案的 S3 key

  • bucket_name (str | None) – 儲存檔案的儲存桶名稱

  • replace (bool) – 一個標誌,用於決定是否覆蓋已存在的鍵

  • encrypt (bool) – 如果為 True,檔案將在 S3 伺服器端加密,並以加密形式儲存在 S3 中。

  • encoding (str | None) – 字串到位元組的編碼方式

  • acl_policy (str | None) – 指定要上傳的物件的預設 ACL 策略的字串

  • compression (str | None) – 要使用的壓縮型別,當前僅支援 gzip。

load_bytes(bytes_data, key, bucket_name=None, replace=False, encrypt=False, acl_policy=None)[原始碼]

將位元組資料載入到 S3。

提供此功能是為了方便將位元組資料放入 S3。它使用 boto 基礎架構將檔案傳送到 S3。

引數:
  • bytes_data (bytes) – 要設定為鍵內容的位元組資料。

  • key (str) – 指向檔案的 S3 key

  • bucket_name (str | None) – 儲存檔案的儲存桶名稱

  • replace (bool) – 一個標誌,用於決定是否覆蓋已存在的鍵

  • encrypt (bool) – 如果為 True,檔案將在 S3 伺服器端加密,並以加密形式儲存在 S3 中。

  • acl_policy (str | None) – 指定要上傳的物件的預設 ACL 策略的字串

load_file_obj(file_obj, key, bucket_name=None, replace=False, encrypt=False, acl_policy=None)[原始碼]

將檔案物件載入到 S3。

引數:
  • file_obj (io.BytesIO) – 要設定為 S3 鍵內容的類檔案物件。

  • key (str) – 指向檔案的 S3 key

  • bucket_name (str | None) – 儲存檔案的儲存桶名稱

  • replace (bool) – 一個標誌,指示是否覆蓋已存在的鍵。

  • encrypt (bool) – 如果為 True,S3 會在伺服器上加密檔案,檔案將以加密形式儲存在 S3 中。

  • acl_policy (str | None) – 指定要上傳的物件的預設 ACL 策略的字串

copy_object(source_bucket_key, dest_bucket_key, source_bucket_name=None, dest_bucket_name=None, source_version_id=None, acl_policy=None, meta_data_directive=None, **kwargs)[原始碼]

建立已儲存在 S3 中的物件的副本。

注意:此處使用的 S3 連線需要同時具有源儲存桶/鍵和目標儲存桶/鍵的訪問許可權。

引數:
  • source_bucket_key (str) –

    源物件的鍵。

    它可以是完整的 s3:// 風格 URL,也可以是相對於根級別的路徑。

    當指定為完整的 s3:// URL 時,請省略 source_bucket_name。

  • dest_bucket_key (str) –

    要複製到的物件的鍵。

    指定 dest_bucket_key 的約定與 source_bucket_key 相同。

  • source_bucket_name (str | None) –

    源物件所在的 S3 儲存桶的名稱。

    source_bucket_key 作為完整的 s3:// URL 提供時,應省略此引數。

  • dest_bucket_name (str | None) –

    物件被複制到的 S3 儲存桶的名稱。

    dest_bucket_key 作為完整的 s3:// URL 提供時,應省略此引數。

  • source_version_id (str | None) – 源物件的版本 ID(可選)

  • acl_policy (str | None) – 指定要複製的物件的預設 ACL 策略的字串,預設為 private。

  • meta_data_directive (str | None) – 是從源物件 COPY 元資料,還是使用請求中提供的元資料來 REPLACE 元資料。

delete_bucket(bucket_name, force_delete=False, max_retries=5)[原始碼]

要刪除 S3 儲存桶,需要先刪除儲存桶中的所有物件,然後才能刪除儲存桶。

引數:
  • bucket_name (str) – 儲存桶名稱

  • force_delete (bool) – 啟用此選項以即使儲存桶不為空也刪除它。

  • max_retries (int) – 要刪除儲存桶,它必須是空的。如果 force_delete 為 true,則重試可能有助於防止在刪除儲存桶中的物件和嘗試刪除儲存桶之間發生競態條件。

返回值:

None

返回型別:

None

delete_objects(bucket, keys)[原始碼]

從儲存桶中刪除鍵。

引數:
  • bucket (str) – 要從中刪除物件(s)的儲存桶名稱

  • keys (str | list) –

    要從 S3 儲存桶中刪除的鍵。

    keys 是字串時,它應為要刪除的單個物件的鍵名。

    keys 是列表時,它應為要刪除的鍵列表。

download_file(key, bucket_name=None, local_path=None, preserve_file_name=False, use_autogenerated_subdir=True)[原始碼]

將檔案從 S3 位置下載到本地檔案系統。

注意

此函式遮蔽了 S3 API 的 ‘download_file’ 方法,但它們並不相同。如果想使用 S3 API 的原始方法,請使用 ‘S3Hook.get_conn().download_file()’。

引數:
  • key (str) – S3 中的鍵路徑。

  • bucket_name (str | None) – 要使用的特定儲存桶。

  • local_path (str | None) – 下載檔案的本地路徑。如果未提供路徑,則將使用系統的臨時目錄。

  • preserve_file_name (bool) – 如果希望下載的檔名與 S3 中的檔名相同,請將此引數設定為 True。設定為 False 時,將生成隨機檔名。預設值:False。

  • use_autogenerated_subdir (bool) – 與 ‘preserve_file_name = True’ 搭配使用,將檔案下載到 ‘local_path’ 內隨機生成的資料夾中,有助於避免多個可能下載同名檔案的任務之間的衝突。如果不需要此功能並希望獲得可預測的路徑,請將其設定為 ‘False’。預設值:True。

返回值:

檔名。

返回型別:

str

generate_presigned_url(client_method, params=None, expires_in=3600, http_method=None)[原始碼]

根據客戶端、其方法和引數生成預簽名 URL。

引數:
  • client_method (str) – 要預簽名的客戶端方法。

  • params (dict | None) – 通常傳遞給 ClientMethod 的引數。

  • expires_in (int) – 預簽名 URL 的有效秒數。預設情況下,它在一小時(3600 秒)後過期。

  • http_method (str | None) – 要在生成的 URL 上使用的 http 方法。預設情況下,http 方法是該方法模型中使用的任何方法。

返回值:

預簽名 URL。

返回型別:

str | None

get_bucket_tagging(bucket_name=None)[原始碼]

從儲存桶獲取標籤列表。

引數:

bucket_name (str | None) – 儲存桶的名稱。

返回值:

一個包含標籤鍵值對的列表

返回型別:

list[dict[str, str]] | None

put_bucket_tagging(tag_set=None, key=None, value=None, bucket_name=None)[原始碼]

用提供的標籤覆蓋現有 TagSet;必須提供 TagSet、鍵值對或兩者。

引數:
  • tag_set (dict[str, str] | list[dict[str, str]] | None) – 一個包含標籤鍵值對的字典,或一個已按 API 格式化的列表。

  • key (str | None) – 新 TagSet 條目的鍵。

  • value (str | None) – 新 TagSet 條目的值。

  • bucket_name (str | None) – 儲存桶的名稱。

返回值:

None

返回型別:

None

delete_bucket_tagging(bucket_name=None)[原始碼]

刪除儲存桶中的所有標籤。

引數:

bucket_name (str | None) – 儲存桶的名稱。

返回值:

None

返回型別:

None

此條目是否有用?