Airflow Summit 2025 將於 10 月 07-09 日舉行。立即註冊可享早鳥票優惠!

資產定義

版本 2.4 新增。

版本 3.0 變更:此概念先前稱為“資料集”。

什麼是“資產”?

Airflow 資產是資料的邏輯分組。上游生產者任務可以更新資產,資產更新有助於排程下游消費者 DAG。

統一資源識別符號 (URI) 定義資產

from airflow.sdk import Asset

example_asset = Asset("s3://asset-bucket/example.csv")

Airflow 對 URI 所代表的資料的內容或位置不做任何假設,並將 URI 視為一個字串。這意味著 Airflow 會將任何正則表示式,例如 input_\d+.csv,或檔案 glob 模式,例如 input_2022*.csv,視為嘗試從一個宣告建立多個資產,並且它們將無法工作。

您必須使用有效的 URI 建立資產。Airflow 核心和提供者定義了各種您可以使用的資料方案,例如 file(核心)、postgres(透過 Postgres 提供者)和 s3(透過 Amazon 提供者)。第三方提供者和外掛也可能提供自己的方案。這些預定義方案具有各自的語義,應遵循這些語義。您可以使用可選的 name 引數為資產提供一個更具可讀性的識別符號。

from airflow.sdk import Asset

example_asset = Asset(uri="s3://asset-bucket/example.csv", name="bucket-1")

什麼是有效的 URI?

從技術上講,URI 必須符合 RFC 3986 中的有效字元集,這基本上是 ASCII 字母數字字元,加上 %-_.~。要標識無法用 URI 安全字元表示的資源,請使用百分比編碼對資源名稱進行編碼。

URI 也區分大小寫,因此 s3://example/assets3://Example/asset 被認為是不同的。請注意,URI 的 host 部分也區分大小寫,這與 RFC 3986 不同。

對於預定義方案(例如 filepostgress3),您必須提供一個有意義的 URI。如果無法提供,請完全使用另一種沒有語義限制的方案。Airflow 永遠不會要求使用者定義的 URI 方案(帶字首 x-)具有語義,因此這可能是一個不錯的選擇。如果您有一個只能稍後(例如,在任務執行期間)獲得的 URI,請考慮改用 AssetAlias 並稍後更新 URI。

# invalid asset:
must_contain_bucket_name = Asset("s3://")

不要使用 airflow 方案,它保留用於 Airflow 內部使用。

Airflow 始終偏好在方案中使用小寫字母,並且 URI 的主機部分需要區分大小寫,以便正確區分資源。

# invalid assets:
reserved = Asset("airflow://example_asset")
not_ascii = Asset("èxample_datašet")

如果您想使用不包含額外語義約束的方案定義資產,請使用帶有字首 x- 的方案。Airflow 會跳過對這些方案的 URI 進行任何語義驗證。

# valid asset, treated as a plain string
my_ds = Asset("x-my-thing://foobarbaz")

識別符號不必是絕對的;它可以是沒有方案的相對 URI,甚至只是一個簡單的路徑或字串

# valid assets:
schemeless = Asset("//example/asset")
csv_file = Asset("example_asset")

非絕對識別符號被認為是普通字串,對 Airflow 不帶任何語義含義。

關於資產的額外資訊

如果需要,您可以在資產中包含一個額外字典

example_asset = Asset(
    "s3://asset/example.csv",
    extra={"team": "trainees"},
)

這可用於為資產提供自定義描述,例如目標檔案的所有者是誰,或該檔案用於何處。額外資訊不影響資產的身份。

注意

安全注意:資產 URI 和額外欄位未加密,它們以明文形式儲存在 Airflow 的元資料資料庫中。切勿在資產 URI 或額外部索引鍵值中儲存任何敏感值,特別是憑證!

建立一個任務來觸發資產事件

一旦資產定義好,就可以透過指定 outlets 來建立任務以針對其觸發事件

from airflow.sdk import DAG, Asset
from airflow.providers.standard.operators.python import PythonOperator

example_asset = Asset(name="example_asset", uri="s3://asset-bucket/example.csv")


def _write_example_asset():
    """Write data to example_asset..."""


with DAG(dag_id="example_asset", schedule="@daily"):
    PythonOperator(task_id="example_asset", outlets=[example_asset], python_callable=_write_example_asset)

