Airflow 101:構建你的第一個工作流¶
歡迎來到 Apache Airflow 的世界!在本教程中,我們將引導您瞭解 Airflow 的基本概念,幫助您理解如何編寫您的第一個 DAG。無論您是否熟悉 Python,或者剛剛入門,我們都會讓這段旅程變得愉快而簡單。
什麼是 DAG?¶
其核心是,DAG 是按反映任務關係和依賴關係的方式組織的任務集合。它就像您工作流的路線圖,展示了每個任務如何相互連線。如果這聽起來有點複雜,請不要擔心;我們將逐步分解它。
流水線定義示例¶
讓我們從一個簡單的流水線定義示例開始。儘管一開始可能看起來令人不知所措,但我們將詳細解釋每一行。
src/airflow/example_dags/tutorial.py
import textwrap
from datetime import datetime, timedelta
# Operators; we need this to operate!
from airflow.providers.standard.operators.bash import BashOperator
# The DAG object; we'll need this to instantiate a DAG
from airflow.sdk import DAG
with DAG(
"tutorial",
# These args will get passed on to each operator
# You can override them on a per-task basis during operator initialization
default_args={
"depends_on_past": False,
"retries": 1,
"retry_delay": timedelta(minutes=5),
# 'queue': 'bash_queue',
# 'pool': 'backfill',
# 'priority_weight': 10,
# 'end_date': datetime(2016, 1, 1),
# 'wait_for_downstream': False,
# 'execution_timeout': timedelta(seconds=300),
# 'on_failure_callback': some_function, # or list of functions
# 'on_success_callback': some_other_function, # or list of functions
# 'on_retry_callback': another_function, # or list of functions
# 'sla_miss_callback': yet_another_function, # or list of functions
# 'on_skipped_callback': another_function, #or list of functions
# 'trigger_rule': 'all_success'
},
description="A simple tutorial DAG",
schedule=timedelta(days=1),
start_date=datetime(2021, 1, 1),
catchup=False,
tags=["example"],
) as dag:
# t1, t2 and t3 are examples of tasks created by instantiating operators
t1 = BashOperator(
task_id="print_date",
bash_command="date",
)
t2 = BashOperator(
task_id="sleep",
depends_on_past=False,
bash_command="sleep 5",
retries=3,
)
t1.doc_md = textwrap.dedent(
"""\
#### Task Documentation
You can document your task using the attributes `doc_md` (markdown),
`doc` (plain text), `doc_rst`, `doc_json`, `doc_yaml` which gets
rendered in the UI's Task Instance Details page.

**Image Credit:** Randall Munroe, [XKCD](https://xkcd.com/license.html)
"""
)
dag.doc_md = __doc__ # providing that you have a docstring at the beginning of the DAG; OR
dag.doc_md = """
This is a documentation placed anywhere
""" # otherwise, type it like this
templated_command = textwrap.dedent(
"""
{% for i in range(5) %}
echo "{{ ds }}"
echo "{{ macros.ds_add(ds, 7)}}"
{% endfor %}
"""
)
t3 = BashOperator(
task_id="templated",
depends_on_past=False,
bash_command=templated_command,
)
t1 >> [t2, t3]
理解 DAG 定義檔案¶
將 Airflow Python 指令碼視為一個配置檔案,它用程式碼定義了 DAG 的結構。您在此處定義的實際任務在不同的環境中執行,這意味著此指令碼並非用於資料處理。它的主要作用是定義 DAG 物件,並且需要快速評估,因為 DAG 檔案處理器會定期檢查它是否有任何更改。
匯入模組¶
要開始使用,我們需要匯入必要的庫。這是任何 Python 指令碼中典型的第一步。
src/airflow/example_dags/tutorial.py
import textwrap
from datetime import datetime, timedelta
# Operators; we need this to operate!
from airflow.providers.standard.operators.bash import BashOperator
# The DAG object; we'll need this to instantiate a DAG
from airflow.sdk import DAG
有關 Python 和 Airflow 如何處理模組的更多詳細資訊,請檢視模組管理。
設定預設引數¶
建立 DAG 及其任務時,您可以直接將引數傳遞給每個任務,也可以在字典中定義一組預設引數。後一種方法通常更高效、更簡潔。
src/airflow/example_dags/tutorial.py
# These args will get passed on to each operator
# You can override them on a per-task basis during operator initialization
default_args={
"depends_on_past": False,
"retries": 1,
"retry_delay": timedelta(minutes=5),
# 'queue': 'bash_queue',
# 'pool': 'backfill',
# 'priority_weight': 10,
# 'end_date': datetime(2016, 1, 1),
# 'wait_for_downstream': False,
# 'execution_timeout': timedelta(seconds=300),
# 'on_failure_callback': some_function, # or list of functions
# 'on_success_callback': some_other_function, # or list of functions
# 'on_retry_callback': another_function, # or list of functions
# 'sla_miss_callback': yet_another_function, # or list of functions
# 'on_skipped_callback': another_function, #or list of functions
# 'trigger_rule': 'all_success'
},
如果您想深入瞭解 BaseOperator 的引數,請檢視 airflow.sdk.BaseOperator 文件。
建立 DAG¶
接下來,我們需要建立一個 DAG 物件來容納我們的任務。我們將為 DAG 提供一個唯一的識別符號,稱為 dag_id,並指定我們剛剛定義的預設引數。我們還將為我們的 DAG 設定每天執行的排程。
src/airflow/example_dags/tutorial.py
with DAG(
"tutorial",
# These args will get passed on to each operator
# You can override them on a per-task basis during operator initialization
default_args={
"depends_on_past": False,
"retries": 1,
"retry_delay": timedelta(minutes=5),
# 'queue': 'bash_queue',
# 'pool': 'backfill',
# 'priority_weight': 10,
# 'end_date': datetime(2016, 1, 1),
# 'wait_for_downstream': False,
# 'execution_timeout': timedelta(seconds=300),
# 'on_failure_callback': some_function, # or list of functions
# 'on_success_callback': some_other_function, # or list of functions
# 'on_retry_callback': another_function, # or list of functions
# 'sla_miss_callback': yet_another_function, # or list of functions
# 'on_skipped_callback': another_function, #or list of functions
# 'trigger_rule': 'all_success'
},
description="A simple tutorial DAG",
schedule=timedelta(days=1),
start_date=datetime(2021, 1, 1),
catchup=False,
tags=["example"],
) as dag:
理解 Operator¶
Operator 在 Airflow 中代表一個工作單元。它們是工作流的構建塊,允許您定義將執行哪些任務。雖然我們可以將 Operator 用於許多工,但 Airflow 也提供了Taskflow API,這是一種更 Python 式的方式來定義工作流,我們稍後將對此進行介紹。
所有 Operator 都派生自 BaseOperator,它包含在 Airflow 中執行任務所需的基本引數。一些流行的 Operator 包括 PythonOperator、BashOperator 和 KubernetesPodOperator。在本教程中,我們將重點介紹 BashOperator 來執行一些簡單的 bash 命令。
定義任務¶
要使用 Operator,必須將其例項化為一個任務。任務規定了 Operator 將如何在 DAG 的上下文中執行其工作。在下面的示例中,我們兩次例項化了 BashOperator 來執行兩個不同的 bash 指令碼。task_id 作為每個任務的唯一識別符號。
src/airflow/example_dags/tutorial.py
t1 = BashOperator(
task_id="print_date",
bash_command="date",
)
t2 = BashOperator(
task_id="sleep",
depends_on_past=False,
bash_command="sleep 5",
retries=3,
)
請注意我們如何將特定於 Operator 的引數(如 bash_command)與從 BaseOperator 繼承的通用引數(如 retries)混合使用。這種方法簡化了我們的程式碼。在第二個任務中,我們甚至覆蓋了 retries 引數,將其設定為 3。
任務引數的優先順序如下
顯式傳遞的引數
default_args字典中的值Operator 的預設值(如果可用)
注意
請記住,每個任務都必須包含或繼承引數 task_id 和 owner。否則,Airflow 將丟擲錯誤。幸運的是,全新的 Airflow 安裝將 owner 預設設定為 airflow,因此您主要需要確保設定了 task_id。
使用 Jinja 進行模板化¶
Airflow 利用了 Jinja 模板化 的強大功能,讓您可以訪問內建引數和宏來增強您的工作流。本節將向您介紹 Airflow 中模板化的基礎知識,重點關注常用的模板變數:{{ ds }},它代表今天的日期戳。
src/airflow/example_dags/tutorial.py
templated_command = textwrap.dedent(
"""
{% for i in range(5) %}
echo "{{ ds }}"
echo "{{ macros.ds_add(ds, 7)}}"
{% endfor %}
"""
)
t3 = BashOperator(
task_id="templated",
depends_on_past=False,
bash_command=templated_command,
)
您會注意到 templated_command 包含 {% %} 塊中的邏輯,並引用 {{ ds }} 等引數。您還可以將檔案傳遞給 bash_command,例如 bash_command='templated_command.sh',以便更好地組織程式碼。您甚至可以定義 user_defined_macros 和 user_defined_filters 來建立自己的變數和過濾器用於模板。有關自定義過濾器的更多資訊,請參閱 Jinja 文件。
有關可在模板中引用的變數和宏的更多資訊,請閱讀模板參考。
新增 DAG 和任務文件¶
您可以為您的 DAG 或單個任務新增文件。雖然 DAG 文件目前支援 markdown,但任務文件可以是純文字、markdown、reStructuredText、JSON 或 YAML。一個好的做法是在 DAG 檔案的開頭包含文件。
src/airflow/example_dags/tutorial.py
t1.doc_md = textwrap.dedent(
"""\
#### Task Documentation
You can document your task using the attributes `doc_md` (markdown),
`doc` (plain text), `doc_rst`, `doc_json`, `doc_yaml` which gets
rendered in the UI's Task Instance Details page.

**Image Credit:** Randall Munroe, [XKCD](https://xkcd.com/license.html)
"""
)
dag.doc_md = __doc__ # providing that you have a docstring at the beginning of the DAG; OR
dag.doc_md = """
This is a documentation placed anywhere
""" # otherwise, type it like this
設定依賴關係¶
在 Airflow 中,任務可以相互依賴。例如,如果您有任務 t1、t2 和 t3,您可以通過幾種方式定義它們的依賴關係
t1.set_downstream(t2)
# This means that t2 will depend on t1
# running successfully to run.
# It is equivalent to:
t2.set_upstream(t1)
# The bit shift operator can also be
# used to chain operations:
t1 >> t2
# And the upstream dependency with the
# bit shift operator:
t2 << t1
# Chaining multiple dependencies becomes
# concise with the bit shift operator:
t1 >> t2 >> t3
# A list of tasks can also be set as
# dependencies. These operations
# all have the same effect:
t1.set_downstream([t2, t3])
t1 >> [t2, t3]
[t2, t3] << t1
請注意,如果 Airflow 檢測到 DAG 中的迴圈或某個依賴關係被多次引用,它將丟擲錯誤。
處理時區¶
建立感知時區的 DAG 很簡單。只需確保使用 pendulum 處理感知時區日期即可。避免使用標準庫中的 timezone,因為它們存在已知限制。
回顧¶
恭喜!現在,您應該對如何在 Airflow 中建立 DAG、定義任務及其依賴關係以及使用模板有了基本的瞭解。您的程式碼應該類似於以下內容
src/airflow/example_dags/tutorial.py
import textwrap
from datetime import datetime, timedelta
# Operators; we need this to operate!
from airflow.providers.standard.operators.bash import BashOperator
# The DAG object; we'll need this to instantiate a DAG
from airflow.sdk import DAG
with DAG(
"tutorial",
# These args will get passed on to each operator
# You can override them on a per-task basis during operator initialization
default_args={
"depends_on_past": False,
"retries": 1,
"retry_delay": timedelta(minutes=5),
# 'queue': 'bash_queue',
# 'pool': 'backfill',
# 'priority_weight': 10,
# 'end_date': datetime(2016, 1, 1),
# 'wait_for_downstream': False,
# 'execution_timeout': timedelta(seconds=300),
# 'on_failure_callback': some_function, # or list of functions
# 'on_success_callback': some_other_function, # or list of functions
# 'on_retry_callback': another_function, # or list of functions
# 'sla_miss_callback': yet_another_function, # or list of functions
# 'on_skipped_callback': another_function, #or list of functions
# 'trigger_rule': 'all_success'
},
description="A simple tutorial DAG",
schedule=timedelta(days=1),
start_date=datetime(2021, 1, 1),
catchup=False,
tags=["example"],
) as dag:
# t1, t2 and t3 are examples of tasks created by instantiating operators
t1 = BashOperator(
task_id="print_date",
bash_command="date",
)
t2 = BashOperator(
task_id="sleep",
depends_on_past=False,
bash_command="sleep 5",
retries=3,
)
t1.doc_md = textwrap.dedent(
"""\
#### Task Documentation
You can document your task using the attributes `doc_md` (markdown),
`doc` (plain text), `doc_rst`, `doc_json`, `doc_yaml` which gets
rendered in the UI's Task Instance Details page.

**Image Credit:** Randall Munroe, [XKCD](https://xkcd.com/license.html)
"""
)
dag.doc_md = __doc__ # providing that you have a docstring at the beginning of the DAG; OR
dag.doc_md = """
This is a documentation placed anywhere
""" # otherwise, type it like this
templated_command = textwrap.dedent(
"""
{% for i in range(5) %}
echo "{{ ds }}"
echo "{{ macros.ds_add(ds, 7)}}"
{% endfor %}
"""
)
t3 = BashOperator(
task_id="templated",
depends_on_past=False,
bash_command=templated_command,
)
t1 >> [t2, t3]
測試你的流水線¶
現在是時候測試您的流水線了!首先,確保您的指令碼能成功解析。如果您將程式碼儲存在 airflow.cfg 中指定的 dags 資料夾下的 tutorial.py 中,您可以執行
python ~/airflow/dags/tutorial.py
如果指令碼執行無誤,恭喜!您的 DAG 已正確設定。
命令列元資料驗證¶
讓我們透過執行一些命令進一步驗證您的指令碼
# initialize the database tables
airflow db migrate
# print the list of active dags
airflow dags list
# prints the list of tasks in the "tutorial" DAG
airflow tasks list tutorial
# prints the graphviz representation of "tutorial" DAG
airflow dags show tutorial
測試任務例項和 DAG 執行¶
您可以針對指定的 邏輯日期 測試特定的任務例項。這模擬了排程程式在特定日期和時間執行您的任務。
注意
請注意,排程程式是 針對 特定的日期和時間執行您的任務,而不是一定 在 該日期或時間執行。邏輯日期 是 DAG 執行的命名所依據的時間戳,它通常對應於您的工作流操作的時間段的 結束 時間 — 或 DAG 執行手動觸發的時間。
Airflow 使用此邏輯日期來組織和跟蹤每次執行;這是您在 UI、日誌和程式碼中引用特定執行的方式。透過 UI 或 API 觸發 DAG 時,您可以提供自己的邏輯日期,以便在特定時間點執行工作流。
# command layout: command subcommand [dag_id] [task_id] [(optional) date]
# testing print_date
airflow tasks test tutorial print_date 2015-06-01
# testing sleep
airflow tasks test tutorial sleep 2015-06-01
您還可以透過執行以下命令檢視您的模板是如何渲染的
# testing templated
airflow tasks test tutorial templated 2015-06-01
此命令將提供詳細日誌並執行您的 bash 命令。
請記住,airflow tasks test 命令在本地執行任務例項,將其日誌輸出到 stdout,並且不在資料庫中跟蹤狀態。這是測試單個任務例項的便捷方法。
類似地,airflow dags test 執行單個 DAG 執行,而不在資料庫中註冊任何狀態,這對於在本地測試整個 DAG 非常有用。
下一步是什麼?¶
本教程到此結束!您已成功編寫並測試了您的第一個 Airflow 流水線。在您繼續您的旅程時,可以考慮將您的程式碼合併到一個配置了 Scheduler 的倉庫中,這將允許您的 DAG 每天被觸發和執行。
以下是關於您下一步的一些建議
另請參閱
繼續本教程的下一步:使用 TaskFlow API 的 Python 式 DAG
探索核心概念部分,以獲取有關 Airflow 概念(例如 DAG、任務、Operator 等)的詳細解釋。