KubernetesPodOperator

KubernetesPodOperator 允許您在 Kubernetes 叢集上建立和執行 Pod。

注意

如果您使用託管 Kubernetes,請考慮使用專門的 KPO Operator,因為它簡化了 Kubernetes 授權過程

注意

使用此 Operator **不**需要Kubernetes executor

此 Operator 如何工作?

KubernetesPodOperator 使用 Kubernetes API 在 Kubernetes 叢集中啟動一個 pod。透過提供映象 URL 和帶有可選引數的命令,Operator 使用 Kube Python Client 生成 Kubernetes API 請求,動態啟動這些單獨的 pod。使用者可以使用 config_file 引數指定 kubeconfig 檔案,否則 Operator 將預設使用 ~/.kube/config

KubernetesPodOperator 支援任務級別的資源配置,對於透過公共 PyPI 倉庫無法獲得的自定義 Python 依賴非常有用。它還允許使用者使用 pod_template_file 引數提供模板 YAML 檔案。最終,它使 Airflow 能夠充當作業編排器 - 無論這些作業是用何種語言編寫的。

除錯 KubernetesPodOperator

透過在 Operator 例項上呼叫 dry_run(),您可以打印出執行時將建立的 pod 的 Kubernetes manifest。

from airflow.providers.cncf.kubernetes.operators.pod import KubernetesPodOperator

k = KubernetesPodOperator(
    name="hello-dry-run",
    image="debian",
    cmds=["bash", "-cx"],
    arguments=["echo", "10"],
    labels={"foo": "bar"},
    task_id="dry_run_demo",
    do_xcom_push=True,
)

k.dry_run()

引數優先順序

當 KPO 定義 pod 物件時,KubernetesPodOperator 引數之間可能存在重疊。一般來說,優先順序順序是 KPO 欄位特定的引數(例如 secretscmdsaffinity),然後是更通用的模板 full_pod_specpod_template_filepod_template_dict,最後預設是 V1Pod

對於 namespace,如果未透過任何這些方法提供 namespace,那麼我們將首先嚐試獲取當前 namespace(如果任務已在 Kubernetes 中執行),如果失敗,我們將使用 default namespace。

對於 pod 名稱,如果未明確提供,我們將使用 task_id。預設情況下會新增一個隨機字尾,因此 pod 名稱通常不太重要。

如何在 Pod 中使用叢集 ConfigMaps、Secrets 和 Volumes?

要新增 ConfigMaps、Volumes 和其他 Kubernetes 原生物件,我們建議您像這樣匯入 Kubernetes 模型 API

from kubernetes.client import models as k8s

透過此 API 物件,您可以以 Python 類形式訪問所有 Kubernetes API 物件。使用此方法將確保正確性和型別安全。雖然我們幾乎刪除了所有 Kubernetes 便捷類,但我們保留了 Secret 類,以簡化生成 secret volumes/env variables 的過程。

tests/system/cncf/kubernetes/example_kubernetes.py

secret_file = Secret("volume", "/etc/sql_conn", "airflow-secrets", "sql_alchemy_conn")
secret_env = Secret("env", "SQL_CONN", "airflow-secrets", "sql_alchemy_conn")
secret_all_keys = Secret("env", None, "airflow-secrets-2")
volume_mount = k8s.V1VolumeMount(
    name="test-volume", mount_path="/root/mount_file", sub_path=None, read_only=True
)

configmaps = [
    k8s.V1EnvFromSource(config_map_ref=k8s.V1ConfigMapEnvSource(name="test-configmap-1")),
    k8s.V1EnvFromSource(config_map_ref=k8s.V1ConfigMapEnvSource(name="test-configmap-2")),
]

volume = k8s.V1Volume(
    name="test-volume",
    persistent_volume_claim=k8s.V1PersistentVolumeClaimVolumeSource(claim_name="test-volume"),
)

port = k8s.V1ContainerPort(name="http", container_port=80)

init_container_volume_mounts = [
    k8s.V1VolumeMount(mount_path="/etc/foo", name="test-volume", sub_path=None, read_only=True)
]

init_environments = [k8s.V1EnvVar(name="key1", value="value1"), k8s.V1EnvVar(name="key2", value="value2")]

init_container = k8s.V1Container(
    name="init-container",
    image="ubuntu:16.04",
    env=init_environments,
    volume_mounts=init_container_volume_mounts,
    command=["bash", "-cx"],
    args=["echo 10"],
)

