Airflow 峰會 2025 將於 10 月 07-09 日舉行。立即註冊獲取早鳥票!

運算子

運算子在概念上是一個預定義任務的模板,你可以在 DAG 中宣告式地定義它

with DAG("my-dag") as dag:
    ping = HttpOperator(endpoint="http://example.com/update/")
    email = EmailOperator(to="admin@example.com", subject="Update complete")

    ping >> email

Airflow 提供了非常豐富的運算子集,其中一些是內置於核心或預安裝的 Provider 中的。核心模組中一些常用的運算子包括

  • BashOperator - 執行一個 bash 命令

  • PythonOperator - 呼叫一個任意的 Python 函式

  • 使用 @task 裝飾器執行任意 Python 函式。它不支援渲染作為引數傳遞的 Jinja 模板。

注意

建議使用 @task 裝飾器而非傳統的 PythonOperator 來執行不帶引數模板渲染的 Python 可呼叫物件。

有關所有核心運算子的列表,請參閱:核心運算子和鉤子參考

如果你需要的運算子預設未隨 Airflow 安裝,你很可能在我們龐大的社群 provider 集中找到它。這裡的一些常用運算子包括

但還有非常非常多 - 你可以在我們的 provider 包 文件中看到所有社群管理的運算子、鉤子、感測器和傳輸器的完整列表。

注意

在 Airflow 程式碼內部,我們經常混用任務和運算子的概念,並且它們大部分是可以互換的。然而,當我們談論*任務*時,我們指的是 DAG 的通用“執行單元”;當我們談論*運算子*時,我們指的是一個可重用、預製的任務模板,其邏輯已為你完成,只需傳入一些引數。

Jinja 模板

Airflow 利用了 Jinja 模板 的強大功能,結合 使用時,這會成為一個強大的工具。

例如,假設你想使用 BashOperator 將資料區間的開始時間作為環境變數傳遞給 Bash 指令碼

# The start of the data interval as YYYY-MM-DD
date = "{{ ds }}"
t = BashOperator(
    task_id="test_env",
    bash_command="/tmp/test.sh ",
    dag=dag,
    env={"DATA_INTERVAL_START": date},
)

在這裡,{{ ds }} 是一個模板變數,並且由於 BashOperatorenv 引數使用了 Jinja 模板,資料區間的開始日期將作為名為 DATA_INTERVAL_START 的環境變數在你的 Bash 指令碼中可用。

當 Python 比 Jinja 模板更易讀時,你也可以傳入一個可呼叫物件。該可呼叫物件必須接受兩個命名引數 contextjinja_env

def build_complex_command(context, jinja_env):
    with open("file.csv") as f:
        return do_complex_things(f)


t = BashOperator(
    task_id="complex_templated_echo",
    bash_command=build_complex_command,
    dag=dag,
)

由於每個模板欄位只渲染一次,可呼叫物件的返回值不會再次透過渲染。因此,可呼叫物件必須手動渲染任何模板。這可以透過在當前任務上呼叫 render_template() 來完成,如下所示

def build_complex_command(context, jinja_env):
    with open("file.csv") as f:
        data = do_complex_things(f)
    return context["task"].render_template(data, context, jinja_env)

你可以對文件中標註為“模板化”的每個引數使用模板。模板替換髮生在呼叫運算子的 pre_execute 函式之前。

你還可以對巢狀欄位使用模板,只要這些巢狀欄位在其所屬結構中被標記為模板化:在 template_fields 屬性中註冊的欄位將進行模板替換,如下例中的 path 欄位。

class MyDataReader:
    template_fields: Sequence[str] = ("path",)

    def __init__(self, my_path):
        self.path = my_path

    # [additional code here...]


t = PythonOperator(
    task_id="transform_data",
    python_callable=transform_data,
    op_args=[MyDataReader("/tmp/{{ ds }}/my_file")],
    dag=dag,
)

注意

template_fields 屬性是一個類變數,並保證是 Sequence[str] 型別(即字串的列表或元組)。

深度巢狀欄位也可以被替換,只要所有中間欄位都標記為模板欄位。

class MyDataTransformer:
    template_fields: Sequence[str] = ("reader",)

    def __init__(self, my_reader):
        self.reader = my_reader

    # [additional code here...]


class MyDataReader:
    template_fields: Sequence[str] = ("path",)

    def __init__(self, my_path):
        self.path = my_path

    # [additional code here...]


t = PythonOperator(
    task_id="transform_data",
    python_callable=transform_data,
    op_args=[MyDataTransformer(MyDataReader("/tmp/{{ ds }}/my_file"))],
    dag=dag,
)

你可以在建立 DAG 時向 Jinja 的 Environment 傳入自定義選項。一個常用例子是避免 Jinja 刪除模板字串中的尾隨換行符。

my_dag = DAG(
    dag_id="my-dag",
    jinja_environment_kwargs={
        "keep_trailing_newline": True,
        # some other jinja2 Environment options here
    },
)

請參閱 Jinja 文件 以找到所有可用選項。

一些運算子在渲染欄位時,也會將以特定字尾(在 template_ext 中定義)結尾的字串視為對檔案的引用。這對於直接從檔案載入指令碼或查詢非常有用,而無需將它們包含在 DAG 程式碼中。

