動態 DAG 生成¶
本文件描述了建立具有動態生成結構的 DAG,但 DAG 中任務數量在 DAG 執行之間保持不變。如果您想實現一個 DAG,其任務(或 Airflow 2.6 中的任務組)的數量可以根據前一個任務的輸出/結果而變化,請參閱動態任務對映。
注意
生成任務和任務組的一致序列
在所有動態生成 DAG 的情況下,您都應該確保任務和任務組每次生成 DAG 時都以一致的序列生成,否則每次重新整理頁面時,網格檢視(Grid View)中的任務和任務組序列可能會發生變化。這可以透過例如在資料庫查詢中使用穩定的排序機制或在 Python 中使用 sorted() 函式來實現。
使用環境變數的動態 DAG¶
如果您想使用變數來配置程式碼,在頂層程式碼中應始終使用環境變數,而不是Airflow 變數。在頂層程式碼中使用 Airflow 變數需要連線 Airflow 的元資料資料庫來獲取值,這會減慢解析速度並增加資料庫的負載。請參閱Airflow 變數的最佳實踐,瞭解如何在 DAG 中使用 Jinja 模板來最好地利用 Airflow 變數。
例如,您可以為生產和開發環境設定不同的 DEPLOYMENT 變數。在生產環境中,變數 DEPLOYMENT 可以設定為 PROD,在開發環境中設定為 DEV。然後,您可以根據環境變數的值,在生產和開發環境中構建不同的 DAG。
deployment = os.environ.get("DEPLOYMENT", "PROD")
if deployment == "PROD":
task = Operator(param="prod-param")
elif deployment == "DEV":
task = Operator(param="dev-param")
生成嵌入元資料的 Python 程式碼¶
您可以外部生成包含元資料的 Python 程式碼作為可匯入的常量。然後,您的 DAG 可以直接匯入這些常量,並用於構建物件和依賴關係。這使得可以輕鬆地從多個 DAG 匯入此類程式碼,而無需查詢、載入和解析儲存在常量中的元資料——這由 Python 直譯器在處理“import”語句時自動完成。這聽起來最初很奇怪,但生成此類程式碼並確保它是您可以從 DAG 中匯入的有效 Python 程式碼卻出奇地容易。
例如,假設您在 DAG 資料夾中動態生成了檔案 my_company_utils/common.py
# This file is generated automatically !
ALL_TASKS = ["task1", "task2", "task3"]
然後您可以在所有 DAG 中像這樣匯入和使用 ALL_TASKS 常量
from my_company_utils.common import ALL_TASKS
with DAG(
dag_id="my_dag",
schedule=None,
start_date=datetime(2021, 1, 1),
catchup=False,
):
for task in ALL_TASKS:
# create your operators and relations here
...
請不要忘記,在這種情況下,您需要在 my_company_utils 資料夾中新增一個空的 __init__.py 檔案,並且應將 my_company_utils/* 行新增到 .airflowignore 檔案(使用預設的 glob 語法)中,以便排程器在查詢 DAG 時忽略整個資料夾。
使用結構化資料檔案的外部配置生成動態 DAG¶
如果您需要使用更復雜的元資料來準備 DAG 結構,並且希望將資料儲存在結構化的非 Python 格式中,您應該將資料匯出到 DAG 資料夾中的檔案並將其推送到 DAG 資料夾,而不是試圖透過 DAG 的頂層程式碼來拉取資料——原因已在父章節 頂層 Python 程式碼 中解釋。
元資料應以方便的檔案格式(JSON, YAML 格式是好的選擇)匯出並與 DAG 一起儲存在 DAG 資料夾中。理想情況下,元資料應與您從中載入 DAG 檔案的模組位於同一包/資料夾中,因為這樣您可以輕鬆地在 DAG 中找到元資料檔案的位置。要讀取的檔案位置可以使用包含 DAG 的模組的 __file__ 屬性找到
my_dir = os.path.dirname(os.path.abspath(__file__))
configuration_file_path = os.path.join(my_dir, "config.yaml")
with open(configuration_file_path) as yaml_file:
configuration = yaml.safe_load(yaml_file)
# Configuration dict is available here
註冊動態 DAG¶
在使用 @dag 修飾器或 with DAG(..) 上下文管理器時,您可以動態生成 DAG,Airflow 將自動註冊它們。
from datetime import datetime
from airflow.sdk import dag, task
configs = {
"config1": {"message": "first DAG will receive this message"},
"config2": {"message": "second DAG will receive this message"},
}
for config_name, config in configs.items():
dag_id = f"dynamic_generated_dag_{config_name}"
@dag(dag_id=dag_id, start_date=datetime(2022, 2, 1))
def dynamic_generated_dag():
@task
def print_message(message):
print(message)
print_message(config["message"])
dynamic_generated_dag()
以下程式碼將為每個配置生成一個 DAG:dynamic_generated_dag_config1 和 dynamic_generated_dag_config2。它們都可以獨立執行,並帶有相關的配置。
如果您不希望自動註冊 DAG,可以透過將 DAG 上的 auto_register=False 設定為停用此行為。
版本 2.4 中的變化:自版本 2.4 起,透過呼叫 @dag 修飾的函式(或在 with DAG(...) 上下文管理器中使用的)建立的 DAG 會自動註冊,不再需要儲存在全域性變數中。
最佳化執行期間的 DAG 解析延遲¶
版本 2.4 中新增。
這是一項實驗性功能。
有時,當您從單個 DAG 檔案生成大量動態 DAG 時,在任務執行期間解析 DAG 檔案可能會導致不必要的延遲。其影響是任務開始前的延遲。
為什麼會這樣?您可能沒有意識到,就在您的任務執行之前,Airflow 會解析 DAG 所屬的 Python 檔案。
Airflow 排程器(或更確切地說,DAG 檔案處理器)需要載入完整的 DAG 檔案來處理所有元資料。然而,任務執行只需要單個 DAG 物件來執行任務。知道了這一點,我們可以在執行任務時跳過生成不必要的 DAG 物件,從而縮短解析時間。當生成的 DAG 數量很大時,這種最佳化效果最明顯。
您可以採用一種實驗性的方法來最佳化此行為。請注意,並非總是可以使用此方法(例如,當下遊 DAG 的生成依賴於上游 DAG 時),或者當您的 DAG 生成存在一些副作用時。此外,下面的程式碼片段相當複雜,雖然我們已經測試過並且在大多數情況下有效,但可能存在無法檢測當前解析的 DAG 的情況,此時將恢復建立所有 DAG 或失敗。請謹慎使用此解決方案並進行徹底測試。
Airflow 的魔法迴圈 部落格文章中展示了一個很好的效能改進示例,該文章描述瞭如何在任務執行期間將解析時間從 120 秒縮短到 200 毫秒。(該示例寫於 Airflow 2.4 之前,因此使用了 Airflow 的未文件化行為。)
在 Airflow 2.4 中,您可以改為使用 get_parsing_context() 方法以文件化且可預測的方式檢索當前上下文。
遍歷需要生成 DAG 的物件集合時,您可以使用上下文來確定是需要生成所有 DAG 物件(在 DAG 檔案處理器中解析時),還是隻生成單個 DAG 物件(執行任務時)。
get_parsing_context() 返回當前的解析上下文。上下文型別為 AirflowParsingContext,如果只需要單個 DAG/任務,則其 dag_id 和 task_id 欄位會被設定。如果需要“完整”解析(例如在 DAG 檔案處理器中),上下文的 dag_id 和 task_id 將設定為 None。
from airflow.sdk import DAG
from airflow.sdk import get_parsing_context
current_dag_id = get_parsing_context().dag_id
for thing in list_of_things:
dag_id = f"generated_dag_{thing}"
if current_dag_id is not None and current_dag_id != dag_id:
continue # skip generation of non-selected DAG
with DAG(dag_id=dag_id, ...):
...