affinity = k8s.V1Affinity(
    node_affinity=k8s.V1NodeAffinity(
        preferred_during_scheduling_ignored_during_execution=[
            k8s.V1PreferredSchedulingTerm(
                weight=1,
                preference=k8s.V1NodeSelectorTerm(
                    match_expressions=[
                        k8s.V1NodeSelectorRequirement(key="disktype", operator="In", values=["ssd"])
                    ]
                ),
            )
        ]
    ),
    pod_affinity=k8s.V1PodAffinity(
        required_during_scheduling_ignored_during_execution=[
            k8s.V1WeightedPodAffinityTerm(
                weight=1,
                pod_affinity_term=k8s.V1PodAffinityTerm(
                    label_selector=k8s.V1LabelSelector(
                        match_expressions=[
                            k8s.V1LabelSelectorRequirement(key="security", operator="In", values="S1")
                        ]
                    ),
                    topology_key="failure-domain.beta.kubernetes.io/zone",
                ),
            )
        ]
    ),
)

tolerations = [k8s.V1Toleration(key="key", operator="Equal", value="value")]

KubernetesPodOperator 與 Kubernetes 物件規範的區別

KubernetesPodOperator 可以被視為 Kubernetes 物件規範定義的替代品,它能夠在 DAG 上下文中的 Airflow 排程器中執行。如果使用此 Operator,則無需為您想要執行的 Pod 建立等效的 YAML/JSON 物件規範。仍然可以使用 pod_template_file 提供 YAML 檔案,甚至可以透過 full_pod_spec 引數在 Python 中構建 Pod Spec,該引數需要一個 Kubernetes V1Pod

如何使用私有映象(容器 registry)?

預設情況下,KubernetesPodOperator 會查詢在 Dockerhub 上公開託管的映象。要從私有 registry(例如 ECR、GCR、Quay 或其他)拉取映象,您必須建立一個 Kubernetes Secret,它代表訪問私有 registry 中映象的憑據,最終在 image_pull_secrets 引數中指定該 Secret。

使用 kubectl 建立 Secret

kubectl create secret docker-registry testquay \
    --docker-server=quay.io \
    --docker-username=<Profile name> \
    --docker-password=<password>

然後在您的 pod 中像這樣使用它

tests/system/cncf/kubernetes/example_kubernetes.py

    quay_k8s = KubernetesPodOperator(
        namespace="default",
        image="quay.io/apache/bash",
        image_pull_secrets=[k8s.V1LocalObjectReference("testquay")],
        cmds=["bash", "-cx"],
        arguments=["echo", "10", "echo pwd"],
        labels={"foo": "bar"},
        name="airflow-private-image-pod",
        on_finish_action="delete_pod",
        in_cluster=True,
        task_id="task-two",
        get_logs=True,
    )

此外,您還可以此操作中使用可延遲模式的 Operator

tests/system/cncf/kubernetes/example_kubernetes_async.py

    quay_k8s_async = KubernetesPodOperator(
        task_id="kubernetes_private_img_task_async",
        namespace="default",
        image="quay.io/apache/bash",
        image_pull_secrets=[k8s.V1LocalObjectReference("testquay")],
        cmds=["bash", "-cx"],
        arguments=["echo", "10", "echo pwd"],
        labels={"foo": "bar"},
        name="airflow-private-image-pod",
        on_finish_action="delete_pod",
        in_cluster=True,
        get_logs=True,
        deferrable=True,
    )

定期獲取並顯示容器日誌的示例

tests/system/cncf/kubernetes/example_kubernetes_async.py

    kubernetes_task_async_log = KubernetesPodOperator(
        task_id="kubernetes_task_async_log",
        namespace="kubernetes_task_async_log",
        in_cluster=False,
        name="astro_k8s_test_pod",
        image="ubuntu",
        cmds=[
            "bash",
            "-cx",
            (
                "i=0; "
                "while [ $i -ne 100 ]; "
                "do i=$(($i+1)); "
                "echo $i; "
                "sleep 1; "
                "done; "
                "mkdir -p /airflow/xcom/; "
                'echo \'{"message": "good afternoon!"}\' > /airflow/xcom/return.json'
            ),
        ],
        do_xcom_push=True,
        deferrable=True,
        get_logs=True,
        logging_interval=5,
    )

XCom 如何工作?

