airflow.providers.standard.sensors.external_task

ExternalDagLink

用於 ExternalTaskSensor 和 ExternalTaskMarker 的 Operator 連結。

ExternalTaskSensor

等待其他 DAG、任務組或任務在特定邏輯日期完成。

ExternalTaskMarker

使用此 Operator 指示其他 DAG 上的任務依賴於此任務。

模組內容

基類: airflow.sdk.BaseOperatorLink

用於 ExternalTaskSensor 和 ExternalTaskMarker 的 Operator 連結。

它允許使用者訪問由 ExternalTaskSensor 等待或由 ExternalTaskMarker 清除的 DAG。

name = 'External DAG'[source]

連結名稱。這將是任務 UI 上的按鈕名稱。

外部系統連結。

引數:
返回:

外部系統連結

返回型別:

str

class airflow.providers.standard.sensors.external_task.ExternalTaskSensor(*, external_dag_id, external_task_id=None, external_task_ids=None, external_task_group_id=None, allowed_states=None, skipped_states=None, failed_states=None, execution_delta=None, execution_date_fn=None, check_existence=False, poll_interval=2.0, deferrable=conf.getboolean('operators', 'default_deferrable', fallback=False), **kwargs)[source]

基類: airflow.sdk.bases.sensor.BaseSensorOperator

等待其他 DAG、任務組或任務在特定邏輯日期完成。

如果 external_task_group_idexternal_task_id 都為 None(預設),則 Sensor 會等待 DAG。無法同時設定 external_task_group_idexternal_task_id 的值。

預設情況下,ExternalTaskSensor 將等待外部任務成功,此時它也會成功。但是,預設情況下,如果外部任務失敗,它**不會**失敗,而是會繼續檢查狀態直到 Sensor 超時(這樣您就有時間重試外部任務,而無需同時清除 Sensor)。

預設情況下,如果外部任務跳過,ExternalTaskSensor 不會跳過。要更改此行為,只需設定 skipped_states=[TaskInstanceState.SKIPPED]。請注意,如果您正在監控多個任務,其中一個進入錯誤狀態而另一個進入跳過狀態,則 ExternalTaskSensor 將對首先看到的狀態做出反應。如果同時發生,失敗狀態優先。

可以透過設定導致 Sensor 失敗的狀態來更改預設行為,例如,透過設定 allowed_states=[DagRunState.FAILED]failed_states=[DagRunState.SUCCESS],您將顛倒行為,得到一個在外部任務**失敗**時變為綠色並在外部任務**成功**時立即變為紅色的 Sensor!

請注意,檢查 failed_states 時會遵守 soft_fail。因此,如果外部任務進入失敗狀態並且 soft_fail == True,Sensor 將會**跳過**而不是失敗。因此,設定 soft_fail=Truefailed_states=[DagRunState.SKIPPED] 將導致 Sensor 在外部任務跳過時跳過。然而,這是一個牽強的例子——如果您想要這種行為,請考慮使用 skipped_states。使用 skipped_states 允許 Sensor 在目標失敗時跳過,但在超時時仍進入失敗狀態。如上所述使用 soft_fail == True 將導致 Sensor 在目標失敗時跳過,並且在超時時也跳過。

