Airflow Summit 2025 將於 10 月 07-09 日舉行。立即註冊以獲取早鳥票!

Airflow 101:構建你的第一個工作流

歡迎來到 Apache Airflow 的世界!在本教程中,我們將引導您瞭解 Airflow 的基本概念,幫助您理解如何編寫您的第一個 DAG。無論您是否熟悉 Python,或者剛剛入門,我們都會讓這段旅程變得愉快而簡單。

什麼是 DAG?

其核心是,DAG 是按反映任務關係和依賴關係的方式組織的任務集合。它就像您工作流的路線圖,展示了每個任務如何相互連線。如果這聽起來有點複雜,請不要擔心;我們將逐步分解它。

流水線定義示例

讓我們從一個簡單的流水線定義示例開始。儘管一開始可能看起來令人不知所措,但我們將詳細解釋每一行。

src/airflow/example_dags/tutorial.py


import textwrap
from datetime import datetime, timedelta

# Operators; we need this to operate!
from airflow.providers.standard.operators.bash import BashOperator

# The DAG object; we'll need this to instantiate a DAG
from airflow.sdk import DAG
with DAG(
    "tutorial",
    # These args will get passed on to each operator
    # You can override them on a per-task basis during operator initialization
    default_args={
        "depends_on_past": False,
        "retries": 1,
        "retry_delay": timedelta(minutes=5),
        # 'queue': 'bash_queue',
        # 'pool': 'backfill',
        # 'priority_weight': 10,
        # 'end_date': datetime(2016, 1, 1),
        # 'wait_for_downstream': False,
        # 'execution_timeout': timedelta(seconds=300),
        # 'on_failure_callback': some_function, # or list of functions
        # 'on_success_callback': some_other_function, # or list of functions
        # 'on_retry_callback': another_function, # or list of functions
        # 'sla_miss_callback': yet_another_function, # or list of functions
        # 'on_skipped_callback': another_function, #or list of functions
        # 'trigger_rule': 'all_success'
    },
    description="A simple tutorial DAG",
    schedule=timedelta(days=1),
    start_date=datetime(2021, 1, 1),
    catchup=False,
    tags=["example"],
) as dag:

    # t1, t2 and t3 are examples of tasks created by instantiating operators
    t1 = BashOperator(
        task_id="print_date",
        bash_command="date",
    )

    t2 = BashOperator(
        task_id="sleep",
        depends_on_past=False,
        bash_command="sleep 5",
        retries=3,
    )
    t1.doc_md = textwrap.dedent(
        """\
    #### Task Documentation
    You can document your task using the attributes `doc_md` (markdown),
    `doc` (plain text), `doc_rst`, `doc_json`, `doc_yaml` which gets
    rendered in the UI's Task Instance Details page.
    ![img](https://imgs.xkcd.com/comics/fixing_problems.png)
    **Image Credit:** Randall Munroe, [XKCD](https://xkcd.com/license.html)
    """
    )

    dag.doc_md = __doc__  # providing that you have a docstring at the beginning of the DAG; OR
    dag.doc_md = """
    This is a documentation placed anywhere
    """  # otherwise, type it like this
    templated_command = textwrap.dedent(
        """
    {% for i in range(5) %}
        echo "{{ ds }}"
        echo "{{ macros.ds_add(ds, 7)}}"
    {% endfor %}
    """
    )

    t3 = BashOperator(
        task_id="templated",
        depends_on_past=False,
        bash_command=templated_command,
    )

    t1 >> [t2, t3]

理解 DAG 定義檔案

將 Airflow Python 指令碼視為一個配置檔案,它用程式碼定義了 DAG 的結構。您在此處定義的實際任務在不同的環境中執行,這意味著此指令碼並非用於資料處理。它的主要作用是定義 DAG 物件,並且需要快速評估,因為 DAG 檔案處理器會定期檢查它是否有任何更改。

匯入模組

要開始使用,我們需要匯入必要的庫。這是任何 Python 指令碼中典型的第一步。

src/airflow/example_dags/tutorial.py

import textwrap
from datetime import datetime, timedelta

# Operators; we need this to operate!
from airflow.providers.standard.operators.bash import BashOperator

# The DAG object; we'll need this to instantiate a DAG
from airflow.sdk import DAG


有關 Python 和 Airflow 如何處理模組的更多詳細資訊,請檢視模組管理