KubernetesPodOperator 處理 XCom 值的方式與其他 Operator 不同。為了從您的 Pod 中傳遞 XCom 值,您必須將 do_xcom_push 指定為 True。這將建立一個與 Pod 一起執行的 sidecar 容器。Pod 必須將 XCom 值寫入 /airflow/xcom/return.json 路徑下的此位置。

注意

無效的 json 內容將導致失敗,例如 echo 'hello' > /airflow/xcom/return.json 會失敗,而 echo '\"hello\"' > /airflow/xcom/return.json 會成功

請參閱以下示例,瞭解這是如何發生的

tests/system/cncf/kubernetes/example_kubernetes.py

    write_xcom = KubernetesPodOperator(
        namespace="default",
        image="alpine",
        cmds=["sh", "-c", "mkdir -p /airflow/xcom/;echo '[1,2,3,4]' > /airflow/xcom/return.json"],
        name="write-xcom",
        do_xcom_push=True,
        on_finish_action="delete_pod",
        in_cluster=True,
        task_id="write-xcom",
        get_logs=True,
    )

    pod_task_xcom_result = BashOperator(
        bash_command="echo \"{{ task_instance.xcom_pull('write-xcom')[0] }}\"",
        task_id="pod_task_xcom_result",
    )

    write_xcom >> pod_task_xcom_result

注意

XCOMs 只會為標記為 State.SUCCESS 的任務推送。

此外,您還可以此操作中使用可延遲模式的 Operator

tests/system/cncf/kubernetes/example_kubernetes_async.py

    write_xcom_async = KubernetesPodOperator(
        task_id="kubernetes_write_xcom_task_async",
        namespace="default",
        image="alpine",
        cmds=["sh", "-c", "mkdir -p /airflow/xcom/;echo '[1,2,3,4]' > /airflow/xcom/return.json"],
        name="write-xcom",
        do_xcom_push=True,
        on_finish_action="delete_pod",
        in_cluster=True,
        get_logs=True,
        deferrable=True,
    )

    pod_task_xcom_result_async = BashOperator(
        task_id="pod_task_xcom_result_async",
        bash_command="echo \"{{ task_instance.xcom_pull('write-xcom')[0] }}\"",
    )

    write_xcom_async >> pod_task_xcom_result_async

在電子郵件警報中包含錯誤訊息

寫入 /dev/termination-log 的任何內容都將被 Kubernetes 檢索,並在任務失敗時包含在異常訊息中。

k = KubernetesPodOperator(
    task_id="test_error_message",
    image="alpine",
    cmds=["/bin/sh"],
    arguments=["-c", "echo hello world; echo Custom error > /dev/termination-log; exit 1;"],
    name="test-error-message",
    email="airflow@example.com",
    email_on_failure=True,
)

在此處閱讀有關 termination-log 的更多資訊:此處

KubernetesPodOperator 回撥

KubernetesPodOperator 支援不同的回撥,可在 pod 的生命週期中觸發操作。要使用它們,您需要建立一個 KubernetesPodOperatorCallback 的子類,並重寫您想要使用的回撥方法。然後,您可以使用 callbacks 引數將您的回撥類傳遞給 Operator。

支援以下回調

  • on_sync_client_creation: 在建立同步客戶端後呼叫

  • on_pod_creation: 在建立 pod 後呼叫

  • on_pod_starting: 在 pod 啟動後呼叫

  • on_pod_completion: 在 pod 完成時呼叫

  • on_pod_cleanup: 在清理/刪除 pod 後呼叫

  • on_operator_resuming: 從延遲狀態恢復任務時呼叫

  • progress_callback: 在容器日誌的每一行上呼叫

目前,回撥方法在非同步模式下不被呼叫,未來將新增此支援。

示例:

import kubernetes.client as k8s
import kubernetes_asyncio.client as async_k8s

from airflow.providers.cncf.kubernetes.operators.pod import KubernetesPodOperator
from airflow.providers.cncf.kubernetes.callbacks import KubernetesPodOperatorCallback


class MyCallback(KubernetesPodOperatorCallback):
    @staticmethod
    def on_pod_creation(*, pod: k8s.V1Pod, client: k8s.CoreV1Api, mode: str, **kwargs) -> None:
        client.create_namespaced_service(
            namespace=pod.metadata.namespace,
            body=k8s.V1Service(
                metadata=k8s.V1ObjectMeta(
                    name=pod.metadata.name,
                    labels=pod.metadata.labels,
                    owner_references=[
                        k8s.V1OwnerReference(
                            api_version=pod.api_version,
                            kind=pod.kind,
                            name=pod.metadata.name,
                            uid=pod.metadata.uid,
                            controller=True,
                            block_owner_deletion=True,
                        )
                    ],
                ),
                spec=k8s.V1ServiceSpec(
                    selector=pod.metadata.labels,
                    ports=[
                        k8s.V1ServicePort(
                            name="http",
                            port=80,
                            target_port=80,
                        )
                    ],
                ),
            ),
        )


