Airflow Summit 2025 將於 10 月 07-09 日舉行。立即註冊獲取早鳥票!

資產感知排程

新增於版本 2.4。

快速入門

除了基於時間排程 DAG 外,您還可以基於任務更新資產時排程 DAG 執行。

from airflow.sdk import DAG, Asset

with DAG(...):
    MyOperator(
        # this task updates example.csv
        outlets=[Asset("s3://asset-bucket/example.csv")],
        ...,
    )


with DAG(
    # this DAG should be run when example.csv is updated (by dag1)
    schedule=[Asset("s3://asset-bucket/example.csv")],
    ...,
):
    ...
../_images/asset_scheduled_dags.png

另請參閱

關於如何宣告資產,請參閱資產定義

使用資產排程 DAG

您可以使用資產在 DAG 中指定資料依賴關係。以下示例展示了當 producer DAG 中的 producer 任務成功完成後,Airflow 如何排程 consumer DAG。Airflow 僅在任務成功完成後才將資產標記為 updated。如果任務失敗或被跳過,則不會發生更新,Airflow 也不會排程 consumer DAG。

example_asset = Asset("s3://asset/example.csv")

with DAG(dag_id="producer", ...):
    BashOperator(task_id="producer", outlets=[example_asset], ...)

with DAG(dag_id="consumer", schedule=[example_asset], ...):
    ...

您可以在資產檢視中找到資產和 DAG 之間的關係列表。

多個資產

因為 schedule 引數是一個列表,所以 DAG 可以要求多個資產。Airflow 會在 DAG 消耗的**所有**資產自上次執行以來至少更新一次後排程該 DAG

with DAG(
    dag_id="multiple_assets_example",
    schedule=[
        example_asset_1,
        example_asset_2,
        example_asset_3,
    ],
    ...,
):
    ...

如果在所有消耗的資產更新之前,某個資產多次更新,下游 DAG 仍然只會執行一次,如下圖所示

graph asset_event_timeline { graph [layout=neato] { node [margin=0 fontcolor=blue width=0.1 shape=point label=""] e1 [pos="1,2.5!"] e2 [pos="2,2.5!"] e3 [pos="2.5,2!"] e4 [pos="4,2.5!"] e5 [pos="5,2!"] e6 [pos="6,2.5!"] e7 [pos="7,1.5!"] r7 [pos="7,1!" shape=star width=0.25 height=0.25 fixedsize=shape] e8 [pos="8,2!"] e9 [pos="9,1.5!"] e10 [pos="10,2!"] e11 [pos="11,1.5!"] e12 [pos="12,2!"] e13 [pos="13,2.5!"] r13 [pos="13,1!" shape=star width=0.25 height=0.25 fixedsize=shape] } { node [shape=none label="" width=0] end_ds1 [pos="14,2.5!"] end_ds2 [pos="14,2!"] end_ds3 [pos="14,1.5!"] } { node [shape=none margin=0.25 fontname="roboto,sans-serif"] example_asset_1 [ pos="-0.5,2.5!"] example_asset_2 [ pos="-0.5,2!"] example_asset_3 [ pos="-0.5,1.5!"] dag_runs [label="DagRuns created" pos="-0.5,1!"] } edge [color=lightgrey] example_asset_1 -- e1 -- e2 -- e4 -- e6 -- e13 -- end_ds1 example_asset_2 -- e3 -- e5 -- e8 -- e10 -- e12 -- end_ds2 example_asset_3 -- e7 -- e9 -- e11 -- end_ds3 }

從觸發資產事件中獲取資訊

觸發的 DAG 可以使用 triggering_asset_events 模板或引數從觸發它的資產中獲取資訊。更多資訊請參見模板參考

示例

example_snowflake_asset = Asset("snowflake://my_db/my_schema/my_table")

with DAG(dag_id="load_snowflake_data", schedule="@hourly", ...):
    SQLExecuteQueryOperator(
        task_id="load", conn_id="snowflake_default", outlets=[example_snowflake_asset], ...
    )

with DAG(dag_id="query_snowflake_data", schedule=[example_snowflake_asset], ...):
    SQLExecuteQueryOperator(
        task_id="query",
        conn_id="snowflake_default",
        sql="""
          SELECT *
          FROM my_db.my_schema.my_table
          WHERE "updated_at" >= '{{ (triggering_asset_events.values() | first | first).source_dag_run.data_interval_start }}'
          AND "updated_at" < '{{ (triggering_asset_events.values() | first | first).source_dag_run.data_interval_end }}';
        """,
    )

    @task
    def print_triggering_asset_events(triggering_asset_events=None):
        for asset, asset_list in triggering_asset_events.items():
            print(asset, asset_list)
            print(asset_list[0].source_dag_run.dag_id)

    print_triggering_asset_events()

請注意,此示例使用 (.values() | first | first) 來獲取提供給 DAG 的資產中的第一個 AssetEvent。如果您有多個資產,並且可能伴隨多個 AssetEvent,則實現可能會相當複雜。

透過 REST API 操作排隊的資產事件

新增於版本 2.9。

在此示例中,當任務同時更新資產“asset-1”和“asset-2”時,DAG waiting_for_asset_1_and_2 將被觸發。一旦“asset-1”更新,Airflow 會建立一個記錄。這確保了當“asset-2”更新時,Airflow 知道觸發該 DAG。我們稱這些記錄為排隊的資產事件。

