Airflow 峰會 2025 將於 10 月 07-09 日舉行。立即註冊獲取早鳥票!

DAG 捆綁包

DAG 捆綁包是包含一個或多個 DAG、檔案及其相關檔案(例如其他 Python 指令碼、配置檔案或其他資源)的集合。DAG 捆綁包可以從各種位置獲取 DAG,例如本地目錄、Git 倉庫或其他外部系統。部署管理員也可以編寫自己的 DAG 捆綁包類來支援自定義來源。您還可以在一個 Airflow 部署中定義多個 DAG 捆綁包,以便更好地組織您的 DAG。透過將捆綁包保持在更高級別,它允許對 DAG 執行所需的一切進行版本控制。

這與 Airflow 2 或更早版本中的 dags folder 相似,但功能更強大。在早期版本中,DAG 必須位於本地磁碟上的一個位置,並且將 DAG 獲取到該位置完全是部署管理員的責任。

由於 DAG 捆綁包支援版本控制,它們還允許 Airflow 使用特定版本的 DAG 捆綁包執行任務,從而使 DAG 執行在整個執行過程中使用相同的程式碼,即使 DAG 在執行中途被更新。

為何 DAG 捆綁包很重要?

  • 版本控制: 透過支援版本控制,DAG 捆綁包允許 DAG 執行在整個執行過程中使用相同的程式碼,即使 DAG 在執行中途被更新。

  • 可伸縮性: 透過 DAG 捆綁包,Airflow 可以將大量 DAG 組織成邏輯單元,從而高效地管理它們。

  • 靈活性: DAG 捆綁包能夠與外部系統(例如 Git 倉庫)無縫整合,以獲取 DAG。

DAG 捆綁包的型別

Airflow 支援多種型別的 DAG 捆綁包,每種型別都適用於特定的用例

airflow.dag_processing.bundles.local.LocalDagBundle

這些捆綁包引用包含 DAG 檔案的本地目錄。它們非常適合開發和測試環境,但不支援捆綁包的版本控制,這意味著任務始終使用最新程式碼執行。

airflow.providers.git.bundles.git.GitDagBundle

這些捆綁包與 Git 倉庫整合,允許 Airflow 直接從倉庫中獲取 DAG。

配置 DAG 捆綁包

DAG 捆綁包在 dag_bundle_config_list 中配置。您可以在此處新增一個或多個 DAG 捆綁包。

預設情況下,Airflow 會新增一個本地 DAG 捆綁包,這與舊的 dags folder 相同。這樣做是為了向後相容,如果您不想使用它,可以將其移除。您也可以保留它並新增其他 DAG 捆綁包,例如 git dag bundle。

例如,在您的 airflow.cfg 檔案中新增多個 DAG 捆綁包

[dag_processor]
dag_bundle_config_list = [
    {
      "name": "my_git_repo",
      "classpath": "airflow.dag_processing.bundles.git.GitDagBundle",
      "kwargs": {"tracking_ref": "main", "git_conn_id": "my_git_conn"}
    },
    {
      "name": "dags-folder",
      "classpath": "airflow.dag_processing.bundles.local.LocalDagBundle",
      "kwargs": {}
    }
  ]

注意

空格,特別是在最後一行上的空格,很重要,這樣多行值才能正常工作。更多詳細資訊可以在 configparser 文件中找到。

您還可以透過在 kwargs 中傳遞引數來覆蓋每個 DAG 捆綁包的 refresh_interval。這控制了 DAG 處理器重新整理或查詢 DAG 捆綁包中新檔案的頻率。

編寫自定義 DAG 捆綁包

當您透過擴充套件 BaseDagBundle 類來實現您自己的 DAG 捆綁包時,有幾個方法是必須實現的。以下是幫助您實現自定義 DAG 捆綁包的指南。

抽象方法

以下方法是抽象方法,必須在您的自定義捆綁包類中實現

path

此屬性應返回一個 Path 物件,指向儲存此捆綁包的 DAG 檔案的目錄。Airflow 使用此屬性來定位要處理的 DAG 檔案。

get_current_version

此方法應以字串形式返回捆綁包的當前版本。Airflow 稍後會將此版本傳遞給 __init__ 方法,以便在執行任務時再次獲取此版本的捆綁包。如果不支援版本控制,則應返回 None

refresh

此方法應處理從其來源重新整理捆綁包內容(例如,從遠端倉庫拉取最新更改)。DAG 處理器會定期使用此方法,以確保捆綁包是最新的。

可選方法

除了抽象方法之外,您可以選擇覆蓋以下方法來自定義捆綁包的行為

__init__

可以擴充套件此方法以使用額外引數初始化捆綁包,例如 GitDagBundletracking_ref。它還應呼叫父類的 __init__ 方法以確保正確初始化。在此方法中應避免執行耗時操作,例如網路呼叫,以防止捆綁包例項化期間出現延遲;耗時操作應改在 initialize 方法中執行。

initialize

此方法在 DAG 處理器或 worker 首次使用捆綁包之前呼叫。它允許您僅在訪問捆綁包內容時執行耗時操作。

view_url

此方法應以字串形式返回一個 URL,用於在外部系統(例如 Git 倉庫的 Web 介面)上檢視捆綁包。

其他注意事項

  • 版本控制: 如果您的捆綁包支援版本控制,請確保實現了 initializeget_current_versionrefresh 方法來處理特定版本的邏輯。

  • 併發性: worker 可能同時建立多個捆綁包,並且不會對捆綁包物件的呼叫進行序列化。因此,如果底層技術存在問題,捆綁包類必須處理鎖定。例如,如果您正在克隆 git 倉庫,捆綁包類負責鎖定,以確保每次只有一個捆綁包物件進行克隆。基類中有一個 lock 方法,如有必要可用於此目的。

此條目有幫助嗎?