Airflow Summit 2025 即將於 10 月 07-09 日舉行。立即註冊,獲取早鳥票!

airflow.models.dagrun

屬性

CreatedTasks

RUN_ID_REGEX

TISchedulingDecision

DagRun.task_instance_scheduling_decisions 的返回型別。

DagRun

DAG 的呼叫例項。

DagRunNote

用於儲存關於 DagRun 例項的任意備註。

模組內容

airflow.models.dagrun.CreatedTasks[source]
airflow.models.dagrun.RUN_ID_REGEX = '^(?:manual|scheduled|asset_triggered)__(?:\\d{4}-\\d{2}-\\d{2}T\\d{2}:\\d{2}:\\d{2}\\+00:00)$'[source]
class airflow.models.dagrun.TISchedulingDecision[source]

基類: NamedTuple

DagRun.task_instance_scheduling_decisions 的返回型別。

tis: list[airflow.models.taskinstance.TaskInstance][source]
schedulable_tis: list[airflow.models.taskinstance.TaskInstance][source]
changed_tis: bool[source]
unfinished_tis: list[airflow.models.taskinstance.TaskInstance][source]
finished_tis: list[airflow.models.taskinstance.TaskInstance][source]
class airflow.models.dagrun.DagRun(dag_id=None, run_id=None, *, queued_at=NOTSET, logical_date=None, run_after=None, start_date=None, conf=None, state=None, run_type=None, creating_job_id=None, data_interval=None, triggered_by=None, backfill_id=None, bundle_version=None)[source]

基類: airflow.models.base.Base, airflow.utils.log.logging_mixin.LoggingMixin

DAG 的呼叫例項。

DagRun 可以由排程器建立(即排程執行),也可以由外部觸發器建立(即手動執行)。

active_spans[source]
__tablename__ = 'dag_run'[source]
id[source]
dag_id[source]
queued_at[source]
logical_date[source]
start_date[source]
end_date[source]
run_id[source]
creating_job_id[source]
run_type[source]
triggered_by[source]
conf[source]
data_interval_start[source]
data_interval_end[source]
run_after[source]
last_scheduling_decision[source]
log_template_id[source]
updated_at[source]
clear_number[source]
backfill_id[source]

此 DagRun 當前關聯的 backfill。

如果例如 DagRun 被清除以便重新執行,或者可能重新 backfill,則此關聯可能會改變。

bundle_version[source]
scheduled_by_job_id[source]
context_carrier[source]
span_status[source]
dag: airflow.models.dag.DAG | None[source]
__table_args__[source]
task_instances[source]
task_instances_histories[source]
dag_model[source]
dag_run_note[source]
backfill[source]
backfill_max_active_runs[source]
max_active_runs[source]
note[source]
DEFAULT_DAGRUNS_TO_EXAMINE[source]
__repr__()[source]
validate_run_id(key, run_id)[source]
屬性 dag_versions: list[airflow.models.dag_version.DagVersion][source]

返回與此 DagRun 的任務例項 (TI) 關聯的 DAG 版本。

屬性 version_number: int | None[source]

返回與此 DagRun 最新任務例項 (TI) 關聯的 DAG 版本號。

check_version_id_exists_in_dr(dag_version_id, session=NEW_SESSION)[source]
屬性 stats_tags: dict[str, str][source]
類方法 set_active_spans(active_spans)[source]
get_state()[source]
set_state(state)[source]

改變 DagRun 的狀態。

根據下表實現屬性更改(行代表舊狀態,列代表新狀態)

狀態轉換矩陣

QUEUED

RUNNING

SUCCESS

FAILED

None

queued_at = timezone.utcnow()

if empty: start_date = timezone.utcnow() end_date = None

end_date = timezone.utcnow()

end_date = timezone.utcnow()

QUEUED

queued_at = timezone.utcnow()

if empty: start_date = timezone.utcnow() end_date = None

end_date = timezone.utcnow()

end_date = timezone.utcnow()

RUNNING

queued_at = timezone.utcnow() start_date = None end_date = None

end_date = timezone.utcnow()

end_date = timezone.utcnow()

SUCCESS

queued_at = timezone.utcnow() start_date = None end_date = None

start_date = timezone.utcnow() end_date = None

FAILED

queued_at = timezone.utcnow() start_date = None end_date = None

start_date = timezone.utcnow() end_date = None

屬性 state[source]
refresh_from_db(session=NEW_SESSION)[source]

從資料庫重新載入當前的 dagrun。

引數:

session (sqlalchemy.orm.Session) – 資料庫會話

