Google Kubernetes Engine 運算元

Google Kubernetes Engine (GKE) 提供一個託管環境,用於利用 Google 基礎設施部署、管理和擴充套件容器化應用程式。GKE 環境由多臺機器(具體來說是 Compute Engine 例項)組成,它們組合在一起形成一個叢集。

先決條件任務

要使用這些運算元,您必須完成以下幾項工作

管理 GKE 叢集

叢集是 GKE 的基礎 - 所有工作負載都在叢集之上執行。它由叢集主節點和工作節點組成。在建立或刪除叢集時,主節點的生命週期由 GKE 管理。工作節點表示為 Compute Engine 虛擬機器例項,GKE 在您建立叢集時代表您建立這些例項。

建立 GKE 叢集

以下是叢集定義的示例

tests/system/google/cloud/kubernetes_engine/example_kubernetes_engine.py

CLUSTER = {"name": CLUSTER_NAME, "initial_node_count": 1, "autopilot": {"enabled": True}}

使用 GKECreateClusterOperator 建立叢集時,需要一個像這樣的 dict 物件,或一個 Cluster 定義。

tests/system/google/cloud/kubernetes_engine/example_kubernetes_engine.py

create_cluster = GKECreateClusterOperator(
    task_id="create_cluster",
    project_id=GCP_PROJECT_ID,
    location=GCP_LOCATION,
    body=CLUSTER,
)

您可以為此操作使用 deferrable 模式,以便非同步執行運算元。當運算元知道需要等待時,它可以釋放工作程序,並將恢復運算元的工作交給 Trigger。因此,當它暫停(延遲)時,不會佔用工作程序槽,並且您的叢集將不會在空閒的 Operators 或 Sensors 上浪費大量資源。

tests/system/google/cloud/kubernetes_engine/example_kubernetes_engine_async.py

create_cluster = GKECreateClusterOperator(
    task_id="create_cluster",
    project_id=GCP_PROJECT_ID,
    location=GCP_LOCATION,
    body=CLUSTER,
    deferrable=True,
)

在叢集內部安裝特定版本的 Kueue

Kueue 是一個雲原生 Job 排程器,它與預設的 Kubernetes 排程器、Job 控制器和叢集自動擴縮器協同工作,以提供端到端的批處理系統。Kueue 實現 Job 排隊,根據配額和公平共享資源的分層結構決定 Job 何時等待以及何時開始。Kueue 支援 Autopilot 叢集、帶有節點自動預配功能的標準 GKE 和常規的自動擴縮節點池。藉助 GKEStartKueueInsideClusterOperator 在您的叢集上安裝和使用 Kueue,如本示例所示

tests/system/google/cloud/kubernetes_engine/example_kubernetes_engine_kueue.py

    add_kueue_cluster = GKEStartKueueInsideClusterOperator(
        task_id="add_kueue_cluster",
        project_id=GCP_PROJECT_ID,
        location=GCP_LOCATION,
        cluster_name=CLUSTER_NAME,
        kueue_version="v0.6.2",
    )

刪除 GKE 叢集

要刪除叢集,請使用 GKEDeleteClusterOperator。這也會刪除分配給叢集的所有節點。

tests/system/google/cloud/kubernetes_engine/example_kubernetes_engine.py

delete_cluster = GKEDeleteClusterOperator(
    task_id="delete_cluster",
    name=CLUSTER_NAME,
    project_id=GCP_PROJECT_ID,
    location=GCP_LOCATION,
)

您可以為此操作使用 deferrable 模式,以便非同步執行運算元。當運算元知道需要等待時,它可以釋放工作程序,並將恢復運算元的工作交給 Trigger。因此,當它暫停(延遲)時,不會佔用工作程序槽,並且您的叢集將不會在空閒的 Operators 或 Sensors 上浪費大量資源。

tests/system/google/cloud/kubernetes_engine/example_kubernetes_engine_async.py

delete_cluster = GKEDeleteClusterOperator(
    task_id="delete_cluster",
    name=CLUSTER_NAME,
    project_id=GCP_PROJECT_ID,
    location=GCP_LOCATION,
    deferrable=True,
)

管理 GKE 叢集上的工作負載

GKE 與容器化應用程式協同工作,例如在 Docker 上建立的應用程式,並將它們部署到叢集上執行。這些稱為工作負載,部署在叢集上時,它們利用叢集的 CPU 和記憶體資源來有效執行。

在 GKE 叢集上執行 Pod

為了在 GKE 叢集上執行 Pod,有兩個運算元可用

GKEStartPodOperator 擴充套件了 KubernetesPodOperator,以使用 Google Cloud 憑據提供授權。無需管理 kube_config 檔案,因為它會自動生成。所有 Kubernetes 引數(config_file 除外)也對 GKEStartPodOperator 有效。有關 KubernetesPodOperator 的更多資訊,請參閱:KubernetesPodOperator 指南。

與私有叢集一起使用

