Airflow Summit 2025 將於 10 月 07-09 日召開。立即註冊享早鳥票優惠!

使用 TaskFlow API 編寫更 Pythonic 的 DAG

在第一個教程中,你使用 PythonOperator 等傳統 Operator 構建了第一個 Airflow DAG。現在讓我們看看使用 TaskFlow API(Airflow 2.0 引入)編寫工作流的更現代、更 Pythonic 的方法。

TaskFlow API 旨在讓你的程式碼更簡潔、更清晰、更易於維護。你只需編寫普通的 Python 函式,用裝飾器修飾它們,剩下的交給 Airflow 處理——包括任務建立、依賴連線和任務之間的資料傳遞。

在本教程中,我們將使用 TaskFlow API 建立一個簡單的 ETL(提取 → 轉換 → 載入)流水線。讓我們開始吧!

整體概覽:TaskFlow 流水線

下圖展示了使用 TaskFlow 構建的完整流水線。如果有些地方看起來不熟悉,請不要擔心——我們會一步一步詳細講解。

src/airflow/example_dags/tutorial_taskflow_api.py


import json

import pendulum

from airflow.sdk import dag, task
@dag(
    schedule=None,
    start_date=pendulum.datetime(2021, 1, 1, tz="UTC"),
    catchup=False,
    tags=["example"],
)
def tutorial_taskflow_api():
    """
    ### TaskFlow API Tutorial Documentation
    This is a simple data pipeline example which demonstrates the use of
    the TaskFlow API using three simple tasks for Extract, Transform, and Load.
    Documentation that goes along with the Airflow TaskFlow API tutorial is
    located
    [here](https://airflow.apache.tw/docs/apache-airflow/stable/tutorial_taskflow_api.html)
    """
    @task()
    def extract():
        """
        #### Extract task
        A simple Extract task to get data ready for the rest of the data
        pipeline. In this case, getting data is simulated by reading from a
        hardcoded JSON string.
        """
        data_string = '{"1001": 301.27, "1002": 433.21, "1003": 502.22}'

        order_data_dict = json.loads(data_string)
        return order_data_dict
    @task(multiple_outputs=True)
    def transform(order_data_dict: dict):
        """
        #### Transform task
        A simple Transform task which takes in the collection of order data and
        computes the total order value.
        """
        total_order_value = 0

        for value in order_data_dict.values():
            total_order_value += value

        return {"total_order_value": total_order_value}
    @task()
    def load(total_order_value: float):
        """
        #### Load task
        A simple Load task which takes in the result of the Transform task and
        instead of saving it to end user review, just prints it out.
        """

        print(f"Total order value is: {total_order_value:.2f}")
    order_data = extract()
    order_summary = transform(order_data)
    load(order_summary["total_order_value"])
tutorial_taskflow_api()

步驟 1:定義 DAG

和以前一樣,你的 DAG 是 Airflow 載入和解析的 Python 指令碼。但這次,我們使用 @dag 裝飾器來定義它。

src/airflow/example_dags/tutorial_taskflow_api.py

@dag(
    schedule=None,
    start_date=pendulum.datetime(2021, 1, 1, tz="UTC"),
    catchup=False,
    tags=["example"],
)
def tutorial_taskflow_api():
    """
    ### TaskFlow API Tutorial Documentation
    This is a simple data pipeline example which demonstrates the use of
    the TaskFlow API using three simple tasks for Extract, Transform, and Load.
    Documentation that goes along with the Airflow TaskFlow API tutorial is
    located
    [here](https://airflow.apache.tw/docs/apache-airflow/stable/tutorial_taskflow_api.html)
    """

為了讓 Airflow 能夠發現這個 DAG,我們可以呼叫用 @dag 裝飾的 Python 函式。

src/airflow/example_dags/tutorial_taskflow_api.py

tutorial_taskflow_api()

版本 2.4 新增功能:如果你使用 @dag 裝飾器或在 with 塊中定義 DAG,則不再需要將其賦給全域性變數。Airflow 將自動找到它。

你可以在 Airflow UI 中視覺化你的 DAG!載入 DAG 後,導航到 Graph View 檢視任務之間的連線方式。

步驟 2:使用 @task 編寫任務

使用 TaskFlow,每個任務都只是一個普通的 Python 函式。你可以使用 @task 裝飾器將其轉換為 Airflow 可以排程和執行的任務。以下是 extract 任務

src/airflow/example_dags/tutorial_taskflow_api.py

