Airflow Summit 2025 將於 10 月 7 日至 9 日舉行。立即註冊享早鳥票!

XComs

XComs(“cross-communications” 的縮寫,意為跨任務通訊)是一種機制,允許任務之間相互通訊,因為預設情況下任務是完全隔離的,並且可能在完全不同的機器上執行。

XCom 透過一個 key(本質上是它的名稱)以及它來自的 task_iddag_id 進行標識。它們可以擁有任何可序列化的值(包括使用 @dataclass@attr.define 裝飾的物件,參閱TaskFlow 引數),但它們僅設計用於傳遞少量資料;不要使用它們來傳遞大量值,例如資料框。

XCom 透過 Task 例項上的 xcom_pushxcom_pull 方法被明確地“推送”(push)和“拉取”(pull)到/從其儲存中。

在名為 “task-1” 的任務中推送一個將被另一個任務使用的值

# pushes data in any_serializable_value into xcom with key "identifier as string"
task_instance.xcom_push(key="identifier as a string", value=any_serializable_value)

在不同的任務中拉取上述程式碼中推送的值

# pulls the xcom variable with key "identifier as string" that was pushed from within task-1
task_instance.xcom_pull(key="identifier as string", task_ids="task-1")

如果 do_xcom_push 引數設定為 True(預設即如此),許多運算子會自動將其結果推送到名為 return_value 的 XCom 鍵中,@task 函式也是如此。xcom_pull 在沒有傳入鍵時,預設使用 return_value 作為鍵,這意味著可以像這樣編寫程式碼

# Pulls the return_value XCOM from "pushing_task"
value = task_instance.xcom_pull(task_ids='pushing_task')

您也可以在模板中使用 XComs

SELECT * FROM {{ task_instance.xcom_pull(task_ids='foo', key='table_name') }}

XComs 與變數相關,主要區別在於 XComs 是每個 Task 例項的,設計用於在 DAG 執行內部進行通訊,而變數是全域性的,設計用於整體配置和值共享。

如果您想一次推送多個 XComs,可以將 do_xcom_pushmultiple_outputs 引數設定為 True,然後返回一個包含多個值的字典。

推送多個 XComs 並單獨拉取的示例

# A task returning a dictionary
@task(do_xcom_push=True, multiple_outputs=True)
def push_multiple(**context):
    return {"key1": "value1", "key2": "value2"}


@task
def xcom_pull_with_multiple_outputs(**context):
    # Pulling a specific key from the multiple outputs
    key1 = context["ti"].xcom_pull(task_ids="push_multiple", key="key1")  # to pull key1
    key2 = context["ti"].xcom_pull(task_ids="push_multiple", key="key2")  # to pull key2

    # Pulling entire xcom data from push_multiple task
    data = context["ti"].xcom_pull(task_ids="push_multiple", key="return_value")

注意

如果第一次任務執行未成功,那麼在每次重試任務時,XComs 將被清除,以使任務執行具有冪等性。

物件儲存 XCom 後端

預設的 XCom 後端 BaseXCom 將 XCom 儲存在 Airflow 資料庫中,這對於小值效果很好,但處理大值或大量 XCom 時可能導致問題。為克服此限制,推薦使用物件儲存來高效處理更大規模資料。詳細概述請參考文件

自定義 XCom 後端

XCom 系統具有可互換的後端,您可以透過 xcom_backend 配置選項來設定使用哪個後端。

如果您想實現自己的後端,應該繼承 BaseXCom 類,並重寫 serialize_valuedeserialize_value 方法。

您可以重寫 BaseXCom 類中的 purge 方法,以便控制從自定義後端清除 XCom 資料。此方法會在 delete 過程中被呼叫。

在容器中驗證自定義 XCom 後端的使用

根據 Airflow 的部署環境,例如本地、Docker、K8s 等,確認自定義 XCom 後端是否實際初始化非常有用。例如,容器環境的複雜性使得在容器部署期間更難確定後端是否正確載入。幸運的是,可以使用以下指導來幫助您增強對自定義 XCom 實現的信心。

如果您可以 exec 進入 Airflow 容器的終端,您就可以打印出正在使用的實際 XCom 類

from airflow.models.xcom import XCom

print(XCom.__name__)

此條目有幫助嗎?