k = KubernetesPodOperator(
    task_id="test_callback",
    image="alpine",
    cmds=["/bin/sh"],
    arguments=["-c", "echo hello world; echo Custom error > /dev/termination-log; exit 1;"],
    name="test-callback",
    callbacks=MyCallback,
)

傳遞 Secrets

切勿使用環境變數將 Secrets(例如連線認證資訊)傳遞給 Kubernetes Pod Operator。此類環境變數對任何有權檢視和描述 Kubernetes 中的 POD 的人都是可見的。相反,請透過原生的 Kubernetes Secrets 傳遞您的 secrets,或使用 Airflow 的 Connections 和 Variables。對於後者,您需要在映象中安裝與執行 Kubernetes Pod Operator 的 airflow 版本相同的 apache-airflow 包)。

參考

有關更多資訊,請參閱

SparkKubernetesOperator

SparkKubernetesOperator 允許您在 Kubernetes 叢集上建立和執行 Spark 作業。它基於 spark-on-k8s-operator 專案。

此 Operator 簡化了介面並接受不同的引數來配置和在 Kubernetes 上執行 Spark 應用程式。與 KubernetesOperator 類似,我們添加了提交作業後等待、管理錯誤處理、從 driver pod 檢索日誌以及刪除 Spark 作業的能力的邏輯。它還支援開箱即用的 Kubernetes 功能,例如處理 volumes、config maps、secrets 等。

此 Operator 如何工作?

該 Operator 透過在 Kubernetes 中生成 SparkApplication Custom Resource Definition (CRD) 來啟動 Spark 任務。此 SparkApplication 任務隨後使用使用者指定的引數生成 driver pod 和所需的 executor pod。Operator 持續監控任務進度,直到成功或失敗。它從 driver pod 檢索日誌並在 Airflow UI 中顯示。

使用示例

為了建立 SparkKubernetesOperator 任務,您必須提供一個包含 Spark 配置和 Kubernetes 相關資源配置的基本模板。此模板可以是 YAML 或 JSON 格式,作為 Operator 的起點。下面是一個可供您使用的示例模板

spark_job_template.yaml

spark:
  apiVersion: sparkoperator.k8s.io/v1beta2
  version: v1beta2
  kind: SparkApplication
  apiGroup: sparkoperator.k8s.io
  metadata:
    namespace: ds
  spec:
    type: Python
    pythonVersion: "3"
    mode: cluster
    sparkVersion: 3.0.0
    successfulRunHistoryLimit: 1
    restartPolicy:
      type: Never
    imagePullPolicy: Always
    hadoopConf: {}
    imagePullSecrets: []
    dynamicAllocation:
      enabled: false
      initialExecutors: 1
      minExecutors: 1
      maxExecutors: 1
    labels: {}
    driver:
      serviceAccount: default
      container_resources:
        gpu:
          name: null
          quantity: 0
        cpu:
          request: null
          limit: null
        memory:
          request: null
          limit: null
    executor:
      instances: 1
      container_resources:
        gpu:
          name: null
          quantity: 0
        cpu:
          request: null
          limit: null
        memory:
          request: null
          limit: null
