2025 年 Airflow Summit 將於 10 月 07 日至 09 日舉行。立即註冊獲取早鳥票!

airflow.models.dagbag

FileLoadStat

關於單個檔案的資訊。

DagBag

DagBag 是一個 DAG 的集合,從資料夾樹中解析出來,並具有高階配置設定。

DagPriorityParsingRequest

儲存檔案解析時將被優先處理的 DAG 解析請求的模型。

函式

generate_md5_hash(context)

模組內容

class airflow.models.dagbag.FileLoadStat[source]

基類: NamedTuple

關於單個檔案的資訊。

引數:
  • file – 載入的檔案。

  • duration – 處理檔案所花費的時間。

  • dag_num – 此檔案中載入的 DAG 總數。

  • task_num – 此檔案中載入的任務總數。

  • dags – 此檔案中載入的 DAG 名稱。

  • warning_num – 處理此檔案捕獲的警告總數。

file: str[source]
duration: datetime.timedelta[source]
dag_num: int[source]
task_num: int[source]
dags: str[source]
warning_num: int[source]
class airflow.models.dagbag.DagBag(dag_folder=None, include_examples=NOTSET, safe_mode=NOTSET, read_dags_from_db=False, load_op_links=True, collect_dags=True, known_pools=None, bundle_path=None)[source]

基類: airflow.utils.log.logging_mixin.LoggingMixin

DagBag 是一個 DAG 的集合,從資料夾樹中解析出來,並具有高階配置設定。

一些可能的設定包括用作後端的資料庫以及用於觸發任務的執行器。這使得為生產、開發、測試或不同團隊或安全配置檔案等執行獨立環境變得更容易。曾經是系統級別的設定現在變為 DagBag 級別,以便一個系統可以執行多個獨立的設定集。

引數:
  • dag_folder (str | pathlib.Path | None) – 掃描以查詢 DAG 的資料夾

  • include_examples (bool | airflow.utils.types.ArgNotSet) – 是否包含 Airflow 附帶的示例

  • safe_mode (bool | airflow.utils.types.ArgNotSet) – 當 False 時,掃描所有 Python 模組以查詢 DAG。當 True 時,使用啟發式方法(包含 DAGairflow 字串的檔案)過濾要掃描的 Python 模組以查詢 DAG。

  • read_dags_from_db (bool) – 如果傳入 True,則從資料庫讀取 DAG。如果為 False,則從 Python 檔案讀取 DAG。

  • load_op_links (bool) – 反序列化 DAG 時是否應透過外掛載入額外的 Operator 連結?此標誌在排程器 (Scheduler) 中設定為 False,以便不載入額外 Operator 連結,以免在排程器中執行使用者程式碼。

  • collect_dags (bool) – 當為 True 時,在類初始化期間收集 DAG。

  • known_pools (set[str] | None) – 如果不是 None,則當 Task 嘗試使用未知 Pool 時生成警告。

bundle_path: pathlib.Path | None = None[source]
dag_folder[source]
dags: dict[str, airflow.models.dag.DAG][source]
file_last_changed: dict[str, datetime.datetime][source]
import_errors: dict[str, str][source]
captured_warnings: dict[str, tuple[str, Ellipsis]][source]
has_logged = False[source]
read_dags_from_db = False[source]
dags_last_fetched: dict[str, datetime.datetime][source]
dags_hash: dict[str, str][source]
known_pools = None[source]
dagbag_import_error_tracebacks = True[source]
dagbag_import_error_traceback_depth[source]
size()[source]
返回值:

此 dagbag 中包含的 dag 數量

返回型別:

int

property dag_ids: list[str][source]

獲取 DAG ID。

返回值:

此 bag 中的 DAG ID 列表

返回型別:

list[str]

get_dag(dag_id, session=None)[source]

從字典中獲取 DAG,如果過期則重新整理。

引數:

dag_id – DAG ID

process_file(filepath, only_if_updated=True, safe_mode=True)[source]

給定 Python 模組或 Zip 檔案路徑,匯入模組並在其中查詢 DAG 物件。

property dag_warnings: set[airflow.models.dagwarning.DagWarning][source]

獲取 DagBag 中 DAG 的 DagWarning 集合。

bag_dag(dag)[source]

將 DAG 新增到 bag 中。

丟擲:

如果檢測到迴圈,則丟擲 AirflowDagCycleException。

丟擲:

如果此 DAG 已存在於 bag 中,則丟擲 AirflowDagDuplicatedIdException。

collect_dags(dag_folder=None, only_if_updated=True, include_examples=conf.getboolean('core', 'LOAD_EXAMPLES'), safe_mode=conf.getboolean('core', 'DAG_DISCOVERY_SAFE_MODE'))[source]

在給定路徑中查詢 Python 模組,匯入它們,並將它們新增到 DagBag 集合中。

注意,如果在處理目錄時找到 .airflowignore 檔案,其行為將非常類似於 .gitignore,忽略與檔案中指定任何模式匹配的檔案。

注意: .airflowignore 中的模式根據 DAG_IGNORE_FILE_SYNTAX 配置引數,被解釋為非錨定正則表示式或類似 gitignore 的 glob 表示式。

collect_dags_from_db()[source]

從資料庫收集 DAG。

dagbag_report()[source]

列印關於 DagBag 載入統計資訊的報告。

sync_to_db(bundle_name, bundle_version, session=NEW_SESSION)[source]

將 DAG 列表的相關屬性儲存到資料庫。

airflow.models.dagbag.generate_md5_hash(context)[source]
class airflow.models.dagbag.DagPriorityParsingRequest(bundle_name, relative_fileloc)[source]

基類: airflow.models.base.Base

儲存檔案解析時將被優先處理的 DAG 解析請求的模型。

__tablename__ = 'dag_priority_parsing_request'[source]
id[source]
bundle_name[source]
relative_fileloc[source]
__repr__()[source]

此條目有幫助嗎?