資產感知排程¶
新增於版本 2.4。
快速入門¶
除了基於時間排程 DAG 外,您還可以基於任務更新資產時排程 DAG 執行。
from airflow.sdk import DAG, Asset
with DAG(...):
MyOperator(
# this task updates example.csv
outlets=[Asset("s3://asset-bucket/example.csv")],
...,
)
with DAG(
# this DAG should be run when example.csv is updated (by dag1)
schedule=[Asset("s3://asset-bucket/example.csv")],
...,
):
...
另請參閱
關於如何宣告資產,請參閱資產定義。
使用資產排程 DAG¶
您可以使用資產在 DAG 中指定資料依賴關係。以下示例展示了當 producer DAG 中的 producer 任務成功完成後,Airflow 如何排程 consumer DAG。Airflow 僅在任務成功完成後才將資產標記為 updated。如果任務失敗或被跳過,則不會發生更新,Airflow 也不會排程 consumer DAG。
example_asset = Asset("s3://asset/example.csv")
with DAG(dag_id="producer", ...):
BashOperator(task_id="producer", outlets=[example_asset], ...)
with DAG(dag_id="consumer", schedule=[example_asset], ...):
...
您可以在資產檢視中找到資產和 DAG 之間的關係列表。
多個資產¶
因為 schedule 引數是一個列表,所以 DAG 可以要求多個資產。Airflow 會在 DAG 消耗的**所有**資產自上次執行以來至少更新一次後排程該 DAG
with DAG(
dag_id="multiple_assets_example",
schedule=[
example_asset_1,
example_asset_2,
example_asset_3,
],
...,
):
...
如果在所有消耗的資產更新之前,某個資產多次更新,下游 DAG 仍然只會執行一次,如下圖所示
從觸發資產事件中獲取資訊¶
觸發的 DAG 可以使用 triggering_asset_events 模板或引數從觸發它的資產中獲取資訊。更多資訊請參見模板參考。
示例
example_snowflake_asset = Asset("snowflake://my_db/my_schema/my_table")
with DAG(dag_id="load_snowflake_data", schedule="@hourly", ...):
SQLExecuteQueryOperator(
task_id="load", conn_id="snowflake_default", outlets=[example_snowflake_asset], ...
)
with DAG(dag_id="query_snowflake_data", schedule=[example_snowflake_asset], ...):
SQLExecuteQueryOperator(
task_id="query",
conn_id="snowflake_default",
sql="""
SELECT *
FROM my_db.my_schema.my_table
WHERE "updated_at" >= '{{ (triggering_asset_events.values() | first | first).source_dag_run.data_interval_start }}'
AND "updated_at" < '{{ (triggering_asset_events.values() | first | first).source_dag_run.data_interval_end }}';
""",
)
@task
def print_triggering_asset_events(triggering_asset_events=None):
for asset, asset_list in triggering_asset_events.items():
print(asset, asset_list)
print(asset_list[0].source_dag_run.dag_id)
print_triggering_asset_events()
請注意,此示例使用 (.values() | first | first) 來獲取提供給 DAG 的資產中的第一個 AssetEvent。如果您有多個資產,並且可能伴隨多個 AssetEvent,則實現可能會相當複雜。
透過 REST API 操作排隊的資產事件¶
新增於版本 2.9。
在此示例中,當任務同時更新資產“asset-1”和“asset-2”時,DAG waiting_for_asset_1_and_2 將被觸發。一旦“asset-1”更新,Airflow 會建立一個記錄。這確保了當“asset-2”更新時,Airflow 知道觸發該 DAG。我們稱這些記錄為排隊的資產事件。
with DAG(
dag_id="waiting_for_asset_1_and_2",
schedule=[Asset("asset-1"), Asset("asset-2")],
...,
):
...
引入了 queuedEvent API 端點來操作這些記錄。
獲取 DAG 的排隊資產事件:
/assets/queuedEvent/{uri}獲取 DAG 的排隊資產事件列表:
/dags/{dag_id}/assets/queuedEvent刪除 DAG 的排隊資產事件:
/assets/queuedEvent/{uri}刪除 DAG 的排隊資產事件列表:
/dags/{dag_id}/assets/queuedEvent獲取資產的排隊資產事件:
/dags/{dag_id}/assets/queuedEvent/{uri}刪除資產的排隊資產事件:
DELETE /dags/{dag_id}/assets/queuedEvent/{uri}
關於如何使用 REST API 以及這些端點所需的引數,請參閱Airflow API。
使用條件表示式進行高階資產排程¶
Apache Airflow 包含高階排程功能,可與資產一起使用條件表示式。此功能允許您基於資產更新為 DAG 執行定義複雜的依賴關係,使用邏輯運算子可以更好地控制工作流觸發器。
資產的邏輯運算子¶
Airflow 支援兩種邏輯運算子來組合資產條件:
AND (``&``): 指定僅在所有指定的資產都已更新後才應觸發 DAG。
OR (``|``): 指定當任何指定的資產更新時,就應觸發 DAG。
這些運算子使您能夠配置 Airflow 工作流使用更復雜的資產更新條件,使其更具動態性和靈活性。
使用示例¶
基於多個資產更新進行排程
要僅在兩個特定資產都已更新時排程 DAG 執行,請使用 AND 運算子(&)
dag1_asset = Asset("s3://dag1/output_1.txt")
dag2_asset = Asset("s3://dag2/output_1.txt")
with DAG(
# Consume asset 1 and 2 with asset expressions
schedule=(dag1_asset & dag2_asset),
...,
):
...
基於任何資產更新進行排程
要在兩個資產中的任意一個更新時觸發 DAG 執行,請應用 OR 運算子(|)
with DAG(
# Consume asset 1 or 2 with asset expressions
schedule=(dag1_asset | dag2_asset),
...,
):
...
複雜條件邏輯
對於需要更復雜條件的情況,例如當一個資產更新時或當另外兩個資產都更新時觸發 DAG,請結合使用 OR 和 AND 運算子
dag3_asset = Asset("s3://dag3/output_3.txt")
with DAG(
# Consume asset 1 or both 2 and 3 with asset expressions
schedule=(dag1_asset | (dag2_asset & dag3_asset)),
...,
):
...
基於資產別名進行排程¶
由於新增到別名的資產事件只是簡單的資產事件,依賴實際資產的下游 DAG 可以正常讀取其資產事件,無需考慮相關的別名。下游 DAG 也可以依賴資產別名。編寫語法是按名稱引用 AssetAlias,並使用相關的資產事件進行排程。請注意,僅當別名解析為 Asset("s3://bucket/my-task") 時,具有 outlets=AssetAlias("xxx") 的任務才能觸發 DAG。每當具有 outlet AssetAlias("out") 的任務在執行時與至少一個資產關聯時,無論資產的身份如何,DAG 都會執行。如果在特定任務執行中沒有資產與別名關聯,下游 DAG 將不會被觸發。這也意味著我們可以進行條件資產觸發。
資產別名在 DAG 解析期間解析為資產。因此,如果“min_file_process_interval”配置設定為較高值,則資產別名可能無法解析。要解決此問題,您可以觸發 DAG 解析。
with DAG(dag_id="asset-producer"):
@task(outlets=[Asset("example-alias")])
def produce_asset_events():
pass
with DAG(dag_id="asset-alias-producer"):
@task(outlets=[AssetAlias("example-alias")])
def produce_asset_events(*, outlet_events):
outlet_events[AssetAlias("example-alias")].add(Asset("s3://bucket/my-task"))
with DAG(dag_id="asset-consumer", schedule=Asset("s3://bucket/my-task")):
...
with DAG(dag_id="asset-alias-consumer", schedule=AssetAlias("example-alias")):
...
在提供的示例中,一旦 DAG asset-alias-producer 執行完成,資產別名 AssetAlias("example-alias") 將解析為 Asset("s3://bucket/my-task")。然而,DAG asset-alias-consumer 必須等待下一次 DAG 重新解析來更新其排程。為解決此問題,當資產別名 AssetAlias("example-alias") 解析為這些 DAG 之前不依賴的資產時,Airflow 將重新解析依賴此別名的 DAG。結果,在 DAG asset-alias-producer 執行後,“asset-consumer”和“asset-alias-consumer”這兩個 DAG 都將被觸發。
結合資產排程和時間排程¶
AssetTimetable 整合¶
您可以使用 AssetOrTimeSchedule 同時基於資產事件和時間排程來排程 DAG。這使得您可以在 DAG 需要既透過資料更新觸發,又根據固定時間表定期執行時建立工作流。
有關 AssetOrTimeSchedule 的更多詳細資訊,請參閱AssetOrTimeSchedule中的相應章節。