引數:
  • external_dag_id (str) – 包含您要等待的任務的 dag_id。(模板化)

  • external_task_id (str | None) – 包含您要等待的任務的 task_id。(模板化)

  • external_task_ids (collections.abc.Collection[str] | None) – 您要等待的任務 task_id 列表。(模板化)如果為 None(預設值),則 Sensor 等待 DAG。ExternalTaskSensor 可以傳遞 external_task_id 或 external_task_ids,但不能同時傳遞兩者。

  • external_task_group_id (str | None) – 包含您要等待的任務組的 task_group_id。(模板化)

  • allowed_states (collections.abc.Iterable[str] | None) – 允許的狀態的可迭代物件,預設是 ['success']

  • skipped_states (collections.abc.Iterable[str] | None) – 將此任務標記為跳過的狀態的可迭代物件,預設是 None

  • failed_states (collections.abc.Iterable[str] | None) – 失敗或不允許的狀態的可迭代物件,預設是 None

  • execution_delta (datetime.timedelta | None) – 與上次執行時間差,用於查詢,預設是當前任務或 DAG 的相同邏輯日期。對於昨天,使用 [正的!] datetime.timedelta(days=1)。ExternalTaskSensor 可以傳遞 execution_delta 或 execution_date_fn,但不能同時傳遞兩者。

  • execution_date_fn (Callable | None) – 一個函式,接收當前執行的邏輯日期作為第一個位置引數,並可選地接收 context 字典中可用的任何數量的關鍵字引數,然後返回要查詢的所需邏輯日期。ExternalTaskSensor 可以傳遞 execution_delta 或 execution_date_fn,但不能同時傳遞兩者。

  • check_existence (bool) – 設定為 True 以檢查外部任務是否存在(當 external_task_id 不為 None 時)或檢查要等待的 DAG 是否存在(當 external_task_id 為 None 時),並在外部任務或 DAG 不存在時立即停止等待(預設值:False)。

  • poll_interval (float) – 輪詢間隔(秒),用於檢查狀態

  • deferrable (bool) – 在可延遲模式下執行 Sensor

template_fields = ['external_dag_id', 'external_task_id', 'external_task_ids', 'external_task_group_id'][source]
ui_color = '#4db7db'[source]
allowed_states[source]
skipped_states = [][source]
failed_states = [][source]
execution_delta = None[source]
execution_date_fn = None[source]
external_dag_id[source]
external_task_id = None[source]
external_task_ids = None[source]
external_task_group_id = None[source]
check_existence = False[source]
deferrable = True[source]
poll_interval = 2.0[source]
poke(context)[source]

派生此類時重寫。

execute(context)[source]

在 worker 上執行,如果 deferrable 設定為 True,則使用 trigger 進行延遲。

execute_complete(context, event=None)[source]

當 trigger 觸發時執行 - 立即返回。

get_count(dttm_filter, session, states)[source]

根據 dttm 過濾器和狀態獲取記錄計數。

引數:
  • dttm_filter – 邏輯日期的日期時間過濾器

  • session – airflow 會話物件

  • states – 任務或 DAG 狀態

返回:

符合過濾條件的記錄計數

返回型別:

int

get_external_task_group_task_ids(session, dttm_filter)[source]
class airflow.providers.standard.sensors.external_task.ExternalTaskMarker(*, external_dag_id, external_task_id, logical_date='{{ logical_date.isoformat() }}', recursion_depth=10, **kwargs)[source]

基類: airflow.providers.standard.operators.empty.EmptyOperator

使用此 Operator 指示其他 DAG 上的任務依賴於此任務。

當此任務在選中“遞迴”的情況下被清除時,Airflow 將遞迴清除其他 DAG 上的該任務及其下游任務。傳遞依賴關係會一直追蹤到達到 recursion_depth。

引數:
  • external_dag_id (str) – 包含需要被清除的依賴任務的 dag_id。

  • external_task_id (str) – 需要被清除的依賴任務的 task_id。

  • logical_date (str | datetime.datetime | None) – 需要被清除的依賴任務執行的邏輯日期。

  • recursion_depth (int) – 允許的最大傳遞依賴級別。預設值為 10。這主要用於防止迴圈依賴。如有必要,可以增加此數字。但是,太多級別的傳遞依賴會使在 Web UI 中清除任務變慢。

template_fields = ['external_dag_id', 'external_task_id', 'logical_date'][source]
ui_color = '#4db7db'[source]
external_dag_id[source]
external_task_id[source]
recursion_depth = 10[source]
classmethod get_serialized_fields()[source]

序列化 ExternalTaskMarker 以精確包含這些欄位 + 模板化欄位。

此條目是否有幫助?