Airflow 峰會 2025 將於 10 月 07 日至 09 日舉行。立即註冊以獲取早鳥票!

airflow.models.dag

屬性

log

AssetT

TAG_MAX_LEN

DagStateChangeCallback

ScheduleInterval

ScheduleArg

dag

異常

InconsistentDataInterval

當模型錯誤地填充資料間隔欄位時引發的異常。

DAG

DAG(有向無環圖)是具有方向依賴關係的任務集合。

DagTag

每個 DAG 的標籤名稱,以便在 DAG 檢視中快速過濾。

DagOwnerAttributes

定義不同所有者屬性的表。

DagModel

包含 DAG 屬性的表。

函式

get_last_dagrun(dag_id, session[, ...])

返回某個 DAG 的最後一次執行,如果沒有則返回 None。

get_asset_triggered_next_run_info(dag_ids, *, session)

獲取給定 dag_ids 列表的下一次執行資訊。

模組內容

airflow.models.dag.log[source]
airflow.models.dag.AssetT[source]
airflow.models.dag.TAG_MAX_LEN = 100[source]
airflow.models.dag.DagStateChangeCallback[source]
airflow.models.dag.ScheduleInterval[source]
airflow.models.dag.ScheduleArg[source]
exception airflow.models.dag.InconsistentDataInterval(instance, start_field_name, end_field_name)[source]

基類: airflow.exceptions.AirflowException

當模型錯誤地填充資料間隔欄位時引發的異常。

資料間隔欄位應要麼都為 None(對於 AIP-39 之前排程的執行),要麼都為 datetime(對於 AIP-39 實現後排程的執行)。如果其中恰好一個欄位為 None,則會引發此異常。

__str__()[source]

返回 str(self)。

airflow.models.dag.get_last_dagrun(dag_id, session, include_manually_triggered=False)[source]

返回某個 DAG 的最後一次執行,如果沒有則返回 None。

最後一次 DAG 執行可以是任何型別的執行,例如排程或回填。忽略被覆蓋的 DagRun。

airflow.models.dag.get_asset_triggered_next_run_info(dag_ids, *, session)[source]

獲取給定 dag_ids 列表的下一次執行資訊。

給定一個 dag_ids 列表,獲取一個字串表示任何資產觸發的 DAG 距離下一次執行有多近,例如“2 個資產中 1 個已更新”。

airflow.models.dag.dag[source]
class airflow.models.dag.DAG(context=None)[source]

基類: airflow.sdk.definitions.dag.DAG, airflow.utils.log.logging_mixin.LoggingMixin

DAG(有向無環圖)是具有方向依賴關係的任務集合。

一個 DAG 也有一個排程、一個開始日期和一個結束日期(可選)。對於每個排程(例如每日或每小時),DAG 需要在其依賴項滿足時執行每個單獨的任務。某些任務具有依賴於其自身過去的屬性,這意味著它們在完成其先前的排程(以及上游任務)之前無法執行。

DAGs 本質上充當任務的名稱空間。一個 task_id 只能新增到同一個 DAG 一次。

請注意,如果您計劃使用時區,所有提供的日期都應該是 pendulum 日期。請參閱 時區感知 DAGs

版本 2.4 新增: 用於指定基於時間的排程邏輯 (timetable) 或資產驅動觸發器的 schedule 引數。

版本 3.0 變更: schedule 的預設值已更改為 None(無排程)。先前的預設值為 timedelta(days=1)

