airflow.models.dagrun¶
屬性¶
類¶
DagRun.task_instance_scheduling_decisions 的返回型別。 |
|
DAG 的呼叫例項。 |
|
用於儲存關於 DagRun 例項的任意備註。 |
模組內容¶
- airflow.models.dagrun.RUN_ID_REGEX = '^(?:manual|scheduled|asset_triggered)__(?:\\d{4}-\\d{2}-\\d{2}T\\d{2}:\\d{2}:\\d{2}\\+00:00)$'[source]¶
- class airflow.models.dagrun.TISchedulingDecision[source]¶
基類:
NamedTupleDagRun.task_instance_scheduling_decisions 的返回型別。
- schedulable_tis: list[airflow.models.taskinstance.TaskInstance][source]¶
- unfinished_tis: list[airflow.models.taskinstance.TaskInstance][source]¶
- finished_tis: list[airflow.models.taskinstance.TaskInstance][source]¶
- class airflow.models.dagrun.DagRun(dag_id=None, run_id=None, *, queued_at=NOTSET, logical_date=None, run_after=None, start_date=None, conf=None, state=None, run_type=None, creating_job_id=None, data_interval=None, triggered_by=None, backfill_id=None, bundle_version=None)[source]¶
基類:
airflow.models.base.Base,airflow.utils.log.logging_mixin.LoggingMixinDAG 的呼叫例項。
DagRun 可以由排程器建立(即排程執行),也可以由外部觸發器建立(即手動執行)。
- 屬性 dag_versions: list[airflow.models.dag_version.DagVersion][source]¶
返回與此 DagRun 的任務例項 (TI) 關聯的 DAG 版本。
- set_state(state)[source]¶
改變 DagRun 的狀態。
根據下表實現屬性更改(行代表舊狀態,列代表新狀態)
狀態轉換矩陣¶ QUEUED
RUNNING
SUCCESS
FAILED
None
queued_at = timezone.utcnow()
if empty: start_date = timezone.utcnow() end_date = None
end_date = timezone.utcnow()
end_date = timezone.utcnow()
QUEUED
queued_at = timezone.utcnow()
if empty: start_date = timezone.utcnow() end_date = None
end_date = timezone.utcnow()
end_date = timezone.utcnow()
RUNNING
queued_at = timezone.utcnow() start_date = None end_date = None
end_date = timezone.utcnow()
end_date = timezone.utcnow()
SUCCESS
queued_at = timezone.utcnow() start_date = None end_date = None
start_date = timezone.utcnow() end_date = None
FAILED
queued_at = timezone.utcnow() start_date = None end_date = None
start_date = timezone.utcnow() end_date = None
- refresh_from_db(session=NEW_SESSION)[source]¶
從資料庫重新載入當前的 dagrun。
- 引數:
session (sqlalchemy.orm.Session) – 資料庫會話
- 類方法 find(dag_id=None, run_id=None, logical_date=None, state=None, no_backfills=False, run_type=None, session=NEW_SESSION, logical_start_date=None, logical_end_date=None)[source]¶
返回符合給定搜尋條件的 DagRun 集合。
- 引數:
dag_id (str | list[str] | None) – 用於查詢 DAG 執行的 dag_id 或 dag_id 列表
run_id (collections.abc.Iterable[str] | None) – 定義此 DAG 執行的執行 ID
run_type (airflow.utils.types.DagRunType | None) – DAG 執行的型別
logical_date (datetime.datetime | collections.abc.Iterable[datetime.datetime] | None) – 邏輯日期
state (airflow.utils.state.DagRunState | None) – DAG 執行的狀態
no_backfills (bool) – 不返回回填(True),返回所有(False)。預設為 False
session (sqlalchemy.orm.Session) – 資料庫會話
logical_start_date (datetime.datetime | None) – 從此日期開始執行的 DAG 執行
logical_end_date (datetime.datetime | None) – 在此日期之前執行的 DAG 執行
- classmethod find_duplicate(dag_id, run_id, *, session=NEW_SESSION)[source]¶
返回指定 run_id 的 DAG 現有執行。
如果未找到此類 DAG 執行,則返回 None。
- 引數:
dag_id (str) – 用於查詢重複項的 dag_id
run_id (str) – 定義此 DAG 執行的執行 ID
session (sqlalchemy.orm.Session) – 資料庫會話
- static generate_run_id(*, run_type, logical_date=None, run_after)[source]¶
基於執行型別、run_after 和邏輯日期生成執行 ID。
- 引數:
run_type (airflow.utils.types.DagRunType) – DAG 執行型別
logical_date (datetime.datetime | None) – 邏輯日期
run_after (datetime.datetime) – DAG 執行不會在此日期之前啟動的日期。
- static fetch_task_instances(dag_id=None, run_id=None, task_ids=None, state=None, session=NEW_SESSION)[source]¶
返回此 DAG 執行的任務例項。
- get_task_instances(state=None, session=NEW_SESSION)[source]¶
返回此 DAG 執行的任務例項。
重定向到 DagRun.fetch_task_instances 方法。保留此方法是因為它在程式碼中廣泛使用。
- get_task_instance(task_id, session=NEW_SESSION, *, map_index=-1)[source]¶
返回此 DAG 執行中由 task_id 指定的任務例項。
- 引數:
task_id (str) – 任務 ID
session (sqlalchemy.orm.Session) – Sqlalchemy ORM Session
- static fetch_task_instance(dag_id, dag_run_id, task_id, session=NEW_SESSION, map_index=-1)[source]¶
返回此 DAG 執行中由 task_id 指定的任務例項。
- 引數:
dag_id (str) – DAG ID
dag_run_id (str) – DAG 執行 ID
task_id (str) – 任務 ID
session (sqlalchemy.orm.Session) – Sqlalchemy ORM Session
- static get_previous_dagrun(dag_run, state=None, session=NEW_SESSION)[source]¶
返回上一個 DagRun,如果存在。
- 引數:
dag_run (DagRun) – Dag 執行
session (sqlalchemy.orm.Session) – SQLAlchemy ORM Session
state (airflow.utils.state.DagRunState | None) – Dag 執行狀態
- static get_previous_scheduled_dagrun(dag_run_id, session=NEW_SESSION)[source]¶
返回上一個 SCHEDULED DagRun,如果存在。
- 引數:
dag_run_id (int) – DAG 執行 ID
session (sqlalchemy.orm.Session) – SQLAlchemy ORM Session
- update_state(session=NEW_SESSION, execute_callbacks=True)[source]¶
根據其任務例項的狀態確定 DagRun 的總體狀態。
- 引數:
session (sqlalchemy.orm.Session) – Sqlalchemy ORM Session
execute_callbacks (bool) – 是否應直接呼叫 DAG 回撥(成功/失敗、SLA 等)(預設: true),或將其記錄為
returned_callback屬性中的待處理請求
- 返回:
包含可在當前迴圈中排程的 tis 以及需要執行的 returned_callback 的元組
- 返回型別:
tuple[list[airflow.models.taskinstance.TaskInstance], airflow.callbacks.callback_requests.DagCallbackRequest | None]
- verify_integrity(*, session=NEW_SESSION, dag_version_id=None)[source]¶
透過檢查已移除的任務或尚未在資料庫中的任務來驗證 DagRun。
它將把狀態設定為 removed 或在需要時新增任務。
- 引數:
dag_version_id (sqlalchemy_utils.UUIDType | None) – DAG 版本 ID
session (sqlalchemy.orm.Session) – Sqlalchemy ORM Session