類方法 find(dag_id=None, run_id=None, logical_date=None, state=None, no_backfills=False, run_type=None, session=NEW_SESSION, logical_start_date=None, logical_end_date=None)[source]

返回符合給定搜尋條件的 DagRun 集合。

引數:
classmethod find_duplicate(dag_id, run_id, *, session=NEW_SESSION)[source]

返回指定 run_id 的 DAG 現有執行。

如果未找到此類 DAG 執行,則返回 None

引數:
  • dag_id (str) – 用於查詢重複項的 dag_id

  • run_id (str) – 定義此 DAG 執行的執行 ID

  • session (sqlalchemy.orm.Session) – 資料庫會話

static generate_run_id(*, run_type, logical_date=None, run_after)[source]

基於執行型別、run_after 和邏輯日期生成執行 ID。

引數:
  • run_type (airflow.utils.types.DagRunType) – DAG 執行型別

  • logical_date (datetime.datetime | None) – 邏輯日期

  • run_after (datetime.datetime) – DAG 執行不會在此日期之前啟動的日期。

static fetch_task_instances(dag_id=None, run_id=None, task_ids=None, state=None, session=NEW_SESSION)[source]

返回此 DAG 執行的任務例項。

get_task_instances(state=None, session=NEW_SESSION)[source]

返回此 DAG 執行的任務例項。

重定向到 DagRun.fetch_task_instances 方法。保留此方法是因為它在程式碼中廣泛使用。

get_task_instance(task_id, session=NEW_SESSION, *, map_index=-1)[source]

返回此 DAG 執行中由 task_id 指定的任務例項。

引數:
static fetch_task_instance(dag_id, dag_run_id, task_id, session=NEW_SESSION, map_index=-1)[source]

返回此 DAG 執行中由 task_id 指定的任務例項。

引數:
get_dag()[source]

返回與此 DagRun 相關聯的 Dag。

返回:

DAG

返回型別:

airflow.models.dag.DAG

static get_previous_dagrun(dag_run, state=None, session=NEW_SESSION)[source]

返回上一個 DagRun,如果存在。

引數:
static get_previous_scheduled_dagrun(dag_run_id, session=NEW_SESSION)[source]

返回上一個 SCHEDULED DagRun,如果存在。

引數:
set_dagrun_span_attrs(span)[source]
start_dr_spans_if_needed(tis)[source]
end_dr_span_if_needed()[source]
update_state(session=NEW_SESSION, execute_callbacks=True)[source]

根據其任務例項的狀態確定 DagRun 的總體狀態。

引數:
  • session (sqlalchemy.orm.Session) – Sqlalchemy ORM Session

  • execute_callbacks (bool) – 是否應直接呼叫 DAG 回撥(成功/失敗、SLA 等)(預設: true),或將其記錄為 returned_callback 屬性中的待處理請求

返回:

包含可在當前迴圈中排程的 tis 以及需要執行的 returned_callback 的元組

返回型別:

tuple[list[airflow.models.taskinstance.TaskInstance], airflow.callbacks.callback_requests.DagCallbackRequest | None]

task_instance_scheduling_decisions(session=NEW_SESSION)[source]
notify_dagrun_state_changed(msg='')[source]
verify_integrity(*, session=NEW_SESSION, dag_version_id=None)[source]

透過檢查已移除的任務或尚未在資料庫中的任務來驗證 DagRun。

它將把狀態設定為 removed 或在需要時新增任務。

引數:
  • dag_version_id (sqlalchemy_utils.UUIDType | None) – DAG 版本 ID

  • session (sqlalchemy.orm.Session) – Sqlalchemy ORM Session

classmethod get_latest_runs(session=NEW_SESSION)[source]

返回每個 DAG 的最新 DagRun。

schedule_tis(schedulable_tis, session=NEW_SESSION, max_tis_per_query=None)[source]

將給定的任務例項設定為 scheduled 狀態。

schedulable_tis 的每個元素都應該已經設定了其 task 屬性。

任何沒有回撥或輸出埠的 EmptyOperator 將直接設定為 success 狀態。

所有 TIs 都應屬於此 DagRun,但這段程式碼位於熱路徑中,不會對此進行檢查——呼叫者有責任只使用來自單個 DAG 執行的 TIs 呼叫此函式。

get_log_template(*, session=NEW_SESSION)[source]
class airflow.models.dagrun.DagRunNote(content, user_id=None)[source]

基類: airflow.models.base.Base

用於儲存關於 DagRun 例項的任意備註。

__tablename__ = 'dag_run_note'[source]
user_id[source]
dag_run_id[source]
content[source]
created_at[source]
updated_at[source]
dag_run[source]
__table_args__[source]
__repr__()[source]

此條目有幫助嗎?