這相當多樣板程式碼。Airflow 為這種簡單但最常見的情況提供了一種簡寫方式:建立一個僅含一個任務並觸發單個資產事件的 DAG。下面的程式碼塊與上面的程式碼塊完全等價

from airflow.sdk import asset


@asset(uri="s3://asset-bucket/example.csv", schedule="@daily")
def example_asset():
    """Write data to example_asset..."""

宣告一個 @asset 會自動建立

  • 一個 Asset,其 name 設定為函式名。

  • 一個 DAG,其 dag_id 設定為函式名。

  • DAG 中的一個任務,其 task_id 設定為函式名,並且 outlet 指向建立的 Asset

為觸發中的資產事件附加額外資訊

版本 2.10.0 新增。

帶有資產出口的任務可以在觸發資產事件之前可選地附加額外資訊。這與關於資產的額外資訊不同。資產的額外資訊靜態描述了資產 URI 指向的實體;而資產事件的額外資訊則應用於標註觸發的資料變更,例如更新改變了資料庫中的多少行,或者覆蓋的日期範圍。

將額外資訊附加到資產事件的最簡單方法是從任務中 yield 一個 Metadata 物件

from airflow.sdk import Metadata, asset


@asset(uri="s3://asset/example.csv", schedule=None)
def example_s3(self):  # 'self' here refers to the current asset.
    df = ...  # Get a Pandas DataFrame to write.
    # Write df to asset...
    yield Metadata(self, {"row_count": len(df)})

Airflow 會自動收集所有 yield 的元資料,並用相應的元資料物件的額外資訊填充資產事件。

這也可以在經典運算子中完成。最好的方法是繼承運算子並覆蓋 execute 方法。或者,也可以在任務的 pre_executepost_execute 鉤子中新增額外資訊。然而,如果您選擇使用鉤子,請記住任務重試時它們不會重新執行,這可能導致在某些情況下額外資訊與實際資料不匹配。

實現同樣效果的另一種方法是直接訪問任務執行上下文中的 outlet_events

@asset(schedule=None)
def write_to_s3(self, context):
    context["outlet_events"][self].extra = {"row_count": len(df)}

這裡沒什麼魔法——Airflow 只是將 yield 的值寫入完全相同的訪問器。這在經典運算子中也適用,包括 executepre_executepost_execute

從之前觸發的資產事件中獲取資訊

版本 2.10.0 新增。

在任務的 outlets 中定義的資產事件,如前一節所述,可以被在其 inlets 中宣告相同資產的任務讀取。資產事件條目包含 extra(詳情見前一節)、指示事件從任務中觸發的時間的 timestamp,以及將事件連結回其源的 source_task_instance

可以使用執行上下文中的 inlet_events 訪問器讀取入口資產事件。接前一節的 write_to_s3 資產

@asset(schedule=None)
def post_process_s3_file(context, write_to_s3):  # Declaring an inlet to write_to_s3.
    events = context["inlet_events"][write_to_s3]
    last_row_count = events[-1].extra["row_count"]

inlet_events 對映中的每個值都是一個序列狀物件,它按 timestamp 對給定資產的過去事件進行排序,從最早到最新。它支援 Python 列表的大部分介面,因此您可以使用 [-1] 訪問最後一個事件,使用 [-2:] 訪問最後兩個事件等。訪問器是惰性的,只有當您訪問其中的專案時才會訪問資料庫。

@asset@task 和經典運算子之間的依賴關係

由於 @asset 只是一個包含任務和資產的 DAG 的簡單包裝器,因此在 @task 或經典運算子中讀取和 @asset 非常容易。例如,上面的 post_process_s3_file 也可以寫成一個任務(在一個 DAG 中,這裡為簡潔起見省略了)

@task(inlets=[write_to_s3])
def post_process_s3_file(*, inlet_events):
    events = inlet_events[example_s3_asset]
    last_row_count = events[-1].extra["row_count"]


post_process_s3_file()

反之亦然

example_asset = Asset("example_asset")


@task(outlets=[example_asset])
def emit_example_asset():
    """Write to example_asset..."""


@asset(schedule=None)
def process_example_asset(example_asset):
    """Process inlet example_asset..."""

在一個任務中輸出到多個資產

