Airflow Summit 2025 將於 10 月 07-09 日舉行。立即註冊獲取早鳥票!

雲原生工作流與物件儲存

在 2.8 版本中新增。

歡迎來到我們 Airflow 系列的最後一個教程!現在,你已經學習瞭如何使用 Python 和 Taskflow API 構建 DAG,使用 XComs 傳遞資料,並將任務串聯成清晰、可重用的工作流。

在本教程中,我們將進一步介紹 物件儲存 API。此 API 使讀寫雲端儲存變得更容易,例如 Amazon S3、Google Cloud Storage (GCS) 或 Azure Blob Storage,而無需擔心特定提供商的 SDK 或低階憑據管理。

我們將帶你瞭解一個真實的用例

  1. 從公共 API 拉取資料

  2. 將資料以 Parquet 格式儲存到物件儲存

  3. 使用 DuckDB 透過 SQL 進行分析

在此過程中,我們將重點介紹新的 ObjectStoragePath 抽象,解釋 Airflow 如何透過連線處理雲憑據,並展示這如何實現可移植的、雲無關的管道。

重要性

許多資料工作流依賴於檔案——無論是原始 CSV、中間 Parquet 檔案還是模型 artifact。傳統上,你需要為此編寫 S3 特定或 GCS 特定的程式碼。現在,有了 ObjectStoragePath,你可以編寫適用於不同提供商的通用程式碼,只要你配置了正確的 Airflow 連線。

讓我們開始吧!

前置條件

開始之前,請確保你已安裝以下內容:

  • DuckDB,一個程序內 SQL 資料庫:使用 pip install duckdb 安裝

  • Amazon S3 訪問許可權帶有 s3fs 的 Amazon Providerpip install apache-airflow-providers-amazon[s3fs] (你可以透過更改儲存 URL 協議和安裝相應的 Provider 來替換你偏好的 Provider。)

  • Pandas,用於處理表格資料:pip install pandas

建立 ObjectStoragePath

本教程的核心是 ObjectStoragePath,這是一個用於處理雲物件儲存路徑的新抽象。可以將其視為 pathlib.Path,但作用於儲存桶而非檔案系統。

src/airflow/example_dags/tutorial_objectstorage.py

base = ObjectStoragePath("s3://aws_default@airflow-tutorial-data/")

URL 語法很簡單:protocol://bucket/path/to/file

  • protocol(例如 s3gsazure)決定了後端

  • URL 的“使用者名稱”部分可以是 conn_id,告訴 Airflow 如何認證

  • 如果省略 conn_id,Airflow 將回退到該後端的預設連線

你也可以將 conn_id 作為關鍵字引數提供以提高畫質晰度

ObjectStoragePath("s3://airflow-tutorial-data/", conn_id="aws_default")

這在重用其他地方(例如在 Asset 中)定義的路徑時,或者連線未直接包含在 URL 中時特別方便。關鍵字引數始終優先。

提示

你可以在全域性 DAG 範圍安全地建立一個 ObjectStoragePath。連線僅在使用路徑時解析,而不是建立路徑時解析。

將資料儲存到物件儲存

讓我們獲取一些資料並將其儲存到雲端。

src/airflow/example_dags/tutorial_objectstorage.py

    @task
    def get_air_quality_data(**kwargs) -> ObjectStoragePath:
        """
        #### Get Air Quality Data
        This task gets air quality data from the Finnish Meteorological Institute's
        open data API. The data is saved as parquet.
        """
        import pandas as pd

        logical_date = kwargs["logical_date"]
        start_time = kwargs["data_interval_start"]

        params = {
            "format": "json",
            "precision": "double",
            "groupareas": "0",
            "producer": "airquality_urban",
            "area": "Uusimaa",
            "param": ",".join(aq_fields.keys()),
            "starttime": start_time.isoformat(timespec="seconds"),
            "endtime": logical_date.isoformat(timespec="seconds"),
            "tz": "UTC",
        }

        response = requests.get(API, params=params)
        response.raise_for_status()

        # ensure the bucket exists
        base.mkdir(exist_ok=True)

        formatted_date = logical_date.format("YYYYMMDD")
        path = base / f"air_quality_{formatted_date}.parquet"

        df = pd.DataFrame(response.json()).astype(aq_fields)
        with path.open("wb") as file:
            df.to_parquet(file)

        return path


以下是正在發生的事情:

  • 我們呼叫芬蘭氣象研究所的公共 API 獲取赫爾辛基空氣質量資料

  • 將 JSON 響應解析為 pandas DataFrame

  • 根據任務的邏輯日期生成檔名

  • 使用 ObjectStoragePath,我們將資料直接以 Parquet 格式寫入雲端儲存

這是一個經典的 Taskflow 模式。物件 key 每天變化,這使得我們可以每天執行此任務並隨時間構建資料集。我們返回最終的物件路徑,以便在下一個任務中使用。

這很棒的原因:無需 boto3、無需 GCS 客戶端設定、無需憑據處理。只有適用於各種儲存後端的簡單檔案語義。

