airflow.models.dag¶
屬性¶
異常¶
當模型錯誤地填充資料間隔欄位時引發的異常。 |
類¶
DAG(有向無環圖)是具有方向依賴關係的任務集合。 |
|
每個 DAG 的標籤名稱,以便在 DAG 檢視中快速過濾。 |
|
定義不同所有者屬性的表。 |
|
包含 DAG 屬性的表。 |
函式¶
|
返回某個 DAG 的最後一次執行,如果沒有則返回 None。 |
|
獲取給定 dag_ids 列表的下一次執行資訊。 |
模組內容¶
- exception airflow.models.dag.InconsistentDataInterval(instance, start_field_name, end_field_name)[source]¶
基類:
airflow.exceptions.AirflowException當模型錯誤地填充資料間隔欄位時引發的異常。
資料間隔欄位應要麼都為 None(對於 AIP-39 之前排程的執行),要麼都為 datetime(對於 AIP-39 實現後排程的執行)。如果其中恰好一個欄位為 None,則會引發此異常。
- airflow.models.dag.get_last_dagrun(dag_id, session, include_manually_triggered=False)[source]¶
返回某個 DAG 的最後一次執行,如果沒有則返回 None。
最後一次 DAG 執行可以是任何型別的執行,例如排程或回填。忽略被覆蓋的 DagRun。
- airflow.models.dag.get_asset_triggered_next_run_info(dag_ids, *, session)[source]¶
獲取給定 dag_ids 列表的下一次執行資訊。
給定一個 dag_ids 列表,獲取一個字串表示任何資產觸發的 DAG 距離下一次執行有多近,例如“2 個資產中 1 個已更新”。
- class airflow.models.dag.DAG(context=None)[source]¶
基類:
airflow.sdk.definitions.dag.DAG,airflow.utils.log.logging_mixin.LoggingMixinDAG(有向無環圖)是具有方向依賴關係的任務集合。
一個 DAG 也有一個排程、一個開始日期和一個結束日期(可選)。對於每個排程(例如每日或每小時),DAG 需要在其依賴項滿足時執行每個單獨的任務。某些任務具有依賴於其自身過去的屬性,這意味著它們在完成其先前的排程(以及上游任務)之前無法執行。
DAGs 本質上充當任務的名稱空間。一個 task_id 只能新增到同一個 DAG 一次。
請注意,如果您計劃使用時區,所有提供的日期都應該是 pendulum 日期。請參閱 時區感知 DAGs。
版本 2.4 新增: 用於指定基於時間的排程邏輯 (timetable) 或資產驅動觸發器的 schedule 引數。
版本 3.0 變更: schedule 的預設值已更改為 None(無排程)。先前的預設值為
timedelta(days=1)。- 引數:
dag_id – DAG 的 ID;必須僅包含字母數字字元、破折號、點和下劃線(所有 ASCII 字元)
description – DAG 的描述,例如用於在 webserver 上顯示
schedule – 如果提供,這將定義 DAG 執行的排程規則。可能的值包括 cron 表示式字串、timedelta 物件、Timetable 或 Asset 物件列表。另請參閱 使用 Timetables 定製 DAG 排程。
start_date – 排程器將嘗試回填的時間戳。如果未提供,則必須手動使用明確的時間範圍進行回填。
end_date – DAG 不會執行的日期,留空(None)表示開放式排程。
template_searchpath – 此資料夾列表(非相對路徑)定義了 jinja 查詢模板的位置。順序很重要。請注意,jinja/airflow 預設包含您的 DAG 檔案路徑
template_undefined – 模板未定義型別。
user_defined_macros – 在 jinja 模板中公開的宏字典。例如,將
dict(foo='bar')作為引數傳遞,可以使您在此 DAG 相關的所有 jinja 模板中使用{{ foo }}。請注意,您可以在此處傳遞任何型別的物件。user_defined_filters – 在 jinja 模板中公開的過濾器字典。例如,將
dict(hello=lambda name: 'Hello %s' % name)作為引數傳遞,可以使您在此 DAG 相關的所有 jinja 模板中使用{{ 'world' | hello }}。default_args – 用於在初始化運算子時作為建構函式關鍵字引數的預設引數字典。請注意,運算子具有相同的 hook,並且優先於此處定義的引數,這意味著如果您的字典中包含 ‘depends_on_past’: True,而運算子呼叫的 default_args 中包含 ‘depends_on_past’: False,則實際值將為 False。
params – DAG 級別的引數字典,可在模板中訪問,位於 params 名稱空間下。這些引數可以在任務級別被覆蓋。
max_active_tasks – 允許同時執行的任務例項數量
max_active_runs – 最大活躍 DAG 執行數,超過此數量處於執行狀態的 DAG 執行後,排程器將不會建立新的活躍 DAG 執行
max_consecutive_failed_dag_runs – (實驗性)最大連續失敗 DAG 執行數,超過此數量後,排程器將停用該 DAG
dagrun_timeout – 指定 DagRun 在超時或失敗前允許執行的持續時間。DagRun 超時時正在執行的任務例項將被標記為跳過。
sla_miss_callback – 已棄用 - SLA 功能在 Airflow 3.0 中移除,將在 3.1 中由新實現取代
catchup – 執行排程器追趕(或僅執行最新的)?預設為 False
on_failure_callback – 當此 DAG 的 DagRun 失敗時要呼叫的函式或函式列表。一個上下文字典作為單個引數傳遞給此函式。
on_success_callback – 與
on_failure_callback非常相似,只是它在 DAG 成功時執行。access_control – 指定可選的 DAG 級別操作,例如“{‘role1’: {‘can_read’}, ‘role2’: {‘can_read’, ‘can_edit’, ‘can_delete’}}”,或者如果存在 DAG 執行資源,則可以指定資源名稱,例如“{‘role1’: {‘DAG Runs’: {‘can_create’}}, ‘role2’: {‘DAGs’: {‘can_read’, ‘can_edit’, ‘can_delete’}}”
is_paused_upon_creation – 指定 DAG 在首次建立時是否暫停。如果 DAG 已存在,此標誌將被忽略。如果未指定此可選引數,將使用全域性配置設定。
jinja_environment_kwargs –
要傳遞給 Jinja
Environment以進行模板渲染的額外配置選項示例: 為了避免 Jinja 從模板字串中移除末尾的換行符
DAG( dag_id="my-dag", jinja_environment_kwargs={ "keep_trailing_newline": True, # some other jinja2 Environment options here }, )
render_template_as_native_obj – 如果為 True,使用 Jinja
NativeEnvironment將模板渲染為本地 Python 型別。如果為 False,使用 JinjaEnvironment將模板渲染為字串值。tags – 標籤列表,用於幫助在 UI 中過濾 DAG。
owner_links – 所有者及其連結的字典,可在 DAG 檢視 UI 上點選。可用作 HTTP 連結(例如指向您的 Slack 頻道),或 mailto 連結。例如: {“dag_owner”: “https://airflow.apache.tw/”}
auto_register – 當在
with塊中使用此 DAG 時自動註冊它fail_fast – 當 DAG 中的任務失敗時,使當前正在執行的任務失敗。警告: 快速失敗的 DAG 只能包含具有預設觸發規則(“all_success”)的任務。如果快速失敗的 DAG 中的任何任務具有非預設觸發規則,則會引發異常。
dag_display_name – 顯示在 UI 上的 DAG 顯示名稱。
- last_loaded: datetime.datetime | None[source]¶
- next_dagrun_info(last_automated_dagrun, *, restricted=True)[source]¶
獲取此 DAG 在
date_last_automated_dagrun之後的下一次 DagRun 的資訊。此方法根據 DAG 的 timetable、start_date、end_date 等計算下一次 DagRun 應操作的時間間隔(其邏輯日期)以及何時可以排程。此方法不檢查最大活躍執行數或任何其他“max_active_tasks”型別的限制,僅根據此 DAG 及其任務的各種日期和間隔欄位執行計算。
- 引數:
last_automated_dagrun (None | airflow.timetables.base.DataInterval) – 此 DAG 現有“自動化” DagRun(排程或回填,非手動)的
max(logical_date)。restricted (bool) – 如果設定為 False(預設為 True),忽略 DAG 或任務上指定的
start_date、end_date和catchup。
- 返回:
下一次 dagrun 的 DagRunInfo,如果不會排程 dagrun 則返回 None。
- 返回型別:
- iter_dagrun_infos_between(earliest, latest, *, align=True)[source]¶
使用此 DAG 的 timetable 在給定間隔內生成 DagRunInfo。
如果 DagRunInfo 例項的
logical_date不早於earliest,也不晚於latest,則生成這些例項。例項按其logical_date從最早到最新排序。如果
align為False,則第一次執行將立即在earliest時發生,即使它不符合邏輯 timetable 排程。預設為True。示例: 一個 DAG 被排程為每午夜執行一次 (
0 0 * * *)。如果earliest為2021-06-03 23:00:00,則如果align=False,第一次 DagRunInfo 將是2021-06-03 23:00:00,如果align=True,則為2021-06-04 00:00:00。
- static fetch_callback(dag, run_id, success=True, reason=None, *, session=NEW_SESSION)[source]¶
根據 success 的值獲取相應的回撥。
此方法獲取屬於此 DAG 執行的單個任務例項的上下文,並連同回撥列表一起返回。
- 引數:
dag (DAG) – DAG 物件
run_id (str) – DAG 執行 ID
success (bool) – 指示是呼叫失敗回撥還是成功回撥的標誌
reason (str | None) – 完成原因
session (sqlalchemy.orm.session.Session) – 資料庫會話
- handle_callback(dagrun, success=True, reason=None, session=NEW_SESSION)[source]¶
觸發相應的 on_failure_callback 或 on_success_callback。
此方法獲取屬於此 DAG 執行的單個任務例項的上下文,並將其連同“reason”(主要用於區分 DAG 執行失敗)一起傳遞給可呼叫物件。
- 引數:
dagrun (airflow.models.dagrun.DagRun) – DagRun 物件
success – 指示是呼叫失敗回撥還是成功回撥的標誌
reason – 完成原因
session – 資料庫會話
- static fetch_dagrun(dag_id, run_id, session=NEW_SESSION)[source]¶
返回給定 run_id 的 DAG 執行(如果存在),否則返回 None。
- 引數:
dag_id (str) – 要查詢的 DAG 的 dag_id。
run_id (str) – 要查詢的 DagRun 的 run_id。
session (sqlalchemy.orm.session.Session)
- 返回:
如果找到,則為 DagRun;否則為 None。
- 返回型別:
- get_dagruns_between(start_date, end_date, session=NEW_SESSION)[source]¶
返回 start_date(包含)和 end_date(包含)之間的 DAG 執行列表。
- 引數:
start_date – 要查詢的 DagRun 的起始邏輯日期。
end_date – 要查詢的 DagRun 的結束邏輯日期。
session
- 返回:
找到的 DagRun 列表。
- get_task_instances_before(base_date, num, *, session=NEW_SESSION)[source]¶
獲取
base_date之前(含)的num個任務例項。返回的列表可能包含與任何 DagRunType 對應的恰好
num個任務例項。如果在base_date之前計劃的 DAG 執行少於num個,則返回的數量可能會少於num。
- set_task_instance_state(*, task_id, map_indexes=None, run_id=None, state, upstream=False, downstream=False, future=False, past=False, commit=True, session=NEW_SESSION)[source]¶
設定任務例項的狀態,並清除處於失敗或上游失敗狀態的下游任務。
- 引數:
task_id (str) – 任務例項的任務 ID
map_indexes (collections.abc.Collection[int] | None) – 僅在 map_index 匹配時設定任務例項。如果為 None(預設),則設定該任務的所有對映任務例項。
run_id (str | None) – 任務例項的 run_id
state (airflow.utils.state.TaskInstanceState) – 要將任務例項設定為的狀態
upstream (bool) – 包含給定 task_id 的所有上游任務
downstream (bool) – 包含給定 task_id 的所有下游任務
future (bool) – 包含給定 task_id 的所有未來的任務例項
commit (bool) – 提交更改
past (bool) – 包含給定 task_id 的所有過去的任務例項
- set_task_group_state(*, group_id, run_id=None, state, upstream=False, downstream=False, future=False, past=False, commit=True, session=NEW_SESSION)[source]¶
將任務組設定為給定狀態,並清除處於失敗或上游失敗狀態的下游任務。
- 引數:
group_id (str) – 任務組的 group_id
run_id (str | None) – 任務例項的 run_id
state (airflow.utils.state.TaskInstanceState) – 要將任務例項設定為的狀態
upstream (bool) – 包含給定 task_id 的所有上游任務
downstream (bool) – 包含給定 task_id 的所有下游任務
future (bool) – 包含給定 task_id 的所有未來的任務例項
commit (bool) – 提交更改
past (bool) – 包含給定 task_id 的所有過去的任務例項
session (sqlalchemy.orm.session.Session) – 新會話
- clear(*, dry_run: airflow.typing_compat.Literal[True], task_ids: collections.abc.Collection[str | tuple[str, int]] | None = None, run_id: str, only_failed: bool = False, only_running: bool = False, confirm_prompt: bool = False, dag_run_state: airflow.utils.state.DagRunState = DagRunState.QUEUED, session: sqlalchemy.orm.session.Session = NEW_SESSION, dag_bag: airflow.models.dagbag.DagBag | None = None, exclude_task_ids: frozenset[str] | frozenset[tuple[str, int]] | None = frozenset(), exclude_run_ids: frozenset[str] | None = frozenset()) list[airflow.models.taskinstance.TaskInstance][source]¶
- clear(*, task_ids: collections.abc.Collection[str | tuple[str, int]] | None = None, run_id: str, only_failed: bool = False, only_running: bool = False, confirm_prompt: bool = False, dag_run_state: airflow.utils.state.DagRunState = DagRunState.QUEUED, dry_run: airflow.typing_compat.Literal[False] = False, session: sqlalchemy.orm.session.Session = NEW_SESSION, dag_bag: airflow.models.dagbag.DagBag | None = None, exclude_task_ids: frozenset[str] | frozenset[tuple[str, int]] | None = frozenset(), exclude_run_ids: frozenset[str] | None = frozenset()) int
- clear(*, dry_run: airflow.typing_compat.Literal[True], task_ids: collections.abc.Collection[str | tuple[str, int]] | None = None, start_date: datetime.datetime | None = None, end_date: datetime.datetime | None = None, only_failed: bool = False, only_running: bool = False, confirm_prompt: bool = False, dag_run_state: airflow.utils.state.DagRunState = DagRunState.QUEUED, session: sqlalchemy.orm.session.Session = NEW_SESSION, dag_bag: airflow.models.dagbag.DagBag | None = None, exclude_task_ids: frozenset[str] | frozenset[tuple[str, int]] | None = frozenset(), exclude_run_ids: frozenset[str] | None = frozenset()) list[airflow.models.taskinstance.TaskInstance]
- clear(*, task_ids: collections.abc.Collection[str | tuple[str, int]] | None = None, start_date: datetime.datetime | None = None, end_date: datetime.datetime | None = None, only_failed: bool = False, only_running: bool = False, confirm_prompt: bool = False, dag_run_state: airflow.utils.state.DagRunState = DagRunState.QUEUED, dry_run: airflow.typing_compat.Literal[False] = False, session: sqlalchemy.orm.session.Session = NEW_SESSION, dag_bag: airflow.models.dagbag.DagBag | None = None, exclude_task_ids: frozenset[str] | frozenset[tuple[str, int]] | None = frozenset(), exclude_run_ids: frozenset[str] | None = frozenset()) int
在指定日期範圍內清除與當前 DAG 關聯的一組任務例項。
- 引數:
task_ids – 要清除的任務 ID 列表或 (
task_id,map_index) 元組run_id – 要清除任務的執行 ID
start_date – 要清除的最小邏輯日期
end_date – 要清除的最大邏輯日期
only_failed – 僅清除失敗的任務
only_running – 僅清除正在執行的任務。
confirm_prompt – 請求確認
dag_run_state – 要將 DagRun 設定成的狀態。如果設定為 False,則不會更改 dagrun 狀態。
dry_run – 查詢要清除的任務,但不實際清除它們。
session – 要使用的 SQLAlchemy 會話
dag_bag – 用於查詢 DAG 的 DagBag(可選)
exclude_task_ids – 一組不應清除的
task_id或 (task_id,map_index) 元組exclude_run_ids – 一組
run_id或 (run_id)
- classmethod clear_dags(dags, start_date=None, end_date=None, only_failed=False, only_running=False, confirm_prompt=False, dag_run_state=DagRunState.QUEUED, dry_run=False)[source]¶
- test(run_after=None, logical_date=None, run_conf=None, conn_file_path=None, variable_file_path=None, use_executor=False, mark_success_pattern=None, session=NEW_SESSION)[source]¶
為給定的 DAG 和邏輯日期執行單個 DagRun。
- 引數:
run_after (datetime.datetime | None) – 在此日期時間之前 DAG 不能執行。
logical_date (datetime.datetime | None) – 此 DAG 執行的邏輯日期
conn_file_path (str | None) – YAML 或 JSON 格式的連線檔案的檔案路徑
variable_file_path (str | None) – YAML 或 JSON 格式的變數檔案的檔案路徑
use_executor (bool) – 如果設定,則使用執行器測試此 DAG
mark_success_pattern (re.Pattern | str | None) – 要標記為成功(而非執行中)的 task_id 的正則表示式
session (sqlalchemy.orm.session.Session) – 資料庫連線(可選)
- classmethod bulk_write_to_db(bundle_name, bundle_version, dags, session=NEW_SESSION)[source]¶
確保資料庫中 dag 表中給定 DAG 的 DagModel 行是最新的。
- 引數:
dags (collections.abc.Collection[airflow.serialization.serialized_objects.MaybeSerializedDAG]) – 要儲存到資料庫的 DAG 物件
- 返回:
無
- static deactivate_unknown_dags(active_dag_ids, session=NEW_SESSION)[source]¶
給定一個已知 DAG 列表,停用 ORM 中被標記為活動的任何其他 DAG。
- 引數:
active_dag_ids – 處於活動狀態的 DAG ID 列表
- 返回:
無
- static deactivate_stale_dags(expiration_date, session=NEW_SESSION)[source]¶
停用在過期日期之前最後一次被排程器觸碰的任何 DAG。
這些 DAG 可能已被刪除。
- 引數:
expiration_date – 設定在此時間之前被觸碰的非活動狀態的 DAG
- 返回:
無
- class airflow.models.dag.DagOwnerAttributes[source]¶
基類:
airflow.models.base.Base定義不同所有者屬性的表。
例如,一個所有者的連結,將作為超連結傳遞到“DAGs”檢視。
- class airflow.models.dag.DagModel(**kwargs)[source]¶
基類:
airflow.models.base.Base包含 DAG 屬性的表。
- property next_dagrun_data_interval: airflow.timetables.base.DataInterval | None[source]¶
- static get_paused_dag_ids(dag_ids, session=NEW_SESSION)[source]¶
給定一個 dag_id 列表,獲取一組暫停的 Dag Id。
- 引數:
session (sqlalchemy.orm.session.Session) – ORM 會話
- 返回:
暫停的 Dag Id
- 返回型別:
- set_is_paused(is_paused, session=NEW_SESSION)[source]¶
暫停/取消暫停一個 DAG。
- 引數:
is_paused (bool) – DAG 是否暫停
session – 會話
- classmethod deactivate_deleted_dags(bundle_name, rel_filelocs, session=NEW_SESSION)[source]¶
對於已移除 DAG 檔案的 DAG,將其
is_active=False設定為 False。- 引數:
bundle_name (str) – 檔案位置的捆綁包
session (sqlalchemy.orm.session.Session) – ORM 會話
- classmethod dags_needing_dagruns(session)[source]¶
返回(並鎖定)一個 Dag 物件列表,這些物件已到期建立新的 DagRun。
這將返回一個行級別鎖定的行結果集,使用“SELECT … FOR UPDATE”查詢。您應確保在單個事務中做出所有排程決策——一旦事務提交,鎖定將被釋放。
- calculate_dagrun_date_fields(dag, last_automated_dag_run)[source]¶
計算
next_dagrun和 next_dagrun_create_after`。- 引數:
dag (DAG) – DAG 物件
last_automated_dag_run (None | airflow.timetables.base.DataInterval) – 此 DAG 最近一次執行的資料間隔(或 datetime),如果尚未排程,則為 None。