@task()
def extract():
    """
    #### Extract task
    A simple Extract task to get data ready for the rest of the data
    pipeline. In this case, getting data is simulated by reading from a
    hardcoded JSON string.
    """
    data_string = '{"1001": 301.27, "1002": 433.21, "1003": 502.22}'

    order_data_dict = json.loads(data_string)
    return order_data_dict


函式的返回值會自動傳遞給下一個任務——無需手動使用 XComs。在底層,TaskFlow 使用 XComs 自動管理資料傳遞,抽象了先前方法中手動管理 XCom 的複雜性。你將使用相同的模式定義 transformload 任務。

請注意上面使用了 @task(multiple_outputs=True)——這告訴 Airflow 該函式返回一個字典,其值應該被拆分成單獨的 XCom。返回字典中的每個鍵都成為一個獨立的 XCom 條目,這使得在下游任務中輕鬆引用特定值成為可能。如果你省略 multiple_outputs=True,則整個字典將作為一個單獨的 XCom 儲存,並且必須作為一個整體訪問。

步驟 3:構建流程

定義任務後,你可以像呼叫 Python 函式一樣呼叫它們來構建流水線。Airflow 利用這種函式呼叫來設定任務依賴關係和管理資料傳遞。

src/airflow/example_dags/tutorial_taskflow_api.py

order_data = extract()
order_summary = transform(order_data)
load(order_summary["total_order_value"])

就這麼簡單!Airflow 可以僅憑這段程式碼就知道如何排程和編排你的流水線。

執行你的 DAG

如何啟用和觸發你的 DAG

  1. 導航到 Airflow UI。

  2. 在列表中找到你的 DAG 並點選開關按鈕啟用它。

  3. 你可以透過點選“Trigger DAG”按鈕手動觸發它,或者等待它按計劃執行。

幕後發生了什麼?

如果你使用過 Airflow 1.x,這可能感覺像魔法一樣。讓我們比較一下底層發生的事情。

“舊方法”:手動連線和 XComs

在 TaskFlow API 之前,你必須使用 PythonOperator 等 Operator,並使用 XComs 在任務之間手動傳遞資料。

以下是使用傳統方法時同一個 DAG 的樣子

import json
import pendulum
from airflow.sdk import DAG, PythonOperator


def extract():
    # Old way: simulate extracting data from a JSON string
    data_string = '{"1001": 301.27, "1002": 433.21, "1003": 502.22}'
    return json.loads(data_string)


def transform(ti):
    # Old way: manually pull from XCom
    order_data_dict = ti.xcom_pull(task_ids="extract")
    total_order_value = sum(order_data_dict.values())
    return {"total_order_value": total_order_value}


def load(ti):
    # Old way: manually pull from XCom
    total = ti.xcom_pull(task_ids="transform")["total_order_value"]
    print(f"Total order value is: {total:.2f}")


with DAG(
    dag_id="legacy_etl_pipeline",
    schedule_interval=None,
    start_date=pendulum.datetime(2021, 1, 1, tz="UTC"),
    catchup=False,
    tags=["example"],
) as dag:
    extract_task = PythonOperator(task_id="extract", python_callable=extract)
    transform_task = PythonOperator(task_id="transform", python_callable=transform)
    load_task = PythonOperator(task_id="load", python_callable=load)

    extract_task >> transform_task >> load_task

注意

此版本產生與 TaskFlow API 示例相同的結果,但需要顯式管理 XComs 和任務依賴項。

TaskFlow 方法

使用 TaskFlow,所有這些都自動處理。

src/airflow/example_dags/tutorial_taskflow_api.py


import json

import pendulum

from airflow.sdk import dag, task
@dag(
    schedule=None,
    start_date=pendulum.datetime(2021, 1, 1, tz="UTC"),
    catchup=False,
    tags=["example"],
)
def tutorial_taskflow_api():
    """
    ### TaskFlow API Tutorial Documentation
    This is a simple data pipeline example which demonstrates the use of
    the TaskFlow API using three simple tasks for Extract, Transform, and Load.
    Documentation that goes along with the Airflow TaskFlow API tutorial is
    located
    [here](https://airflow.apache.tw/docs/apache-airflow/stable/tutorial_taskflow_api.html)
    """
    @task()
    def extract():
        """
        #### Extract task
        A simple Extract task to get data ready for the rest of the data
        pipeline. In this case, getting data is simulated by reading from a
        hardcoded JSON string.
        """
        data_string = '{"1001": 301.27, "1002": 433.21, "1003": 502.22}'

        order_data_dict = json.loads(data_string)
        return order_data_dict
    @task(multiple_outputs=True)
    def transform(order_data_dict: dict):
        """
        #### Transform task
        A simple Transform task which takes in the collection of order data and
        computes the total order value.
        """
        total_order_value = 0

        for value in order_data_dict.values():
            total_order_value += value

        return {"total_order_value": total_order_value}
    @task()
    def load(total_order_value: float):
        """
        #### Load task
        A simple Load task which takes in the result of the Transform task and
        instead of saving it to end user review, just prints it out.
        """

        print(f"Total order value is: {total_order_value:.2f}")
    order_data = extract()
    order_summary = transform(order_data)
    load(order_summary["total_order_value"])