kubernetes:
  # example:
  # env_vars:
  # - name: TEST_NAME
  #   value: TEST_VALUE
  env_vars: []

  # example:
  # env_from:
  # - name: test
  #   valueFrom:
  #     secretKeyRef:
  #       name: mongo-secret
  #       key: mongo-password
  env_from: []

  # example:
  # node_selector:
  #   karpenter.sh/provisioner-name: spark
  node_selector: {}

  # example: https://kubernetes.club.tw/docs/concepts/scheduling-eviction/assign-pod-node/
  # affinity:
  #   nodeAffinity:
  #     requiredDuringSchedulingIgnoredDuringExecution:
  #       nodeSelectorTerms:
  #       - matchExpressions:
  #         - key: beta.kubernetes.io/instance-type
  #           operator: In
  #           values:
  #           - r5.xlarge
  affinity:
    nodeAffinity: {}
    podAffinity: {}
    podAntiAffinity: {}

  # example: https://kubernetes.club.tw/docs/concepts/scheduling-eviction/taint-and-toleration/
  # type: list
  # tolerations:
  # - key: "key1"
  #   operator: "Equal"
  #   value: "value1"
  #   effect: "NoSchedule"
  tolerations: []

  # example:
  # config_map_mounts:
  #   snowflake-default: /mnt/tmp
  config_map_mounts: {}

  # example:
  # volume_mounts:
  # - name: config
  #   mountPath: /airflow
  volume_mounts: []

  # https://kubernetes.club.tw/docs/concepts/storage/volumes/
  # example:
  # volumes:
  # - name: config
  #   persistentVolumeClaim:
  #     claimName: airflow
  volumes: []

  # read config map into an env variable
  # example:
  # from_env_config_map:
  # - configmap_1
  # - configmap_2
  from_env_config_map: []

  # load secret into an env variable
  # example:
  # from_env_secret:
  # - secret_1
  # - secret_2
  from_env_secret: []

  in_cluster: true
  conn_id: kubernetes_default
  kube_config_file: null
  cluster_context: null

重要提示

  • 模板檔案包含兩個主要類別:sparkkubernetes

    • spark: 此部分包含任務的 Spark 配置,映象了 Spark API 模板的結構。

    • kubernetes: 此部分包含任務的 Kubernetes 資源配置,直接對應 Kubernetes API 文件。每種資源型別在模板中都包含一個示例。

  • 指定使用的基礎映象為 gcr.io/spark-operator/spark-py:v3.1.1

  • 確保 Spark 程式碼嵌入在映象中、使用 persistentVolume 掛載,或從外部位置(如 S3 儲存桶)訪問。

接下來,使用以下方式建立任務

SparkKubernetesOperator(
    task_id="spark_task",
    image="gcr.io/spark-operator/spark-py:v3.1.1",  # OR custom image using that
    code_path="local://path/to/spark/code.py",
    application_file="spark_job_template.yaml",  # OR spark_job_template.json
    dag=dag,
)

注意:application_file 也可以是 JSON 檔案。請參閱以下示例

spark_job_template.json

{
  "spark": {
    "apiVersion": "sparkoperator.k8s.io/v1beta2",
    "version": "v1beta2",
    "kind": "SparkApplication",
    "apiGroup": "sparkoperator.k8s.io",
    "metadata": {
      "namespace": "ds"
    },
    "spec": {
      "type": "Python",
      "pythonVersion": "3",
      "mode": "cluster",
      "sparkVersion": "3.0.0",
      "successfulRunHistoryLimit": 1,
      "restartPolicy": {
        "type": "Never"
      },
      "imagePullPolicy": "Always",
      "hadoopConf": {},
      "imagePullSecrets": [],
      "dynamicAllocation": {
        "enabled": false,
        "initialExecutors": 1,
        "minExecutors": 1,
        "maxExecutors": 1
      },
      "labels": {},
      "driver": {
        "serviceAccount": "default",
        "container_resources": {
          "gpu": {
            "name": null,
            "quantity": 0
          },
          "cpu": {
            "request": null,
            "limit": null
          },
          "memory": {
            "request": null,
            "limit": null
          }
        }
      },
      "executor": {
        "instances": 1,
        "container_resources": {
          "gpu": {
            "name": null,
            "quantity": 0
          },
          "cpu": {
            "request": null,
            "limit": null
          },
          "memory": {
            "request": null,
            "limit": null
          }
        }
      }
    }
  },
  "kubernetes": {
    "env_vars": [],
    "env_from": [],
    "node_selector": {},
    "affinity": {
      "nodeAffinity": {},
      "podAffinity": {},
      "podAntiAffinity": {}
    },
    "tolerations": [],
    "config_map_mounts": {},
    "volume_mounts": [
      {
        "name": "config",
        "mountPath": "/airflow"
      }
    ],
    "volumes": [
      {
        "name": "config",
        "persistentVolumeClaim": {
          "claimName": "hsaljoog-airflow"
        }
      }
    ],
    "from_env_config_map": [],
    "from_env_secret": [],
    "in_cluster": true,
    "conn_id": "kubernetes_default",
    "kube_config_file": null,
    "cluster_context": null
  }
}

除了使用 YAML 或 JSON 檔案外,另一種方法是直接傳遞 template_spec 欄位,而不是 application_file,如果您不想使用檔案進行配置。

參考

有關更多資訊,請參閱

KubernetesJobOperator

