PythonOperator¶
使用 PythonOperator 來執行 Python 可呼叫物件。
提示
建議使用 @task 裝飾器而不是經典的 PythonOperator 來執行 Python 可呼叫物件。
airflow/example_dags/example_python_decorator.py
@task(task_id="print_the_context")
def print_context(ds=None, **kwargs):
"""Print the Airflow context and ds variable from the context."""
pprint(kwargs)
print(ds)
return "Whatever you return gets printed in the logs"
run_this = print_context()
airflow/example_dags/example_python_operator.py
def print_context(ds=None, **kwargs):
"""Print the Airflow context and ds variable from the context."""
print("::group::All kwargs")
pprint(kwargs)
print("::endgroup::")
print("::group::Context variable ds")
print(ds)
print("::endgroup::")
return "Whatever you return gets printed in the logs"
run_this = PythonOperator(task_id="print_the_context", python_callable=print_context)
傳入引數¶
您可以像處理普通 Python 函式一樣,將額外引數傳遞給 @task 裝飾的函式。
airflow/example_dags/example_python_decorator.py
# Generate 5 sleeping tasks, sleeping from 0.0 to 0.4 seconds respectively
@task
def my_sleeping_function(random_base):
"""This is a function that will run within the DAG execution"""
time.sleep(random_base)
for i in range(5):
sleeping_task = my_sleeping_function.override(task_id=f"sleep_for_{i}")(random_base=i / 10)
run_this >> log_the_sql >> sleeping_task
airflow/example_dags/example_python_operator.py
# Generate 5 sleeping tasks, sleeping from 0.0 to 0.4 seconds respectively
def my_sleeping_function(random_base):
"""This is a function that will run within the DAG execution"""
time.sleep(random_base)
for i in range(5):
sleeping_task = PythonOperator(
task_id=f"sleep_for_{i}", python_callable=my_sleeping_function, op_kwargs={"random_base": i / 10}
)
run_this >> log_the_sql >> sleeping_task
模板化¶
Airflow 會傳入一組額外的關鍵字引數:每個 Jinja 模板變數 對應一個,還有一個 templates_dict 引數。
templates_dict、op_args、op_kwargs 引數是模板化的,因此字典中的每個值都會被評估為一個 Jinja 模板。
airflow/example_dags/example_python_decorator.py
@task(task_id="log_sql_query", templates_dict={"query": "sql/sample.sql"}, templates_exts=[".sql"])
def log_sql(**kwargs):
log.info("Python task decorator query: %s", str(kwargs["templates_dict"]["query"]))
log_the_sql = log_sql()
airflow/example_dags/example_python_operator.py
def log_sql(**kwargs):
log.info("Python task decorator query: %s", str(kwargs["templates_dict"]["query"]))
log_the_sql = PythonOperator(
task_id="log_sql_query",
python_callable=log_sql,
templates_dict={"query": "sql/sample.sql"},
templates_exts=[".sql"],
)
上下文¶
Context 是一個字典物件,包含有關 DagRun 環境的資訊。例如,選擇 task_instance 將獲取當前執行的 TaskInstance 物件。
它可以隱式使用,例如使用 **kwargs,也可以使用 get_current_context() 顯式使用。在這種情況下,型別提示可用於靜態分析。
PythonVirtualenvOperator¶
使用 PythonVirtualenvOperator 裝飾器在新的 Python 虛擬環境中執行 Python 可呼叫物件。virtualenv 包需要在執行 Airflow 的環境中安裝(作為可選依賴項 pip install apache-airflow[virtualenv] --constraint ...)。
此外,cloudpickle 包需要作為可選依賴項安裝,使用命令 pip install [cloudpickle] --constraint ...。此包取代了當前使用的 dill 包。Cloudpickle 的強大優勢在於它專注於標準的 pickling 協議,確保更廣泛的相容性和更流暢的資料交換,同時仍能有效地處理函式中的常見 Python 物件和全域性變數。
提示
建議使用 @task.virtualenv 裝飾器而不是經典的 PythonVirtualenvOperator 來在新的 Python 虛擬環境中執行 Python 可呼叫物件。
airflow/example_dags/example_python_decorator.py
@task.virtualenv(
task_id="virtualenv_python", requirements=["colorama==0.4.0"], system_site_packages=False
)
def callable_virtualenv():
"""
Example function that will be performed in a virtual environment.
Importing at the module level ensures that it will not attempt to import the
library before it is installed.
"""
from time import sleep
from colorama import Back, Fore, Style
print(Fore.RED + "some red text")
print(Back.GREEN + "and with a green background")
print(Style.DIM + "and in dim text")
print(Style.RESET_ALL)
for _ in range(4):
print(Style.DIM + "Please wait...", flush=True)
sleep(1)
print("Finished")
virtualenv_task = callable_virtualenv()
airflow/example_dags/example_python_operator.py
def callable_virtualenv():
"""
Example function that will be performed in a virtual environment.
Importing at the function level ensures that it will not attempt to import the
library before it is installed.
"""
from time import sleep
from colorama import Back, Fore, Style
print(Fore.RED + "some red text")
print(Back.GREEN + "and with a green background")
print(Style.DIM + "and in dim text")
print(Style.RESET_ALL)
for _ in range(4):
print(Style.DIM + "Please wait...", flush=True)
sleep(1)
print("Finished")
virtualenv_task = PythonVirtualenvOperator(
task_id="virtualenv_python",
python_callable=callable_virtualenv,
requirements=["colorama==0.4.0"],
system_site_packages=False,
)
傳入引數¶
您可以像處理普通 Python 函式一樣,將額外引數傳遞給 @task.virtualenv 裝飾的函式。遺憾的是,由於與底層庫不相容,Airflow 不支援序列化 var、ti 和 task_instance。對於 Airflow 上下文變數,請確保您可以透過設定 system_site_packages 為 True 來訪問 Airflow,或者將 apache-airflow 新增到 requirements 引數中。否則,您將無法在 op_kwargs 中訪問大多數 Airflow 上下文變數。如果您想要訪問與 datetime 物件相關的上下文,例如 data_interval_start,您可以新增 pendulum 和 lazy_object_proxy。
重要
用於執行的 Python 函式體將從 DAG 中剪切出來,放入一個臨時檔案,不包含周圍的程式碼。正如示例所示,您需要再次新增所有匯入,並且不能依賴於全域性 Python 上下文中的變數。
如果您想將變數傳遞給經典的 PythonVirtualenvOperator,請使用 op_args 和 op_kwargs。
如果需要額外的引數來安裝包,請透過 pip_install_options 引數傳遞,或者像下面的示例一樣使用 requirements.txt
SomePackage==0.2.1 --pre --index-url http://some.archives.com/archives
AnotherPackage==1.4.3 --no-index --find-links /my/local/archives
requirements 檔案格式 中列出了所有支援的選項。
模板化¶
可以像 PythonOperator 中描述的那樣使用 Jinja 模板化。
虛擬環境設定選項¶
虛擬環境是根據工作節點上全域性 Python pip 配置建立的。您可以在環境中使用額外的 ENV 變數或根據 pip 配置 中描述的方式調整通用的 pip 配置。
如果您想使用額外的任務特定私有 Python 倉庫來設定虛擬環境,您可以傳遞 index_urls 引數,這將調整 pip 安裝配置。傳遞的 index urls 將替換標準的系統配置的 index url 設定。為了避免在您的 DAG 程式碼中新增私有倉庫的秘密資訊,您可以使用 Airflow 連線和 Hooks。為此,可以使用連線型別 Package Index (Python)。
在您想要阻止設定虛擬環境的遠端呼叫這種特殊情況下,將 index_urls 作為空列表傳遞,例如 index_urls=[],這將強制 pip 安裝程式使用 --no-index 選項。
快取和重用¶
虛擬環境在任務執行時在臨時目錄中設定。執行後虛擬環境會被再次刪除。請確保您的工作節點上的 $tmp 資料夾有足夠的磁碟空間。通常(如果未另行配置),將使用本地 pip 快取,從而避免每次執行都重新下載包。
但每次執行仍需要一些時間來設定虛擬環境。對於重複執行,您可以將選項 venv_cache_path 設定為工作節點上的檔案系統資料夾。在這種情況下,虛擬環境將設定一次並被重用。如果使用虛擬環境快取,則會根據每個唯一的 requirements 集在快取路徑中建立不同的虛擬環境子資料夾。因此,取決於您的系統中 DAG 的變動情況,需要足夠的磁碟空間。
請注意,在快取模式下,不會進行自動清理。所有工作節點槽位共享同一個虛擬環境,但如果任務反覆排程到不同的工作節點上,則可能發生虛擬環境在多個工作節點上單獨建立的情況。此外,如果工作節點在 Kubernetes POD 中啟動,工作節點的重啟將丟失快取(假設 venv_cache_path 不在持久捲上)。
如果在執行時遇到損壞的快取虛擬環境問題,您可以透過將 Airflow 變數 PythonVirtualenvOperator.cache_key 設定為任意文字來影響快取目錄雜湊值。此變數的內容用於計算快取目錄鍵的向量。
請注意,對快取的虛擬環境進行任何修改(例如二進位制路徑中的臨時檔案、後安裝更多 requirements)可能會汙染快取的虛擬環境,並且 operator 不會維護或清理快取路徑。
ExternalPythonOperator¶
ExternalPythonOperator 可以幫助您使用與 Adags 和主 Airflow 環境不同的 Python 庫集來執行部分任務。這可能是一個虛擬環境或任何預安裝在執行 Airflow 任務的環境中的 Python 安裝。operator 將 Python 二進位制檔案作為 python 引數。請注意,即使是虛擬環境,python 路徑也應指向虛擬環境內的 python 二進位制檔案(通常在虛擬環境的 bin 子目錄中)。與虛擬環境的常規用法相反,不需要 activation 環境。僅僅使用 python 二進位制檔案即可自動啟用它。在下面的兩個示例中,PATH_TO_PYTHON_BINARY 就是這樣的路徑,指向可執行的 Python 二進位制檔案。
使用 ExternalPythonOperator 在預定義的環境中執行 Python 可呼叫物件。virtualenv 包應預安裝在執行 Python 的環境中。如果使用 dill,則必須將其預安裝在環境中(版本與主 Airflow 環境中安裝的版本相同)。
提示
建議使用 @task.external_python 裝飾器而不是經典的 ExternalPythonOperator 來在預定義的 Python 環境中執行 Python 程式碼。
airflow/example_dags/example_python_decorator.py
@task.external_python(task_id="external_python", python=PATH_TO_PYTHON_BINARY)
def callable_external_python():
"""
Example function that will be performed in a virtual environment.
Importing at the module level ensures that it will not attempt to import the
library before it is installed.
"""
import sys
from time import sleep
print(f"Running task via {sys.executable}")
print("Sleeping")
for _ in range(4):
print("Please wait...", flush=True)
sleep(1)
print("Finished")
external_python_task = callable_external_python()
airflow/example_dags/example_python_operator.py
def callable_external_python():
"""
Example function that will be performed in a virtual environment.
Importing at the module level ensures that it will not attempt to import the
library before it is installed.
"""
import sys
from time import sleep
print(f"Running task via {sys.executable}")
print("Sleeping")
for _ in range(4):
print("Please wait...", flush=True)
sleep(1)
print("Finished")
external_python_task = ExternalPythonOperator(
task_id="external_python",
python_callable=callable_external_python,
python=PATH_TO_PYTHON_BINARY,
)
傳入引數¶
您可以像處理普通 Python 函式一樣,將額外引數傳遞給 @task.external_python 裝飾的函式。遺憾的是,由於與底層庫不相容,Airflow 不支援序列化 var 和 ti / task_instance。對於 Airflow 上下文變數,請確保 Airflow 也作為虛擬環境的一部分安裝,且版本與任務執行時的 Airflow 版本相同。否則,您將無法在 op_kwargs 中訪問大多數 Airflow 上下文變數。如果您想要訪問與 datetime 物件相關的上下文,例如 data_interval_start,您可以在虛擬環境中新增 pendulum 和 lazy_object_proxy。
重要
用於執行的 Python 函式體將從 DAG 中剪切出來,放入一個臨時檔案,不包含周圍的程式碼。正如示例所示,您需要再次新增所有匯入,並且不能依賴於全域性 Python 上下文中的變數。
如果您想將變數傳遞給經典的 ExternalPythonOperator,請使用 op_args 和 op_kwargs。
模板化¶
可以像 PythonOperator 中描述的那樣使用 Jinja 模板化。
PythonBranchOperator¶
使用 PythonBranchOperator 來執行 Python 分支 任務。
提示
建議使用 @task.branch 裝飾器而不是經典的 PythonBranchOperator 來執行 Python 程式碼。
airflow/example_dags/example_branch_operator_decorator.py
@task.branch()
def branching(choices: list[str]) -> str:
return f"branch_{random.choice(choices)}"
airflow/example_dags/example_branch_operator.py
branching = BranchPythonOperator(
task_id="branching",
python_callable=lambda: f"branch_{random.choice(options)}",
)
傳入引數和模板化¶
引數傳遞和模板化選項與 PythonOperator 相同。
BranchPythonVirtualenvOperator¶
使用 BranchPythonVirtualenvOperator 裝飾器來執行 Python 分支 任務,它是 PythonBranchOperator 與在虛擬環境中執行的混合體。
提示
建議使用 @task.branch_virtualenv 裝飾器而不是經典的 BranchPythonVirtualenvOperator 來執行 Python 程式碼。
airflow/example_dags/example_branch_operator_decorator.py
# Note: Passing a caching dir allows to keep the virtual environment over multiple runs
# Run the example a second time and see that it reuses it and is faster.
VENV_CACHE_PATH = tempfile.gettempdir()
@task.branch_virtualenv(requirements=["numpy~=1.26.0"], venv_cache_path=VENV_CACHE_PATH)
def branching_virtualenv(choices) -> str:
import random
import numpy as np
print(f"Some numpy stuff: {np.arange(6)}")
return f"venv_{random.choice(choices)}"
airflow/example_dags/example_branch_operator.py
# Note: Passing a caching dir allows to keep the virtual environment over multiple runs
# Run the example a second time and see that it reuses it and is faster.
VENV_CACHE_PATH = Path(tempfile.gettempdir())
def branch_with_venv(choices):
import random
import numpy as np
print(f"Some numpy stuff: {np.arange(6)}")
return f"venv_{random.choice(choices)}"
branching_venv = BranchPythonVirtualenvOperator(
task_id="branching_venv",
requirements=["numpy~=1.26.0"],
venv_cache_path=VENV_CACHE_PATH,
python_callable=branch_with_venv,
op_args=[options],
)
傳入引數和模板化¶
引數傳遞和模板化選項與 PythonOperator 相同。
BranchExternalPythonOperator¶
使用 BranchExternalPythonOperator 來執行 Python 分支 任務,它是 PythonBranchOperator 與在外部 Python 環境中執行的混合體。
提示
建議使用 @task.branch_external_python 裝飾器而不是經典的 BranchExternalPythonOperator 來執行 Python 程式碼。
airflow/example_dags/example_branch_operator_decorator.py
@task.branch_external_python(python=PATH_TO_PYTHON_BINARY)
def branching_ext_python(choices) -> str:
import random
return f"ext_py_{random.choice(choices)}"
airflow/example_dags/example_branch_operator.py
def branch_with_external_python(choices):
import random
return f"ext_py_{random.choice(choices)}"
branching_ext_py = BranchExternalPythonOperator(
task_id="branching_ext_python",
python=PATH_TO_PYTHON_BINARY,
python_callable=branch_with_external_python,
op_args=[options],
)
傳入引數和模板化¶
引數傳遞和模板化選項與 PythonOperator 相同。
ShortCircuitOperator¶
使用 ShortCircuitOperator 來控制當條件滿足或獲得真值時流水線是否繼續。
此條件和真值的評估是透過可呼叫物件的輸出完成的。如果可呼叫物件返回 True 或一個真值,則允許流水線繼續,並將輸出的 XCom 推送出去。如果輸出是 False 或一個假值,則流水線將根據配置的短路邏輯進行短路(稍後會詳細介紹)。在下面的示例中,“condition_is_true”任務之後的任務將執行,而“condition_is_false”任務下游的任務將被跳過。
提示
建議使用 @task.short_circuit 裝飾器而不是經典的 ShortCircuitOperator 來透過 Python 可呼叫物件對流水線進行短路。
airflow/example_dags/example_short_circuit_decorator.py
@task.short_circuit()
def check_condition(condition):
return condition
ds_true = [EmptyOperator(task_id=f"true_{i}") for i in [1, 2]]
ds_false = [EmptyOperator(task_id=f"false_{i}") for i in [1, 2]]
condition_is_true = check_condition.override(task_id="condition_is_true")(condition=True)
condition_is_false = check_condition.override(task_id="condition_is_false")(condition=False)
chain(condition_is_true, *ds_true)
chain(condition_is_false, *ds_false)
airflow/example_dags/example_short_circuit_operator.py
cond_true = ShortCircuitOperator(
task_id="condition_is_True",
python_callable=lambda: True,
)
cond_false = ShortCircuitOperator(
task_id="condition_is_False",
python_callable=lambda: False,
)
ds_true = [EmptyOperator(task_id=f"true_{i}") for i in [1, 2]]
ds_false = [EmptyOperator(task_id=f"false_{i}") for i in [1, 2]]
chain(cond_true, *ds_true)
chain(cond_false, *ds_false)
“短路”可以配置為尊重或忽略下游任務定義的 觸發規則。如果 ignore_downstream_trigger_rules 設定為 True(預設配置),則所有下游任務都會被跳過,不考慮為任務定義的 trigger_rule。如果此引數設定為 False,則直接下游任務會被跳過,但後續其他下游任務指定的 trigger_rule 會被尊重。在此短路配置中,operator 假定直接下游任務被特意跳過,但後續其他任務可能不會被跳過。此配置特別有用,如果只需要短路流水線的 *部分*,而不是跟隨短路任務的所有任務。
在下面的示例中,請注意“short_circuit”任務被配置為尊重下游觸發規則。這意味著雖然跟隨“short_circuit”任務的任務會被跳過(因為裝飾函式返回 False),但“task_7”仍會執行,因為它被設定為在其上游任務無論狀態如何完成執行時執行(即 TriggerRule.ALL_DONE 觸發規則)。
airflow/example_dags/example_short_circuit_decorator.py
[task_1, task_2, task_3, task_4, task_5, task_6] = [
EmptyOperator(task_id=f"task_{i}") for i in range(1, 7)
]
task_7 = EmptyOperator(task_id="task_7", trigger_rule=TriggerRule.ALL_DONE)
short_circuit = check_condition.override(task_id="short_circuit", ignore_downstream_trigger_rules=False)(
condition=False
)
chain(task_1, [task_2, short_circuit], [task_3, task_4], [task_5, task_6], task_7)
airflow/example_dags/example_short_circuit_operator.py
[task_1, task_2, task_3, task_4, task_5, task_6] = [
EmptyOperator(task_id=f"task_{i}") for i in range(1, 7)
]
task_7 = EmptyOperator(task_id="task_7", trigger_rule=TriggerRule.ALL_DONE)
short_circuit = ShortCircuitOperator(
task_id="short_circuit", ignore_downstream_trigger_rules=False, python_callable=lambda: False
)
chain(task_1, [task_2, short_circuit], [task_3, task_4], [task_5, task_6], task_7)
傳入引數和模板化¶
引數傳遞和模板化選項與 PythonOperator 相同。