Airflow Summit 2025 將於 10 月 07 日至 09 日舉行。立即註冊獲取早鳥票!

airflow.models.taskinstance

屬性

TR

log

PAST_DEPENDS_MET

TaskInstanceStateType

TaskInstance

任務例項儲存任務例項的狀態。

SimpleTaskInstance

簡化任務例項。

TaskInstanceNote

用於儲存與任務例項相關的任意備註。

函式

set_current_context(context)

將當前執行上下文設定為提供的上下文物件。

clear_task_instances(tis, session[, dag, dag_run_state])

清除一組任務例項,但確保正在執行的任務被殺死。

uuid7()

生成新的 UUID7 字串。

模組內容

airflow.models.taskinstance.TR[source]
airflow.models.taskinstance.log[source]
airflow.models.taskinstance.PAST_DEPENDS_MET = 'past_depends_met'[source]
airflow.models.taskinstance.set_current_context(context)[source]

將當前執行上下文設定為提供的上下文物件。

此方法應在每次任務執行之前呼叫一次 operator.execute。

airflow.models.taskinstance.clear_task_instances(tis, session, dag=None, dag_run_state=DagRunState.QUEUED)[source]

清除一組任務例項,但確保正在執行的任務被殺死。

同時將 Dagrun 的 state 設定為 QUEUED,start_date 設定為執行時間。但僅針對已完成的 DR(SUCCESS 和 FAILED)。對於正在執行的 DR(QUEUED 和 RUNNING),不會清除 DR 的 state 和 start_date`,因為清除已在執行的 DR 的狀態是多餘的,並且清除 `start_date` 會影響 DR 的持續時間。

引數::
airflow.models.taskinstance.uuid7()[source]

生成新的 UUID7 字串。

class airflow.models.taskinstance.TaskInstance(task, run_id=None, state=None, map_index=-1, dag_version_id=None)[source]

基類:airflow.models.base.Base, airflow.utils.log.logging_mixin.LoggingMixin

任務例項儲存任務例項的狀態。

此表是關於哪些任務已執行及其所處狀態的權威和單一事實來源。

SqlAlchemy 模型故意沒有指向 task 或 dag 模型的 SqlAlchemy 外部索引鍵,以便更好地控制事務。

對此表的資料庫事務應確保不會發生重複觸發,並避免在多個排程程式可能正在觸發任務例項時對哪些任務例項已準備好執行產生混淆。

map_index 中的值 -1 表示以下任何一種情況:沒有對映任務的任務例項;帶有對映任務但尚未展開(state=pending)的任務例項;帶有對映任務但展開為空列表(state=skipped)的任務例項。

__tablename__ = 'task_instance'[source]
id[source]
task_id[source]
dag_id[source]
run_id[source]
map_index[source]
start_date[source]
end_date[source]
duration[source]
state[source]
try_number[source]
max_tries[source]
hostname[source]
unixname[source]
pool[source]
pool_slots[source]
queue[source]
priority_weight[source]
operator[source]
custom_operator_name[source]
queued_dttm[source]
scheduled_dttm[source]
queued_by_job_id[source]
last_heartbeat_at[source]
pid[source]
executor[source]
executor_config[source]
updated_at[source]
context_carrier[source]
span_status[source]
external_executor_id[source]
trigger_id[source]
trigger_timeout[source]
next_method[source]
next_kwargs[source]
dag_version_id[source]
dag_version[source]
__table_args__[source]
dag_model: airflow.models.dag.DagModel[source]
trigger[source]
triggerer_job[source]
dag_run[source]
rendered_task_instance_fields[source]
run_after[source]
logical_date[source]
task_instance_note[source]
note[source]
task: airflow.sdk.definitions._internal.abstractoperator.Operator | None = None[source]
test_mode: bool = False[source]
is_trigger_log_context: bool = False[source]
run_as_user: str | None = None[source]
__hash__()[source]
property stats_tags: dict[str, str][source]

返回任務例項標籤。

init_on_load()[source]

初始化未儲存在資料庫中的屬性。

property operator_name: str | None[source]

@property: 如果設定,為 operator 使用一個更友好的顯示名稱。

task_display_name()[source]
rendered_map_index()[source]
classmethod from_runtime_ti(runtime_ti)[source]
to_runtime_ti(context_from_server)[source]
command_as_list(mark_success=False, ignore_all_deps=False, ignore_task_deps=False, ignore_depends_on_past=False, wait_for_past_depends_before_skipping=False, ignore_ti_state=False, local=False, raw=False, pool=None, cfg_path=None)[source]

返回一個可以在任何安裝了 airflow 的地方執行的命令。

此命令是排程器傳送給執行器的訊息的一部分。

static generate_command(dag_id, task_id, run_id, mark_success=False, ignore_all_deps=False, ignore_depends_on_past=False, wait_for_past_depends_before_skipping=False, ignore_task_deps=False, ignore_ti_state=False, local=False, file_path=None, raw=False, pool=None, cfg_path=None, map_index=-1)[source]

生成執行此任務例項所需的 shell 命令。

引數::
  • dag_id (str) – DAG ID

  • task_id (str) – Task ID

  • run_id (str) – 此任務的 DagRun 的 run_id

  • mark_success (bool) – 是否將任務標記為成功

  • ignore_all_deps (bool) – 忽略所有可忽略的依賴項。覆蓋其他 ignore_* 引數。

  • ignore_depends_on_past (bool) – 忽略 DAG 的 depends_on_past 引數(例如,用於 Backfills)

  • wait_for_past_depends_before_skipping (bool) – 在將任務例項標記為跳過之前等待過去的依賴項

  • ignore_task_deps (bool) – 忽略任務特定的依賴項,例如 depends_on_past 和 trigger rule

  • ignore_ti_state (bool) – 忽略任務例項之前的失敗/成功狀態

  • local (bool) – 是否在本地執行任務

  • file_path (pathlib.PurePath | str | None) – 包含 DAG 定義的檔案的路徑

  • raw (bool) – raw 模式(需要更多詳細資訊)

  • pool (str | None) – 任務應該執行所在的 Airflow pool

  • cfg_path (str | None) – 配置檔案的路徑

返回:

可用於執行任務例項的 shell 命令

返回型別:

list[str]

property log_url: str[source]

任務例項的日誌 URL。

property mark_success_url: str[source]

用於將任務例項標記為成功的 URL。

error(session=NEW_SESSION)[source]

在資料庫中將任務例項的狀態強制設定為 FAILED。

引數::

session (sqlalchemy.orm.session.Session) – SQLAlchemy ORM Session

classmethod get_task_instance(dag_id, run_id, task_id, map_index, lock_for_update=False, session=NEW_SESSION)[source]
refresh_from_db(session=NEW_SESSION, lock_for_update=False, keep_local_changes=False)[source]

根據主鍵從資料庫重新整理任務例項。

引數::
  • session (sqlalchemy.orm.session.Session) – SQLAlchemy ORM Session

  • lock_for_update (bool) – 如果為 True,表示資料庫應該鎖定任務例項(發出 FOR UPDATE 子句),直到 session 提交。

  • keep_local_changes (bool) – 如果為 False(預設值),則強制所有屬性使用資料庫中的值,如果為 True,則不覆蓋本地設定的屬性

refresh_from_task(task, pool_override=None)[source]

從給定的任務中複製通用屬性。

引數::
  • task (airflow.sdk.definitions._internal.abstractoperator.Operator) – 要從中複製的任務物件

  • pool_override (str | None) – 使用 pool_override 而不是任務的 pool

clear_xcom_data(session=NEW_SESSION)[source]
property key: airflow.models.taskinstancekey.TaskInstanceKey[source]

返回唯一標識任務例項的 tuple。

set_state(state, session=NEW_SESSION)[source]

設定任務例項狀態。

引數::
返回:

狀態是否已更改

返回型別:

bool

property is_premature: bool[source]

返回任務是否處於 UP_FOR_RETRY 狀態且其重試間隔已過去。

prepare_db_for_next_try(session)[source]

使用將此任務例項排隊等待下一次重試所需的所有記錄更新元資料。

are_dependents_done(session=NEW_SESSION)[source]

檢查此任務例項的直接下游任務是否已成功或已被跳過。

這旨在供 wait_for_downstream 使用。

當您不想在下游任務完成之前開始處理任務的下一個排程時,這很有用。例如,如果任務 DROPs 並重新建立表。

引數::

session (sqlalchemy.orm.session.Session) – SQLAlchemy ORM Session

get_previous_dagrun(state=None, session=None)[source]

返回在此任務例項的 DagRun 之前執行的 DagRun。

引數::
get_previous_ti(state=None, session=NEW_SESSION)[source]

返回在此任務例項之前執行的任務的任務例項。

引數::
are_dependencies_met(dep_context=None, session=NEW_SESSION, verbose=False)[source]

給定依賴關係的上下文,是否滿足此任務例項執行的所有條件。

(例如,從 UI 強制執行的任務例項將忽略某些依賴項)。

引數::
  • dep_context (airflow.ti_deps.dep_context.DepContext | None) – 確定應評估的依賴項的執行上下文。

  • session (sqlalchemy.orm.session.Session) – 資料庫 session

  • verbose (bool) – 是否在 info 或 debug 日誌級別記錄失敗依賴項的詳細資訊

get_failed_dep_statuses(dep_context=None, session=NEW_SESSION)[source]

獲取失敗的依賴項。

__repr__()[source]
next_retry_datetime()[source]

如果任務例項失敗,獲取下次重試的日期時間。

對於指數退避,retry_delay 用作基數,並將轉換為秒。

ready_for_retry()[source]

檢查任務例項是否處於正確的狀態和時間範圍以進行重試。

get_dagrun(session=NEW_SESSION)[source]

返回此任務例項的 DagRun。

引數::

session (sqlalchemy.orm.session.Session) – SQLAlchemy ORM Session

返回:

DagRun

返回型別:

airflow.models.dagrun.DagRun

check_and_change_state_before_execution(verbose=True, ignore_all_deps=False, ignore_depends_on_past=False, wait_for_past_depends_before_skipping=False, ignore_task_deps=False, ignore_ti_state=False, mark_success=False, test_mode=False, pool=None, external_executor_id=None, session=NEW_SESSION)[source]
emit_state_change_metric(new_state)[source]

傳送一個時間指標,表示給定狀態轉換花費了多少時間。

根據任務所處的狀態推斷出前一個狀態和指標名稱。

引數::

new_state (airflow.utils.state.TaskInstanceState) – 剛為此任務設定的狀態。我們不使用 self.state,因為有時狀態會直接在資料庫中更新,而不是在本地任務例項物件中更新。支援的狀態:QUEUED 和 RUNNING

clear_next_method_args()[source]

確保我們取消設定 next_method 和 next_kwargs,以確保任何重試都不會重複使用它們。

static register_asset_changes_in_db(ti, task_outlets, outlet_events, session=NEW_SESSION)[source]
update_rtif(rendered_fields, session=NEW_SESSION)[source]
update_heartbeat()[source]
defer_task(exception, session=NEW_SESSION)[source]

將任務標記為延遲狀態,並設定在 TaskDeferred 被引發時恢復它所需的 trigger。

run(verbose=True, ignore_all_deps=False, ignore_depends_on_past=False, wait_for_past_depends_before_skipping=False, ignore_task_deps=False, ignore_ti_state=False, mark_success=False, test_mode=False, pool=None, session=NEW_SESSION, raise_on_defer=False)[source]

執行 TaskInstance。

dry_run()[source]

僅為 TaskInstance 渲染模板。

classmethod fetch_handle_failure_context(ti, error, test_mode=None, context=None, force_fail=False, *, session, fail_fast=False)[source]

獲取處理失敗所需的上下文。

引數::
  • ti (TaskInstance) – TaskInstance

  • error (None | str | BaseException) – 如果指定,記錄丟擲的特定異常

  • test_mode (bool | None) – 如果為 True,則不記錄資料庫中的成功或失敗

  • context (airflow.utils.context.Context | None) – Jinja2 上下文

  • force_fail (bool) – 如果為 True,任務不會重試

  • session (sqlalchemy.orm.session.Session) – SQLAlchemy ORM Session

  • fail_fast (bool) – 如果為 True,所有下游任務都將失敗

static save_to_db(ti, session=NEW_SESSION)[source]
handle_failure(error, test_mode=None, context=None, force_fail=False, session=NEW_SESSION)[source]

處理任務例項的失敗。

引數::
  • error (None | str | BaseException) – 如果指定,記錄丟擲的特定異常

  • session (sqlalchemy.orm.session.Session) – SQLAlchemy ORM Session

  • test_mode (bool | None) – 如果為 True,則不記錄資料庫中的成功或失敗

  • context (airflow.utils.context.Context | None) – Jinja2 上下文

  • force_fail (bool) – 如果為 True,任務不會重試

is_eligible_to_retry()[source]

任務例項是否符合重試條件。

get_template_context(session=None, ignore_param_exceptions=True)[source]

返回 Task Instance 上下文。

引數::
get_rendered_template_fields(session=NEW_SESSION)[source]

使用渲染後的模板欄位更新任務,以便在 UI 中顯示。

如果任務已執行,將從資料庫中獲取;否則將進行渲染。

overwrite_params_with_dag_run_conf(params, dag_run)[source]

使用 DagRun.conf 覆蓋任務引數 (Task Params)。

render_templates(context=None, jinja_env=None)[source]

渲染 Operator 欄位中的模板。

如果任務最初是對映的,這可能會將 self.task 替換為未對映的、完全渲染的 BaseOperator。返回替換前的原始 self.task

get_email_subject_content(exception, task=None)[source]

獲取異常電子郵件的主題內容。

引數::
  • exception (BaseException) – 電子郵件中傳送的異常

  • task (airflow.models.baseoperator.BaseOperator | None)

email_alert(exception, task)[source]

傳送包含異常資訊的警報郵件。

引數::
  • exception – 異常

  • task (airflow.models.baseoperator.BaseOperator) – 與異常相關的任務

set_duration()[source]

設定任務例項時長。

xcom_push(key, value, session=NEW_SESSION)[source]

使 XCom 可供任務拉取。

引數::
  • key (str) – 儲存值的鍵。

  • value (Any) – 要儲存的值。否則只能使用 JSON 可序列化的值。

get_num_running_task_instances(session, same_dagrun=False)[source]

從資料庫返回正在執行的任務例項數量。

static filter_for_tis(tis)[source]

返回 SQLAlchemy 過濾器,用於查詢選定的任務例項。

get_relevant_upstream_map_indexes(upstream, ti_count, *, session)[source]

推斷與此任務例項“相關”的上游的 map indexes。

大部分邏輯主要用於解決以下示例描述的問題,其中 ‘val’ 必須根據引用的使用位置解析為不同的值

@task
def this_task(v):  # This is self.task.
    return v * 2


@task_group
def tg1(inp):
    val = upstream(inp)  # This is the upstream task.
    this_task(val)  # When inp is 1, val here should resolve to 2.
    return val


# This val is the same object returned by tg1.
val = tg1.expand(inp=[1, 2, 3])


@task_group
def tg2(inp):
    another_task(inp, val)  # val here should resolve to [2, 4, 6].


tg2.expand(inp=["a", "b"])

檢查 upstream 和 self.task 周圍的 mapped task groups 以查詢共同的“祖先”。如果找到此類祖先,我們需要返回特定的 map indexes 以從上游 XCom 拉取部分值。

引數::
  • upstream (airflow.sdk.definitions._internal.abstractoperator.Operator) – 引用的上游任務。

  • ti_count (int | None) – 排程器擴充套件此任務的任務例項總數,即模板上下文中的 expanded_ti_count

返回:

要拉取的特定 map index 或 map indexes,如果需要“完整”返回值(即不涉及 mapped task groups),則為 None。

返回型別:

int | range | None

classmethod duration_expression_update(end_date, query, bind)[source]

返回一個 SQL 表示式,用於根據開始和結束日期列計算此任務例項的時長。

static validate_inlet_outlet_assets_activeness(inlets, outlets, session)[source]
get_first_reschedule_date(context)[source]

獲取任務例項的第一次重新排程日期。

airflow.models.taskinstance.TaskInstanceStateType[source]
class airflow.models.taskinstance.SimpleTaskInstance(dag_id, task_id, run_id, queued_dttm, start_date, end_date, try_number, map_index, state, executor, executor_config, pool, queue, key, run_as_user=None, priority_weight=None, parent_context_carrier=None, context_carrier=None, span_status=None)[source]

簡化任務例項。

用於透過佇列在程序間傳送資料。

dag_id[source]
task_id[source]
run_id[source]
map_index[source]
queued_dttm[source]
start_date[source]
end_date[source]
try_number[source]
state[source]
executor[source]
executor_config[source]
run_as_user = None[source]
pool[source]
priority_weight = None[source]
queue[source]
key[source]
parent_context_carrier = None[source]
context_carrier = None[source]
span_status = None[source]
__repr__()[source]
__eq__(other)[source]
classmethod from_ti(ti)[source]
class airflow.models.taskinstance.TaskInstanceNote(content, user_id=None)[source]

Bases: airflow.models.base.Base

用於儲存與任務例項相關的任意備註。

__tablename__ = 'task_instance_note'[source]
ti_id[source]
user_id[source]
content[source]
created_at[source]
updated_at[source]
task_instance[source]
__table_args__[source]
__repr__()[source]

此條目有幫助嗎?