airflow.providers.standard.sensors.external_task¶
類¶
用於 ExternalTaskSensor 和 ExternalTaskMarker 的 Operator 連結。 |
|
等待其他 DAG、任務組或任務在特定邏輯日期完成。 |
|
使用此 Operator 指示其他 DAG 上的任務依賴於此任務。 |
模組內容¶
- class airflow.providers.standard.sensors.external_task.ExternalDagLink[source]¶
基類:
airflow.sdk.BaseOperatorLink用於 ExternalTaskSensor 和 ExternalTaskMarker 的 Operator 連結。
它允許使用者訪問由 ExternalTaskSensor 等待或由 ExternalTaskMarker 清除的 DAG。
- get_link(operator, *, ti_key)[source]¶
外部系統連結。
- 引數:
operator (airflow.sdk.BaseOperator) – 與此連結關聯的 Airflow Operator 物件。
ti_key (airflow.models.taskinstancekey.TaskInstanceKey) – 要返回連結的任務例項 ID。
- 返回:
外部系統連結
- 返回型別:
- class airflow.providers.standard.sensors.external_task.ExternalTaskSensor(*, external_dag_id, external_task_id=None, external_task_ids=None, external_task_group_id=None, allowed_states=None, skipped_states=None, failed_states=None, execution_delta=None, execution_date_fn=None, check_existence=False, poll_interval=2.0, deferrable=conf.getboolean('operators', 'default_deferrable', fallback=False), **kwargs)[source]¶
基類:
airflow.sdk.bases.sensor.BaseSensorOperator等待其他 DAG、任務組或任務在特定邏輯日期完成。
如果 external_task_group_id 和 external_task_id 都為
None(預設),則 Sensor 會等待 DAG。無法同時設定 external_task_group_id 和 external_task_id 的值。預設情況下,ExternalTaskSensor 將等待外部任務成功,此時它也會成功。但是,預設情況下,如果外部任務失敗,它**不會**失敗,而是會繼續檢查狀態直到 Sensor 超時(這樣您就有時間重試外部任務,而無需同時清除 Sensor)。
預設情況下,如果外部任務跳過,ExternalTaskSensor 不會跳過。要更改此行為,只需設定
skipped_states=[TaskInstanceState.SKIPPED]。請注意,如果您正在監控多個任務,其中一個進入錯誤狀態而另一個進入跳過狀態,則 ExternalTaskSensor 將對首先看到的狀態做出反應。如果同時發生,失敗狀態優先。可以透過設定導致 Sensor 失敗的狀態來更改預設行為,例如,透過設定
allowed_states=[DagRunState.FAILED]和failed_states=[DagRunState.SUCCESS],您將顛倒行為,得到一個在外部任務**失敗**時變為綠色並在外部任務**成功**時立即變為紅色的 Sensor!請注意,檢查 failed_states 時會遵守
soft_fail。因此,如果外部任務進入失敗狀態並且soft_fail == True,Sensor 將會**跳過**而不是失敗。因此,設定soft_fail=True和failed_states=[DagRunState.SKIPPED]將導致 Sensor 在外部任務跳過時跳過。然而,這是一個牽強的例子——如果您想要這種行為,請考慮使用skipped_states。使用skipped_states允許 Sensor 在目標失敗時跳過,但在超時時仍進入失敗狀態。如上所述使用soft_fail == True將導致 Sensor 在目標失敗時跳過,並且在超時時也跳過。- 引數:
external_dag_id (str) – 包含您要等待的任務的 dag_id。(模板化)
external_task_id (str | None) – 包含您要等待的任務的 task_id。(模板化)
external_task_ids (collections.abc.Collection[str] | None) – 您要等待的任務 task_id 列表。(模板化)如果為
None(預設值),則 Sensor 等待 DAG。ExternalTaskSensor 可以傳遞 external_task_id 或 external_task_ids,但不能同時傳遞兩者。external_task_group_id (str | None) – 包含您要等待的任務組的 task_group_id。(模板化)
allowed_states (collections.abc.Iterable[str] | None) – 允許的狀態的可迭代物件,預設是
['success']skipped_states (collections.abc.Iterable[str] | None) – 將此任務標記為跳過的狀態的可迭代物件,預設是
Nonefailed_states (collections.abc.Iterable[str] | None) – 失敗或不允許的狀態的可迭代物件,預設是
Noneexecution_delta (datetime.timedelta | None) – 與上次執行時間差,用於查詢,預設是當前任務或 DAG 的相同邏輯日期。對於昨天,使用 [正的!] datetime.timedelta(days=1)。ExternalTaskSensor 可以傳遞 execution_delta 或 execution_date_fn,但不能同時傳遞兩者。
execution_date_fn (Callable | None) – 一個函式,接收當前執行的邏輯日期作為第一個位置引數,並可選地接收 context 字典中可用的任何數量的關鍵字引數,然後返回要查詢的所需邏輯日期。ExternalTaskSensor 可以傳遞 execution_delta 或 execution_date_fn,但不能同時傳遞兩者。
check_existence (bool) – 設定為 True 以檢查外部任務是否存在(當 external_task_id 不為 None 時)或檢查要等待的 DAG 是否存在(當 external_task_id 為 None 時),並在外部任務或 DAG 不存在時立即停止等待(預設值:False)。
poll_interval (float) – 輪詢間隔(秒),用於檢查狀態
deferrable (bool) – 在可延遲模式下執行 Sensor
- template_fields = ['external_dag_id', 'external_task_id', 'external_task_ids', 'external_task_group_id'][source]¶
- class airflow.providers.standard.sensors.external_task.ExternalTaskMarker(*, external_dag_id, external_task_id, logical_date='{{ logical_date.isoformat() }}', recursion_depth=10, **kwargs)[source]¶
基類:
airflow.providers.standard.operators.empty.EmptyOperator使用此 Operator 指示其他 DAG 上的任務依賴於此任務。
當此任務在選中“遞迴”的情況下被清除時,Airflow 將遞迴清除其他 DAG 上的該任務及其下游任務。傳遞依賴關係會一直追蹤到達到 recursion_depth。
- 引數:
external_dag_id (str) – 包含需要被清除的依賴任務的 dag_id。
external_task_id (str) – 需要被清除的依賴任務的 task_id。
logical_date (str | datetime.datetime | None) – 需要被清除的依賴任務執行的邏輯日期。
recursion_depth (int) – 允許的最大傳遞依賴級別。預設值為 10。這主要用於防止迴圈依賴。如有必要,可以增加此數字。但是,太多級別的傳遞依賴會使在 Web UI 中清除任務變慢。