設定預設引數

建立 DAG 及其任務時,您可以直接將引數傳遞給每個任務,也可以在字典中定義一組預設引數。後一種方法通常更高效、更簡潔。

src/airflow/example_dags/tutorial.py

# These args will get passed on to each operator
# You can override them on a per-task basis during operator initialization
default_args={
    "depends_on_past": False,
    "retries": 1,
    "retry_delay": timedelta(minutes=5),
    # 'queue': 'bash_queue',
    # 'pool': 'backfill',
    # 'priority_weight': 10,
    # 'end_date': datetime(2016, 1, 1),
    # 'wait_for_downstream': False,
    # 'execution_timeout': timedelta(seconds=300),
    # 'on_failure_callback': some_function, # or list of functions
    # 'on_success_callback': some_other_function, # or list of functions
    # 'on_retry_callback': another_function, # or list of functions
    # 'sla_miss_callback': yet_another_function, # or list of functions
    # 'on_skipped_callback': another_function, #or list of functions
    # 'trigger_rule': 'all_success'
},

如果您想深入瞭解 BaseOperator 的引數,請檢視 airflow.sdk.BaseOperator 文件。

建立 DAG

接下來,我們需要建立一個 DAG 物件來容納我們的任務。我們將為 DAG 提供一個唯一的識別符號,稱為 dag_id,並指定我們剛剛定義的預設引數。我們還將為我們的 DAG 設定每天執行的排程。

src/airflow/example_dags/tutorial.py

with DAG(
    "tutorial",
    # These args will get passed on to each operator
    # You can override them on a per-task basis during operator initialization
    default_args={
        "depends_on_past": False,
        "retries": 1,
        "retry_delay": timedelta(minutes=5),
        # 'queue': 'bash_queue',
        # 'pool': 'backfill',
        # 'priority_weight': 10,
        # 'end_date': datetime(2016, 1, 1),
        # 'wait_for_downstream': False,
        # 'execution_timeout': timedelta(seconds=300),
        # 'on_failure_callback': some_function, # or list of functions
        # 'on_success_callback': some_other_function, # or list of functions
        # 'on_retry_callback': another_function, # or list of functions
        # 'sla_miss_callback': yet_another_function, # or list of functions
        # 'on_skipped_callback': another_function, #or list of functions
        # 'trigger_rule': 'all_success'
    },
    description="A simple tutorial DAG",
    schedule=timedelta(days=1),
    start_date=datetime(2021, 1, 1),
    catchup=False,
    tags=["example"],
) as dag:

理解 Operator

Operator 在 Airflow 中代表一個工作單元。它們是工作流的構建塊,允許您定義將執行哪些任務。雖然我們可以將 Operator 用於許多工,但 Airflow 也提供了Taskflow API,這是一種更 Python 式的方式來定義工作流,我們稍後將對此進行介紹。

所有 Operator 都派生自 BaseOperator,它包含在 Airflow 中執行任務所需的基本引數。一些流行的 Operator 包括 PythonOperatorBashOperatorKubernetesPodOperator。在本教程中,我們將重點介紹 BashOperator 來執行一些簡單的 bash 命令。

定義任務

要使用 Operator,必須將其例項化為一個任務。任務規定了 Operator 將如何在 DAG 的上下文中執行其工作。在下面的示例中,我們兩次例項化了 BashOperator 來執行兩個不同的 bash 指令碼。task_id 作為每個任務的唯一識別符號。

src/airflow/example_dags/tutorial.py

t1 = BashOperator(
    task_id="print_date",
    bash_command="date",
)

t2 = BashOperator(
    task_id="sleep",
    depends_on_past=False,
    bash_command="sleep 5",
    retries=3,
)

請注意我們如何將特定於 Operator 的引數(如 bash_command)與從 BaseOperator 繼承的通用引數(如 retries)混合使用。這種方法簡化了我們的程式碼。在第二個任務中,我們甚至覆蓋了 retries 引數,將其設定為 3

任務引數的優先順序如下

  1. 顯式傳遞的引數

  2. default_args 字典中的值

  3. Operator 的預設值(如果可用)


注意

請記住,每個任務都必須包含或繼承引數 task_idowner。否則,Airflow 將丟擲錯誤。幸運的是,全新的 Airflow 安裝將 owner 預設設定為 airflow,因此您主要需要確保設定了 task_id