KubernetesJobOperator 允許您在 Kubernetes 叢集上建立和執行 Job。

注意

如果您使用託管 Kubernetes,請考慮使用專門的 KJO Operator,因為它簡化了 Kubernetes 授權過程

  • Google Kubernetes Engine 的 GKEStartJobOperator Operator。

注意

使用此 Operator **不**需要Kubernetes executor

此 Operator 如何工作?

KubernetesJobOperator 使用 Kubernetes API 在 Kubernetes 叢集中啟動一個 Job。Operator 使用 Kube Python Client 生成 Kubernetes API 請求,動態啟動此 Job。使用者可以使用 config_file 引數指定 kubeconfig 檔案,否則 Operator 將預設使用 ~/.kube/config。它還允許使用者使用 job_template_file 引數提供模板 YAML 檔案。

tests/system/cncf/kubernetes/example_kubernetes_job.py

k8s_job = KubernetesJobOperator(
    task_id="job-task",
    namespace=JOB_NAMESPACE,
    image="perl:5.34.0",
    cmds=["perl", "-Mbignum=bpi", "-wle", "print bpi(2000)"],
    name=JOB_NAME,
)

KubernetesJobOperator 也支援可延遲模式

tests/system/cncf/kubernetes/example_kubernetes_job.py

k8s_job_def = KubernetesJobOperator(
    task_id="job-task-def",
    namespace="default",
    image="perl:5.34.0",
    cmds=["perl", "-Mbignum=bpi", "-wle", "print bpi(2000)"],
    name=JOB_NAME + "-def",
    wait_until_job_complete=True,
    deferrable=True,
)

KubernetesPodOperatorKubernetesJobOperator 的區別

KubernetesJobOperator 是用於建立 Job 的 Operator。Job 會建立一個或多個 Pod,並會持續重試執行 Pod,直到指定數量的 Pod 成功終止。隨著 Pod 成功完成,Job 會跟蹤成功完成的數量。當達到指定數量的成功完成時,Job 就完成了。使用者可以使用 activeDeadlineSecondsbackoffLimit 等配置引數限制 Job 重試執行的次數。此 Operator 使用 KubernetesPodOperator 來建立 Pod,而不是使用 template 引數。這意味著使用者可以在 KubernetesJobOperator 中使用 KubernetesPodOperator 的所有引數。

此處有關 Jobs 的更多資訊:Kubernetes Job Documentation

KubernetesDeleteJobOperator

KubernetesDeleteJobOperator 允許您在 Kubernetes 叢集上刪除 Job。

tests/system/cncf/kubernetes/example_kubernetes_job.py

delete_job_task = KubernetesDeleteJobOperator(
    task_id="delete_job_task",
    name=k8s_job.output["job_name"],
    namespace=JOB_NAMESPACE,
    wait_for_completion=True,
    delete_on_status="Complete",
    poll_interval=1.0,
)

KubernetesPatchJobOperator

KubernetesPatchJobOperator 允許您在 Kubernetes 叢集上更新 Job。

tests/system/cncf/kubernetes/example_kubernetes_job.py

update_job = KubernetesPatchJobOperator(
    task_id="update-job-task",
    namespace="default",
    name=k8s_job.output["job_name"],
    body={"spec": {"suspend": False}},
)

KubernetesInstallKueueOperator

KubernetesInstallKueueOperator 允許您在 Kubernetes 叢集中安裝 Kueue 元件

tests/system/cncf/kubernetes/example_kubernetes_kueue.py

install_kueue = KubernetesInstallKueueOperator(
    task_id="install_kueue",
    kueue_version="v0.9.1",
)

參考

有關更多資訊,請參閱

KubernetesStartKueueJobOperator

KubernetesStartKueueJobOperator 允許您在 Kubernetes 叢集中啟動 Kueue 作業

tests/system/cncf/kubernetes/example_kubernetes_kueue.py

start_kueue_job = KubernetesStartKueueJobOperator(
    task_id="kueue_job",
    queue_name=QUEUE_NAME,
    namespace="default",
    image="perl:5.34.0",
    cmds=["perl", "-Mbignum=bpi", "-wle", "print bpi(2000)"],
    name="test-pi",
    suspend=True,
    container_resources=k8s.V1ResourceRequirements(
        requests={
            "cpu": 1,
            "memory": "200Mi",
        },
    ),
    wait_until_job_complete=True,
)

有關更多資訊,請參閱

此條目有幫助嗎?