例如,考慮一個執行多行 bash 指令碼的 BashOperator,它將載入檔案 script.sh 並將其內容用作 bash_command 的值。

run_script = BashOperator(
    task_id="run_script",
    bash_command="script.sh",
)

預設情況下,以這種方式提供的路徑應相對於 DAG 資料夾(因為這是預設的 Jinja 模板搜尋路徑),但可以透過在 DAG 上設定 template_searchpath 引數來新增額外路徑。

在某些情況下,你可能希望將字串排除在模板化之外並直接使用它。考慮以下任務

print_script = BashOperator(
    task_id="print_script",
    bash_command="cat script.sh",
)

這將失敗,並出現 TemplateNotFound: cat script.sh 錯誤,因為 Airflow 會將該字串視為檔案路徑而非命令。我們可以透過將其包裝在 literal() 中來阻止 Airflow 將此值視為對檔案的引用。此方法停用宏和檔案的渲染,可應用於選定的巢狀欄位,同時保留其餘內容的預設模板化規則。

from airflow.sdk import literal


fixed_print_script = BashOperator(
    task_id="fixed_print_script",
    bash_command=literal("cat script.sh"),
)

自 2.8 版新增:已新增 literal()

或者,如果你想阻止 Airflow 將某個值視為對檔案的引用,可以覆蓋 template_ext

fixed_print_script = BashOperator(
    task_id="fixed_print_script",
    bash_command="cat script.sh",
)
fixed_print_script.template_ext = ()

將欄位渲染為原生 Python 物件

預設情況下,template_fields 中的所有 Jinja 模板都渲染為字串。然而,這並非總是期望的行為。例如,假設一個 extract 任務將字典 {"1001": 301.27, "1002": 433.21, "1003": 502.22} 推送到 XCom

@task(task_id="extract")
def extract():
    data_string = '{"1001": 301.27, "1002": 433.21, "1003": 502.22}'
    return json.loads(data_string)

如果一個任務依賴於 extractorder_data 引數將傳入字串 "{'1001': 301.27, '1002': 433.21, '1003': 502.22}"

def transform(order_data):
    total_order_value = sum(order_data.values())  # Fails because order_data is a str :(
    return {"total_order_value": total_order_value}


transform = PythonOperator(
    task_id="transform",
    op_kwargs={"order_data": "{{ ti.xcom_pull('extract') }}"},
    python_callable=transform,
)

extract() >> transform

如果想獲得實際的字典物件,有兩種解決方案。第一種是使用可呼叫物件

def render_transform_op_kwargs(context, jinja_env):
    order_data = context["ti"].xcom_pull("extract")
    return {"order_data": order_data}


transform = PythonOperator(
    task_id="transform",
    op_kwargs=render_transform_op_kwargs,
    python_callable=transform,
)

或者,也可以指示 Jinja 渲染原生 Python 物件。這可以透過向 DAG 傳遞 render_template_as_native_obj=True 來實現。這將使 Airflow 使用 NativeEnvironment 而非預設的 SandboxedEnvironment

with DAG(
    dag_id="example_template_as_python_object",
    schedule=None,
    start_date=pendulum.datetime(2021, 1, 1, tz="UTC"),
    catchup=False,
    render_template_as_native_obj=True,
):
    transform = PythonOperator(
        task_id="transform",
        op_kwargs={"order_data": "{{ ti.xcom_pull('extract') }}"},
        python_callable=transform,
    )

保留的 params 關鍵字

在 Apache Airflow 2.2.0 中,params 變數在 DAG 序列化期間使用。請不要在第三方運算子中使用該名稱。如果你升級環境並遇到以下錯誤

AttributeError: 'str' object has no attribute '__module__'

請更改運算子中 params 的名稱。

模板與 f-string 的衝突

使用 Python f-string 構造模板化欄位(例如 BashOperator 中的 bash_command)的字串時,請注意 f-string 插值與 Jinja 模板語法的互動。兩者都使用大括號 ({})。

Python f-string 將雙大括號 ({{}}`) 解釋為文字單大括號 ({}`) 的轉義序列。然而,Jinja 使用雙大括號 ({{ variable }}`) 來表示模板化變數。

如果你需要在用 f-string 定義的字串中字面包含一個 Jinja 模板表示式(例如,{{ ds }}`),以便 Airflow 的 Jinja 引擎稍後可以處理它,你必須透過將 f-string 的大括號*再次*加倍來轉義它們。這意味著使用四個大括號

t1 = BashOperator(
    task_id="fstring_templating_correct",
    bash_command=f"echo Data interval start: {{{{ ds }}}}",
    dag=dag,
)

python_var = "echo Data interval start:"

t2 = BashOperator(
    task_id="fstring_templating_simple",
    bash_command=f"{python_var} {{{{ ds }}}}",
    dag=dag,
)

這確保 f-string 處理的結果字串包含 Jinja 所需的字面雙大括號,Airflow 隨後可以在執行前正確地對其進行模板化。未能做到這一點是初學者常犯的錯誤,可能導致 DAG 解析期間的錯誤,或在模板化未按預期發生時導致執行時出現意外行為。

此條目是否有幫助?