使用 Jinja 進行模板化

Airflow 利用了 Jinja 模板化 的強大功能,讓您可以訪問內建引數和宏來增強您的工作流。本節將向您介紹 Airflow 中模板化的基礎知識,重點關注常用的模板變數:{{ ds }},它代表今天的日期戳。

src/airflow/example_dags/tutorial.py

templated_command = textwrap.dedent(
    """
{% for i in range(5) %}
    echo "{{ ds }}"
    echo "{{ macros.ds_add(ds, 7)}}"
{% endfor %}
"""
)

t3 = BashOperator(
    task_id="templated",
    depends_on_past=False,
    bash_command=templated_command,
)

您會注意到 templated_command 包含 {% %} 塊中的邏輯,並引用 {{ ds }} 等引數。您還可以將檔案傳遞給 bash_command,例如 bash_command='templated_command.sh',以便更好地組織程式碼。您甚至可以定義 user_defined_macrosuser_defined_filters 來建立自己的變數和過濾器用於模板。有關自定義過濾器的更多資訊,請參閱 Jinja 文件

有關可在模板中引用的變數和宏的更多資訊,請閱讀模板參考

新增 DAG 和任務文件

您可以為您的 DAG 或單個任務新增文件。雖然 DAG 文件目前支援 markdown,但任務文件可以是純文字、markdown、reStructuredText、JSON 或 YAML。一個好的做法是在 DAG 檔案的開頭包含文件。

src/airflow/example_dags/tutorial.py

t1.doc_md = textwrap.dedent(
    """\
#### Task Documentation
You can document your task using the attributes `doc_md` (markdown),
`doc` (plain text), `doc_rst`, `doc_json`, `doc_yaml` which gets
rendered in the UI's Task Instance Details page.
![img](https://imgs.xkcd.com/comics/fixing_problems.png)
**Image Credit:** Randall Munroe, [XKCD](https://xkcd.com/license.html)
"""
)

dag.doc_md = __doc__  # providing that you have a docstring at the beginning of the DAG; OR
dag.doc_md = """
This is a documentation placed anywhere
"""  # otherwise, type it like this

../_images/task_doc.png

../_images/dag_doc.png

設定依賴關係

在 Airflow 中,任務可以相互依賴。例如,如果您有任務 t1t2t3,您可以通過幾種方式定義它們的依賴關係

t1.set_downstream(t2)

# This means that t2 will depend on t1
# running successfully to run.
# It is equivalent to:
t2.set_upstream(t1)

# The bit shift operator can also be
# used to chain operations:
t1 >> t2

# And the upstream dependency with the
# bit shift operator:
t2 << t1

# Chaining multiple dependencies becomes
# concise with the bit shift operator:
t1 >> t2 >> t3

# A list of tasks can also be set as
# dependencies. These operations
# all have the same effect:
t1.set_downstream([t2, t3])
t1 >> [t2, t3]
[t2, t3] << t1

請注意,如果 Airflow 檢測到 DAG 中的迴圈或某個依賴關係被多次引用,它將丟擲錯誤。

處理時區

建立感知時區的 DAG 很簡單。只需確保使用 pendulum 處理感知時區日期即可。避免使用標準庫中的 timezone,因為它們存在已知限制。

回顧

恭喜!現在,您應該對如何在 Airflow 中建立 DAG、定義任務及其依賴關係以及使用模板有了基本的瞭解。您的程式碼應該類似於以下內容

src/airflow/example_dags/tutorial.py


import textwrap
from datetime import datetime, timedelta

# Operators; we need this to operate!
from airflow.providers.standard.operators.bash import BashOperator

