airflow.providers.amazon.aws.hooks.s3¶
使用 boto3 庫與 AWS S3 互動。
屬性¶
類¶
與 Amazon Simple Storage Service (S3) 互動。 |
函式¶
|
如果函式未傳入儲存桶名稱,則從連線中獲取儲存桶名稱。 |
在未傳入儲存桶名稱但至少傳入了一個 key 的情況下,統一儲存桶名稱和 key。 |
模組內容¶
- airflow.providers.amazon.aws.hooks.s3.unify_bucket_name_and_key(func)[source]¶
在未傳入儲存桶名稱但至少傳入了一個 key 的情況下,統一儲存桶名稱和 key。
- class airflow.providers.amazon.aws.hooks.s3.S3Hook(aws_conn_id=AwsBaseHook.default_conn_name, transfer_config_args=None, extra_args=None, *args, **kwargs)[source]¶
基類:
airflow.providers.amazon.aws.hooks.base_aws.AwsBaseHook與 Amazon Simple Storage Service (S3) 互動。
為
boto3.client("s3")和boto3.resource("s3")提供厚封裝。- 引數:
另請參閱
有關允許的上傳額外引數,請參閱
boto3.s3.transfer.S3Transfer.ALLOWED_UPLOAD_ARGS。有關允許的下載額外引數,請參閱
boto3.s3.transfer.S3Transfer.ALLOWED_DOWNLOAD_ARGS。
可以指定附加引數(例如
aws_conn_id),這些引數將傳遞給底層的 AwsBaseHook。- static parse_s3_url(s3url)[source]¶
將 S3 URL 解析為儲存桶名稱和 key。
有關有效的 URL 格式,請參閱 https://docs.aws.amazon.com/AmazonS3/latest/userguide/access-bucket-intro.html。
- static get_s3_bucket_key(bucket, key, bucket_param_name, key_param_name)[source]¶
獲取 S3 儲存桶名稱和 key。
來源可以是: - 儲存桶名稱和 key。檢查 key 是相對路徑後返回資訊。 - key。必須是完整的 s3:// URL。
- get_bucket(bucket_name=None)[source]¶
返回一個
S3.Bucket物件。- 引數:
bucket_name (str | None) – 儲存桶名稱
- 返回值:
與儲存桶名稱對應的儲存桶物件。
- 返回型別:
mypy_boto3_s3.service_resource.Bucket
- list_prefixes(bucket_name=None, prefix=None, delimiter=None, page_size=None, max_items=None)[source]¶
列出儲存桶中在指定字首下的字首。
- async list_prefixes_async(client, bucket_name=None, prefix=None, delimiter=None, page_size=None, max_items=None)[source]¶
列出儲存桶中在指定字首下的字首。
- async get_file_metadata_async(client, bucket_name, key=None)[source]¶
非同步獲取與萬用字元表示式匹配的 key 存在於儲存桶中的檔案列表。
- async check_key_async(client, bucket, bucket_keys, wildcard_match, use_regex=False)[source]¶
獲取與萬用字元表示式匹配的 key 存在的檔案列表或獲取 head 物件。
如果 wildcard_match 為 True,則非同步獲取與萬用字元表示式匹配的 key 存在於儲存桶中的檔案列表並返回布林值。如果 wildcard_match 為 False,則從儲存桶獲取 head 物件並返回布林值。
- async get_files_async(client, bucket, bucket_keys, wildcard_match, delimiter='/')[source]¶
獲取儲存桶中的檔案列表。
- async is_keys_unchanged_async(client, bucket_name, prefix, inactivity_period=60 * 60, min_objects=1, previous_objects=None, inactivity_seconds=0, allow_delete=True, last_activity_time=None)[source]¶
檢查是否上傳了新物件以及指定週期是否已過;據此更新 sensor 狀態。
- 引數:
client (aiobotocore.client.AioBaseClient) – aiobotocore 客戶端
bucket_name (str) – 儲存桶名稱
prefix (str) – key 字首
inactivity_period (float) – 指定 key 未更改的總不活動秒數。注意,此機制不是即時的,此 operator 可能直到此週期過去且沒有檢測到其他物件後經過一個 poke_interval 才會返回。
min_objects (int) – 對於 key 未更改 sensor,視為有效所需的最小物件數。
previous_objects (set[str] | None) – 在上次 poke 期間找到的物件 ID 集合。
inactivity_seconds (int) – 不活動秒數
allow_delete (bool) – 此 sensor 是否應將兩次 poke 之間刪除物件視為有效行為。如果為 true,發生此情況時將記錄警告訊息。如果為 false,則將引發錯誤。
last_activity_time (datetime.datetime | None) – 最後活動時間。
- list_keys(bucket_name=None, prefix=None, delimiter=None, page_size=None, max_items=None, start_after_key=None, from_datetime=None, to_datetime=None, object_filter=None, apply_wildcard=False)[source]¶
列出儲存桶中在指定字首下且不包含分隔符的 key。
- 引數:
bucket_name (str | None) – 儲存桶名稱
prefix (str | None) – key 字首
delimiter (str | None) – 用於標記 key 層級結構的分隔符。
page_size (int | None) – 分頁大小
max_items (int | None) – 最大返回條目數
start_after_key (str | None) – 僅返回大於此 key 的 key
from_datetime (datetime.datetime | None) – 僅返回 LastModified 屬性大於或等於此 from_datetime 的 key
to_datetime (datetime.datetime | None) – 僅返回 LastModified 屬性小於此 to_datetime 的 key
object_filter (Callable[Ellipsis, list] | None) – 接收 S3 物件列表、from_datetime 和 to_datetime 並返回匹配 key 列表的函式。
apply_wildcard (bool) – 是否將字首中的 ‘*’ 視為萬用字元或普通符號。
- 示例:返回 LastModified 屬性大於 from_datetime 的 S3 物件列表
且小於 to_datetime
def object_filter( keys: list, from_datetime: datetime | None = None, to_datetime: datetime | None = None, ) -> list: def _is_in_period(input_date: datetime) -> bool: if from_datetime is not None and input_date < from_datetime: return False if to_datetime is not None and input_date > to_datetime: return False return True return [k["Key"] for k in keys if _is_in_period(k["LastModified"])]
- 返回值:
匹配的 key 列表
- 返回型別:
- get_file_metadata(prefix, bucket_name=None, page_size=None, max_items=None)[source]¶
列出儲存桶中在指定字首下的元資料物件。
- select_key(key, bucket_name=None, expression=None, expression_type=None, input_serialization=None, output_serialization=None)[source]¶
使用 S3 Select 讀取 key。
- 引數:
- 返回值:
由 S3 Select 檢索到的原始資料子集
- 返回型別:
- get_wildcard_key(wildcard_key, bucket_name=None, delimiter='')[原始碼]¶
返回與萬用字元表示式匹配的 boto3.s3.Object 物件。
- load_file(filename, key, bucket_name=None, replace=False, encrypt=False, gzip=False, acl_policy=None)[原始碼]¶
將本地檔案載入到 S3。
- 引數:
filename (pathlib.Path | str) – 要載入的檔案的路徑。
key (str) – 指向檔案的 S3 key
bucket_name (str | None) – 儲存檔案的儲存桶名稱
replace (bool) – 一個標誌,用於決定是否覆蓋已存在的鍵。如果 replace 為 False 且鍵已存在,將引發錯誤。
encrypt (bool) – 如果為 True,檔案將在 S3 伺服器端加密,並以加密形式儲存在 S3 中。
gzip (bool) – 如果為 True,檔案將在本地壓縮。
acl_policy (str | None) – 指定上傳到 S3 儲存桶的檔案的預設 ACL 策略的字串。
- load_string(string_data, key, bucket_name=None, replace=False, encrypt=False, encoding=None, acl_policy=None, compression=None)[原始碼]¶
將字串載入到 S3。
提供此功能是為了方便將字串放入 S3。它使用 boto 基礎架構將檔案傳送到 S3。
- 引數:
string_data (str) – 要設定為鍵內容的字串。
key (str) – 指向檔案的 S3 key
bucket_name (str | None) – 儲存檔案的儲存桶名稱
replace (bool) – 一個標誌,用於決定是否覆蓋已存在的鍵
encrypt (bool) – 如果為 True,檔案將在 S3 伺服器端加密,並以加密形式儲存在 S3 中。
encoding (str | None) – 字串到位元組的編碼方式
acl_policy (str | None) – 指定要上傳的物件的預設 ACL 策略的字串
compression (str | None) – 要使用的壓縮型別,當前僅支援 gzip。
- load_bytes(bytes_data, key, bucket_name=None, replace=False, encrypt=False, acl_policy=None)[原始碼]¶
將位元組資料載入到 S3。
提供此功能是為了方便將位元組資料放入 S3。它使用 boto 基礎架構將檔案傳送到 S3。
- load_file_obj(file_obj, key, bucket_name=None, replace=False, encrypt=False, acl_policy=None)[原始碼]¶
將檔案物件載入到 S3。
- copy_object(source_bucket_key, dest_bucket_key, source_bucket_name=None, dest_bucket_name=None, source_version_id=None, acl_policy=None, meta_data_directive=None, **kwargs)[原始碼]¶
建立已儲存在 S3 中的物件的副本。
注意:此處使用的 S3 連線需要同時具有源儲存桶/鍵和目標儲存桶/鍵的訪問許可權。
- 引數:
source_bucket_key (str) –
源物件的鍵。
它可以是完整的 s3:// 風格 URL,也可以是相對於根級別的路徑。
當指定為完整的 s3:// URL 時,請省略 source_bucket_name。
dest_bucket_key (str) –
要複製到的物件的鍵。
指定 dest_bucket_key 的約定與 source_bucket_key 相同。
source_bucket_name (str | None) –
源物件所在的 S3 儲存桶的名稱。
當 source_bucket_key 作為完整的 s3:// URL 提供時,應省略此引數。
dest_bucket_name (str | None) –
物件被複制到的 S3 儲存桶的名稱。
當 dest_bucket_key 作為完整的 s3:// URL 提供時,應省略此引數。
source_version_id (str | None) – 源物件的版本 ID(可選)
acl_policy (str | None) – 指定要複製的物件的預設 ACL 策略的字串,預設為 private。
meta_data_directive (str | None) – 是從源物件 COPY 元資料,還是使用請求中提供的元資料來 REPLACE 元資料。
- delete_bucket(bucket_name, force_delete=False, max_retries=5)[原始碼]¶
要刪除 S3 儲存桶,需要先刪除儲存桶中的所有物件,然後才能刪除儲存桶。
- download_file(key, bucket_name=None, local_path=None, preserve_file_name=False, use_autogenerated_subdir=True)[原始碼]¶
將檔案從 S3 位置下載到本地檔案系統。
- 注意
此函式遮蔽了 S3 API 的 ‘download_file’ 方法,但它們並不相同。如果想使用 S3 API 的原始方法,請使用 ‘S3Hook.get_conn().download_file()’。
- 引數:
key (str) – S3 中的鍵路徑。
bucket_name (str | None) – 要使用的特定儲存桶。
local_path (str | None) – 下載檔案的本地路徑。如果未提供路徑,則將使用系統的臨時目錄。
preserve_file_name (bool) – 如果希望下載的檔名與 S3 中的檔名相同,請將此引數設定為 True。設定為 False 時,將生成隨機檔名。預設值:False。
use_autogenerated_subdir (bool) – 與 ‘preserve_file_name = True’ 搭配使用,將檔案下載到 ‘local_path’ 內隨機生成的資料夾中,有助於避免多個可能下載同名檔案的任務之間的衝突。如果不需要此功能並希望獲得可預測的路徑,請將其設定為 ‘False’。預設值:True。
- 返回值:
檔名。
- 返回型別:
- generate_presigned_url(client_method, params=None, expires_in=3600, http_method=None)[原始碼]¶
根據客戶端、其方法和引數生成預簽名 URL。