一個任務可以為多個資產觸發事件。這通常不建議,但在某些情況下需要,例如當您需要將一個數據源拆分成多個時。這對於任務來說很簡單,因為 outlets 在設計上就是複數形式

from airflow.sdk import DAG, Asset, task

input_asset = Asset("input_asset")
out_asset_1 = Asset("out_asset_1")
out_asset_2 = Asset("out_asset_2")

with DAG(dag_id="process_input", schedule=None):

    @task(inlets=[input_asset], outlets=[out_asset_1, out_asset_2])
    def process_input():
        """Split input into two."""

其簡寫形式是 @asset.multi

from airflow.sdk import Asset, asset

input_asset = Asset("input_asset")
out_asset_1 = Asset("out_asset_1")
out_asset_2 = Asset("out_asset_2")


@asset.multi(schedule=None, outlets=[out_asset_1, out_asset_2])
def process_input(input_asset):
    """Split input into two."""

透過 AssetAlias 觸發動態資料事件和建立資產

資產別名可用於觸發與別名相關聯的資產事件。下游可以依賴於解析的資產。此功能允許您基於資產更新定義 DAG 執行的複雜依賴關係。

如何使用 AssetAlias

AssetAlias 只有一個引數 name,它唯一標識資產。任務必須首先將別名宣告為出口,然後使用 outlet_events 或 yield Metadata 向其新增事件。

以下示例針對 S3 URI f"s3://bucket/my-task" 建立了一個資產事件,並附帶可選的額外資訊 extra。如果資產不存在,Airflow 將動態建立它並記錄警告訊息。

透過 outlet_events 在任務執行期間觸發資產事件

from airflow.sdk.definitions.asset import AssetAlias


@task(outlets=[AssetAlias("my-task-outputs")])
def my_task_with_outlet_events(*, outlet_events):
    outlet_events[AssetAlias("my-task-outputs")].add(Asset("s3://bucket/my-task"), extra={"k": "v"})

透過 yielding Metadata 在任務執行期間觸發資產事件

from airflow.sdk.definitions.asset.metadata import Metadata


@task(outlets=[AssetAlias("my-task-outputs")])
def my_task_with_metadata():
    s3_asset = Asset(uri="s3://bucket/my-task", name="example_s3")
    yield Metadata(s3_asset, extra={"k": "v"}, alias="my-task-outputs")

新增的資產只會觸發一個資產事件,即使它被多次新增到別名中,或新增到多個別名中。但是,如果傳遞了不同的 extra 值,它可以觸發多個資產事件。在下面的示例中,將觸發兩個資產事件。

from airflow.sdk.definitions.asset import AssetAlias


@task(
    outlets=[
        AssetAlias("my-task-outputs-1"),
        AssetAlias("my-task-outputs-2"),
        AssetAlias("my-task-outputs-3"),
    ]
)
def my_task_with_outlet_events(*, outlet_events):
    outlet_events[AssetAlias("my-task-outputs-1")].add(Asset("s3://bucket/my-task"), extra={"k": "v"})
    # This line won't emit an additional asset event as the asset and extra are the same as the previous line.
    outlet_events[AssetAlias("my-task-outputs-2")].add(Asset("s3://bucket/my-task"), extra={"k": "v"})
    # This line will emit an additional asset event as the extra is different.
    outlet_events[AssetAlias("my-task-outputs-3")].add(Asset("s3://bucket/my-task"), extra={"k2": "v2"})

透過解析的資產別名獲取之前觸發的資產事件資訊

從之前觸發的資產事件中獲取資訊中所述,可以使用執行上下文中的 inlet_events 訪問器讀取入口資產事件,您也可以使用資產別名訪問由它們觸發的資產事件。

with DAG(dag_id="asset-alias-producer"):

    @task(outlets=[AssetAlias("example-alias")])
    def produce_asset_events(*, outlet_events):
        outlet_events[AssetAlias("example-alias")].add(Asset("s3://bucket/my-task"), extra={"row_count": 1})


with DAG(dag_id="asset-alias-consumer", schedule=None):

    @task(inlets=[AssetAlias("example-alias")])
    def consume_asset_alias_events(*, inlet_events):
        events = inlet_events[AssetAlias("example-alias")]
        last_row_count = events[-1].extra["row_count"]

此條目有幫助嗎?