# The DAG object; we'll need this to instantiate a DAG
from airflow.sdk import DAG
with DAG(
    "tutorial",
    # These args will get passed on to each operator
    # You can override them on a per-task basis during operator initialization
    default_args={
        "depends_on_past": False,
        "retries": 1,
        "retry_delay": timedelta(minutes=5),
        # 'queue': 'bash_queue',
        # 'pool': 'backfill',
        # 'priority_weight': 10,
        # 'end_date': datetime(2016, 1, 1),
        # 'wait_for_downstream': False,
        # 'execution_timeout': timedelta(seconds=300),
        # 'on_failure_callback': some_function, # or list of functions
        # 'on_success_callback': some_other_function, # or list of functions
        # 'on_retry_callback': another_function, # or list of functions
        # 'sla_miss_callback': yet_another_function, # or list of functions
        # 'on_skipped_callback': another_function, #or list of functions
        # 'trigger_rule': 'all_success'
    },
    description="A simple tutorial DAG",
    schedule=timedelta(days=1),
    start_date=datetime(2021, 1, 1),
    catchup=False,
    tags=["example"],
) as dag:

    # t1, t2 and t3 are examples of tasks created by instantiating operators
    t1 = BashOperator(
        task_id="print_date",
        bash_command="date",
    )

    t2 = BashOperator(
        task_id="sleep",
        depends_on_past=False,
        bash_command="sleep 5",
        retries=3,
    )
    t1.doc_md = textwrap.dedent(
        """\
    #### Task Documentation
    You can document your task using the attributes `doc_md` (markdown),
    `doc` (plain text), `doc_rst`, `doc_json`, `doc_yaml` which gets
    rendered in the UI's Task Instance Details page.
    ![img](https://imgs.xkcd.com/comics/fixing_problems.png)
    **Image Credit:** Randall Munroe, [XKCD](https://xkcd.com/license.html)
    """
    )

    dag.doc_md = __doc__  # providing that you have a docstring at the beginning of the DAG; OR
    dag.doc_md = """
    This is a documentation placed anywhere
    """  # otherwise, type it like this
    templated_command = textwrap.dedent(
        """
    {% for i in range(5) %}
        echo "{{ ds }}"
        echo "{{ macros.ds_add(ds, 7)}}"
    {% endfor %}
    """
    )

    t3 = BashOperator(
        task_id="templated",
        depends_on_past=False,
        bash_command=templated_command,
    )

    t1 >> [t2, t3]

測試你的流水線

現在是時候測試您的流水線了!首先,確保您的指令碼能成功解析。如果您將程式碼儲存在 airflow.cfg 中指定的 dags 資料夾下的 tutorial.py 中,您可以執行

python ~/airflow/dags/tutorial.py

如果指令碼執行無誤,恭喜!您的 DAG 已正確設定。

命令列元資料驗證

讓我們透過執行一些命令進一步驗證您的指令碼

# initialize the database tables
airflow db migrate

# print the list of active dags
airflow dags list

# prints the list of tasks in the "tutorial" DAG
airflow tasks list tutorial

# prints the graphviz representation of "tutorial" DAG
airflow dags show tutorial

測試任務例項和 DAG 執行

您可以針對指定的 邏輯日期 測試特定的任務例項。這模擬了排程程式在特定日期和時間執行您的任務。

注意

請注意,排程程式是 針對 特定的日期和時間執行您的任務,而不是一定 該日期或時間執行。邏輯日期 是 DAG 執行的命名所依據的時間戳,它通常對應於您的工作流操作的時間段的 結束 時間 — 或 DAG 執行手動觸發的時間。

Airflow 使用此邏輯日期來組織和跟蹤每次執行;這是您在 UI、日誌和程式碼中引用特定執行的方式。透過 UI 或 API 觸發 DAG 時,您可以提供自己的邏輯日期,以便在特定時間點執行工作流。

# command layout: command subcommand [dag_id] [task_id] [(optional) date]

# testing print_date
airflow tasks test tutorial print_date 2015-06-01

# testing sleep
airflow tasks test tutorial sleep 2015-06-01

您還可以透過執行以下命令檢視您的模板是如何渲染的

# testing templated
airflow tasks test tutorial templated 2015-06-01

此命令將提供詳細日誌並執行您的 bash 命令。

請記住,airflow tasks test 命令在本地執行任務例項,將其日誌輸出到 stdout,並且不在資料庫中跟蹤狀態。這是測試單個任務例項的便捷方法。

類似地,airflow dags test 執行單個 DAG 執行,而不在資料庫中註冊任何狀態,這對於在本地測試整個 DAG 非常有用。

下一步是什麼?

本教程到此結束!您已成功編寫並測試了您的第一個 Airflow 流水線。在您繼續您的旅程時,可以考慮將您的程式碼合併到一個配置了 Scheduler 的倉庫中,這將允許您的 DAG 每天被觸發和執行。

以下是關於您下一步的一些建議

另請參閱

這篇內容有幫助嗎?