Airflow Summit 2025 即將於 10 月 07-09 日舉行。立即註冊以獲取早鳥票!

建立自定義 @task 裝飾器

從 Airflow 2.2 版本開始,可以從 Provider 包中向 TaskFlow 介面新增自定義裝飾器,並使這些裝飾器作為 @task.____ 設計的一部分原生顯示。

例如。假設您正在嘗試建立一個更簡便的機制來將 Python 函式作為“foo”任務執行。建立和註冊 @task.foo 的步驟如下:

  1. 建立 FooDecoratedOperator

    在這種情況下,我們假設您有一個現有的 FooOperator,它將 Python 函式作為引數。透過建立一個繼承自 FooOperatorairflow.decorators.base.DecoratedOperatorFooDecoratedOperator,Airflow 將提供將您的新類視為 TaskFlow 原生類所需的大部分功能。

    您還應該重寫 custom_operator_name 屬性,為任務提供一個自定義名稱。例如,apache-airflow-providers-docker Provider 中的 _DockerDecoratedOperator 將其設定為 @task.docker,以表明它實現的裝飾器名稱。

  2. 建立 foo_task 函式

    一旦您有了裝飾後的類,建立像這樣的函式,將新的 FooDecoratedOperator 轉換為 TaskFlow 函式裝飾器!

    from typing import TYPE_CHECKING
    from airflow.sdk.bases.decorator import task_decorator_factory
    
    if TYPE_CHECKING:
        from airflow.sdk.bases.decorator import TaskDecorator
    
    
    def foo_task(
        python_callable: Callable | None = None,
        multiple_outputs: bool | None = None,
        **kwargs,
    ) -> "TaskDecorator":
        return task_decorator_factory(
            python_callable=python_callable,
            multiple_outputs=multiple_outputs,
            decorated_operator_class=FooDecoratedOperator,
            **kwargs,
        )
    
  3. 在您的 Provider 的 get_provider_info 中註冊新的裝飾器

    最後,在 Provider 入口點返回的字典中新增一個鍵值對 task-decorators,如 如何建立自己的 Provider 中所述。這應該是一個列表,其中每個項包含 nameclass-name 鍵。當 Airflow 啟動時,ProviderManager 類將自動匯入此值,並且 task.foo 將作為一個新的裝飾器生效!

    def get_provider_info():
        return {
            "package-name": "foo-provider-airflow",
            "name": "Foo",
            "task-decorators": [
                {
                    "name": "foo",
                    # "Import path" and function name of the `foo_task`
                    "class-name": "name.of.python.package.foo_task",
                }
            ],
            # ...
        }
    

    請注意,name 必須是有效的 Python 識別符號。

(可選) 新增 IDE 自動補全支援

注意

本節主要適用於 Apache Airflow 託管的 Provider。我們尚未決定是否允許第三方 Provider 以這種方式註冊自動補全。

無論好壞,Python IDE 無法自動補全動態生成的方法(參見 JetBrain 關於此主題的說明)。

為了解決這個問題,提供了一個型別 stub 檔案 airflow/sdk/definitions/decorators/__init__.pyi,用於靜態宣告每個任務裝飾器的型別簽名。新新增的任務裝飾器應像這樣宣告其簽名 stub:

task-sdk/src/airflow/sdk/definitions/decorators/__init__.pyi

    def docker(
        self,
        *,
        multiple_outputs: bool | None = None,
        python_command: str = "python3",
        serializer: Literal["pickle", "cloudpickle", "dill"] | None = None,
        use_dill: bool = False,  # Added by _DockerDecoratedOperator.
        # 'command', 'retrieve_output', and 'retrieve_output_path' are filled by
        # _DockerDecoratedOperator.
        image: str,
        api_version: str | None = None,
        container_name: str | None = None,
        cpus: float = 1.0,
        docker_url: str | None = None,
        environment: dict[str, str] | None = None,
        private_environment: dict[str, str] | None = None,
        env_file: str | None = None,
        force_pull: bool = False,
        mem_limit: float | str | None = None,
        host_tmp_dir: str | None = None,
        network_mode: str | None = None,
        tls_ca_cert: str | None = None,
        tls_client_cert: str | None = None,
        tls_client_key: str | None = None,
        tls_verify: bool = True,
        tls_hostname: str | bool | None = None,
        tls_ssl_version: str | None = None,
        mount_tmp_dir: bool = True,
        tmp_dir: str = "/tmp/airflow",
        user: str | int | None = None,
        mounts: list[Mount] | None = None,
        entrypoint: str | list[str] | None = None,
        working_dir: str | None = None,
        xcom_all: bool = False,
        docker_conn_id: str | None = None,
        dns: list[str] | None = None,
        dns_search: list[str] | None = None,
        auto_remove: Literal["never", "success", "force"] = "never",
        shm_size: int | None = None,
        tty: bool = False,
        hostname: str | None = None,
        privileged: bool = False,
        cap_add: str | None = None,
        extra_hosts: dict[str, str] | None = None,
        timeout: int = 60,
        device_requests: list[dict] | None = None,
        log_opts_max_size: str | None = None,
        log_opts_max_file: str | None = None,
        ipc_mode: str | None = None,
        skip_on_exit_code: int | Container[int] | None = None,
        port_bindings: dict | None = None,
        ulimits: list[dict] | None = None,
        labels: dict[str, str] | list[str] | None = None,
        **kwargs,
    ) -> TaskDecorator:
        """Create a decorator to convert the decorated callable to a Docker task.

        :param multiple_outputs: If set, function return value will be unrolled to multiple XCom values.
            Dict will unroll to XCom values with keys as XCom keys. Defaults to False.
        :param python_command: Python command for executing functions, Default: python3
        :param serializer: Which serializer use to serialize the args and result. It can be one of the following:

            - ``"pickle"``: (default) Use pickle for serialization. Included in the Python Standard Library.
            - ``"cloudpickle"``: Use cloudpickle for serialize more complex types,
              this requires to include cloudpickle in your requirements.
            - ``"dill"``: Use dill for serialize more complex types,
              this requires to include dill in your requirements.
        :param use_dill: Deprecated, use ``serializer`` instead. Whether to use dill to serialize
            the args and result (pickle is default). This allows more complex types
            but requires you to include dill in your requirements.
        :param image: Docker image from which to create the container.
            If image tag is omitted, "latest" will be used.
        :param api_version: Remote API version. Set to ``auto`` to automatically
            detect the server's version.
        :param container_name: Name of the container. Optional (templated)
        :param cpus: Number of CPUs to assign to the container.
            This value gets multiplied with 1024. See
            https://dockerdocs.tw/engine/reference/run/#cpu-share-constraint
        :param docker_url: URL of the host running the docker daemon.
            Default is the value of the ``DOCKER_HOST`` environment variable or unix://var/run/docker.sock
            if it is unset.
        :param environment: Environment variables to set in the container. (templated)
        :param private_environment: Private environment variables to set in the container.
            These are not templated, and hidden from the website.
        :param env_file: Relative path to the ``.env`` file with environment variables to set in the container.
            Overridden by variables in the environment parameter.
        :param force_pull: Pull the docker image on every run. Default is False.
        :param mem_limit: Maximum amount of memory the container can use.
            Either a float value, which represents the limit in bytes,
            or a string like ``128m`` or ``1g``.
        :param host_tmp_dir: Specify the location of the temporary directory on the host which will
            be mapped to tmp_dir. If not provided defaults to using the standard system temp directory.
        :param network_mode: Network mode for the container. It can be one of the following:

            - ``"bridge"``: Create new network stack for the container with default docker bridge network
            - ``"none"``: No networking for this container
            - ``"container:<name|id>"``: Use the network stack of another container specified via <name|id>
            - ``"host"``: Use the host network stack. Incompatible with `port_bindings`
            - ``"<network-name>|<network-id>"``: Connects the container to user created network
              (using ``docker network create`` command)
        :param tls_ca_cert: Path to a PEM-encoded certificate authority
            to secure the docker connection.
        :param tls_client_cert: Path to the PEM-encoded certificate
            used to authenticate docker client.
        :param tls_client_key: Path to the PEM-encoded key used to authenticate docker client.
        :param tls_verify: Set ``True`` to verify the validity of the provided certificate.
        :param tls_hostname: Hostname to match against
            the docker server certificate or False to disable the check.
        :param tls_ssl_version: Version of SSL to use when communicating with docker daemon.
        :param mount_tmp_dir: Specify whether the temporary directory should be bind-mounted
            from the host to the container. Defaults to True
        :param tmp_dir: Mount point inside the container to
            a temporary directory created on the host by the operator.
            The path is also made available via the environment variable
            ``AIRFLOW_TMP_DIR`` inside the container.
        :param user: Default user inside the docker container.
        :param mounts: List of mounts to mount into the container, e.g.
            ``['/host/path:/container/path', '/host/path2:/container/path2:ro']``.
        :param entrypoint: Overwrite the default ENTRYPOINT of the image
        :param working_dir: Working directory to
            set on the container (equivalent to the -w switch the docker client)
        :param xcom_all: Push all the stdout or just the last line.
            The default is False (last line).
        :param docker_conn_id: The :ref:`Docker connection id <howto/connection:docker>`
        :param dns: Docker custom DNS servers
        :param dns_search: Docker custom DNS search domain
        :param auto_remove: Enable removal of the container when the container's process exits. Possible values:

            - ``never``: (default) do not remove container
            - ``success``: remove on success
            - ``force``: always remove container
        :param shm_size: Size of ``/dev/shm`` in bytes. The size must be
            greater than 0. If omitted uses system default.
        :param tty: Allocate pseudo-TTY to the container
            This needs to be set see logs of the Docker container.
        :param hostname: Optional hostname for the container.
        :param privileged: Give extended privileges to this container.
        :param cap_add: Include container capabilities
        :param extra_hosts: Additional hostnames to resolve inside the container,
            as a mapping of hostname to IP address.
        :param device_requests: Expose host resources such as GPUs to the container.
        :param log_opts_max_size: The maximum size of the log before it is rolled.
            A positive integer plus a modifier representing the unit of measure (k, m, or g).
            Eg: 10m or 1g Defaults to -1 (unlimited).
        :param log_opts_max_file: The maximum number of log files that can be present.
            If rolling the logs creates excess files, the oldest file is removed.
            Only effective when max-size is also set. A positive integer. Defaults to 1.
        :param ipc_mode: Set the IPC mode for the container.
        :param skip_on_exit_code: If task exits with this exit code, leave the task
            in ``skipped`` state (default: None). If set to ``None``, any non-zero
            exit code will be treated as a failure.
        :param port_bindings: Publish a container's port(s) to the host. It is a
            dictionary of value where the key indicates the port to open inside the container
            and value indicates the host port that binds to the container port.
            Incompatible with ``"host"`` in ``network_mode``.
        :param ulimits: List of ulimit options to set for the container. Each item should
            be a :py:class:`docker.types.Ulimit` instance.
        :param labels: A dictionary of name-value labels (e.g. ``{"label1": "value1", "label2": "value2"}``)
            or a list of names of labels to set with empty values (e.g. ``["label1", "label2"]``)
        """

簽名應只允許關鍵字引數,包括一個名為 multiple_outputs 的引數,該引數預設自動提供。所有其他引數應直接從真實的 FooOperator 中複製,我們建議添加註釋以說明哪些引數由 FooDecoratedOperator 自動填充,因此不包含在內。

如果新裝飾器可以在沒有引數的情況下使用(例如 @task.python 而不是 @task.python()),您還應該在“真實”定義之後立即新增一個接受單個可呼叫物件 (callable) 的過載 (overload),以便 mypy 可以將該函式識別為“裸裝飾器”(bare decorator)。

task-sdk/src/airflow/sdk/definitions/decorators/__init__.pyi

    @overload
    def python(self, python_callable: Callable[FParams, FReturn]) -> Task[FParams, FReturn]: ...

一旦更改合併並且下一個 Airflow 版本(次要版本或補丁版本)釋出,使用者將能夠在 IDE 自動補全中看到您的裝飾器。此自動補全將根據使用者安裝的 Provider 版本而變化。

請注意,此步驟不是建立可工作的裝飾器所必需的,但可以為 Provider 的使用者提供更好的體驗。

此條目是否有幫助?