資產定義¶
版本 2.4 新增。
版本 3.0 變更:此概念先前稱為“資料集”。
什麼是“資產”?¶
Airflow 資產是資料的邏輯分組。上游生產者任務可以更新資產,資產更新有助於排程下游消費者 DAG。
統一資源識別符號 (URI) 定義資產
from airflow.sdk import Asset
example_asset = Asset("s3://asset-bucket/example.csv")
Airflow 對 URI 所代表的資料的內容或位置不做任何假設,並將 URI 視為一個字串。這意味著 Airflow 會將任何正則表示式,例如 input_\d+.csv,或檔案 glob 模式,例如 input_2022*.csv,視為嘗試從一個宣告建立多個資產,並且它們將無法工作。
您必須使用有效的 URI 建立資產。Airflow 核心和提供者定義了各種您可以使用的資料方案,例如 file(核心)、postgres(透過 Postgres 提供者)和 s3(透過 Amazon 提供者)。第三方提供者和外掛也可能提供自己的方案。這些預定義方案具有各自的語義,應遵循這些語義。您可以使用可選的 name 引數為資產提供一個更具可讀性的識別符號。
from airflow.sdk import Asset
example_asset = Asset(uri="s3://asset-bucket/example.csv", name="bucket-1")
什麼是有效的 URI?¶
從技術上講,URI 必須符合 RFC 3986 中的有效字元集,這基本上是 ASCII 字母數字字元,加上 %、-、_、. 和 ~。要標識無法用 URI 安全字元表示的資源,請使用百分比編碼對資源名稱進行編碼。
URI 也區分大小寫,因此 s3://example/asset 和 s3://Example/asset 被認為是不同的。請注意,URI 的 host 部分也區分大小寫,這與 RFC 3986 不同。
對於預定義方案(例如 file、postgres 和 s3),您必須提供一個有意義的 URI。如果無法提供,請完全使用另一種沒有語義限制的方案。Airflow 永遠不會要求使用者定義的 URI 方案(帶字首 x-)具有語義,因此這可能是一個不錯的選擇。如果您有一個只能稍後(例如,在任務執行期間)獲得的 URI,請考慮改用 AssetAlias 並稍後更新 URI。
# invalid asset:
must_contain_bucket_name = Asset("s3://")
不要使用 airflow 方案,它保留用於 Airflow 內部使用。
Airflow 始終偏好在方案中使用小寫字母,並且 URI 的主機部分需要區分大小寫,以便正確區分資源。
# invalid assets:
reserved = Asset("airflow://example_asset")
not_ascii = Asset("èxample_datašet")
如果您想使用不包含額外語義約束的方案定義資產,請使用帶有字首 x- 的方案。Airflow 會跳過對這些方案的 URI 進行任何語義驗證。
# valid asset, treated as a plain string
my_ds = Asset("x-my-thing://foobarbaz")
識別符號不必是絕對的;它可以是沒有方案的相對 URI,甚至只是一個簡單的路徑或字串
# valid assets:
schemeless = Asset("//example/asset")
csv_file = Asset("example_asset")
非絕對識別符號被認為是普通字串,對 Airflow 不帶任何語義含義。
關於資產的額外資訊¶
如果需要,您可以在資產中包含一個額外字典
example_asset = Asset(
"s3://asset/example.csv",
extra={"team": "trainees"},
)
這可用於為資產提供自定義描述,例如目標檔案的所有者是誰,或該檔案用於何處。額外資訊不影響資產的身份。
注意
安全注意:資產 URI 和額外欄位未加密,它們以明文形式儲存在 Airflow 的元資料資料庫中。切勿在資產 URI 或額外部索引鍵值中儲存任何敏感值,特別是憑證!
建立一個任務來觸發資產事件¶
一旦資產定義好,就可以透過指定 outlets 來建立任務以針對其觸發事件
from airflow.sdk import DAG, Asset
from airflow.providers.standard.operators.python import PythonOperator
example_asset = Asset(name="example_asset", uri="s3://asset-bucket/example.csv")
def _write_example_asset():
"""Write data to example_asset..."""
with DAG(dag_id="example_asset", schedule="@daily"):
PythonOperator(task_id="example_asset", outlets=[example_asset], python_callable=_write_example_asset)
這相當多樣板程式碼。Airflow 為這種簡單但最常見的情況提供了一種簡寫方式:建立一個僅含一個任務並觸發單個資產事件的 DAG。下面的程式碼塊與上面的程式碼塊完全等價
from airflow.sdk import asset
@asset(uri="s3://asset-bucket/example.csv", schedule="@daily")
def example_asset():
"""Write data to example_asset..."""
宣告一個 @asset 會自動建立
一個
Asset,其 name 設定為函式名。一個
DAG,其 dag_id 設定為函式名。DAG中的一個任務,其 task_id 設定為函式名,並且 outlet 指向建立的Asset。
為觸發中的資產事件附加額外資訊¶
版本 2.10.0 新增。
帶有資產出口的任務可以在觸發資產事件之前可選地附加額外資訊。這與關於資產的額外資訊不同。資產的額外資訊靜態描述了資產 URI 指向的實體;而資產事件的額外資訊則應用於標註觸發的資料變更,例如更新改變了資料庫中的多少行,或者覆蓋的日期範圍。
將額外資訊附加到資產事件的最簡單方法是從任務中 yield 一個 Metadata 物件
from airflow.sdk import Metadata, asset
@asset(uri="s3://asset/example.csv", schedule=None)
def example_s3(self): # 'self' here refers to the current asset.
df = ... # Get a Pandas DataFrame to write.
# Write df to asset...
yield Metadata(self, {"row_count": len(df)})
Airflow 會自動收集所有 yield 的元資料,並用相應的元資料物件的額外資訊填充資產事件。
這也可以在經典運算子中完成。最好的方法是繼承運算子並覆蓋 execute 方法。或者,也可以在任務的 pre_execute 或 post_execute 鉤子中新增額外資訊。然而,如果您選擇使用鉤子,請記住任務重試時它們不會重新執行,這可能導致在某些情況下額外資訊與實際資料不匹配。
實現同樣效果的另一種方法是直接訪問任務執行上下文中的 outlet_events
@asset(schedule=None)
def write_to_s3(self, context):
context["outlet_events"][self].extra = {"row_count": len(df)}
這裡沒什麼魔法——Airflow 只是將 yield 的值寫入完全相同的訪問器。這在經典運算子中也適用,包括 execute、pre_execute 和 post_execute。
從之前觸發的資產事件中獲取資訊¶
版本 2.10.0 新增。
在任務的 outlets 中定義的資產事件,如前一節所述,可以被在其 inlets 中宣告相同資產的任務讀取。資產事件條目包含 extra(詳情見前一節)、指示事件從任務中觸發的時間的 timestamp,以及將事件連結回其源的 source_task_instance。
可以使用執行上下文中的 inlet_events 訪問器讀取入口資產事件。接前一節的 write_to_s3 資產
@asset(schedule=None)
def post_process_s3_file(context, write_to_s3): # Declaring an inlet to write_to_s3.
events = context["inlet_events"][write_to_s3]
last_row_count = events[-1].extra["row_count"]
inlet_events 對映中的每個值都是一個序列狀物件,它按 timestamp 對給定資產的過去事件進行排序,從最早到最新。它支援 Python 列表的大部分介面,因此您可以使用 [-1] 訪問最後一個事件,使用 [-2:] 訪問最後兩個事件等。訪問器是惰性的,只有當您訪問其中的專案時才會訪問資料庫。
@asset、@task 和經典運算子之間的依賴關係¶
由於 @asset 只是一個包含任務和資產的 DAG 的簡單包裝器,因此在 @task 或經典運算子中讀取和 @asset 非常容易。例如,上面的 post_process_s3_file 也可以寫成一個任務(在一個 DAG 中,這裡為簡潔起見省略了)
@task(inlets=[write_to_s3])
def post_process_s3_file(*, inlet_events):
events = inlet_events[example_s3_asset]
last_row_count = events[-1].extra["row_count"]
post_process_s3_file()
反之亦然
example_asset = Asset("example_asset")
@task(outlets=[example_asset])
def emit_example_asset():
"""Write to example_asset..."""
@asset(schedule=None)
def process_example_asset(example_asset):
"""Process inlet example_asset..."""
在一個任務中輸出到多個資產¶
一個任務可以為多個資產觸發事件。這通常不建議,但在某些情況下需要,例如當您需要將一個數據源拆分成多個時。這對於任務來說很簡單,因為 outlets 在設計上就是複數形式
from airflow.sdk import DAG, Asset, task
input_asset = Asset("input_asset")
out_asset_1 = Asset("out_asset_1")
out_asset_2 = Asset("out_asset_2")
with DAG(dag_id="process_input", schedule=None):
@task(inlets=[input_asset], outlets=[out_asset_1, out_asset_2])
def process_input():
"""Split input into two."""
其簡寫形式是 @asset.multi
from airflow.sdk import Asset, asset
input_asset = Asset("input_asset")
out_asset_1 = Asset("out_asset_1")
out_asset_2 = Asset("out_asset_2")
@asset.multi(schedule=None, outlets=[out_asset_1, out_asset_2])
def process_input(input_asset):
"""Split input into two."""
透過 AssetAlias 觸發動態資料事件和建立資產¶
資產別名可用於觸發與別名相關聯的資產事件。下游可以依賴於解析的資產。此功能允許您基於資產更新定義 DAG 執行的複雜依賴關係。
如何使用 AssetAlias¶
AssetAlias 只有一個引數 name,它唯一標識資產。任務必須首先將別名宣告為出口,然後使用 outlet_events 或 yield Metadata 向其新增事件。
以下示例針對 S3 URI f"s3://bucket/my-task" 建立了一個資產事件,並附帶可選的額外資訊 extra。如果資產不存在,Airflow 將動態建立它並記錄警告訊息。
透過 outlet_events 在任務執行期間觸發資產事件
from airflow.sdk.definitions.asset import AssetAlias
@task(outlets=[AssetAlias("my-task-outputs")])
def my_task_with_outlet_events(*, outlet_events):
outlet_events[AssetAlias("my-task-outputs")].add(Asset("s3://bucket/my-task"), extra={"k": "v"})
透過 yielding Metadata 在任務執行期間觸發資產事件
from airflow.sdk.definitions.asset.metadata import Metadata
@task(outlets=[AssetAlias("my-task-outputs")])
def my_task_with_metadata():
s3_asset = Asset(uri="s3://bucket/my-task", name="example_s3")
yield Metadata(s3_asset, extra={"k": "v"}, alias="my-task-outputs")
新增的資產只會觸發一個資產事件,即使它被多次新增到別名中,或新增到多個別名中。但是,如果傳遞了不同的 extra 值,它可以觸發多個資產事件。在下面的示例中,將觸發兩個資產事件。
from airflow.sdk.definitions.asset import AssetAlias
@task(
outlets=[
AssetAlias("my-task-outputs-1"),
AssetAlias("my-task-outputs-2"),
AssetAlias("my-task-outputs-3"),
]
)
def my_task_with_outlet_events(*, outlet_events):
outlet_events[AssetAlias("my-task-outputs-1")].add(Asset("s3://bucket/my-task"), extra={"k": "v"})
# This line won't emit an additional asset event as the asset and extra are the same as the previous line.
outlet_events[AssetAlias("my-task-outputs-2")].add(Asset("s3://bucket/my-task"), extra={"k": "v"})
# This line will emit an additional asset event as the extra is different.
outlet_events[AssetAlias("my-task-outputs-3")].add(Asset("s3://bucket/my-task"), extra={"k2": "v2"})
透過解析的資產別名獲取之前觸發的資產事件資訊¶
如從之前觸發的資產事件中獲取資訊中所述,可以使用執行上下文中的 inlet_events 訪問器讀取入口資產事件,您也可以使用資產別名訪問由它們觸發的資產事件。
with DAG(dag_id="asset-alias-producer"):
@task(outlets=[AssetAlias("example-alias")])
def produce_asset_events(*, outlet_events):
outlet_events[AssetAlias("example-alias")].add(Asset("s3://bucket/my-task"), extra={"row_count": 1})
with DAG(dag_id="asset-alias-consumer", schedule=None):
@task(inlets=[AssetAlias("example-alias")])
def consume_asset_alias_events(*, inlet_events):
events = inlet_events[AssetAlias("example-alias")]
last_row_count = events[-1].extra["row_count"]