任務¶
任務 (Task) 是 Airflow 中的基本執行單元。任務被組織到 DAGs 中,並透過設定上游和下游依賴關係來表達它們的執行順序。
任務有三種基本型別
運算子 (Operators),是預定義的任務模板,您可以快速組合它們來構建 DAG 的大部分。
感測器 (Sensors),是 Operator 的特殊子類,完全用於等待外部事件發生。
TaskFlow 修飾的
@task,是打包成任務的自定義 Python 函式。
內部來說,這些實際上都是 Airflow BaseOperator 的子類,任務(Task)和運算子(Operator)的概念在某種程度上可以互換,但將它們視為獨立概念很有用——本質上,運算子和感測器是*模板*,當您在 DAG 檔案中呼叫它們時,您正在建立一個任務。
關係¶
使用任務的關鍵部分在於定義它們之間的關係——它們的*依賴關係*,或者正如我們在 Airflow 中所說的,它們的*上游*任務和*下游*任務。您首先宣告任務,然後宣告它們的依賴關係。
注意
我們將*上游*任務稱為直接位於另一個任務之前的任務。我們以前稱之為父任務。請注意,這個概念並不描述任務層級結構中更高的任務(即它們不是任務的直接父級)。*下游*任務也適用同樣的定義,它需要是另一個任務的直接子級。
有兩種宣告依賴關係的方法——使用 >> 和 <<(位移)運算子
first_task >> second_task >> [third_task, fourth_task]
或者更顯式的 set_upstream 和 set_downstream 方法
first_task.set_downstream(second_task)
third_task.set_upstream(second_task)
這兩種方法的功能完全相同,但總的來說,我們建議您使用位移運算子,因為在大多數情況下它們更易讀。
預設情況下,當一個任務的所有上游(父)任務都成功時,它才會執行,但有很多方法可以修改此行為,例如新增分支、只等待部分上游任務,或根據當前執行在歷史中的位置改變行為。更多資訊請參見 控制流。
任務預設情況下不相互傳遞資訊,並完全獨立執行。如果您想從一個任務向另一個任務傳遞資訊,您應該使用 XComs。
任務例項¶
就像 DAG 每次執行時會被例項化為 DAG Run 一樣,一個 DAG 下的任務會被例項化為*任務例項*。
任務例項是該任務在給定 DAG(以及給定資料間隔)下的特定執行。它們也是具有*狀態*的任務的表現,表示它所處的生命週期階段。
任務例項可能的狀態有
none: 任務尚未排隊執行(其依賴項尚未滿足)scheduled: 排程器已確定任務的依賴項已滿足並應執行queued: 任務已被分配給執行器,正在等待工作程序running: 任務正在工作程序上執行(或在本地/同步執行器上)success: 任務執行完成無錯誤restarting: 任務在執行時被外部請求重新啟動failed: 任務在執行期間發生錯誤並執行失敗skipped: 任務因分支、LatestOnly 或類似原因被跳過。upstream_failed: 一個上游任務失敗,且 觸發規則 (Trigger Rule) 要求它必須成功up_for_retry: 任務失敗,但還有剩餘重試次數,將被重新排程。up_for_reschedule: 任務是一個 感測器(Sensor),處於reschedule模式deferred: 任務已被延遲到觸發器 (trigger)removed: 自執行開始以來,任務已從 DAG 中消失
理想情況下,任務應從 none 依次流經 scheduled、queued、running,最終達到 success 狀態。
當任何自定義任務(Operator)執行時,它將獲得傳遞給它的任務例項副本;除了能夠檢查任務元資料外,它還包含諸如 XComs 之類的方法。
關係術語¶
對於任何給定的任務例項,它與其他例項有兩種型別的關係。
首先,它可以有*上游*和*下游*任務
task1 >> task2 >> task3
當 DAG 執行時,它會為相互之間存在上游/下游關係的這些任務建立例項,但所有這些例項都具有相同的資料間隔。
可能還有*同一任務*的例項,但針對不同的資料間隔——來自同一 DAG 的其他執行。我們將這些稱為*前一個*和*後一個*——這與*上游*和*下游*的關係不同!
注意
一些較舊的 Airflow 文件可能仍將“previous”用來表示“upstream”。如果您發現這種情況,請幫助我們修正!
超時¶
如果您希望任務有最大執行時間,請將其 execution_timeout 屬性設定為一個 datetime.timedelta 值,作為最大允許執行時間。這適用於所有 Airflow 任務,包括感測器。execution_timeout 控制每次執行允許的最大時間。如果 execution_timeout 被超出,任務將超時並丟擲 AirflowTaskTimeout 異常。
此外,感測器有一個 timeout 引數。這僅對處於 reschedule 模式的感測器重要。timeout 控制感測器成功允許的最大時間。如果 timeout 被超出,AirflowSensorTimeout 將被丟擲,感測器將立即失敗且不重試。
以下 SFTPSensor 示例對此進行了說明。該 sensor 處於 reschedule 模式,這意味著它會週期性地執行並重新排程,直到成功。
每次感測器探測 SFTP 伺服器時,允許的最大時間為
execution_timeout定義的 60 秒。如果感測器探測 SFTP 伺服器耗時超過 60 秒,
AirflowTaskTimeout將被丟擲。發生這種情況時,感測器可以重試。最多可以重試retries定義的 2 次。從第一次執行開始,直到最終成功(即檔案 ‘root/test’ 出現後),感測器允許的最大時間為
timeout定義的 3600 秒。換句話說,如果檔案在 3600 秒內沒有出現在 SFTP 伺服器上,感測器將丟擲AirflowSensorTimeout。丟擲此錯誤時不會重試。如果在 3600 秒間隔內,感測器由於網路中斷等其他原因失敗,最多可以重試
retries定義的 2 次。重試不會重置timeout。它總共有最多 3600 秒的時間來成功。
sensor = SFTPSensor(
task_id="sensor",
path="/root/test",
execution_timeout=timedelta(seconds=60),
timeout=3600,
retries=2,
mode="reschedule",
)
SLA¶
Airflow 2 中的 SLA 功能在 3.0 中已被移除,並將在 Airflow 3.1 中被新的實現替代。
特殊異常¶
如果您想從自定義任務/Operator 程式碼中控制任務的狀態,Airflow 提供了兩個您可以丟擲的特殊異常
AirflowSkipException將當前任務標記為已跳過AirflowFailException將當前任務標記為失敗,*忽略任何剩餘的重試嘗試*
如果您的程式碼對執行環境有額外瞭解並希望更快失敗/跳過,這些異常會很有用——例如,當知道沒有可用資料時跳過,或者當檢測到 API 金鑰無效時快速失敗(因為重試無法解決此問題)。
任務例項心跳超時¶
沒有系統能完美執行,任務例項偶爾會發生故障。
任務例項(TaskInstances)即使其關聯的作業處於非活動狀態,也可能卡在 running 狀態(例如,如果任務例項的工作程序記憶體不足)。這類任務以前被稱為殭屍任務。Airflow 會定期查詢這些任務,清理它們,並將任務例項標記為失敗,如果還有可用重試次數則重試。任務例項的心跳可能因多種原因超時,包括
Airflow 工作程序記憶體不足並被 OOMKilled。
Airflow 工作程序的活躍度探測失敗,因此係統(例如 Kubernetes)重啟了工作程序。
系統(例如 Kubernetes)縮減規模並將 Airflow 工作程序從一個節點遷移到另一個節點。
在本地復現任務例項心跳超時¶
如果您希望在開發/測試過程中復現任務例項心跳超時,請遵循以下步驟
為您的本地 Airflow 設定以下環境變數(或者您可以修改 airflow.cfg 中的相應配置值)
export AIRFLOW__SCHEDULER__TASK_INSTANCE_HEARTBEAT_SEC=600
export AIRFLOW__SCHEDULER__TASK_INSTANCE_HEARTBEAT_TIMEOUT=2
export AIRFLOW__SCHEDULER__TASK_INSTANCE_HEARTBEAT_TIMEOUT_DETECTION_INTERVAL=5
建立一個 DAG,其中包含一個需要大約 10 分鐘才能完成的任務(即一個長時間執行的任務)。例如,您可以使用以下 DAG
from airflow.sdk import dag
from airflow.providers.standard.operators.bash import BashOperator
from datetime import datetime
@dag(start_date=datetime(2021, 1, 1), schedule="@once", catchup=False)
def sleep_dag():
t1 = BashOperator(
task_id="sleep_10_minutes",
bash_command="sleep 600",
)
sleep_dag()
執行上面的 DAG 並等待一段時間。任務例項(TaskInstance)將在 <task_instance_heartbeat_timeout> 秒後被標記為失敗。
執行器配置¶
一些 執行器(Executors) 允許可選的按任務配置——例如 KubernetesExecutor,它允許您設定執行任務的映象。
這是透過向任務或 Operator 傳遞 executor_config 引數實現的。以下是為一個將在 KubernetesExecutor 上執行的任務設定 Docker 映象的示例
MyOperator(...,
executor_config={
"KubernetesExecutor":
{"image": "myCustomDockerImage"}
}
)
您可以傳遞給 executor_config 的設定因執行器而異,因此請閱讀各個執行器文件以瞭解您可以設定的內容。