airflow.providers.standard.operators.python

屬性

log

PythonOperator

執行一個 Python 可呼叫物件。

BranchPythonOperator

工作流可以在此任務執行後“分支”或遵循一條路徑。

ShortCircuitOperator

根據 python_callable 的結果允許管道繼續執行。

PythonVirtualenvOperator

在一個自動建立和銷燬的虛擬環境中執行一個函式。

BranchPythonVirtualenvOperator

工作流可以在此任務在虛擬環境中執行後“分支”或遵循一條路徑。

ExternalPythonOperator

在不重新建立的虛擬環境中執行一個函式。

BranchExternalPythonOperator

工作流可以在此任務執行後“分支”或遵循一條路徑。

函式

get_current_context()

檢索執行上下文字典,而無需更改使用者方法的簽名。

模組內容

airflow.providers.standard.operators.python.log[source]
class airflow.providers.standard.operators.python.PythonOperator(*, python_callable, op_args=None, op_kwargs=None, templates_dict=None, templates_exts=None, show_return_value_in_logs=True, **kwargs)[source]

基類: airflow.models.baseoperator.BaseOperator

執行一個 Python 可呼叫物件。

另請參閱

有關如何使用此 Operator 的更多資訊,請參閱指南: PythonOperator

執行您的可呼叫物件時,Airflow 將傳遞一組可在函式中使用的關鍵字引數。這組 kwargs 與您在 jinja 模板中使用的內容完全對應。為此,您需要在函式頭中定義 **kwargs,或者您可以直接新增您想要獲取的關鍵字引數 - 例如,使用下面的程式碼,您的可呼叫物件將獲取 ti 上下文變數的值。

使用顯式引數

def my_python_callable(ti):
    pass

使用 kwargs

def my_python_callable(**kwargs):
    ti = kwargs["ti"]
引數:
  • python_callable (Callable) – 對可呼叫物件的引用

  • op_args (collections.abc.Collection[Any] | None) – 呼叫您的可呼叫物件時將被解包的位置引數列表

  • op_kwargs (collections.abc.Mapping[str, Any] | None) – 一個將在您的函式中被解包的關鍵字引數字典

  • templates_dict (dict[str, Any] | None) – 一個字典,其值是模板,這些模板將在 __init__execute 執行之間被 Airflow 引擎模板化,並在應用模板後在您的可呼叫物件的上下文中可用。(已模板化)

  • templates_exts (collections.abc.Sequence[str] | None) – 處理模板化欄位時要解析的副檔名列表,例如 ['.sql', '.hql']

  • show_return_value_in_logs (bool) – 一個布林值,指示是否顯示返回值日誌。預設為 True,允許輸出返回值日誌。當您返回大量資料(例如透過 XCom 向 TaskAPI 傳輸大量資料)時,可以將其設定為 False 以防止輸出返回值日誌。

template_fields: collections.abc.Sequence[str] = ('templates_dict', 'op_args', 'op_kwargs')[source]
template_fields_renderers[source]
BLUE = '#ffefeb'[source]
ui_color = '#ffefeb'[source]
shallow_copy_attrs: collections.abc.Sequence[str] = ('python_callable', 'op_kwargs')[source]
python_callable[source]
op_args = ()[source]
op_kwargs[source]
templates_dict = None[source]
show_return_value_in_logs = True[source]
execute(context)[source]

建立 Operator 時派生。

上下文與渲染 jinja 模板時使用的字典相同。

有關更多上下文資訊,請參閱 get_template_context。

determine_kwargs(context)[source]
execute_callable()[source]

使用給定的引數呼叫 Python 可呼叫物件。

返回:

呼叫的返回值。

返回型別:

Any

class airflow.providers.standard.operators.python.BranchPythonOperator(*, python_callable, op_args=None, op_kwargs=None, templates_dict=None, templates_exts=None, show_return_value_in_logs=True, **kwargs)[source]

基類: PythonOperator, airflow.providers.standard.operators.branch.BranchMixIn

工作流可以在此任務執行後“分支”或遵循一條路徑。

它派生自 PythonOperator,並期望一個 Python 函式返回一個 task_id、一個 task_group_id,或一個要遵循的 task_id(s) 和/或 task_group_id(s) 列表。返回的 task_id(s) 和/或 task_group_id(s) 應該指向 {self} 的直接下游任務或任務組。所有其他“分支”或直接下游任務都被標記為 skipped 狀態,以便這些路徑無法繼續前進。skipped 狀態會向下遊傳播,以允許 DAG 狀態填充並推斷 DAG 執行狀態。

inherits_from_skipmixin = True[source]

用於確定 Operator 是否繼承自 SkipMixin 或其子類(例如 BranchMixin)。

execute(context)[source]

建立 Operator 時派生。

上下文與渲染 jinja 模板時使用的字典相同。