with DAG(
    dag_id="waiting_for_asset_1_and_2",
    schedule=[Asset("asset-1"), Asset("asset-2")],
    ...,
):
    ...

引入了 queuedEvent API 端點來操作這些記錄。

  • 獲取 DAG 的排隊資產事件: /assets/queuedEvent/{uri}

  • 獲取 DAG 的排隊資產事件列表: /dags/{dag_id}/assets/queuedEvent

  • 刪除 DAG 的排隊資產事件: /assets/queuedEvent/{uri}

  • 刪除 DAG 的排隊資產事件列表: /dags/{dag_id}/assets/queuedEvent

  • 獲取資產的排隊資產事件: /dags/{dag_id}/assets/queuedEvent/{uri}

  • 刪除資產的排隊資產事件: DELETE /dags/{dag_id}/assets/queuedEvent/{uri}

關於如何使用 REST API 以及這些端點所需的引數,請參閱Airflow API

使用條件表示式進行高階資產排程

Apache Airflow 包含高階排程功能,可與資產一起使用條件表示式。此功能允許您基於資產更新為 DAG 執行定義複雜的依賴關係,使用邏輯運算子可以更好地控制工作流觸發器。

資產的邏輯運算子

Airflow 支援兩種邏輯運算子來組合資產條件:

  • AND (``&``): 指定僅在所有指定的資產都已更新後才應觸發 DAG。

  • OR (``|``): 指定當任何指定的資產更新時,就應觸發 DAG。

這些運算子使您能夠配置 Airflow 工作流使用更復雜的資產更新條件,使其更具動態性和靈活性。

使用示例

基於多個資產更新進行排程

要僅在兩個特定資產都已更新時排程 DAG 執行,請使用 AND 運算子(&

dag1_asset = Asset("s3://dag1/output_1.txt")
dag2_asset = Asset("s3://dag2/output_1.txt")

with DAG(
    # Consume asset 1 and 2 with asset expressions
    schedule=(dag1_asset & dag2_asset),
    ...,
):
    ...

基於任何資產更新進行排程

要在兩個資產中的任意一個更新時觸發 DAG 執行,請應用 OR 運算子(|

with DAG(
    # Consume asset 1 or 2 with asset expressions
    schedule=(dag1_asset | dag2_asset),
    ...,
):
    ...

複雜條件邏輯

對於需要更復雜條件的情況,例如當一個資產更新時或當另外兩個資產都更新時觸發 DAG,請結合使用 OR 和 AND 運算子

dag3_asset = Asset("s3://dag3/output_3.txt")

with DAG(
    # Consume asset 1 or both 2 and 3 with asset expressions
    schedule=(dag1_asset | (dag2_asset & dag3_asset)),
    ...,
):
    ...

基於資產別名進行排程

由於新增到別名的資產事件只是簡單的資產事件,依賴實際資產的下游 DAG 可以正常讀取其資產事件,無需考慮相關的別名。下游 DAG 也可以依賴資產別名。編寫語法是按名稱引用 AssetAlias,並使用相關的資產事件進行排程。請注意,僅當別名解析為 Asset("s3://bucket/my-task") 時,具有 outlets=AssetAlias("xxx") 的任務才能觸發 DAG。每當具有 outlet AssetAlias("out") 的任務在執行時與至少一個資產關聯時,無論資產的身份如何,DAG 都會執行。如果在特定任務執行中沒有資產與別名關聯,下游 DAG 將不會被觸發。這也意味著我們可以進行條件資產觸發。

資產別名在 DAG 解析期間解析為資產。因此,如果“min_file_process_interval”配置設定為較高值,則資產別名可能無法解析。要解決此問題,您可以觸發 DAG 解析。

with DAG(dag_id="asset-producer"):

    @task(outlets=[Asset("example-alias")])
    def produce_asset_events():
        pass


with DAG(dag_id="asset-alias-producer"):

    @task(outlets=[AssetAlias("example-alias")])
    def produce_asset_events(*, outlet_events):
        outlet_events[AssetAlias("example-alias")].add(Asset("s3://bucket/my-task"))


with DAG(dag_id="asset-consumer", schedule=Asset("s3://bucket/my-task")):
    ...

with DAG(dag_id="asset-alias-consumer", schedule=AssetAlias("example-alias")):
    ...

在提供的示例中,一旦 DAG asset-alias-producer 執行完成,資產別名 AssetAlias("example-alias") 將解析為 Asset("s3://bucket/my-task")。然而,DAG asset-alias-consumer 必須等待下一次 DAG 重新解析來更新其排程。為解決此問題,當資產別名 AssetAlias("example-alias") 解析為這些 DAG 之前不依賴的資產時,Airflow 將重新解析依賴此別名的 DAG。結果,在 DAG asset-alias-producer 執行後,“asset-consumer”和“asset-alias-consumer”這兩個 DAG 都將被觸發。

結合資產排程和時間排程

AssetTimetable 整合

您可以使用 AssetOrTimeSchedule 同時基於資產事件和時間排程來排程 DAG。這使得您可以在 DAG 需要既透過資料更新觸發,又根據固定時間表定期執行時建立工作流。

有關 AssetOrTimeSchedule 的更多詳細資訊,請參閱AssetOrTimeSchedule中的相應章節。

此條目是否有幫助?