運算子¶
運算子在概念上是一個預定義任務的模板,你可以在 DAG 中宣告式地定義它
with DAG("my-dag") as dag:
ping = HttpOperator(endpoint="http://example.com/update/")
email = EmailOperator(to="admin@example.com", subject="Update complete")
ping >> email
Airflow 提供了非常豐富的運算子集,其中一些是內置於核心或預安裝的 Provider 中的。核心模組中一些常用的運算子包括
BashOperator- 執行一個 bash 命令PythonOperator- 呼叫一個任意的 Python 函式使用
@task裝飾器執行任意 Python 函式。它不支援渲染作為引數傳遞的 Jinja 模板。
注意
建議使用 @task 裝飾器而非傳統的 PythonOperator 來執行不帶引數模板渲染的 Python 可呼叫物件。
有關所有核心運算子的列表,請參閱:核心運算子和鉤子參考。
如果你需要的運算子預設未隨 Airflow 安裝,你很可能在我們龐大的社群 provider 集中找到它。這裡的一些常用運算子包括
但還有非常非常多 - 你可以在我們的 provider 包 文件中看到所有社群管理的運算子、鉤子、感測器和傳輸器的完整列表。
注意
在 Airflow 程式碼內部,我們經常混用任務和運算子的概念,並且它們大部分是可以互換的。然而,當我們談論*任務*時,我們指的是 DAG 的通用“執行單元”;當我們談論*運算子*時,我們指的是一個可重用、預製的任務模板,其邏輯已為你完成,只需傳入一些引數。
Jinja 模板¶
Airflow 利用了 Jinja 模板 的強大功能,結合 宏 使用時,這會成為一個強大的工具。
例如,假設你想使用 BashOperator 將資料區間的開始時間作為環境變數傳遞給 Bash 指令碼
# The start of the data interval as YYYY-MM-DD
date = "{{ ds }}"
t = BashOperator(
task_id="test_env",
bash_command="/tmp/test.sh ",
dag=dag,
env={"DATA_INTERVAL_START": date},
)
在這裡,{{ ds }} 是一個模板變數,並且由於 BashOperator 的 env 引數使用了 Jinja 模板,資料區間的開始日期將作為名為 DATA_INTERVAL_START 的環境變數在你的 Bash 指令碼中可用。
當 Python 比 Jinja 模板更易讀時,你也可以傳入一個可呼叫物件。該可呼叫物件必須接受兩個命名引數 context 和 jinja_env
def build_complex_command(context, jinja_env):
with open("file.csv") as f:
return do_complex_things(f)
t = BashOperator(
task_id="complex_templated_echo",
bash_command=build_complex_command,
dag=dag,
)
由於每個模板欄位只渲染一次,可呼叫物件的返回值不會再次透過渲染。因此,可呼叫物件必須手動渲染任何模板。這可以透過在當前任務上呼叫 render_template() 來完成,如下所示
def build_complex_command(context, jinja_env):
with open("file.csv") as f:
data = do_complex_things(f)
return context["task"].render_template(data, context, jinja_env)
你可以對文件中標註為“模板化”的每個引數使用模板。模板替換髮生在呼叫運算子的 pre_execute 函式之前。
你還可以對巢狀欄位使用模板,只要這些巢狀欄位在其所屬結構中被標記為模板化:在 template_fields 屬性中註冊的欄位將進行模板替換,如下例中的 path 欄位。
class MyDataReader:
template_fields: Sequence[str] = ("path",)
def __init__(self, my_path):
self.path = my_path
# [additional code here...]
t = PythonOperator(
task_id="transform_data",
python_callable=transform_data,
op_args=[MyDataReader("/tmp/{{ ds }}/my_file")],
dag=dag,
)
注意
template_fields 屬性是一個類變數,並保證是 Sequence[str] 型別(即字串的列表或元組)。
深度巢狀欄位也可以被替換,只要所有中間欄位都標記為模板欄位。
class MyDataTransformer:
template_fields: Sequence[str] = ("reader",)
def __init__(self, my_reader):
self.reader = my_reader
# [additional code here...]
class MyDataReader:
template_fields: Sequence[str] = ("path",)
def __init__(self, my_path):
self.path = my_path
# [additional code here...]
t = PythonOperator(
task_id="transform_data",
python_callable=transform_data,
op_args=[MyDataTransformer(MyDataReader("/tmp/{{ ds }}/my_file"))],
dag=dag,
)
你可以在建立 DAG 時向 Jinja 的 Environment 傳入自定義選項。一個常用例子是避免 Jinja 刪除模板字串中的尾隨換行符。
my_dag = DAG(
dag_id="my-dag",
jinja_environment_kwargs={
"keep_trailing_newline": True,
# some other jinja2 Environment options here
},
)
請參閱 Jinja 文件 以找到所有可用選項。
一些運算子在渲染欄位時,也會將以特定字尾(在 template_ext 中定義)結尾的字串視為對檔案的引用。這對於直接從檔案載入指令碼或查詢非常有用,而無需將它們包含在 DAG 程式碼中。
例如,考慮一個執行多行 bash 指令碼的 BashOperator,它將載入檔案 script.sh 並將其內容用作 bash_command 的值。
run_script = BashOperator(
task_id="run_script",
bash_command="script.sh",
)
預設情況下,以這種方式提供的路徑應相對於 DAG 資料夾(因為這是預設的 Jinja 模板搜尋路徑),但可以透過在 DAG 上設定 template_searchpath 引數來新增額外路徑。
在某些情況下,你可能希望將字串排除在模板化之外並直接使用它。考慮以下任務
print_script = BashOperator(
task_id="print_script",
bash_command="cat script.sh",
)
這將失敗,並出現 TemplateNotFound: cat script.sh 錯誤,因為 Airflow 會將該字串視為檔案路徑而非命令。我們可以透過將其包裝在 literal() 中來阻止 Airflow 將此值視為對檔案的引用。此方法停用宏和檔案的渲染,可應用於選定的巢狀欄位,同時保留其餘內容的預設模板化規則。
from airflow.sdk import literal
fixed_print_script = BashOperator(
task_id="fixed_print_script",
bash_command=literal("cat script.sh"),
)
自 2.8 版新增:已新增 literal()。
或者,如果你想阻止 Airflow 將某個值視為對檔案的引用,可以覆蓋 template_ext
fixed_print_script = BashOperator(
task_id="fixed_print_script",
bash_command="cat script.sh",
)
fixed_print_script.template_ext = ()
將欄位渲染為原生 Python 物件¶
預設情況下,template_fields 中的所有 Jinja 模板都渲染為字串。然而,這並非總是期望的行為。例如,假設一個 extract 任務將字典 {"1001": 301.27, "1002": 433.21, "1003": 502.22} 推送到 XCom
@task(task_id="extract")
def extract():
data_string = '{"1001": 301.27, "1002": 433.21, "1003": 502.22}'
return json.loads(data_string)
如果一個任務依賴於 extract,order_data 引數將傳入字串 "{'1001': 301.27, '1002': 433.21, '1003': 502.22}"
def transform(order_data):
total_order_value = sum(order_data.values()) # Fails because order_data is a str :(
return {"total_order_value": total_order_value}
transform = PythonOperator(
task_id="transform",
op_kwargs={"order_data": "{{ ti.xcom_pull('extract') }}"},
python_callable=transform,
)
extract() >> transform
如果想獲得實際的字典物件,有兩種解決方案。第一種是使用可呼叫物件
def render_transform_op_kwargs(context, jinja_env):
order_data = context["ti"].xcom_pull("extract")
return {"order_data": order_data}
transform = PythonOperator(
task_id="transform",
op_kwargs=render_transform_op_kwargs,
python_callable=transform,
)
或者,也可以指示 Jinja 渲染原生 Python 物件。這可以透過向 DAG 傳遞 render_template_as_native_obj=True 來實現。這將使 Airflow 使用 NativeEnvironment 而非預設的 SandboxedEnvironment
with DAG(
dag_id="example_template_as_python_object",
schedule=None,
start_date=pendulum.datetime(2021, 1, 1, tz="UTC"),
catchup=False,
render_template_as_native_obj=True,
):
transform = PythonOperator(
task_id="transform",
op_kwargs={"order_data": "{{ ti.xcom_pull('extract') }}"},
python_callable=transform,
)
保留的 params 關鍵字¶
在 Apache Airflow 2.2.0 中,params 變數在 DAG 序列化期間使用。請不要在第三方運算子中使用該名稱。如果你升級環境並遇到以下錯誤
AttributeError: 'str' object has no attribute '__module__'
請更改運算子中 params 的名稱。
模板與 f-string 的衝突¶
使用 Python f-string 構造模板化欄位(例如 BashOperator 中的 bash_command)的字串時,請注意 f-string 插值與 Jinja 模板語法的互動。兩者都使用大括號 ({})。
Python f-string 將雙大括號 ({{ 和 }}`) 解釋為文字單大括號 ({ 和 }`) 的轉義序列。然而,Jinja 使用雙大括號 ({{ variable }}`) 來表示模板化變數。
如果你需要在用 f-string 定義的字串中字面包含一個 Jinja 模板表示式(例如,{{ ds }}`),以便 Airflow 的 Jinja 引擎稍後可以處理它,你必須透過將 f-string 的大括號*再次*加倍來轉義它們。這意味著使用四個大括號
t1 = BashOperator(
task_id="fstring_templating_correct",
bash_command=f"echo Data interval start: {{{{ ds }}}}",
dag=dag,
)
python_var = "echo Data interval start:"
t2 = BashOperator(
task_id="fstring_templating_simple",
bash_command=f"{python_var} {{{{ ds }}}}",
dag=dag,
)
這確保 f-string 處理的結果字串包含 Jinja 所需的字面雙大括號,Airflow 隨後可以在執行前正確地對其進行模板化。未能做到這一點是初學者常犯的錯誤,可能導致 DAG 解析期間的錯誤,或在模板化未按預期發生時導致執行時出現意外行為。