DAGs¶
DAG 是一種模型,封裝了執行工作流所需的一切。一些 DAG 屬性包括:
排程: 工作流應該何時執行。
任務: 任務 (tasks) 是在 worker 上執行的離散工作單元。
任務依賴: 任務 執行的順序和條件。
回撥: 整個工作流完成時要採取的行動。
附加引數: 以及許多其他的操作細節。
這是一個基本的 DAG 示例:
它定義了四個任務——A、B、C 和 D——並規定了它們的執行順序,以及哪些任務依賴於其他任務。它還會說明 DAG 執行的頻率——也許是“從明天開始每 5 分鐘一次”,或者“從 2020 年 1 月 1 日開始每天一次”。
DAG 本身不關心任務內部正在發生*什麼*;它只關心*如何*執行它們——它們的執行順序、重試次數、是否有超時等。
注意
“DAG”一詞來源於數學概念“有向無環圖”,但在 Airflow 中的含義已經遠遠超出了與數學 DAG 概念相關的字面資料結構。
宣告一個 DAG¶
有三種方法可以宣告一個 DAG——你可以使用 with 語句(上下文管理器),這會將內部的一切隱式地新增到 DAG 中:
import datetime
from airflow.sdk import DAG
from airflow.providers.standard.operators.empty import EmptyOperator
with DAG(
dag_id="my_dag_name",
start_date=datetime.datetime(2021, 1, 1),
schedule="@daily",
):
EmptyOperator(task_id="task")
或者,你可以使用標準建構函式,將 DAG 傳遞給你使用的任何運算子:
import datetime
from airflow.sdk import DAG
from airflow.providers.standard.operators.empty import EmptyOperator
my_dag = DAG(
dag_id="my_dag_name",
start_date=datetime.datetime(2021, 1, 1),
schedule="@daily",
)
EmptyOperator(task_id="task", dag=my_dag)
或者,你可以使用 @dag 裝飾器將函式 轉換為 DAG 生成器:
import datetime
from airflow.sdk import dag
from airflow.providers.standard.operators.empty import EmptyOperator
@dag(start_date=datetime.datetime(2021, 1, 1), schedule="@daily")
def generate_dag():
EmptyOperator(task_id="task")
generate_dag()
沒有要執行的任務,DAG 就什麼都不是,而這些任務通常以 運算子 (Operators)、感測器 (Sensors) 或 TaskFlow 的形式出現。
任務依賴¶
任務/運算子通常不會單獨存在;它依賴於其他任務(位於它*上游*的任務),其他任務也依賴於它(位於它*下游*的任務)。宣告這些任務之間的依賴關係構成了 DAG 結構(有向無環圖的*邊*)。
有兩種主要方法來宣告單個任務依賴關係。推薦的方法是使用 >> 和 << 運算子:
first_task >> [second_task, third_task]
third_task << fourth_task
或者,你也可以使用更明確的 set_upstream 和 set_downstream 方法:
first_task.set_downstream([second_task, third_task])
third_task.set_upstream(fourth_task)
還有一些宣告更復雜依賴關係的快捷方式。如果你想讓一個任務列表依賴於另一個任務列表,你就不能使用上面兩種方法,因此需要使用 cross_downstream:
from airflow.sdk import cross_downstream
# Replaces
# [op1, op2] >> op3
# [op1, op2] >> op4
cross_downstream([op1, op2], [op3, op4])
如果你想將依賴關係連結起來,可以使用 chain:
from airflow.sdk import chain
# Replaces op1 >> op2 >> op3 >> op4
chain(op1, op2, op3, op4)
# You can also do it dynamically
chain(*[EmptyOperator(task_id=f"op{i}") for i in range(1, 6)])
chain 也可以對大小相同的列表進行*成對*依賴(這與 cross_downstream 建立的*交叉依賴*不同!):
from airflow.sdk import chain
# Replaces
# op1 >> op2 >> op4 >> op6
# op1 >> op3 >> op5 >> op6
chain(op1, [op2, op3], [op4, op5], op6)
載入 DAG¶
Airflow 從 DAG 包中的 Python 原始檔載入 DAG。它會獲取每個檔案,執行它,然後從該檔案中載入任何 DAG 物件。
這意味著你可以在一個 Python 檔案中定義多個 DAG,甚至可以使用匯入功能將一個非常複雜的 DAG 分散到多個 Python 檔案中。
但請注意,當 Airflow 從 Python 檔案載入 DAG 時,它只會拉取位於*頂層*且是 DAG 例項的任何物件。例如,考慮這個 DAG 檔案:
dag_1 = DAG('this_dag_will_be_discovered')
def my_function():
dag_2 = DAG('but_this_dag_will_not')
my_function()
雖然檔案被訪問時會呼叫兩個 DAG 建構函式,但只有 dag_1 位於頂層(在 globals() 中),因此只有它被新增到 Airflow 中。dag_2 未被載入。
注意
作為一種最佳化,當在 DAG 包中搜索 DAG 時,Airflow 只考慮包含字串 airflow 和 dag(不區分大小寫)的 Python 檔案。
要考慮所有 Python 檔案,請停用 DAG_DISCOVERY_SAFE_MODE 配置標誌。
你還可以在 DAG 包或其任何子資料夾中提供一個 .airflowignore 檔案,該檔案描述了載入器要忽略的檔案模式。它涵蓋了所在目錄及其下的所有子資料夾。有關檔案語法的詳細資訊,請參閱下面的.airflowignore。
如果 .airflowignore 無法滿足你的需求,並且你想要一種更靈活的方式來控制 Airflow 是否需要解析某個 Python 檔案,你可以透過在配置檔案中設定 might_contain_dag_callable 來插入你的可呼叫物件。請注意,此可呼叫物件將替換預設的 Airflow 啟發式方法,即檢查 Python 檔案中是否存在字串 airflow 和 dag(不區分大小寫)。
def might_contain_dag(file_path: str, zip_file: zipfile.ZipFile | None = None) -> bool:
# Your logic to check if there are dags defined in the file_path
# Return True if the file_path needs to be parsed, otherwise False
執行 DAG¶
DAG 將以下列兩種方式之一執行:
當它們被手動或透過 API *觸發*時。
按照 DAG 定義的*排程*執行。
DAG *不要求*有排程,但定義排程非常常見。你可以透過 schedule 引數來定義它,如下所示:
with DAG("my_daily_dag", schedule="@daily"):
...
schedule 引數有多種有效值:
with DAG("my_daily_dag", schedule="0 0 * * *"):
...
with DAG("my_one_time_dag", schedule="@once"):
...
with DAG("my_continuous_dag", schedule="@continuous"):
...
提示
有關不同型別排程的更多資訊,請參閱編寫和排程。
每次執行 DAG 時,都會建立一個新的 DAG 例項,Airflow 將其稱為DAG 執行 (DAG Run)。同一個 DAG 可以並行執行多個 DAG 執行,每個 DAG 執行都有一個定義的資料間隔 (data interval),用於標識任務應該處理的資料期間。
舉個例子說明為何這很有用:考慮編寫一個處理每日實驗資料的 DAG。它已被重寫,你想對前 3 個月的資料執行它——沒問題,因為 Airflow 可以*回填 (backfill)* DAG,並對這前 3 個月的每一天都執行一份副本,所有副本同時執行。
所有這些 DAG 執行都在同一天實際開始,但每個 DAG 執行都有一個數據間隔,涵蓋這 3 個月期間的單一天,而這個資料間隔是 DAG 內部所有任務、運算子和感測器執行時所關注的全部。
就像 DAG 每次執行時都會例項化為一個 DAG 執行一樣,DAG 內指定的任務也會隨之例項化為任務例項 (Task Instances)。
DAG 執行有開始日期和結束日期。這個期間描述了 DAG 實際“執行”的時間。除了 DAG 執行的開始和結束日期外,還有一個稱為*邏輯日期 (logical date)*(正式名稱為執行日期 (execution date))的日期,它描述了 DAG 執行計劃或觸發的預期時間。之所以稱為*邏輯*,是因為它具有抽象性,取決於 DAG 執行的上下文,可能具有多種含義。
例如,如果 DAG 執行由使用者手動觸發,則其邏輯日期將是 DAG 執行觸發的日期和時間,其值應等於 DAG 執行的開始日期。然而,當 DAG 根據設定的排程間隔自動排程時,邏輯日期將指示資料間隔開始的時間點,此時 DAG 執行的開始日期將是邏輯日期 + 排程間隔。
提示
有關 logical date 的更多資訊,請參閱資料間隔 (Data Interval) 和execution_date 是什麼意思?。
DAG 分配¶
請注意,為了執行,每個運算子/任務都必須分配給一個 DAG。Airflow 有幾種方法可以在不顯式傳遞 DAG 的情況下計算出它:
如果你在
with DAG塊內宣告你的運算子:如果你在
@dag裝飾器內宣告你的運算子:如果你將你的運算子放在已分配 DAG 的運算子的上游或下游:
否則,你必須使用 dag= 將其傳遞給每個運算子。
預設引數¶
通常,DAG 中的許多運算子需要一組相同的預設引數(例如它們的 retries)。與其為每個運算子單獨指定這些引數,你可以在建立 DAG 時傳遞 default_args,它將自動應用於與其關聯的任何運算子:
import pendulum
with DAG(
dag_id="my_dag",
start_date=pendulum.datetime(2016, 1, 1),
schedule="@daily",
default_args={"retries": 2},
):
op = BashOperator(task_id="hello_world", bash_command="Hello World!")
print(op.retries) # 2
DAG 裝飾器¶
添加於 2.0 版本。
除了使用上下文管理器或 DAG() 建構函式宣告單個 DAG 的傳統方式外,你還可以使用 @dag 裝飾器裝飾一個函式,將其轉換為 DAG 生成器函式:
src/airflow/example_dags/example_dag_decorator.py
@dag(
schedule=None,
start_date=pendulum.datetime(2021, 1, 1, tz="UTC"),
catchup=False,
tags=["example"],
)
def example_dag_decorator(url: str = "http://httpbin.org/get"):
"""
DAG to get IP address and echo it via BashOperator.
:param url: URL to get IP address from. Defaults to "http://httpbin.org/get".
"""
get_ip = GetRequestOperator(task_id="get_ip", url=url)
@task(multiple_outputs=True)
def prepare_command(raw_json: dict[str, Any]) -> dict[str, str]:
external_ip = raw_json["origin"]
return {
"command": f"echo 'Seems like today your server executing Airflow is connected from IP {external_ip}'",
}
command_info = prepare_command(get_ip.output)
BashOperator(task_id="echo_ip_info", bash_command=command_info["command"])
example_dag = example_dag_decorator()
除了作為一種新的乾淨地建立 DAG 的方式外,裝飾器還會將函式中的任何引數設定為 DAG 引數,允許你在觸發 DAG 時設定這些引數。然後你可以從 Python 程式碼或 Jinja 模板中的 {{ context.params }} 訪問這些引數。
注意
Airflow 只會載入出現在 DAG 檔案頂層的 DAG。這意味著你不能只宣告一個帶有 @dag 的函式——你還必須在你的 DAG 檔案中至少呼叫它一次,並將其分配給一個頂層物件,如上面的示例所示。
控制流¶
預設情況下,只有當一個任務的所有依賴任務都成功時,DAG 才會執行該任務。但是,有幾種方法可以修改這種行為:
分支 (Branching) - 根據條件選擇要進入哪個任務。
觸發規則 (Trigger Rules) - 設定 DAG 執行任務的條件。
設定和拆卸 (Setup and Teardown) - 定義設定和拆卸關係。
僅最新 (Latest Only) - 一種特殊形式的分支,只在針對當前時間執行的 DAGs 上執行。
依賴於過去 (Depends On Past) - 任務可以*依賴於它自身在之前的執行*。
分支¶
你可以利用分支來告訴 DAG *不要*執行所有依賴任務,而是選擇一條或多條路徑進行。這時就用到了 @task.branch 裝飾器。
@task.branch 裝飾器非常像 @task,但它期望被裝飾的函式返回一個任務的 ID(或一個 ID 列表)。指定的任務將被執行,而所有其他路徑將被跳過。它也可以返回 *None* 來跳過所有下游任務。
Python 函式返回的 task_id 必須引用 @task.branch 裝飾的任務直接下游的任務。
注意
當一個任務既是分支運算子的下游,又是被選中的一個或多個任務的下游時,它將不會被跳過:
分支任務的路徑是 branch_a、join 和 branch_b。由於 join 是 branch_a 的下游任務,即使它沒有作為分支決策的一部分返回,它仍然會執行。
@task.branch 也可以與 XComs 一起使用,允許分支上下文根據上游任務動態決定要遵循哪個分支。例如:
@task.branch(task_id="branch_task")
def branch_func(ti=None):
xcom_value = int(ti.xcom_pull(task_ids="start_task"))
if xcom_value >= 5:
return "continue_task"
elif xcom_value >= 3:
return "stop_task"
else:
return None
start_op = BashOperator(
task_id="start_task",
bash_command="echo 5",
do_xcom_push=True,
dag=dag,
)
branch_op = branch_func()
continue_op = EmptyOperator(task_id="continue_task", dag=dag)
stop_op = EmptyOperator(task_id="stop_task", dag=dag)
start_op >> branch_op >> [continue_op, stop_op]
如果你希望實現帶有分支功能的自定義運算子,可以從 BaseBranchOperator 繼承,它的行為類似於 @task.branch 裝飾器,但需要你提供 choose_branch 方法的實現。
注意
建議優先使用 @task.branch 裝飾器,而不是直接在 DAG 中例項化 BranchPythonOperator。後者通常只應在實現自定義運算子時進行子類化。
與 @task.branch 的可呼叫物件一樣,此方法可以返回下游任務的 ID 或任務 ID 列表,這些任務將執行,而其他所有任務將被跳過。它也可以返回 None 以跳過所有下游任務:
class MyBranchOperator(BaseBranchOperator):
def choose_branch(self, context):
"""
Run an extra branch on the first day of the month
"""
if context['data_interval_start'].day == 1:
return ['daily_task_id', 'monthly_task_id']
elif context['data_interval_start'].day == 2:
return 'daily_task_id'
else:
return None
類似於用於常規 Python 程式碼的 @task.branch 裝飾器,還有使用虛擬環境的 @task.branch_virtualenv 或使用外部 Python 的 @task.branch_external_python 分支裝飾器。
僅最新¶
Airflow 的 DAG 執行通常針對的日期與當前日期不同——例如,對上個月的每一天執行一份 DAG 副本,以回填一些資料。
但是,在某些情況下,你*不*希望 DAG 的某些(或所有)部分針對過去的日期執行;在這種情況下,你可以使用 LatestOnlyOperator。
這個特殊的運算子會在你不在“最新”的 DAG 執行時(如果當前真即時間在其執行時間 (execution_time) 和下一次計劃執行時間之間,且不是外部觸發的執行)跳過其下游的所有任務。
這裡有一個例子:
src/airflow/example_dags/example_latest_only_with_trigger.py
import datetime
import pendulum
from airflow.providers.standard.operators.empty import EmptyOperator
from airflow.providers.standard.operators.latest_only import LatestOnlyOperator
from airflow.sdk import DAG
from airflow.utils.trigger_rule import TriggerRule
with DAG(
dag_id="latest_only_with_trigger",
schedule=datetime.timedelta(hours=4),
start_date=pendulum.datetime(2021, 1, 1, tz="UTC"),
catchup=False,
tags=["example3"],
) as dag:
latest_only = LatestOnlyOperator(task_id="latest_only")
task1 = EmptyOperator(task_id="task1")
task2 = EmptyOperator(task_id="task2")
task3 = EmptyOperator(task_id="task3")
task4 = EmptyOperator(task_id="task4", trigger_rule=TriggerRule.ALL_DONE)
latest_only >> task1 >> [task3, task4]
task2 >> [task3, task4]
在這個 DAG 的情況下:
task1是latest_only的直接下游,除了最新的執行外,所有執行都會跳過它。task2完全獨立於latest_only,將在所有計劃週期中執行。task3是task1和task2的下游,由於預設的觸發規則 (trigger rule) 是all_success,它將收到來自task1的級聯跳過。task4是task1和task2的下游,但它不會被跳過,因為其trigger_rule設定為all_done。
依賴於過去¶
你也可以說一個任務只有在其在先前 DAG 執行中的*上一次*執行成功時才能執行。要使用此功能,你只需將任務的 depends_on_past 引數設定為 True。
請注意,如果你在 DAG 生命週期的最開始執行它——特別是它的第一次*自動化*執行——那麼該任務仍將執行,因為它沒有之前的執行可以依賴。
觸發規則¶
預設情況下,Airflow 會等待一個任務的所有上游(直接父任務)任務都處於成功狀態後才執行該任務。
然而,這只是預設行為,你可以使用任務的 trigger_rule 引數來控制它。trigger_rule 的選項包括:
all_success(預設): 所有上游任務都已成功。all_failed: 所有上游任務都處於failed或upstream_failed狀態。all_done: 所有上游任務都已完成執行。all_skipped: 所有上游任務都處於skipped狀態。one_failed: 至少有一個上游任務失敗(不等待所有上游任務完成)。one_success: 至少有一個上游任務成功(不等待所有上游任務完成)。one_done: 至少有一個上游任務成功或失敗。none_failed: 所有上游任務都沒有failed或upstream_failed——也就是說,所有上游任務都已成功或被跳過。none_failed_min_one_success: 所有上游任務都沒有failed或upstream_failed,並且至少有一個上游任務成功。none_skipped: 沒有上游任務處於skipped狀態——也就是說,所有上游任務都處於success、failed或upstream_failed狀態。always: 完全沒有依賴關係,隨時執行此任務。
如果需要,你還可以將其與依賴於過去 (Depends On Past) 功能結合使用。
注意
重要的是要意識到觸發規則與被跳過任務之間的相互作用,特別是作為分支操作一部分被跳過的任務。*在分支操作的下游,幾乎永遠不要使用 all_success 或 all_failed*。
被跳過的任務將透過觸發規則 all_success 和 all_failed 進行級聯,導致它們也被跳過。考慮以下 DAG:
# dags/branch_without_trigger.py
import pendulum
from airflow.sdk import task
from airflow.sdk import DAG
from airflow.providers.standard.operators.empty import EmptyOperator
dag = DAG(
dag_id="branch_without_trigger",
schedule="@once",
start_date=pendulum.datetime(2019, 2, 28, tz="UTC"),
)
run_this_first = EmptyOperator(task_id="run_this_first", dag=dag)
@task.branch(task_id="branching")
def do_branching():
return "branch_a"
branching = do_branching()
branch_a = EmptyOperator(task_id="branch_a", dag=dag)
follow_branch_a = EmptyOperator(task_id="follow_branch_a", dag=dag)
branch_false = EmptyOperator(task_id="branch_false", dag=dag)
join = EmptyOperator(task_id="join", dag=dag)
run_this_first >> branching
branching >> branch_a >> follow_branch_a >> join
branching >> branch_false >> join
join 位於 follow_branch_a 和 branch_false 的下游。join 任務將顯示為已跳過,因為其 trigger_rule 預設為 all_success,並且分支操作導致的跳過會級聯到標記為 all_success 的任務。
透過將 join 任務中的 trigger_rule 設定為 none_failed_min_one_success,我們可以獲得預期的行為:
設定和拆卸¶
在資料工作流中,建立資源(例如計算資源),使用它完成一些工作,然後將其拆卸是很常見的。Airflow 提供設定和拆卸任務來支援此需求。
請參閱主要文章設定和拆卸,瞭解如何使用此功能。
動態 DAG¶
由於 DAG 由 Python 程式碼定義,因此無需使其完全宣告式;你可以自由使用迴圈、函式等來定義你的 DAG。
例如,這是一個使用 for 迴圈定義一些任務的 DAG:
with DAG("loop_example", ...):
first = EmptyOperator(task_id="first")
last = EmptyOperator(task_id="last")
options = ["branch_a", "branch_b", "branch_c", "branch_d"]
for option in options:
t = EmptyOperator(task_id=option)
first >> t >> last
一般來說,我們建議你儘量保持 DAG 任務的*拓撲結構*(佈局)相對穩定;動態 DAG 通常更適合用於動態載入配置選項或更改運算子選項。
DAG 視覺化¶
如果你想檢視 DAG 的視覺化表示,有兩種選擇:
你可以開啟 Airflow UI,導航到你的 DAG,然後選擇“圖表” (Graph)。
你可以執行
airflow dags show,它會將其渲染為影像檔案。
我們通常建議你使用圖表檢視 (Graph view),因為它還會顯示你選擇的任何 DAG 執行中所有任務例項 (Task Instances) 的狀態。
當然,隨著你的 DAG 變得越來越複雜,它們會變得越來越難以理解,因此我們提供了幾種修改這些 DAG 檢視的方法,使它們更容易理解。
TaskGroups¶
TaskGroup 可用於在圖表檢視 (Graph view) 中將任務組織成層次結構組。它對於建立重複模式和減少視覺混亂很有用。
TaskGroup 中的任務位於同一原始 DAG 上,並遵循所有 DAG 設定和池配置。
可以使用 >> 和 << 運算子跨 TaskGroup 中的所有任務應用依賴關係。例如,以下程式碼將 task1 和 task2 放入 TaskGroup group1 中,然後將這兩個任務都設定為 task3 的上游:
from airflow.sdk import task_group
@task_group()
def group1():
task1 = EmptyOperator(task_id="task1")
task2 = EmptyOperator(task_id="task2")
task3 = EmptyOperator(task_id="task3")
group1() >> task3
TaskGroup 也支援 default_args,就像 DAG 一樣,它會覆蓋 DAG 級別的 default_args。
import datetime
from airflow.sdk import DAG
from airflow.sdk import task_group
from airflow.providers.standard.operators.bash import BashOperator
from airflow.providers.standard.operators.empty import EmptyOperator
with DAG(
dag_id="dag1",
start_date=datetime.datetime(2016, 1, 1),
schedule="@daily",
default_args={"retries": 1},
):
@task_group(default_args={"retries": 3})
def group1():
"""This docstring will become the tooltip for the TaskGroup."""
task1 = EmptyOperator(task_id="task1")
task2 = BashOperator(task_id="task2", bash_command="echo Hello World!", retries=2)
print(task1.retries) # 3
print(task2.retries) # 2
如果你想了解 TaskGroup 的更高階用法,可以檢視 Airflow 自帶的 example_task_group_decorator.py 示例 DAG。
注意
預設情況下,子任務/TaskGroup 的 ID 會以其父 TaskGroup 的 group_id 作為字首。這有助於確保整個 DAG 中 group_id 和 task_id 的唯一性。
要停用此字首行為,在建立 TaskGroup 時傳遞引數 prefix_group_id=False,但請注意,現在你需要自己負責確保每個任務和組都具有唯一的 ID。
注意
當使用 @task_group 裝飾器時,除非顯式提供了 tooltip 值,否則被裝飾函式的文件字串(docstring)將用作 UI 中 TaskGroup 的工具提示(tooltip)。
邊緣標籤 (Edge Labels)¶
除了將任務分組之外,你還可以在“圖檢視”(Graph view)中為不同任務之間的*依賴邊緣*新增標籤——這對於 DAG 的分支區域特別有用,你可以標註某些分支可能執行的條件。
要新增標籤,可以直接在 >> 和 << 運算子中以內聯方式使用它們
from airflow.sdk import Label
my_task >> Label("When empty") >> other_task
或者,你可以將一個 Label 物件傳遞給 set_upstream/set_downstream 方法
from airflow.sdk import Label
my_task.set_downstream(other_task, Label("When empty"))
這是一個說明如何標註不同分支的示例 DAG
src/airflow/example_dags/example_branch_labels.py
with DAG(
"example_branch_labels",
schedule="@daily",
start_date=pendulum.datetime(2021, 1, 1, tz="UTC"),
catchup=False,
) as dag:
ingest = EmptyOperator(task_id="ingest")
analyse = EmptyOperator(task_id="analyze")
check = EmptyOperator(task_id="check_integrity")
describe = EmptyOperator(task_id="describe_integrity")
error = EmptyOperator(task_id="email_error")
save = EmptyOperator(task_id="save")
report = EmptyOperator(task_id="report")
ingest >> analyse >> check
check >> Label("No errors") >> save >> report
check >> Label("Errors found") >> describe >> error >> report
DAG 與任務文件 (DAG & Task Documentation)¶
你可以為你的 DAG 和任務物件新增文件或備註,這些內容可以在 Web 介面中看到(DAG 對應“Graph”和“Tree”,任務對應“Task Instance Details”)。
如果定義了以下一組特殊的任務屬性,它們將被渲染為富文字內容
屬性 |
渲染為 |
|---|---|
doc |
等寬字型 |
doc_json |
json |
doc_yaml |
yaml |
doc_md |
markdown |
doc_rst |
reStructuredText |
請注意,對於 DAG 而言,只有 doc_md 屬性會被解析。對於 DAG,它可以包含一個字串或指向 markdown 檔案的引用。markdown 檔案透過以 .md 結尾的字串來識別。如果提供的是相對路徑,檔案將從 Airflow Scheduler 或 DAG 解析器啟動的相對路徑載入。如果 markdown 檔案不存在,傳遞的檔名將被用作文字,不會顯示異常。請注意,markdown 檔案是在 DAG 解析期間載入的,對其內容的更改需要經過一個 DAG 解析週期才能顯示。
如果你的任務是從配置檔案動態構建的,這會特別有用,因為它允許你在 Airflow 中展示生成相關任務的配置。
"""
### My great DAG
"""
import pendulum
dag = DAG(
"my_dag",
start_date=pendulum.datetime(2021, 1, 1, tz="UTC"),
schedule="@daily",
catchup=False,
)
dag.doc_md = __doc__
t = BashOperator("foo", dag=dag)
t.doc_md = """\
#Title"
Here's a [url](www.airbnb.com)
"""
打包 DAG (Packaging dags)¶
雖然簡單的 DAG 通常只包含在一個 Python 檔案中,但更復雜的 DAG 分佈在多個檔案幷包含應隨其一起分發(“vendored”)的依賴項也很常見。
你可以在 DAG 捆綁包內部完成所有這些,使用標準的 檔案系統佈局,或者你可以將 DAG 及其所有 Python 檔案打包成一個單獨的 zip 檔案。例如,你可以將兩個 DAG 和它們所需的一個依賴項打包成一個 zip 檔案,其內容如下:
my_dag1.py
my_dag2.py
package1/__init__.py
package1/functions.py
請注意,打包的 DAG 存在一些限制:
如果為序列化啟用了 pickling,則無法使用它們
它們不能包含編譯後的庫(例如
libz.so),只能包含純 Python 檔案它們將被插入到 Python 的
sys.path中,並可供 Airflow 程序中的任何其他程式碼匯入,因此請確保包名不會與系統上已安裝的其他包衝突。
總的來說,如果你有一組複雜的編譯依賴項和模組,使用 Python 的 virtualenv 系統並透過 pip 在目標系統上安裝必要的包可能是更好的選擇。
.airflowignore¶
.airflowignore 檔案指定了 Airflow 應該故意忽略的 DAG 捆綁包或 PLUGINS_FOLDER 中的目錄或檔案。 Airflow 支援檔案中的兩種模式語法,由 DAG_IGNORE_FILE_SYNTAX 配置引數指定(*在 Airflow 2.3 中新增*):regexp 和 glob。
注意
在 Airflow 3 或更高版本中,預設的 DAG_IGNORE_FILE_SYNTAX 是 glob(在早期版本中是 regexp)。
使用 glob 語法(預設),模式的工作方式與 .gitignore 檔案中的模式類似:
*字元匹配任意數量的字元,但不包括/?字元匹配任意單個字元,但不包括/範圍表示法,例如
[a-zA-Z],可用於匹配範圍內的任一字元透過在模式前加上
!可以否定該模式。模式按順序評估,因此否定可以覆蓋同一檔案中先前定義的模式或父目錄中定義的模式。雙星號(
**)可用於匹配跨目錄的檔案/路徑。例如,**/__pycache__/將忽略每個子目錄中的__pycache__目錄,深度不限。如果模式開頭或中間(或兩者都有)包含
/,則該模式是相對於.airflowignore檔案所在的目錄級別。否則,該模式也可能匹配.airflowignore檔案所在目錄級別下的任何級別。
對於 regexp 模式語法,.airflowignore 檔案中的每一行都指定一個正則表示式模式,其名稱(而非 DAG ID)匹配任一模式的目錄或檔案將被忽略(底層實現使用 Pattern.search() 來匹配模式)。使用 # 字元表示註釋;以 # 開頭的行的所有字元都將被忽略。
.airflowignore 檔案應該放在你的 DAG 捆綁包中。例如,你可以準備一個使用 glob 語法的 .airflowignore 檔案:
**/*project_a*
tenant_[0-9]*
那麼你的 DAG 捆綁包中諸如 project_a_dag_1.py、TESTING_project_a.py、tenant_1.py、project_a/dag_1.py 和 tenant_1/dag_1.py 等檔案將被忽略(如果目錄名稱匹配任一模式,則該目錄及其所有子資料夾將完全不被 Airflow 掃描。這提高了 DAG 查詢的效率)。
.airflowignore 檔案的作用範圍是其所在的目錄及其所有子資料夾。你也可以為 DAG 捆綁包中的一個子資料夾準備 .airflowignore 檔案,它將僅適用於該子資料夾。
DAG 依賴關係 (DAG Dependencies)¶
在 Airflow 2.1 中新增
DAG 中任務之間的依賴關係透過上游和下游關係明確定義,而 DAG 之間的依賴關係則稍微複雜一些。總的來說,一個 DAG 可以依賴於另一個 DAG 的方式有兩種:
等待 -
ExternalTaskSensor
額外的複雜性在於,一個 DAG 可能需要等待或觸發另一個 DAG 的多次執行,且資料間隔可能不同。**DAG 依賴關係**檢視(選單 -> 瀏覽 -> DAG 依賴關係)有助於視覺化 DAG 之間的依賴關係。這些依賴關係由排程器在 DAG 序列化期間計算,Web 伺服器使用它們來構建依賴圖。
依賴檢測器是可配置的,因此你可以實現自己的邏輯,與 DependencyDetector 中的預設邏輯不同。
DAG 暫停、停用和刪除 (DAG pausing, deactivation and deletion)¶
DAG 在處於“未執行”狀態時有幾種不同的狀態。DAG 可以被暫停(paused)、停用(deactivated),最後可以刪除 DAG 的所有元資料。
DAG 可以在 UI 中被暫停(paused),當它存在於 DAGS_FOLDER 中且排程器已將其儲存在資料庫中,但使用者選擇透過 UI 停用它。透過 UI 和 API 可以執行“暫停”(pause)和“取消暫停”(unpause)操作。被暫停的 DAG 不會被排程器安排執行,但你可以透過 UI 手動觸發它們。在 UI 中,你可以在“Paused”標籤頁中看到已暫停的 DAG。未暫停的 DAG 可以在“Active”標籤頁中找到。當一個 DAG 被暫停時,任何正在執行的任務將被允許完成,所有下游任務將被置於“Scheduled”(已排程)狀態。當 DAG 取消暫停時,任何處於“Scheduled”狀態的任務將按照 DAG 邏輯開始執行。沒有“Scheduled”任務的 DAG 將按照其排程計劃開始執行。
DAG 可以透過從 DAGS_FOLDER 中移除檔案來停用(deactivated)(不要與 UI 中的“Active”標籤混淆)。當排程器解析 DAGS_FOLDER 並發現之前見過且儲存在資料庫中的某個 DAG 不存在時,它會將其設定為停用狀態。已停用的 DAG 會保留其元資料和歷史記錄;當該 DAG 被重新添加回 DAGS_FOLDER 時,它會再次被啟用,並且歷史記錄將可見。你無法透過 UI 或 API 啟用/停用 DAG,這隻能透過從 DAGS_FOLDER 中移除檔案來完成。再次強調 - 當 DAG 被排程器停用時,其歷史執行資料不會丟失。請注意,Airflow UI 中的“Active”標籤頁指的是既已“Activated”且未“Paused”的 DAG,這一點最初可能會有點令人困惑。
你無法在 UI 中看到已停用的 DAG——有時你可以看到歷史執行記錄,但當你嘗試檢視相關資訊時,會看到 DAG 不存在的錯誤。
你還可以使用 UI 或 API 從元資料資料庫中刪除 DAG 的元資料,但這並不總會導致 DAG 從 UI 中消失——這一點最初可能也會有點令人困惑。如果你刪除元資料時 DAG 仍然在 DAGS_FOLDER 中,排程器會重新解析該資料夾,DAG 將再次出現,只是該 DAG 的歷史執行資訊會被移除。
這意味著如果你想實際刪除一個 DAG 及其所有歷史元資料,你需要分三步進行:
暫停 DAG。
透過 UI 或 API 從資料庫中刪除歷史元資料。
從
DAGS_FOLDER中刪除 DAG 檔案,並等待其變為非活動狀態。
DAG 自動暫停 (Experimental)¶
DAG 也可以配置為自動暫停。Airflow 有一個配置項,允許在 DAG 連續失敗 N 次後自動停用它。
我們也可以從 DAG 引數中提供並覆蓋這些配置:
max_consecutive_failed_dag_runs:覆蓋 max_consecutive_failed_dag_runs_per_dag 配置。