引數:
  • dag_id – DAG 的 ID;必須僅包含字母數字字元、破折號、點和下劃線(所有 ASCII 字元)

  • description – DAG 的描述,例如用於在 webserver 上顯示

  • schedule – 如果提供,這將定義 DAG 執行的排程規則。可能的值包括 cron 表示式字串、timedelta 物件、Timetable 或 Asset 物件列表。另請參閱 使用 Timetables 定製 DAG 排程

  • start_date – 排程器將嘗試回填的時間戳。如果未提供,則必須手動使用明確的時間範圍進行回填。

  • end_date – DAG 不會執行的日期,留空(None)表示開放式排程。

  • template_searchpath – 此資料夾列表(非相對路徑)定義了 jinja 查詢模板的位置。順序很重要。請注意,jinja/airflow 預設包含您的 DAG 檔案路徑

  • template_undefined – 模板未定義型別。

  • user_defined_macros – 在 jinja 模板中公開的宏字典。例如,將 dict(foo='bar') 作為引數傳遞,可以使您在此 DAG 相關的所有 jinja 模板中使用 {{ foo }}。請注意,您可以在此處傳遞任何型別的物件。

  • user_defined_filters – 在 jinja 模板中公開的過濾器字典。例如,將 dict(hello=lambda name: 'Hello %s' % name) 作為引數傳遞,可以使您在此 DAG 相關的所有 jinja 模板中使用 {{ 'world' | hello }}

  • default_args – 用於在初始化運算子時作為建構函式關鍵字引數的預設引數字典。請注意,運算子具有相同的 hook,並且優先於此處定義的引數,這意味著如果您的字典中包含 ‘depends_on_past’: True,而運算子呼叫的 default_args 中包含 ‘depends_on_past’: False,則實際值將為 False

  • params – DAG 級別的引數字典,可在模板中訪問,位於 params 名稱空間下。這些引數可以在任務級別被覆蓋。

  • max_active_tasks – 允許同時執行的任務例項數量

  • max_active_runs – 最大活躍 DAG 執行數,超過此數量處於執行狀態的 DAG 執行後,排程器將不會建立新的活躍 DAG 執行

  • max_consecutive_failed_dag_runs – (實驗性)最大連續失敗 DAG 執行數,超過此數量後,排程器將停用該 DAG

  • dagrun_timeout – 指定 DagRun 在超時或失敗前允許執行的持續時間。DagRun 超時時正在執行的任務例項將被標記為跳過。

  • sla_miss_callback – 已棄用 - SLA 功能在 Airflow 3.0 中移除,將在 3.1 中由新實現取代

  • catchup – 執行排程器追趕(或僅執行最新的)?預設為 False

  • on_failure_callback – 當此 DAG 的 DagRun 失敗時要呼叫的函式或函式列表。一個上下文字典作為單個引數傳遞給此函式。

  • on_success_callback – 與 on_failure_callback 非常相似,只是它在 DAG 成功時執行。

  • access_control – 指定可選的 DAG 級別操作,例如“{‘role1’: {‘can_read’}, ‘role2’: {‘can_read’, ‘can_edit’, ‘can_delete’}}”,或者如果存在 DAG 執行資源,則可以指定資源名稱,例如“{‘role1’: {‘DAG Runs’: {‘can_create’}}, ‘role2’: {‘DAGs’: {‘can_read’, ‘can_edit’, ‘can_delete’}}”

  • is_paused_upon_creation – 指定 DAG 在首次建立時是否暫停。如果 DAG 已存在,此標誌將被忽略。如果未指定此可選引數,將使用全域性配置設定。

  • jinja_environment_kwargs

    要傳遞給 Jinja Environment 以進行模板渲染的額外配置選項

    示例: 為了避免 Jinja 從模板字串中移除末尾的換行符

    DAG(
        dag_id="my-dag",
        jinja_environment_kwargs={
            "keep_trailing_newline": True,
            # some other jinja2 Environment options here
        },
    )
    

    參見: Jinja Environment 文件

  • render_template_as_native_obj – 如果為 True,使用 Jinja NativeEnvironment 將模板渲染為本地 Python 型別。如果為 False,使用 Jinja Environment 將模板渲染為字串值。

  • tags – 標籤列表,用於幫助在 UI 中過濾 DAG。

  • owner_links – 所有者及其連結的字典,可在 DAG 檢視 UI 上點選。可用作 HTTP 連結(例如指向您的 Slack 頻道),或 mailto 連結。例如: {“dag_owner”: “https://airflow.apache.tw/”}

  • auto_register – 當在 with 塊中使用此 DAG 時自動註冊它

  • fail_fast – 當 DAG 中的任務失敗時,使當前正在執行的任務失敗。警告: 快速失敗的 DAG 只能包含具有預設觸發規則(“all_success”)的任務。如果快速失敗的 DAG 中的任何任務具有非預設觸發規則,則會引發異常。

  • dag_display_name – 顯示在 UI 上的 DAG 顯示名稱。

partial: bool = False[source]
last_loaded: datetime.datetime | None[source]
max_consecutive_failed_dag_runs: int[source]
property safe_dag_id[source]
validate()[source]

驗證 DAG 的設定是否一致。

此方法在 DAG bag 組織 DAG 之前呼叫。

validate_executor_field()[source]
next_dagrun_info(last_automated_dagrun, *, restricted=True)[source]

獲取此 DAG 在 date_last_automated_dagrun 之後的下一次 DagRun 的資訊。

此方法根據 DAG 的 timetable、start_date、end_date 等計算下一次 DagRun 應操作的時間間隔(其邏輯日期)以及何時可以排程。此方法不檢查最大活躍執行數或任何其他“max_active_tasks”型別的限制,僅根據此 DAG 及其任務的各種日期和間隔欄位執行計算。

引數:
  • last_automated_dagrun (None | airflow.timetables.base.DataInterval) – 此 DAG 現有“自動化” DagRun(排程或回填,非手動)的 max(logical_date)

  • restricted (bool) – 如果設定為 False(預設為 True),忽略 DAG 或任務上指定的 start_dateend_datecatchup

返回:

下一次 dagrun 的 DagRunInfo,如果不會排程 dagrun 則返回 None。

返回型別:

airflow.timetables.base.DagRunInfo | None

iter_dagrun_infos_between(earliest, latest, *, align=True)[source]

使用此 DAG 的 timetable 在給定間隔內生成 DagRunInfo。

如果 DagRunInfo 例項的 logical_date 不早於 earliest,也不晚於 latest,則生成這些例項。例項按其 logical_date 從最早到最新排序。

如果 alignFalse,則第一次執行將立即在 earliest 時發生,即使它不符合邏輯 timetable 排程。預設為 True

示例: 一個 DAG 被排程為每午夜執行一次 (0 0 * * *)。如果 earliest2021-06-03 23:00:00,則如果 align=False,第一次 DagRunInfo 將是 2021-06-03 23:00:00,如果 align=True,則為 2021-06-04 00:00:00

get_last_dagrun(session=NEW_SESSION, include_manually_triggered=False)[source]
has_dag_runs(session=NEW_SESSION, include_manually_triggered=True)[source]
property dag_id: str[source]
property timetable_summary: str[source]
get_concurrency_reached(session=NEW_SESSION)[source]

返回一個布林值,指示此 DAG 是否已達到 max_active_tasks 限制。

get_is_active(session=NEW_SESSION)[source]

返回一個布林值,指示此 DAG 是否活躍。

get_is_stale(session=NEW_SESSION)[source]

返回一個布林值,指示此 DAG 是否陳舊。

get_is_paused(session=NEW_SESSION)[source]

返回一個布林值,指示此 DAG 是否已暫停。

get_bundle_name(session=NEW_SESSION)[source]

返回此 DAG 所在的 bundle 名稱。

get_bundle_version(session=NEW_SESSION)[source]

返回處理此 DAG 時看到的 bundle 版本。

classmethod get_serialized_fields()[source]

字串化的 DAGs 和運算子恰好包含這些欄位。

static fetch_callback(dag, run_id, success=True, reason=None, *, session=NEW_SESSION)[source]

根據 success 的值獲取相應的回撥。

此方法獲取屬於此 DAG 執行的單個任務例項的上下文,並連同回撥列表一起返回。

引數:
  • dag (DAG) – DAG 物件

  • run_id (str) – DAG 執行 ID

  • success (bool) – 指示是呼叫失敗回撥還是成功回撥的標誌

  • reason (str | None) – 完成原因

  • session (sqlalchemy.orm.session.Session) – 資料庫會話

handle_callback(dagrun, success=True, reason=None, session=NEW_SESSION)[source]

觸發相應的 on_failure_callback 或 on_success_callback。

此方法獲取屬於此 DAG 執行的單個任務例項的上下文,並將其連同“reason”(主要用於區分 DAG 執行失敗)一起傳遞給可呼叫物件。

引數:
  • dagrun (airflow.models.dagrun.DagRun) – DagRun 物件

  • success – 指示是呼叫失敗回撥還是成功回撥的標誌

  • reason – 完成原因

  • session – 資料庫會話

classmethod execute_callback(callbacks, context, dag_id)[source]

使用給定的上下文觸發回撥。

引數:
  • callbacks (list[Callable] | None) – 要呼叫的回撥列表

  • context (airflow.utils.context.Context | None) – 要傳遞給所有回撥的上下文

  • dag_id (str) – 要查詢的 DAG 的 dag_id。

get_active_runs()[source]

返回當前正在執行的 DAG 執行的邏輯日期列表。

返回:

邏輯日期列表

static fetch_dagrun(dag_id, run_id, session=NEW_SESSION)[source]

返回給定 run_id 的 DAG 執行(如果存在),否則返回 None。

引數:
返回:

如果找到,則為 DagRun;否則為 None。

返回型別:

airflow.models.dagrun.DagRun

get_dagrun(run_id, session=NEW_SESSION)[source]
get_dagruns_between(start_date, end_date, session=NEW_SESSION)[source]

返回 start_date(包含)和 end_date(包含)之間的 DAG 執行列表。

引數:
  • start_date – 要查詢的 DagRun 的起始邏輯日期。

  • end_date – 要查詢的 DagRun 的結束邏輯日期。

  • session

返回:

找到的 DagRun 列表。

get_latest_logical_date(session=NEW_SESSION)[source]

返回存在至少一個 DAG 執行的最新日期。

get_task_instances_before(base_date, num, *, session=NEW_SESSION)[source]

獲取 base_date 之前(含)的 num 個任務例項。

返回的列表可能包含與任何 DagRunType 對應的恰好 num 個任務例項。如果在 base_date 之前計劃的 DAG 執行少於 num 個,則返回的數量可能會少於 num

get_task_instances(start_date=None, end_date=None, state=None, session=NEW_SESSION)[source]
set_task_instance_state(*, task_id, map_indexes=None, run_id=None, state, upstream=False, downstream=False, future=False, past=False, commit=True, session=NEW_SESSION)[source]

設定任務例項的狀態,並清除處於失敗或上游失敗狀態的下游任務。

引數:
  • task_id (str) – 任務例項的任務 ID

  • map_indexes (collections.abc.Collection[int] | None) – 僅在 map_index 匹配時設定任務例項。如果為 None(預設),則設定該任務的所有對映任務例項。

  • run_id (str | None) – 任務例項的 run_id

  • state (airflow.utils.state.TaskInstanceState) – 要將任務例項設定為的狀態

  • upstream (bool) – 包含給定 task_id 的所有上游任務

  • downstream (bool) – 包含給定 task_id 的所有下游任務

  • future (bool) – 包含給定 task_id 的所有未來的任務例項

  • commit (bool) – 提交更改

  • past (bool) – 包含給定 task_id 的所有過去的任務例項

set_task_group_state(*, group_id, run_id=None, state, upstream=False, downstream=False, future=False, past=False, commit=True, session=NEW_SESSION)[source]

將任務組設定為給定狀態,並清除處於失敗或上游失敗狀態的下游任務。

引數:
  • group_id (str) – 任務組的 group_id

  • run_id (str | None) – 任務例項的 run_id

  • state (airflow.utils.state.TaskInstanceState) – 要將任務例項設定為的狀態

  • upstream (bool) – 包含給定 task_id 的所有上游任務

  • downstream (bool) – 包含給定 task_id 的所有下游任務

  • future (bool) – 包含給定 task_id 的所有未來的任務例項

  • commit (bool) – 提交更改

  • past (bool) – 包含給定 task_id 的所有過去的任務例項

  • session (sqlalchemy.orm.session.Session) – 新會話

clear(*, dry_run: airflow.typing_compat.Literal[True], task_ids: collections.abc.Collection[str | tuple[str, int]] | None = None, run_id: str, only_failed: bool = False, only_running: bool = False, confirm_prompt: bool = False, dag_run_state: airflow.utils.state.DagRunState = DagRunState.QUEUED, session: sqlalchemy.orm.session.Session = NEW_SESSION, dag_bag: airflow.models.dagbag.DagBag | None = None, exclude_task_ids: frozenset[str] | frozenset[tuple[str, int]] | None = frozenset(), exclude_run_ids: frozenset[str] | None = frozenset()) list[airflow.models.taskinstance.TaskInstance][source]
clear(*, task_ids: collections.abc.Collection[str | tuple[str, int]] | None = None, run_id: str, only_failed: bool = False, only_running: bool = False, confirm_prompt: bool = False, dag_run_state: airflow.utils.state.DagRunState = DagRunState.QUEUED, dry_run: airflow.typing_compat.Literal[False] = False, session: sqlalchemy.orm.session.Session = NEW_SESSION, dag_bag: airflow.models.dagbag.DagBag | None = None, exclude_task_ids: frozenset[str] | frozenset[tuple[str, int]] | None = frozenset(), exclude_run_ids: frozenset[str] | None = frozenset()) int
clear(*, dry_run: airflow.typing_compat.Literal[True], task_ids: collections.abc.Collection[str | tuple[str, int]] | None = None, start_date: datetime.datetime | None = None, end_date: datetime.datetime | None = None, only_failed: bool = False, only_running: bool = False, confirm_prompt: bool = False, dag_run_state: airflow.utils.state.DagRunState = DagRunState.QUEUED, session: sqlalchemy.orm.session.Session = NEW_SESSION, dag_bag: airflow.models.dagbag.DagBag | None = None, exclude_task_ids: frozenset[str] | frozenset[tuple[str, int]] | None = frozenset(), exclude_run_ids: frozenset[str] | None = frozenset()) list[airflow.models.taskinstance.TaskInstance]
clear(*, task_ids: collections.abc.Collection[str | tuple[str, int]] | None = None, start_date: datetime.datetime | None = None, end_date: datetime.datetime | None = None, only_failed: bool = False, only_running: bool = False, confirm_prompt: bool = False, dag_run_state: airflow.utils.state.DagRunState = DagRunState.QUEUED, dry_run: airflow.typing_compat.Literal[False] = False, session: sqlalchemy.orm.session.Session = NEW_SESSION, dag_bag: airflow.models.dagbag.DagBag | None = None, exclude_task_ids: frozenset[str] | frozenset[tuple[str, int]] | None = frozenset(), exclude_run_ids: frozenset[str] | None = frozenset()) int

在指定日期範圍內清除與當前 DAG 關聯的一組任務例項。

引數:
  • task_ids – 要清除的任務 ID 列表或 (task_id, map_index) 元組

  • run_id – 要清除任務的執行 ID

  • start_date – 要清除的最小邏輯日期

  • end_date – 要清除的最大邏輯日期

  • only_failed – 僅清除失敗的任務

  • only_running – 僅清除正在執行的任務。

  • confirm_prompt – 請求確認

  • dag_run_state – 要將 DagRun 設定成的狀態。如果設定為 False,則不會更改 dagrun 狀態。

  • dry_run – 查詢要清除的任務,但不實際清除它們。

  • session – 要使用的 SQLAlchemy 會話

  • dag_bag – 用於查詢 DAG 的 DagBag(可選)

  • exclude_task_ids – 一組不應清除的 task_id 或 (task_id, map_index) 元組

  • exclude_run_ids – 一組 run_id 或 (run_id)

classmethod clear_dags(dags, start_date=None, end_date=None, only_failed=False, only_running=False, confirm_prompt=False, dag_run_state=DagRunState.QUEUED, dry_run=False)[source]
cli()[source]

提供一個針對此 DAG 的 CLI。

test(run_after=None, logical_date=None, run_conf=None, conn_file_path=None, variable_file_path=None, use_executor=False, mark_success_pattern=None, session=NEW_SESSION)[source]

為給定的 DAG 和邏輯日期執行單個 DagRun。

引數:
  • run_after (datetime.datetime | None) – 在此日期時間之前 DAG 不能執行。

  • logical_date (datetime.datetime | None) – 此 DAG 執行的邏輯日期

  • run_conf (dict[str, Any] | None) – 傳遞給新建立的 dagrun 的配置

  • conn_file_path (str | None) – YAML 或 JSON 格式的連線檔案的檔案路徑

  • variable_file_path (str | None) – YAML 或 JSON 格式的變數檔案的檔案路徑

  • use_executor (bool) – 如果設定,則使用執行器測試此 DAG

  • mark_success_pattern (re.Pattern | str | None) – 要標記為成功(而非執行中)的 task_id 的正則表示式

  • session (sqlalchemy.orm.session.Session) – 資料庫連線(可選)

classmethod bulk_write_to_db(bundle_name, bundle_version, dags, session=NEW_SESSION)[source]

確保資料庫中 dag 表中給定 DAG 的 DagModel 行是最新的。

引數:

dags (collections.abc.Collection[airflow.serialization.serialized_objects.MaybeSerializedDAG]) – 要儲存到資料庫的 DAG 物件

返回:

sync_to_db(session=NEW_SESSION)[source]

將此 DAG 的屬性儲存到資料庫。

返回:

static deactivate_unknown_dags(active_dag_ids, session=NEW_SESSION)[source]

給定一個已知 DAG 列表,停用 ORM 中被標記為活動的任何其他 DAG。

引數:

active_dag_ids – 處於活動狀態的 DAG ID 列表

返回:

static deactivate_stale_dags(expiration_date, session=NEW_SESSION)[source]

停用在過期日期之前最後一次被排程器觸碰的任何 DAG。

這些 DAG 可能已被刪除。

引數:

expiration_date – 設定在此時間之前被觸碰的非活動狀態的 DAG

返回:

static get_num_task_instances(dag_id, run_id=None, task_ids=None, states=None, session=NEW_SESSION)[source]

返回給定 DAG 中任務例項的數量。

引數:
  • session – ORM 會話

  • dag_id – 要獲取任務例項數量的 DAG 的 ID

  • run_id – 要獲取任務例項數量的 DAG 執行的 ID

  • task_ids – 給定 DAG 的有效任務 ID 列表

  • states – 如果提供,則用於過濾的狀態列表

返回:

正在執行的任務數量

返回型別:

int

get_task_assets(inlets=True, outlets=True, of_type=Asset)[source]
classmethod from_sdk_dag(dag)[source]

從 TaskSDKDag 建立一個新的(排程器)DAG 物件。

class airflow.models.dag.DagTag[source]

基類:airflow.models.base.Base

每個 DAG 的標籤名稱,以便在 DAG 檢視中快速過濾。

__tablename__ = 'dag_tag'[source]
name[source]
dag_id[source]
__table_args__[source]
__repr__()[source]
class airflow.models.dag.DagOwnerAttributes[source]

基類:airflow.models.base.Base

定義不同所有者屬性的表。

例如,一個所有者的連結,將作為超連結傳遞到“DAGs”檢視。

__tablename__ = 'dag_owner_attributes'[source]
dag_id[source]
owner[source]
__repr__()[source]
classmethod get_all(session)[source]
class airflow.models.dag.DagModel(**kwargs)[source]

基類:airflow.models.base.Base

包含 DAG 屬性的表。

__tablename__ = 'dag'[source]

這些項儲存在資料庫中,用於與狀態相關的資訊

dag_id[source]
is_paused_at_creation = True[source]
is_paused[source]
is_stale[source]
last_parsed_time[source]
last_expired[source]
fileloc[source]
relative_fileloc[source]
bundle_name[source]
bundle_version[source]
owners[source]
description[source]
timetable_summary[source]
timetable_description[source]
asset_expression[source]
tags[source]
max_active_tasks[source]
max_active_runs[source]
max_consecutive_failed_dag_runs[source]
has_task_concurrency_limits[source]
has_import_errors[source]
next_dagrun[source]
next_dagrun_data_interval_start[source]
next_dagrun_data_interval_end[source]
next_dagrun_create_after[source]
__table_args__[source]
schedule_asset_references[source]
schedule_asset_alias_references[source]
schedule_asset_name_references[source]
schedule_asset_uri_references[source]
schedule_assets[source]
task_outlet_asset_references[source]
NUM_DAGS_PER_DAGRUN_QUERY[source]
dag_versions[source]
__repr__()[source]
property next_dagrun_data_interval: airflow.timetables.base.DataInterval | None[source]
property timezone[source]
static get_dagmodel(dag_id, session=NEW_SESSION)[source]
classmethod get_current(dag_id, session=NEW_SESSION)[source]
get_last_dagrun(session=NEW_SESSION, include_manually_triggered=False)[source]
get_is_paused(*, session=None)[source]

提供與‘DAG’的介面相容性。

get_is_active(*, session=None)[source]

提供與‘DAG’的介面相容性。

static get_paused_dag_ids(dag_ids, session=NEW_SESSION)[source]

給定一個 dag_id 列表,獲取一組暫停的 Dag Id。

引數:
返回:

暫停的 Dag Id

返回型別:

set[str]

property safe_dag_id[source]
set_is_paused(is_paused, session=NEW_SESSION)[source]

暫停/取消暫停一個 DAG。

引數:
  • is_paused (bool) – DAG 是否暫停

  • session – 會話

dag_display_name()[source]
classmethod deactivate_deleted_dags(bundle_name, rel_filelocs, session=NEW_SESSION)[source]

對於已移除 DAG 檔案的 DAG,將其 is_active=False 設定為 False。

引數:
classmethod dags_needing_dagruns(session)[source]

返回(並鎖定)一個 Dag 物件列表,這些物件已到期建立新的 DagRun。

這將返回一個行級別鎖定的行結果集,使用“SELECT … FOR UPDATE”查詢。您應確保在單個事務中做出所有排程決策——一旦事務提交,鎖定將被釋放。

calculate_dagrun_date_fields(dag, last_automated_dag_run)[source]

計算 next_dagrunnext_dagrun_create_after`

引數:
  • dag (DAG) – DAG 物件

  • last_automated_dag_run (None | airflow.timetables.base.DataInterval) – 此 DAG 最近一次執行的資料間隔(或 datetime),如果尚未排程,則為 None。

get_asset_triggered_next_run_info(*, session=NEW_SESSION)[source]

本條目是否有幫助?