有關更多上下文資訊,請參閱 get_template_context。

class airflow.providers.standard.operators.python.ShortCircuitOperator(*, ignore_downstream_trigger_rules=True, **kwargs)[source]

基類: PythonOperator, airflow.providers.standard.utils.skipmixin.SkipMixin

根據 python_callable 的結果允許管道繼續執行。

ShortCircuitOperator 派生自 PythonOperator,並評估 python_callable 的結果。如果返回結果為 False 或假值,則管道將被短路。下游任務將根據配置的短路模式標記為“skipped”狀態。如果返回結果為 True 或真值,下游任務將正常進行,並且返回結果的 XCom 將被推送。

短路可以配置為遵循或忽略為下游任務設定的 trigger_rule。如果將 ignore_downstream_trigger_rules 設定為 True(預設設定),則所有下游任務都將被跳過,而不考慮為任務定義的 trigger_rule。但是,如果將此引數設定為 False,則直接下游任務將被跳過,但將遵循為所有其他後續下游任務指定的 trigger_rule。在此模式下,Operator 假定直接下游任務是有意跳過的,但其他後續任務可能不是。

另請參閱

有關如何使用此 Operator 的更多資訊,請參閱指南: ShortCircuitOperator

引數:

ignore_downstream_trigger_rules (bool) – 如果設定為 True,則從此 Operator 任務開始的所有下游任務都將被跳過。這是預設行為。如果設定為 False,則直接下游任務將被跳過,但將遵循為所有其他下游任務定義的 trigger_rule

inherits_from_skipmixin = True[source]

用於確定 Operator 是否繼承自 SkipMixin 或其子類(例如 BranchMixin)。

ignore_downstream_trigger_rules = True[source]
execute(context)[source]

建立 Operator 時派生。

上下文與渲染 jinja 模板時使用的字典相同。

有關更多上下文資訊,請參閱 get_template_context。

class airflow.providers.standard.operators.python.PythonVirtualenvOperator(*, python_callable, requirements=None, python_version=None, serializer=None, system_site_packages=True, pip_install_options=None, op_args=None, op_kwargs=None, string_args=None, templates_dict=None, templates_exts=None, expect_airflow=True, skip_on_exit_code=None, index_urls=None, venv_cache_path=None, env_vars=None, inherit_env=True, **kwargs)[source]

基類: _BasePythonVirtualenvOperator

在一個自動建立和銷燬的虛擬環境中執行一個函式。

該函式(有一些注意事項)必須使用 def 定義,且不能是類的一部分。所有匯入都必須發生在函式內部,並且不能引用作用域外部的任何變數。一個名為 virtualenv_string_args 的全域性作用域變數將可用(由 string_args 填充)。此外,可以透過 op_args 和 op_kwargs 傳遞內容,並且可以使用返回值。請注意,如果您的 virtualenv 執行的 Python 主要版本與 Airflow 不同,則不能使用返回值、op_args、op_kwargs,或使用透過外掛提供給 Airflow 的任何宏。但您可以使用 string_args。

另請參閱

有關如何使用此 Operator 的更多資訊,請參閱指南: PythonVirtualenvOperator