tutorial_taskflow_api()


Airflow 仍然使用 XComs 並構建依賴關係圖——它只是被抽象出來,以便你可以專注於業務邏輯。

XComs 工作原理

TaskFlow 返回值自動儲存為 XComs。這些值可以在 UI 的“XCom”選項卡下檢視。對於傳統 Operator,手動使用 xcom_pull() 仍然是可能的。

錯誤處理和重試

你可以使用裝飾器輕鬆配置任務的重試次數。例如,你可以在任務裝飾器中直接設定最大重試次數

@task(retries=3)
def my_task(): ...

這有助於確保瞬時故障不會導致任務失敗。

任務引數化

你可以在多個 DAG 中複用裝飾器任務,並覆蓋 task_idretries 等引數。

start = add_task.override(task_id="start")(1, 2)

你甚至可以從共享模組匯入裝飾器任務。

接下來探索什麼

幹得好!你現在已經使用 TaskFlow API 編寫了第一個流水線。想知道接下來可以探索什麼嗎?

  • 向 DAG 新增一個新任務——也許是過濾或驗證步驟

  • 修改返回值並傳遞多個輸出

  • 使用 .override(task_id="...") 探索重試和覆蓋功能

  • 開啟 Airflow UI 並檢查資料如何在任務之間流動,包括任務日誌和依賴項

另請參閱

高階 TaskFlow 模式

熟悉基礎知識後,可以嘗試以下一些強大的技術。

複用裝飾器任務

你可以在多個 DAG 或 DAG 執行中複用裝飾器任務。這對於可複用的工具函式或共享業務規則等常見邏輯尤其有用。使用 .override() 可以自定義任務元資料,例如 task_idretries

start = add_task.override(task_id="start")(1, 2)

你甚至可以從共享模組匯入裝飾器任務。

處理衝突的依賴項

有時任務需要與 DAG 其餘部分不同的 Python 依賴項——例如,專門的庫或系統級軟體包。TaskFlow 支援多種執行環境來隔離這些依賴項。

動態建立的 Virtualenv

在任務執行時建立臨時的 virtualenv。非常適合實驗性或動態任務,但可能會有冷啟動開銷。

src/airflow/example_dags/example_python_decorator.py

@task.virtualenv(
    task_id="virtualenv_python", requirements=["colorama==0.4.0"], system_site_packages=False
)
def callable_virtualenv():
    """
    Example function that will be performed in a virtual environment.

    Importing at the module level ensures that it will not attempt to import the
    library before it is installed.
    """
    from time import sleep

    from colorama import Back, Fore, Style

    print(Fore.RED + "some red text")
    print(Back.GREEN + "and with a green background")
    print(Style.DIM + "and in dim text")
    print(Style.RESET_ALL)
    for _ in range(4):
        print(Style.DIM + "Please wait...", flush=True)
        sleep(1)
    print("Finished")

virtualenv_task = callable_virtualenv()

外部 Python 環境

使用預安裝的 Python 直譯器執行任務——非常適合一致的環境或共享的 virtualenv。

src/airflow/example_dags/example_python_decorator.py

@task.external_python(task_id="external_python", python=PATH_TO_PYTHON_BINARY)
def callable_external_python():
    """
    Example function that will be performed in a virtual environment.

    Importing at the module level ensures that it will not attempt to import the
    library before it is installed.
    """
    import sys
    from time import sleep

    print(f"Running task via {sys.executable}")
    print("Sleeping")
    for _ in range(4):
        print("Please wait...", flush=True)
        sleep(1)
    print("Finished")

external_python_task = callable_external_python()

Docker 環境

在 Docker 容器中執行你的任務。對於打包任務所需的一切很有用——但需要你的 worker 上有 Docker。

docker/tests/system/docker/example_taskflow_api_docker_virtualenv.py

@task.docker(image="python:3.9-slim-bookworm", multiple_outputs=True)
def transform(order_data_dict: dict):
    """
    #### Transform task
    A simple Transform task which takes in the collection of order data and
    computes the total order value.
    """
    total_order_value = 0

    for value in order_data_dict.values():
        total_order_value += value

    return {"total_order_value": total_order_value}


