物件儲存¶
在 2.8.0 版本中新增。
所有主要的雲提供商都在物件儲存中提供持久化資料儲存。這些不是經典的“POSIX”檔案系統。為了儲存數百 PB 的資料而沒有任何單點故障,物件儲存用更簡單的物件名 => 資料模型取代了傳統的檔案系統目錄樹。為了啟用遠端訪問,物件操作通常作為(較慢的)HTTP REST 操作提供。
Airflow 在物件儲存(如 s3、gcs 和 Azure Blob Storage)之上提供了一個通用抽象層。這個抽象層允許你在 DAGs 中使用多種物件儲存系統,而無需更改程式碼來處理每一種不同的物件儲存系統。此外,它還允許你使用大多數可以處理類檔案物件的標準 Python 模組,例如 shutil。
對特定物件儲存系統的支援取決於你已安裝的 Provider(提供者)。例如,如果你安裝了 apache-airflow-providers-google Provider(提供者),你就可以使用 gcs Scheme(方案)進行物件儲存。預設情況下,Airflow 支援 file Scheme(方案)。
注意
支援 s3 需要你安裝 apache-airflow-providers-amazon[s3fs]。這是因為它依賴於 aiobotocore,該庫預設不會安裝,因為它可能與 botocore 產生依賴衝突。
雲物件儲存不是真正的檔案系統¶
物件儲存看起來像檔案系統,但它們並非真正的檔案系統。它們不支援真正的檔案系統所支援的所有操作。主要區別包括:
不保證原子重新命名操作。這意味著如果你將檔案從一個位置移動到另一個位置,它會被先複製然後刪除。如果複製失敗,你將丟失檔案。
目錄是模擬的,可能導致對其操作變慢。例如,列出目錄可能需要列出 bucket 中的所有物件並按字首過濾它們。
在檔案內進行查詢可能需要顯著的呼叫開銷,損害效能,或者根本不受支援。
Airflow 依賴 fsspec 來在不同物件儲存系統之間提供一致的使用體驗。它實現了本地檔案快取以加速訪問。但是,在設計 DAGs 時,你應該瞭解物件儲存的限制。
基本用法¶
要使用物件儲存,你需要使用要互動的物件的 URI 例項化一個 Path(參見下文)物件。例如,要指向 s3 中的一個 bucket,你可以這樣做:
from airflow.sdk import ObjectStoragePath
base = ObjectStoragePath("s3://aws_default@my-bucket/")
URI 中的使用者名稱部分代表 Airflow 連線 ID,是可選的。它也可以作為單獨的關鍵字引數傳入
# Equivalent to the previous example.
base = ObjectStoragePath("s3://my-bucket/", conn_id="aws_default")
列出檔案物件
@task
def list_files() -> list[ObjectStoragePath]:
files = [f for f in base.iterdir() if f.is_file()]
return files
在目錄樹內導航
base = ObjectStoragePath("s3://my-bucket/")
subdir = base / "subdir"
# prints ObjectStoragePath("s3://my-bucket/subdir")
print(subdir)
開啟檔案
@task
def read_file(path: ObjectStoragePath) -> str:
with path.open() as f:
return f.read()
利用 XCOM,你可以在任務之間傳遞路徑
@task
def create(path: ObjectStoragePath) -> ObjectStoragePath:
return path / "new_file.txt"
@task
def write_file(path: ObjectStoragePath, content: str):
with path.open("wb") as f:
f.write(content)
new_file = create(base)
write = write_file(new_file, b"data")
read >> write
配置¶
在其基本用法中,物件儲存抽象層不需要太多配置,並依賴於標準的 Airflow 連線機制。這意味著你可以使用 conn_id 引數來指定要使用的連線。連線中的任何設定都會下推到底層實現。例如,如果你使用 s3,你可以指定 aws_access_key_id 和 aws_secret_access_key,也可以新增額外的引數,例如 endpoint_url 來指定自定義端點。
備選後端¶
可以為 Scheme(方案)或 Protocol(協議)配置備選後端。這透過將 backend 附加到 Scheme(方案)來完成。例如,要為 dbfs Scheme(方案)啟用 Databricks 後端,你可以執行以下操作:
from airflow.sdk import ObjectStoragePath
from airflow.sdk.io import attach
from fsspec.implementations.dbfs import DBFSFileSystem
attach(protocol="dbfs", fs=DBFSFileSystem(instance="myinstance", token="mytoken"))
base = ObjectStoragePath("dbfs://my-location/")
注意
為了在任務之間重用註冊,請確保在 DAG 的頂層附加後端。否則,該後端將無法在多個任務之間使用。
路徑 API¶
物件儲存抽象被實現為一個 Path API,並構建在 Universal Pathlib 之上。這意味著你與物件儲存互動時,基本上可以使用與本地檔案系統相同的 API。在本節中,我們僅列出這兩種 API 之間的差異。超出標準 Path API 的擴充套件操作,例如複製和移動,列在下一節中。有關每個操作的詳細資訊,例如它們接受哪些引數,請參閱 ObjectStoragePath 類的文件。
mkdir¶
在指定路徑或 bucket/container 內建立一個目錄條目。對於沒有真正目錄的系統,它可能僅為此例項建立一個目錄條目,而不影響實際檔案系統。
如果 parents 為 True,則會根據需要建立此路徑的任何缺失父級。
touch¶
在此給定路徑建立檔案,或更新時間戳。如果 truncate 為 True,則檔案將被截斷,這是預設行為。如果檔案已存在,當 exists_ok 為 True 時函式會成功(並且其修改時間更新為當前時間),否則會引發 FileExistsError。
stat¶
返回一個類似 stat_result 的物件,該物件支援以下屬性:st_size、st_mtime、st_mode,但它也像一個字典一樣,可以提供有關物件的額外元資料。例如,對於 s3,它將返回額外的鍵,例如:['ETag', 'ContentType']。如果你的程式碼需要在不同的物件儲存之間移植,請不要依賴擴充套件元資料。
擴充套件¶
以下操作不屬於標準 Path API 的一部分,但物件儲存抽象層支援它們。
bucket¶
返回 bucket 名稱。
校驗和¶
返回檔案的校驗和。
container¶
bucket 的別名
fs¶
訪問已例項化檔案系統的便捷屬性
key¶
返回物件 key。
namespace¶
返回物件的 namespace。通常這是 Protocol(協議),例如帶有 bucket 名稱的 s3://。
path¶
與 fsspec 相容的路徑,用於檔案系統例項
protocol¶
filesystem_spec Protocol(協議)。
read_block¶
從此給定路徑的檔案中讀取一塊位元組。
從檔案 offset 位置開始,讀取 length 位元組。如果設定了 delimiter(分隔符),則確保讀取操作在位於 offset 和 offset + length 範圍內的分隔符邊界處開始和停止。如果 offset 為零,則從零開始。返回的位元組串將包含結束分隔符字串。
如果 offset + length 超出檔案末尾 (eof),則讀取到 eof。
sign¶
建立一個表示給定路徑的簽名 URL。某些實現允許生成臨時 URL,作為委託憑證的一種方式。
size¶
返回給定路徑處檔案的大小(以位元組為單位)。
storage_options¶
例項化底層檔案系統的儲存選項。
ukey¶
檔案屬性的雜湊值,用於判斷檔案是否已更改。
複製和移動¶
本文件記錄了 copy 和 move 操作的預期行為,特別是跨物件儲存(例如檔案 -> s3)的行為。每個方法將檔案或目錄從 source 位置複製或移動到 target 位置。預期的行為與 fsspec 指定的行為相同。對於跨物件儲存的目錄複製,Airflow 需要遍歷目錄樹並單獨複製每個檔案。這是透過將每個檔案從源流式傳輸到目標來完成的。
外部整合¶
許多其他專案,如 DuckDB、Apache Iceberg 等,都可以利用物件儲存抽象。通常這是透過傳遞底層 fsspec 實現來完成的。為此,ObjectStoragePath 公開了 fs 屬性。例如,以下程式碼適用於 duckdb,以便使用來自 Airflow 的連線詳細資訊連線到 s3,並讀取由 ObjectStoragePath 指示的 Parquet 檔案
import duckdb
from airflow.sdk import ObjectStoragePath
path = ObjectStoragePath("s3://my-bucket/my-table.parquet", conn_id="aws_default")
conn = duckdb.connect(database=":memory:")
conn.register_filesystem(path.fs)
conn.execute(f"CREATE OR REPLACE TABLE my_table AS SELECT * FROM read_parquet('{path}');")