所有叢集都有一個規範端點。該端點是 Kubernetes API 伺服器的 IP 地址,Airflow 使用它與您的叢集主節點通訊。該端點顯示在 Cloud Console 中叢集“詳細資訊”選項卡的“端點”欄位下,以及 gcloud container clusters describe 命令輸出的 endpoint 欄位中。

私有叢集有兩個唯一的端點值:privateEndpoint(內部 IP 地址)和 publicEndpoint(外部 IP 地址)。預設情況下,對私有叢集執行 GKEStartPodOperator 會將外部 IP 地址設定為端點。如果您傾向於使用內部 IP 作為端點,則需要將 use_internal_ip 引數設定為 True

與 DNS 端點叢集一起使用

要在使用 DNS 端點時執行 GKEStartPodOperator,您需要將 use_dns_endpoint 引數設定為 True

與 Autopilot(無伺服器)叢集一起使用

在像 GKE Autopilot 這樣的無伺服器叢集上執行時,由於冷啟動,Pod 啟動有時會花費更長時間。在 Pod 啟動期間,會定期短間隔檢查狀態,如果 Pod 尚未啟動,則會發出警告訊息。您可以透過 startup_check_interval_seconds 引數增加此間隔長度,建議設定為 60 秒。

XCom 的使用

我們可以在運算元上啟用 XCom 的使用。其工作原理是使用指定的 Pod 啟動一個 Sidecar 容器。指定使用 XCom 時,Sidecar 會自動掛載,其掛載點路徑為 /airflow/xcom。要為 XCom 提供值,請確保您的 Pod 將其寫入 Sidecar 中一個名為 return.json 的檔案。該檔案的內容可在您的 DAG 的下游使用。以下是使用示例

tests/system/google/cloud/kubernetes_engine/example_kubernetes_engine.py

pod_task_xcom = GKEStartPodOperator(
    task_id="pod_task_xcom",
    project_id=GCP_PROJECT_ID,
    location=GCP_LOCATION,
    cluster_name=CLUSTER_NAME,
    do_xcom_push=True,
    namespace="default",
    image="alpine",
    cmds=["sh", "-c", "mkdir -p /airflow/xcom/;echo '[1,2,3,4]' > /airflow/xcom/return.json"],
    name="test-pod-xcom",
    in_cluster=False,
    on_finish_action="delete_pod",
)

然後在其他運算元中使用它

tests/system/google/cloud/kubernetes_engine/example_kubernetes_engine.py

pod_task_xcom_result = BashOperator(
    bash_command="echo \"{{ task_instance.xcom_pull('pod_task_xcom') }}\"",
    task_id="pod_task_xcom_result",
)

您可以為此操作使用 deferrable 模式,以便非同步執行運算元。當運算元知道需要等待時,它可以釋放工作程序,並將恢復運算元的工作交給 Trigger。因此,當它暫停(延遲)時,不會佔用工作程序槽,並且您的叢集將不會在空閒的 Operators 或 Sensors 上浪費大量資源。

tests/system/google/cloud/kubernetes_engine/example_kubernetes_engine_async.py

pod_task_xcom_async = GKEStartPodOperator(
    task_id="pod_task_xcom_async",
    project_id=GCP_PROJECT_ID,
    location=GCP_LOCATION,
    cluster_name=CLUSTER_NAME,
    namespace="default",
    image="alpine",
    cmds=["sh", "-c", "mkdir -p /airflow/xcom/;echo '[1,2,3,4]' > /airflow/xcom/return.json"],
    name="test-pod-xcom-async",
    in_cluster=False,
    on_finish_action="delete_pod",
    do_xcom_push=True,
    deferrable=True,
    get_logs=True,
)

在 GKE 叢集上執行 Job

為了在 GKE 叢集上執行 Job,有兩個運算元可用

GKEStartJobOperator 擴充套件了 KubernetesJobOperator,以使用 Google Cloud 憑據提供授權。無需管理 kube_config 檔案,因為它會自動生成。所有 Kubernetes 引數(config_file 除外)也對 GKEStartJobOperator 有效。

tests/system/google/cloud/kubernetes_engine/example_kubernetes_engine_job.py

job_task = GKEStartJobOperator(
    task_id="job_task",
    project_id=GCP_PROJECT_ID,
    location=GCP_LOCATION,
    cluster_name=CLUSTER_NAME,
    namespace=JOB_NAMESPACE,
    image="perl:5.34.0",
    cmds=["perl", "-Mbignum=bpi", "-wle", "print bpi(2000)"],
    name=JOB_NAME,
)

GKEStartJobOperator 也支援 deferrable 模式。請注意,僅當 wait_until_job_complete 引數設定為 True 時,此模式才有意義。

tests/system/google/cloud/kubernetes_engine/example_kubernetes_engine_job.py

job_task_def = GKEStartJobOperator(
    task_id="job_task_def",
    project_id=GCP_PROJECT_ID,
    location=GCP_LOCATION,
    cluster_name=CLUSTER_NAME,
    namespace=JOB_NAMESPACE,
    image="perl:5.34.0",
    cmds=["perl", "-Mbignum=bpi", "-wle", "print bpi(2000)"],
    name=JOB_NAME_DEF,
    wait_until_job_complete=True,
    deferrable=True,
)