注意

需要 Airflow 2.2 和 Docker provider。

KubernetesPodOperator

在 Kubernetes Pod 中執行你的任務,與主 Airflow 環境完全隔離。非常適合大型任務或需要自定義執行時的任務。

cncf/kubernetes/tests/system/cncf/kubernetes/example_kubernetes_decorator.py

@task.kubernetes(
    image="python:3.9-slim-buster",
    name="k8s_test",
    namespace="default",
    in_cluster=False,
    config_file="/path/to/.kube/config",
)
def execute_in_k8s_pod():
    import time

    print("Hello from k8s pod")
    time.sleep(2)

@task.kubernetes(image="python:3.9-slim-buster", namespace="default", in_cluster=False)
def print_pattern():
    n = 5
    for i in range(n):
        # inner loop to handle number of columns
        # values changing acc. to outer loop
        for _ in range(i + 1):
            # printing stars
            print("* ", end="")

        # ending line after each row
        print("\r")

execute_in_k8s_pod_instance = execute_in_k8s_pod()
print_pattern_instance = print_pattern()

execute_in_k8s_pod_instance >> print_pattern_instance

注意

需要 Airflow 2.4 和 Kubernetes provider。

使用 Sensor

使用 @task.sensor 使用 Python 函式構建輕量級、可複用的 Sensor。這些 Sensor 支援 poke 和 reschedule 兩種模式。

src/airflow/example_dags/example_sensor_decorator.py


import pendulum

from airflow.sdk import PokeReturnValue, dag, task
@dag(
    schedule=None,
    start_date=pendulum.datetime(2021, 1, 1, tz="UTC"),
    catchup=False,
    tags=["example"],
)
def example_sensor_decorator():
    # Using a sensor operator to wait for the upstream data to be ready.
    @task.sensor(poke_interval=60, timeout=3600, mode="reschedule")
    def wait_for_upstream() -> PokeReturnValue:
        return PokeReturnValue(is_done=True, xcom_value="xcom_value")
    @task
    def dummy_operator() -> None:
        pass
    wait_for_upstream() >> dummy_operator()
tutorial_etl_dag = example_sensor_decorator()

與傳統任務混用

你可以將裝飾器任務與經典的 Operator 結合使用。這在使用社群 provider 或逐步遷移到 TaskFlow 時很有幫助。

你可以使用 >> 鏈式連線 TaskFlow 和傳統任務,或使用 .output 屬性傳遞資料。

TaskFlow 中的模板

就像傳統任務一樣,裝飾的 TaskFlow 函式支援模板化引數——包括從檔案載入內容或使用執行時引數。

執行你的可呼叫物件時,Airflow 會傳遞一組關鍵字引數,這些引數可以在你的函式中使用。這組 kwargs 與你在 Jinja 模板中可以使用的完全對應。為此,你可以將你希望在函式中接收的上下文鍵作為關鍵字引數新增。

例如,下面程式碼塊中的可呼叫物件將獲取 tinext_ds 上下文變數的值

@task
def my_python_callable(*, ti, next_ds):
    pass

你也可以選擇使用 **kwargs 接收整個上下文。請注意,這可能會稍微影響效能,因為 Airflow 需要展開整個上下文,其中可能包含許多你實際上不需要的東西。因此,更建議使用顯式引數,如上一段所示。

@task
def my_python_callable(**kwargs):
    ti = kwargs["ti"]
    next_ds = kwargs["next_ds"]

另外,有時你可能希望在呼叫棧深處訪問上下文,但又不希望從任務可呼叫物件中傳遞上下文變數。你仍然可以透過 get_current_context 方法訪問執行上下文。

from airflow.sdk import get_current_context


def some_function_in_your_library():
    context = get_current_context()
    ti = context["ti"]

傳遞給裝飾函式的引數會自動模板化。你也可以使用 templates_exts 對檔案進行模板化處理。

@task(templates_exts=[".sql"])
def read_sql(sql): ...

條件執行

使用 @task.run_if()@task.skip_if() 根據執行時動態條件控制任務是否執行——而無需改變你的 DAG 結構。

@task.run_if(lambda ctx: ctx["task_instance"].task_id == "run")
@task.bash()
def echo():
    return "echo 'run'"

下一步

現在你已經瞭解瞭如何使用 TaskFlow API 構建簡潔、易於維護的 DAG,以下是一些不錯的下一步建議。

這篇文件有用嗎?