XComs¶
XComs(“cross-communications” 的縮寫,意為跨任務通訊)是一種機制,允許任務之間相互通訊,因為預設情況下任務是完全隔離的,並且可能在完全不同的機器上執行。
XCom 透過一個 key(本質上是它的名稱)以及它來自的 task_id 和 dag_id 進行標識。它們可以擁有任何可序列化的值(包括使用 @dataclass 或 @attr.define 裝飾的物件,參閱TaskFlow 引數),但它們僅設計用於傳遞少量資料;不要使用它們來傳遞大量值,例如資料框。
XCom 透過 Task 例項上的 xcom_push 和 xcom_pull 方法被明確地“推送”(push)和“拉取”(pull)到/從其儲存中。
在名為 “task-1” 的任務中推送一個將被另一個任務使用的值
# pushes data in any_serializable_value into xcom with key "identifier as string"
task_instance.xcom_push(key="identifier as a string", value=any_serializable_value)
在不同的任務中拉取上述程式碼中推送的值
# pulls the xcom variable with key "identifier as string" that was pushed from within task-1
task_instance.xcom_pull(key="identifier as string", task_ids="task-1")
如果 do_xcom_push 引數設定為 True(預設即如此),許多運算子會自動將其結果推送到名為 return_value 的 XCom 鍵中,@task 函式也是如此。xcom_pull 在沒有傳入鍵時,預設使用 return_value 作為鍵,這意味著可以像這樣編寫程式碼
# Pulls the return_value XCOM from "pushing_task"
value = task_instance.xcom_pull(task_ids='pushing_task')
您也可以在模板中使用 XComs
SELECT * FROM {{ task_instance.xcom_pull(task_ids='foo', key='table_name') }}
XComs 與變數相關,主要區別在於 XComs 是每個 Task 例項的,設計用於在 DAG 執行內部進行通訊,而變數是全域性的,設計用於整體配置和值共享。
如果您想一次推送多個 XComs,可以將 do_xcom_push 和 multiple_outputs 引數設定為 True,然後返回一個包含多個值的字典。
推送多個 XComs 並單獨拉取的示例
# A task returning a dictionary
@task(do_xcom_push=True, multiple_outputs=True)
def push_multiple(**context):
return {"key1": "value1", "key2": "value2"}
@task
def xcom_pull_with_multiple_outputs(**context):
# Pulling a specific key from the multiple outputs
key1 = context["ti"].xcom_pull(task_ids="push_multiple", key="key1") # to pull key1
key2 = context["ti"].xcom_pull(task_ids="push_multiple", key="key2") # to pull key2
# Pulling entire xcom data from push_multiple task
data = context["ti"].xcom_pull(task_ids="push_multiple", key="return_value")
注意
如果第一次任務執行未成功,那麼在每次重試任務時,XComs 將被清除,以使任務執行具有冪等性。
物件儲存 XCom 後端¶
預設的 XCom 後端 BaseXCom 將 XCom 儲存在 Airflow 資料庫中,這對於小值效果很好,但處理大值或大量 XCom 時可能導致問題。為克服此限制,推薦使用物件儲存來高效處理更大規模資料。詳細概述請參考文件。
自定義 XCom 後端¶
XCom 系統具有可互換的後端,您可以透過 xcom_backend 配置選項來設定使用哪個後端。
如果您想實現自己的後端,應該繼承 BaseXCom 類,並重寫 serialize_value 和 deserialize_value 方法。
您可以重寫 BaseXCom 類中的 purge 方法,以便控制從自定義後端清除 XCom 資料。此方法會在 delete 過程中被呼叫。
在容器中驗證自定義 XCom 後端的使用¶
根據 Airflow 的部署環境,例如本地、Docker、K8s 等,確認自定義 XCom 後端是否實際初始化非常有用。例如,容器環境的複雜性使得在容器部署期間更難確定後端是否正確載入。幸運的是,可以使用以下指導來幫助您增強對自定義 XCom 實現的信心。
如果您可以 exec 進入 Airflow 容器的終端,您就可以打印出正在使用的實際 XCom 類
from airflow.models.xcom import XCom
print(XCom.__name__)