要在啟用了 Kueue 的 GKE 叢集上執行 Job,請使用 GKEStartKueueJobOperator

tests/system/google/cloud/kubernetes_engine/example_kubernetes_engine_kueue.py

kueue_job_task = GKEStartKueueJobOperator(
    task_id="kueue_job_task",
    project_id=GCP_PROJECT_ID,
    location=GCP_LOCATION,
    cluster_name=CLUSTER_NAME,
    queue_name=QUEUE_NAME,
    namespace="default",
    parallelism=3,
    image="perl:5.34.0",
    cmds=["perl", "-Mbignum=bpi", "-wle", "print bpi(2000)"],
    name="test-pi",
    suspend=True,
    container_resources=k8s.V1ResourceRequirements(
        requests={
            "cpu": 1,
            "memory": "200Mi",
        },
    ),
)

刪除 GKE 叢集上的 Job

為了刪除 GKE 叢集上的 Job,有兩個運算元可用

GKEDeleteJobOperator 擴充套件了 KubernetesDeleteJobOperator,以使用 Google Cloud 憑據提供授權。無需管理 kube_config 檔案,因為它會自動生成。所有 Kubernetes 引數(config_file 除外)也對 GKEDeleteJobOperator 有效。

tests/system/google/cloud/kubernetes_engine/example_kubernetes_engine_job.py

delete_job = GKEDeleteJobOperator(
    task_id="delete_job",
    project_id=GCP_PROJECT_ID,
    location=GCP_LOCATION,
    cluster_name=CLUSTER_NAME,
    name=JOB_NAME,
    namespace=JOB_NAMESPACE,
)

按給定名稱檢索 Job 的資訊

您可以使用 GKEDescribeJobOperator 透過提供 Job 的名稱和名稱空間來檢索現有 Job 的詳細描述。

tests/system/google/cloud/kubernetes_engine/example_kubernetes_engine_job.py

describe_job_task = GKEDescribeJobOperator(
    task_id="describe_job_task",
    project_id=GCP_PROJECT_ID,
    location=GCP_LOCATION,
    job_name=job_task.output["job_name"],
    namespace="default",
    cluster_name=CLUSTER_NAME,
)

檢索 Job 列表

您可以使用 GKEListJobsOperator 檢索現有 Job 列表。如果提供了 namespace 引數,輸出將包含給定名稱空間中的 Job。如果未指定 namespace 引數,將輸出所有名稱空間中的資訊。

tests/system/google/cloud/kubernetes_engine/example_kubernetes_engine_job.py

list_job_task = GKEListJobsOperator(
    task_id="list_job_task", project_id=GCP_PROJECT_ID, location=GCP_LOCATION, cluster_name=CLUSTER_NAME
)

在 GKE 叢集中建立資源

您可以使用 GKECreateCustomResourceOperator 在指定的 Google Kubernetes Engine 叢集中建立資源。

tests/system/google/cloud/kubernetes_engine/example_kubernetes_engine_resource.py

create_resource_task = GKECreateCustomResourceOperator(
    task_id="create_resource_task",
    project_id=GCP_PROJECT_ID,
    location=GCP_LOCATION,
    cluster_name=CLUSTER_NAME,
    yaml_conf=PVC_CONF,
)

刪除 GKE 叢集中的資源

您可以使用 GKEDeleteCustomResourceOperator 在指定的 Google Kubernetes Engine 叢集中刪除資源。

tests/system/google/cloud/kubernetes_engine/example_kubernetes_engine_resource.py

delete_resource_task = GKEDeleteCustomResourceOperator(
    task_id="delete_resource_task",
    project_id=GCP_PROJECT_ID,
    location=GCP_LOCATION,
    cluster_name=CLUSTER_NAME,
    yaml_conf=PVC_CONF,
)

暫停 GKE 叢集上的 Job

您可以使用 GKESuspendJobOperator 在指定的 Google Kubernetes Engine 叢集中暫停 Job。

tests/system/google/cloud/kubernetes_engine/example_kubernetes_engine_job.py

suspend_job = GKESuspendJobOperator(
    task_id="suspend_job",
    project_id=GCP_PROJECT_ID,
    location=GCP_LOCATION,
    cluster_name=CLUSTER_NAME,
    name=job_task.output["job_name"],
    namespace="default",
)

恢復 GKE 叢集上的 Job

您可以使用 GKEResumeJobOperator 在指定的 Google Kubernetes Engine 叢集中恢復 Job。

tests/system/google/cloud/kubernetes_engine/example_kubernetes_engine_job.py

resume_job = GKEResumeJobOperator(
    task_id="resume_job",
    project_id=GCP_PROJECT_ID,
    location=GCP_LOCATION,
    cluster_name=CLUSTER_NAME,
    name=job_task.output["job_name"],
    namespace="default",
)

參考資料

欲瞭解更多資訊,請參閱

此條目有幫助嗎?