引數:
  • python_callable (`Callable`) – 一個沒有外部變數引用的 Python 函式,使用 def 定義,將在虛擬環境中執行。

  • requirements (`None` `|` `collections.abc.Iterable`[`str`] `|` `str`) – 一個需求字串列表,或 pip 指定的(模板化)“需求檔案”。

  • python_version (`str` `|` `None`) – 執行虛擬環境時使用的 Python 版本。請注意,2 和 2.7 都是可接受的形式。

  • serializer (`_SerializerTypeDef` `|` `None`) –

    用於序列化引數和結果的序列化器。可以是以下之一:

    • "pickle":(預設)使用 pickle 進行序列化。包含在 Python 標準庫中。

    • "cloudpickle": 使用 cloudpickle 序列化更復雜的型別,這需要在您的需求中包含 cloudpickle。

    • "dill": 使用 dill 序列化更復雜的型別,這需要在您的需求中包含 dill。

  • system_site_packages (`bool`) – 是否在您的虛擬環境中包含 system_site_packages。更多資訊請參見 virtualenv 文件。

  • pip_install_options (`list`[`str`] `|` `None`) – 安裝需求時使用的 pip 安裝選項列表。有關可用選項,請參見‘pip install -h’。

  • op_args (`collections.abc.Collection`[`Any`] `|` `None`) – 要傳遞給 python_callable 的位置引數列表。

  • op_kwargs (`collections.abc.Mapping`[`str`, `Any`] `|` `None`) – 要傳遞給 python_callable 的關鍵字引數字典。

  • string_args (`collections.abc.Iterable`[`str`] `|` `None`) – 存在於全域性變數 virtualenv_string_args 中的字串,在執行時作為 list[str] 可供 python_callable 使用。注意引數按換行符分割。

  • templates_dict (`dict` `|` `None`) – 一個字典,其值是模板,這些模板將在 __init__execute 執行之間被 Airflow 引擎模板化,並在應用模板後在您的可呼叫物件的上下文中可用

  • templates_exts (`list`[`str`] `|` `None`) – 處理模板化欄位時要解析的副檔名列表,例如 ['.sql', '.hql']

  • expect_airflow (`bool`) – 期望 Airflow 安裝在目標環境中。如果為 true,則如果 Airflow 未安裝,Operator 將發出警告,並在啟動時嘗試載入 Airflow 宏。

  • skip_on_exit_code (`int` `|` `collections.abc.Container`[`int`] `|` `None`) – 如果 python_callable 以此退出程式碼退出,則將任務保留在 skipped 狀態(預設值: None)。如果設定為 None,則任何非零退出程式碼都將被視為失敗。

  • index_urls (`None` `|` `collections.abc.Collection`[`str`] `|` `str`) – 用於載入 Python 包的可選索引 url 列表。如果未提供,將使用系統 pip conf 來獲取包。

  • venv_cache_path (`None` `|` `os.PathLike`[`str`]) – 虛擬環境父資料夾的可選路徑,虛擬環境將在此資料夾中快取,會建立一個子資料夾 venv-{hash},其中 hash 將替換為需求的校驗和。如果未提供,虛擬環境將在每次執行時在臨時資料夾中建立和刪除。

  • env_vars (`dict`[`str`, `str`] `|` `None`) – 一個包含為虛擬環境執行時設定的附加環境變數的字典。

  • inherit_env (`bool`) – 執行虛擬環境時是否繼承當前環境變數。如果設定為 True,虛擬環境將繼承父程序 (os.environ) 的環境變數。如果設定為 False,虛擬環境將使用乾淨的環境執行。

template_fields: collections.abc.Sequence[str][source]
template_ext: collections.abc.Sequence[str] = ('.txt',)[source]
python_version = None[source]
system_site_packages = True[source]
pip_install_options = None[source]
venv_cache_path = None[source]
execute_callable()[source]

使用給定的引數呼叫 Python 可呼叫物件。

返回:

呼叫的返回值。

class airflow.providers.standard.operators.python.BranchPythonVirtualenvOperator(*, python_callable, requirements=None, python_version=None, serializer=None, system_site_packages=True, pip_install_options=None, op_args=None, op_kwargs=None, string_args=None, templates_dict=None, templates_exts=None, expect_airflow=True, skip_on_exit_code=None, index_urls=None, venv_cache_path=None, env_vars=None, inherit_env=True, **kwargs)[source]

基類:PythonVirtualenvOperator, airflow.providers.standard.operators.branch.BranchMixIn

工作流可以在此任務在虛擬環境中執行後“分支”或遵循一條路徑。

它繼承自 PythonVirtualenvOperator 並需要一個 Python 函式,該函式返回單個 task_id、單個 task_group_id,或要遵循的 task_ids 和/或 task_group_ids 列表。返回的 task_id(s) 和/或 task_group_id(s) 應指向緊接在 {self} 下游的任務或任務組。所有其他“分支”或直接下游任務都會被標記為 skipped 狀態,這樣這些路徑就無法繼續。skipped 狀態會向下遊傳播,以便填充 DAG 狀態並推斷 DAG 執行的狀態。

另請參閱

有關如何使用此 Operator 的更多資訊,請參閱指南:BranchPythonVirtualenvOperator

inherits_from_skipmixin = True[source]

用於確定 Operator 是否繼承自 SkipMixin 或其子類(例如 BranchMixin)。

execute(context)[source]

建立 Operator 時派生。

上下文與渲染 jinja 模板時使用的字典相同。

有關更多上下文資訊,請參閱 get_template_context。

class airflow.providers.standard.operators.python.ExternalPythonOperator(*, python, python_callable, serializer=None, op_args=None, op_kwargs=None, string_args=None, templates_dict=None, templates_exts=None, expect_airflow=True, expect_pendulum=False, skip_on_exit_code=None, env_vars=None, inherit_env=True, **kwargs)[source]

基類: _BasePythonVirtualenvOperator

在不重新建立的虛擬環境中執行一個函式。

在不建立虛擬環境的開銷下(帶有一些注意事項)直接重用。

函式必須使用 def 定義,且不能是類的一部分。所有匯入必須發生在函式內部,不能引用作用域之外的變數。一個名為 virtualenv_string_args 的全域性作用域變數將可用(由 string_args 填充)。此外,可以透過 op_args 和 op_kwargs 傳遞內容,並且可以使用返回值。請注意,如果您的虛擬環境執行的 Python 主版本與 Airflow 不同,則不能使用返回值、op_args、op_kwargs,或使用透過外掛提供給 Airflow 的任何宏。但您可以使用 string_args。

