airflow.models.taskinstance¶
屬性¶
類¶
任務例項儲存任務例項的狀態。 |
|
簡化任務例項。 |
|
用於儲存與任務例項相關的任意備註。 |
函式¶
|
將當前執行上下文設定為提供的上下文物件。 |
|
清除一組任務例項,但確保正在執行的任務被殺死。 |
|
生成新的 UUID7 字串。 |
模組內容¶
- airflow.models.taskinstance.set_current_context(context)[source]¶
將當前執行上下文設定為提供的上下文物件。
此方法應在每次任務執行之前呼叫一次 operator.execute。
- airflow.models.taskinstance.clear_task_instances(tis, session, dag=None, dag_run_state=DagRunState.QUEUED)[source]¶
清除一組任務例項,但確保正在執行的任務被殺死。
同時將 Dagrun 的 state 設定為 QUEUED,start_date 設定為執行時間。但僅針對已完成的 DR(SUCCESS 和 FAILED)。對於正在執行的 DR(QUEUED 和 RUNNING),不會清除 DR 的 state 和 start_date`,因為清除已在執行的 DR 的狀態是多餘的,並且清除 `start_date` 會影響 DR 的持續時間。
- 引數::
tis (list[TaskInstance]) – 任務例項列表
session (sqlalchemy.orm.session.Session) – 當前會話
dag_run_state (airflow.utils.state.DagRunState | airflow.typing_compat.Literal[False]) – 要設定已完成 DagRuns 的狀態。如果設定為 False,則不會更改 DagRuns 的狀態。
dag (airflow.sdk.definitions.dag.DAG | None) – DAG 物件
- class airflow.models.taskinstance.TaskInstance(task, run_id=None, state=None, map_index=-1, dag_version_id=None)[source]¶
基類:
airflow.models.base.Base,airflow.utils.log.logging_mixin.LoggingMixin任務例項儲存任務例項的狀態。
此表是關於哪些任務已執行及其所處狀態的權威和單一事實來源。
SqlAlchemy 模型故意沒有指向 task 或 dag 模型的 SqlAlchemy 外部索引鍵,以便更好地控制事務。
對此表的資料庫事務應確保不會發生重複觸發,並避免在多個排程程式可能正在觸發任務例項時對哪些任務例項已準備好執行產生混淆。
map_index 中的值 -1 表示以下任何一種情況:沒有對映任務的任務例項;帶有對映任務但尚未展開(state=pending)的任務例項;帶有對映任務但展開為空列表(state=skipped)的任務例項。
- dag_model: airflow.models.dag.DagModel[source]¶
- command_as_list(mark_success=False, ignore_all_deps=False, ignore_task_deps=False, ignore_depends_on_past=False, wait_for_past_depends_before_skipping=False, ignore_ti_state=False, local=False, raw=False, pool=None, cfg_path=None)[source]¶
返回一個可以在任何安裝了 airflow 的地方執行的命令。
此命令是排程器傳送給執行器的訊息的一部分。
- static generate_command(dag_id, task_id, run_id, mark_success=False, ignore_all_deps=False, ignore_depends_on_past=False, wait_for_past_depends_before_skipping=False, ignore_task_deps=False, ignore_ti_state=False, local=False, file_path=None, raw=False, pool=None, cfg_path=None, map_index=-1)[source]¶
生成執行此任務例項所需的 shell 命令。
- 引數::
dag_id (str) – DAG ID
task_id (str) – Task ID
run_id (str) – 此任務的 DagRun 的 run_id
mark_success (bool) – 是否將任務標記為成功
ignore_all_deps (bool) – 忽略所有可忽略的依賴項。覆蓋其他 ignore_* 引數。
ignore_depends_on_past (bool) – 忽略 DAG 的 depends_on_past 引數(例如,用於 Backfills)
wait_for_past_depends_before_skipping (bool) – 在將任務例項標記為跳過之前等待過去的依賴項
ignore_task_deps (bool) – 忽略任務特定的依賴項,例如 depends_on_past 和 trigger rule
ignore_ti_state (bool) – 忽略任務例項之前的失敗/成功狀態
local (bool) – 是否在本地執行任務
file_path (pathlib.PurePath | str | None) – 包含 DAG 定義的檔案的路徑
raw (bool) – raw 模式(需要更多詳細資訊)
pool (str | None) – 任務應該執行所在的 Airflow pool
cfg_path (str | None) – 配置檔案的路徑
- 返回:
可用於執行任務例項的 shell 命令
- 返回型別:
- error(session=NEW_SESSION)[source]¶
在資料庫中將任務例項的狀態強制設定為 FAILED。
- 引數::
session (sqlalchemy.orm.session.Session) – SQLAlchemy ORM Session
- classmethod get_task_instance(dag_id, run_id, task_id, map_index, lock_for_update=False, session=NEW_SESSION)[source]¶
- refresh_from_db(session=NEW_SESSION, lock_for_update=False, keep_local_changes=False)[source]¶
根據主鍵從資料庫重新整理任務例項。
- 引數::
session (sqlalchemy.orm.session.Session) – SQLAlchemy ORM Session
lock_for_update (bool) – 如果為 True,表示資料庫應該鎖定任務例項(發出 FOR UPDATE 子句),直到 session 提交。
keep_local_changes (bool) – 如果為 False(預設值),則強制所有屬性使用資料庫中的值,如果為 True,則不覆蓋本地設定的屬性
- refresh_from_task(task, pool_override=None)[source]¶
從給定的任務中複製通用屬性。
- 引數::
task (airflow.sdk.definitions._internal.abstractoperator.Operator) – 要從中複製的任務物件
pool_override (str | None) – 使用 pool_override 而不是任務的 pool
- property key: airflow.models.taskinstancekey.TaskInstanceKey[source]¶
返回唯一標識任務例項的 tuple。
- set_state(state, session=NEW_SESSION)[source]¶
設定任務例項狀態。
- 引數::
state (str | None) – 要為任務例項設定的狀態
session (sqlalchemy.orm.session.Session) – SQLAlchemy ORM Session
- 返回:
狀態是否已更改
- 返回型別:
- are_dependents_done(session=NEW_SESSION)[source]¶
檢查此任務例項的直接下游任務是否已成功或已被跳過。
這旨在供 wait_for_downstream 使用。
當您不想在下游任務完成之前開始處理任務的下一個排程時,這很有用。例如,如果任務 DROPs 並重新建立表。
- 引數::
session (sqlalchemy.orm.session.Session) – SQLAlchemy ORM Session
- get_previous_dagrun(state=None, session=None)[source]¶
返回在此任務例項的 DagRun 之前執行的 DagRun。
- 引數::
state (airflow.utils.state.DagRunState | None) – 如果傳入,則僅考慮特定狀態的例項。
session (sqlalchemy.orm.session.Session | None) – SQLAlchemy ORM Session。
- get_previous_ti(state=None, session=NEW_SESSION)[source]¶
返回在此任務例項之前執行的任務的任務例項。
- 引數::
session (sqlalchemy.orm.session.Session) – SQLAlchemy ORM Session
state (airflow.utils.state.DagRunState | None) – 如果傳入,則僅考慮特定狀態的例項。
- are_dependencies_met(dep_context=None, session=NEW_SESSION, verbose=False)[source]¶
給定依賴關係的上下文,是否滿足此任務例項執行的所有條件。
(例如,從 UI 強制執行的任務例項將忽略某些依賴項)。
- 引數::
dep_context (airflow.ti_deps.dep_context.DepContext | None) – 確定應評估的依賴項的執行上下文。
session (sqlalchemy.orm.session.Session) – 資料庫 session
verbose (bool) – 是否在 info 或 debug 日誌級別記錄失敗依賴項的詳細資訊
- get_dagrun(session=NEW_SESSION)[source]¶
返回此任務例項的 DagRun。
- 引數::
session (sqlalchemy.orm.session.Session) – SQLAlchemy ORM Session
- 返回:
DagRun
- 返回型別:
- check_and_change_state_before_execution(verbose=True, ignore_all_deps=False, ignore_depends_on_past=False, wait_for_past_depends_before_skipping=False, ignore_task_deps=False, ignore_ti_state=False, mark_success=False, test_mode=False, pool=None, external_executor_id=None, session=NEW_SESSION)[source]¶
- emit_state_change_metric(new_state)[source]¶
傳送一個時間指標,表示給定狀態轉換花費了多少時間。
根據任務所處的狀態推斷出前一個狀態和指標名稱。
- 引數::
new_state (airflow.utils.state.TaskInstanceState) – 剛為此任務設定的狀態。我們不使用 self.state,因為有時狀態會直接在資料庫中更新,而不是在本地任務例項物件中更新。支援的狀態:QUEUED 和 RUNNING
- defer_task(exception, session=NEW_SESSION)[source]¶
將任務標記為延遲狀態,並設定在 TaskDeferred 被引發時恢復它所需的 trigger。
- run(verbose=True, ignore_all_deps=False, ignore_depends_on_past=False, wait_for_past_depends_before_skipping=False, ignore_task_deps=False, ignore_ti_state=False, mark_success=False, test_mode=False, pool=None, session=NEW_SESSION, raise_on_defer=False)[source]¶
執行 TaskInstance。
- classmethod fetch_handle_failure_context(ti, error, test_mode=None, context=None, force_fail=False, *, session, fail_fast=False)[source]¶
獲取處理失敗所需的上下文。
- 引數::
ti (TaskInstance) – TaskInstance
error (None | str | BaseException) – 如果指定,記錄丟擲的特定異常
test_mode (bool | None) – 如果為 True,則不記錄資料庫中的成功或失敗
context (airflow.utils.context.Context | None) – Jinja2 上下文
force_fail (bool) – 如果為 True,任務不會重試
session (sqlalchemy.orm.session.Session) – SQLAlchemy ORM Session
fail_fast (bool) – 如果為 True,所有下游任務都將失敗
- handle_failure(error, test_mode=None, context=None, force_fail=False, session=NEW_SESSION)[source]¶
處理任務例項的失敗。
- 引數::
error (None | str | BaseException) – 如果指定,記錄丟擲的特定異常
session (sqlalchemy.orm.session.Session) – SQLAlchemy ORM Session
test_mode (bool | None) – 如果為 True,則不記錄資料庫中的成功或失敗
context (airflow.utils.context.Context | None) – Jinja2 上下文
force_fail (bool) – 如果為 True,任務不會重試
- get_template_context(session=None, ignore_param_exceptions=True)[source]¶
返回 Task Instance 上下文。
- 引數::
session (sqlalchemy.orm.session.Session | None) – SQLAlchemy ORM 會話
ignore_param_exceptions (bool) – 初始化 ParamsDict 時抑制值異常的標誌
- get_rendered_template_fields(session=NEW_SESSION)[source]¶
使用渲染後的模板欄位更新任務,以便在 UI 中顯示。
如果任務已執行,將從資料庫中獲取;否則將進行渲染。
- render_templates(context=None, jinja_env=None)[source]¶
渲染 Operator 欄位中的模板。
如果任務最初是對映的,這可能會將
self.task替換為未對映的、完全渲染的 BaseOperator。返回替換前的原始self.task。
- get_email_subject_content(exception, task=None)[source]¶
獲取異常電子郵件的主題內容。
- 引數::
exception (BaseException) – 電子郵件中傳送的異常
task (airflow.models.baseoperator.BaseOperator | None)
- email_alert(exception, task)[source]¶
傳送包含異常資訊的警報郵件。
- 引數::
exception – 異常
task (airflow.models.baseoperator.BaseOperator) – 與異常相關的任務
- xcom_push(key, value, session=NEW_SESSION)[source]¶
使 XCom 可供任務拉取。
- 引數::
key (str) – 儲存值的鍵。
value (Any) – 要儲存的值。否則只能使用 JSON 可序列化的值。
- get_relevant_upstream_map_indexes(upstream, ti_count, *, session)[source]¶
推斷與此任務例項“相關”的上游的 map indexes。
大部分邏輯主要用於解決以下示例描述的問題,其中 ‘val’ 必須根據引用的使用位置解析為不同的值
@task def this_task(v): # This is self.task. return v * 2 @task_group def tg1(inp): val = upstream(inp) # This is the upstream task. this_task(val) # When inp is 1, val here should resolve to 2. return val # This val is the same object returned by tg1. val = tg1.expand(inp=[1, 2, 3]) @task_group def tg2(inp): another_task(inp, val) # val here should resolve to [2, 4, 6]. tg2.expand(inp=["a", "b"])
檢查 upstream 和
self.task周圍的 mapped task groups 以查詢共同的“祖先”。如果找到此類祖先,我們需要返回特定的 map indexes 以從上游 XCom 拉取部分值。
- class airflow.models.taskinstance.SimpleTaskInstance(dag_id, task_id, run_id, queued_dttm, start_date, end_date, try_number, map_index, state, executor, executor_config, pool, queue, key, run_as_user=None, priority_weight=None, parent_context_carrier=None, context_carrier=None, span_status=None)[source]¶
簡化任務例項。
用於透過佇列在程序間傳送資料。