使用 DuckDB 分析資料

現在,讓我們使用 DuckDB 透過 SQL 分析這些資料。

src/airflow/example_dags/tutorial_objectstorage.py

    @task
    def analyze(path: ObjectStoragePath, **kwargs):
        """
        #### Analyze
        This task analyzes the air quality data, prints the results
        """
        import duckdb

        conn = duckdb.connect(database=":memory:")
        conn.register_filesystem(path.fs)
        conn.execute(f"CREATE OR REPLACE TABLE airquality_urban AS SELECT * FROM read_parquet('{path}')")

        df2 = conn.execute("SELECT * FROM airquality_urban").fetchdf()

        print(df2.head())


需要注意的幾個關鍵點:

  • DuckDB 原生支援讀取 Parquet

  • DuckDB 和 ObjectStoragePath 都依賴於 fsspec,這使得註冊物件儲存後端變得容易

  • 我們使用 path.fs 獲取正確的檔案系統物件並將其註冊到 DuckDB

  • 最後,我們使用 SQL 查詢 Parquet 檔案並返回一個 pandas DataFrame

請注意,該函式不會手動重新建立路徑——它使用 Xcom 從上一個任務中獲取完整路徑。這使得任務具有可移植性,並且與之前的邏輯解耦。

整合所有內容

以下是連線所有部分的完整 DAG

src/airflow/example_dags/tutorial_objectstorage.py


import pendulum
import requests

from airflow.sdk import ObjectStoragePath, dag, task

API = "https://opendata.fmi.fi/timeseries"

aq_fields = {
    "fmisid": "int32",
    "time": "datetime64[ns]",
    "AQINDEX_PT1H_avg": "float64",
    "PM10_PT1H_avg": "float64",
    "PM25_PT1H_avg": "float64",
    "O3_PT1H_avg": "float64",
    "CO_PT1H_avg": "float64",
    "SO2_PT1H_avg": "float64",
    "NO2_PT1H_avg": "float64",
    "TRSC_PT1H_avg": "float64",
}
base = ObjectStoragePath("s3://aws_default@airflow-tutorial-data/")


@dag(
    schedule=None,
    start_date=pendulum.datetime(2021, 1, 1, tz="UTC"),
    catchup=False,
    tags=["example"],
)
def tutorial_objectstorage():
    """
    ### Object Storage Tutorial Documentation
    This is a tutorial DAG to showcase the usage of the Object Storage API.
    Documentation that goes along with the Airflow Object Storage tutorial is
    located
    [here](https://airflow.apache.tw/docs/apache-airflow/stable/tutorial/objectstorage.html)
    """
    @task
    def get_air_quality_data(**kwargs) -> ObjectStoragePath:
        """
        #### Get Air Quality Data
        This task gets air quality data from the Finnish Meteorological Institute's
        open data API. The data is saved as parquet.
        """
        import pandas as pd

        logical_date = kwargs["logical_date"]
        start_time = kwargs["data_interval_start"]

        params = {
            "format": "json",
            "precision": "double",
            "groupareas": "0",
            "producer": "airquality_urban",
            "area": "Uusimaa",
            "param": ",".join(aq_fields.keys()),
            "starttime": start_time.isoformat(timespec="seconds"),
            "endtime": logical_date.isoformat(timespec="seconds"),
            "tz": "UTC",
        }

        response = requests.get(API, params=params)
        response.raise_for_status()

        # ensure the bucket exists
        base.mkdir(exist_ok=True)

        formatted_date = logical_date.format("YYYYMMDD")
        path = base / f"air_quality_{formatted_date}.parquet"

        df = pd.DataFrame(response.json()).astype(aq_fields)
        with path.open("wb") as file:
            df.to_parquet(file)

        return path
    @task
    def analyze(path: ObjectStoragePath, **kwargs):
        """
        #### Analyze
        This task analyzes the air quality data, prints the results
        """
        import duckdb

        conn = duckdb.connect(database=":memory:")
        conn.register_filesystem(path.fs)
        conn.execute(f"CREATE OR REPLACE TABLE airquality_urban AS SELECT * FROM read_parquet('{path}')")

        df2 = conn.execute("SELECT * FROM airquality_urban").fetchdf()

        print(df2.head())
    obj_path = get_air_quality_data()
    analyze(obj_path)
tutorial_objectstorage()

你可以觸發此 DAG 並在 Airflow UI 的 Graph View 中檢視它。每個任務都清晰地記錄其輸入和輸出,你可以在 Xcom tab 中檢查返回的路徑。

接下來探索什麼

以下是一些進一步探索的方法:

  • 使用物件感測器(例如 S3KeySensor)等待外部系統上傳的檔案

  • 協調 S3 到 GCS 的傳輸或跨區域資料同步

  • 新增分支邏輯來處理缺失或格式錯誤的檔案

  • 嘗試使用不同的格式,如 CSV 或 JSON

另請參閱

本條目是否有幫助?