如果 Airflow 安裝在外部環境中的版本與 Operator 使用的版本不同,則 Operator 將會失敗。

另請參閱

有關如何使用此 Operator 的更多資訊,請參閱指南:ExternalPythonOperator

引數:
  • python (str) – 完整路徑字串(檔案系統特定),指向虛擬環境中應使用的 Python 二進位制檔案(位於 VENV/bin 資料夾中)。應為絕對路徑(因此通常根據所使用的檔案系統/作業系統以“/”或“X:/”開頭)。

  • python_callable (`Callable`) – 一個沒有外部變數引用的 Python 函式,使用 def 定義,將在虛擬環境中執行。

  • serializer (`_SerializerTypeDef` `|` `None`) –

    用於序列化引數和結果的序列化器。可以是以下之一:

    • "pickle":(預設)使用 pickle 進行序列化。包含在 Python 標準庫中。

    • "cloudpickle": 使用 cloudpickle 序列化更復雜的型別,這需要在您的需求中包含 cloudpickle。

    • "dill": 使用 dill 序列化更復雜的型別,這需要在您的需求中包含 dill。

  • op_args (`collections.abc.Collection`[`Any`] `|` `None`) – 要傳遞給 python_callable 的位置引數列表。

  • op_kwargs (`collections.abc.Mapping`[`str`, `Any`] `|` `None`) – 要傳遞給 python_callable 的關鍵字引數字典。

  • string_args (`collections.abc.Iterable`[`str`] `|` `None`) – 存在於全域性變數 virtualenv_string_args 中的字串,在執行時作為 list[str] 可供 python_callable 使用。注意引數按換行符分割。

  • templates_dict (`dict` `|` `None`) – 一個字典,其值是模板,這些模板將在 __init__execute 執行之間被 Airflow 引擎模板化,並在應用模板後在您的可呼叫物件的上下文中可用

  • templates_exts (`list`[`str`] `|` `None`) – 處理模板化欄位時要解析的副檔名列表,例如 ['.sql', '.hql']

  • expect_airflow (`bool`) – 期望 Airflow 安裝在目標環境中。如果為 true,則如果 Airflow 未安裝,Operator 將發出警告,並在啟動時嘗試載入 Airflow 宏。

  • skip_on_exit_code (`int` `|` `collections.abc.Container`[`int`] `|` `None`) – 如果 python_callable 以此退出程式碼退出,則將任務保留在 skipped 狀態(預設值: None)。如果設定為 None,則任何非零退出程式碼都將被視為失敗。

  • env_vars (`dict`[`str`, `str`] `|` `None`) – 一個包含為虛擬環境執行時設定的附加環境變數的字典。

  • inherit_env (`bool`) – 執行虛擬環境時是否繼承當前環境變數。如果設定為 True,虛擬環境將繼承父程序 (os.environ) 的環境變數。如果設定為 False,虛擬環境將使用乾淨的環境執行。

template_fields: collections.abc.Sequence[str][source]
python[source]
expect_pendulum = False[source]
execute_callable()[source]

使用給定的引數呼叫 Python 可呼叫物件。

返回:

呼叫的返回值。

class airflow.providers.standard.operators.python.BranchExternalPythonOperator(*, python, python_callable, serializer=None, op_args=None, op_kwargs=None, string_args=None, templates_dict=None, templates_exts=None, expect_airflow=True, expect_pendulum=False, skip_on_exit_code=None, env_vars=None, inherit_env=True, **kwargs)[source]

基類:ExternalPythonOperator, airflow.providers.standard.operators.branch.BranchMixIn

工作流可以在此任務執行後“分支”或遵循一條路徑。

繼承 ExternalPythonOperator,因此需要獲取 Python:應使用的虛擬環境(位於 VENV/bin 資料夾中)。應為絕對路徑,以便它可以在單獨的虛擬環境中執行,類似於 ExternalPythonOperator。

另請參閱

有關如何使用此 Operator 的更多資訊,請參閱指南:BranchExternalPythonOperator

execute(context)[source]

建立 Operator 時派生。

上下文與渲染 jinja 模板時使用的字典相同。

有關更多上下文資訊,請參閱 get_template_context。

airflow.providers.standard.operators.python.get_current_context()[source]

檢索執行上下文字典,而不改變使用者方法的簽名。

這是檢索執行上下文字典的最簡單方法。

舊樣式

def my_task(**context):
    ti = context["ti"]

新樣式

from airflow.providers.standard.operators.python import get_current_context


def my_task():
    context = get_current_context()
    ti = context["ti"]

僅當在 Operator 開始執行後呼叫此方法時,當前上下文才會有值。

此條目是否有幫助?