Google Kubernetes Engine 運算元¶
Google Kubernetes Engine (GKE) 提供一個託管環境,用於利用 Google 基礎設施部署、管理和擴充套件容器化應用程式。GKE 環境由多臺機器(具體來說是 Compute Engine 例項)組成,它們組合在一起形成一個叢集。
先決條件任務¶
要使用這些運算元,您必須完成以下幾項工作
使用 Cloud Console 選擇或建立一個 Cloud Platform 專案。
為您的專案啟用結算功能,具體方法請參閱 Google Cloud 文件。
啟用 API,具體方法請參閱 Cloud Console 文件。
透過 pip 安裝 API 庫。
pip install 'apache-airflow[google]'有關詳細資訊,請參閱安裝。
管理 GKE 叢集¶
叢集是 GKE 的基礎 - 所有工作負載都在叢集之上執行。它由叢集主節點和工作節點組成。在建立或刪除叢集時,主節點的生命週期由 GKE 管理。工作節點表示為 Compute Engine 虛擬機器例項,GKE 在您建立叢集時代表您建立這些例項。
建立 GKE 叢集¶
以下是叢集定義的示例
tests/system/google/cloud/kubernetes_engine/example_kubernetes_engine.py
CLUSTER = {"name": CLUSTER_NAME, "initial_node_count": 1, "autopilot": {"enabled": True}}
使用 GKECreateClusterOperator 建立叢集時,需要一個像這樣的 dict 物件,或一個 Cluster 定義。
tests/system/google/cloud/kubernetes_engine/example_kubernetes_engine.py
create_cluster = GKECreateClusterOperator(
task_id="create_cluster",
project_id=GCP_PROJECT_ID,
location=GCP_LOCATION,
body=CLUSTER,
)
您可以為此操作使用 deferrable 模式,以便非同步執行運算元。當運算元知道需要等待時,它可以釋放工作程序,並將恢復運算元的工作交給 Trigger。因此,當它暫停(延遲)時,不會佔用工作程序槽,並且您的叢集將不會在空閒的 Operators 或 Sensors 上浪費大量資源。
tests/system/google/cloud/kubernetes_engine/example_kubernetes_engine_async.py
create_cluster = GKECreateClusterOperator(
task_id="create_cluster",
project_id=GCP_PROJECT_ID,
location=GCP_LOCATION,
body=CLUSTER,
deferrable=True,
)
在叢集內部安裝特定版本的 Kueue¶
Kueue 是一個雲原生 Job 排程器,它與預設的 Kubernetes 排程器、Job 控制器和叢集自動擴縮器協同工作,以提供端到端的批處理系統。Kueue 實現 Job 排隊,根據配額和公平共享資源的分層結構決定 Job 何時等待以及何時開始。Kueue 支援 Autopilot 叢集、帶有節點自動預配功能的標準 GKE 和常規的自動擴縮節點池。藉助 GKEStartKueueInsideClusterOperator 在您的叢集上安裝和使用 Kueue,如本示例所示
tests/system/google/cloud/kubernetes_engine/example_kubernetes_engine_kueue.py
add_kueue_cluster = GKEStartKueueInsideClusterOperator(
task_id="add_kueue_cluster",
project_id=GCP_PROJECT_ID,
location=GCP_LOCATION,
cluster_name=CLUSTER_NAME,
kueue_version="v0.6.2",
)
刪除 GKE 叢集¶
要刪除叢集,請使用 GKEDeleteClusterOperator。這也會刪除分配給叢集的所有節點。
tests/system/google/cloud/kubernetes_engine/example_kubernetes_engine.py
delete_cluster = GKEDeleteClusterOperator(
task_id="delete_cluster",
name=CLUSTER_NAME,
project_id=GCP_PROJECT_ID,
location=GCP_LOCATION,
)
您可以為此操作使用 deferrable 模式,以便非同步執行運算元。當運算元知道需要等待時,它可以釋放工作程序,並將恢復運算元的工作交給 Trigger。因此,當它暫停(延遲)時,不會佔用工作程序槽,並且您的叢集將不會在空閒的 Operators 或 Sensors 上浪費大量資源。
tests/system/google/cloud/kubernetes_engine/example_kubernetes_engine_async.py
delete_cluster = GKEDeleteClusterOperator(
task_id="delete_cluster",
name=CLUSTER_NAME,
project_id=GCP_PROJECT_ID,
location=GCP_LOCATION,
deferrable=True,
)
管理 GKE 叢集上的工作負載¶
GKE 與容器化應用程式協同工作,例如在 Docker 上建立的應用程式,並將它們部署到叢集上執行。這些稱為工作負載,部署在叢集上時,它們利用叢集的 CPU 和記憶體資源來有效執行。
在 GKE 叢集上執行 Pod¶
為了在 GKE 叢集上執行 Pod,有兩個運算元可用
GKEStartPodOperator 擴充套件了 KubernetesPodOperator,以使用 Google Cloud 憑據提供授權。無需管理 kube_config 檔案,因為它會自動生成。所有 Kubernetes 引數(config_file 除外)也對 GKEStartPodOperator 有效。有關 KubernetesPodOperator 的更多資訊,請參閱:KubernetesPodOperator 指南。
與私有叢集一起使用¶
所有叢集都有一個規範端點。該端點是 Kubernetes API 伺服器的 IP 地址,Airflow 使用它與您的叢集主節點通訊。該端點顯示在 Cloud Console 中叢集“詳細資訊”選項卡的“端點”欄位下,以及 gcloud container clusters describe 命令輸出的 endpoint 欄位中。
私有叢集有兩個唯一的端點值:privateEndpoint(內部 IP 地址)和 publicEndpoint(外部 IP 地址)。預設情況下,對私有叢集執行 GKEStartPodOperator 會將外部 IP 地址設定為端點。如果您傾向於使用內部 IP 作為端點,則需要將 use_internal_ip 引數設定為 True。
與 DNS 端點叢集一起使用¶
要在使用 DNS 端點時執行 GKEStartPodOperator,您需要將 use_dns_endpoint 引數設定為 True。
與 Autopilot(無伺服器)叢集一起使用¶
在像 GKE Autopilot 這樣的無伺服器叢集上執行時,由於冷啟動,Pod 啟動有時會花費更長時間。在 Pod 啟動期間,會定期短間隔檢查狀態,如果 Pod 尚未啟動,則會發出警告訊息。您可以透過 startup_check_interval_seconds 引數增加此間隔長度,建議設定為 60 秒。
XCom 的使用¶
我們可以在運算元上啟用 XCom 的使用。其工作原理是使用指定的 Pod 啟動一個 Sidecar 容器。指定使用 XCom 時,Sidecar 會自動掛載,其掛載點路徑為 /airflow/xcom。要為 XCom 提供值,請確保您的 Pod 將其寫入 Sidecar 中一個名為 return.json 的檔案。該檔案的內容可在您的 DAG 的下游使用。以下是使用示例
tests/system/google/cloud/kubernetes_engine/example_kubernetes_engine.py
pod_task_xcom = GKEStartPodOperator(
task_id="pod_task_xcom",
project_id=GCP_PROJECT_ID,
location=GCP_LOCATION,
cluster_name=CLUSTER_NAME,
do_xcom_push=True,
namespace="default",
image="alpine",
cmds=["sh", "-c", "mkdir -p /airflow/xcom/;echo '[1,2,3,4]' > /airflow/xcom/return.json"],
name="test-pod-xcom",
in_cluster=False,
on_finish_action="delete_pod",
)
然後在其他運算元中使用它
tests/system/google/cloud/kubernetes_engine/example_kubernetes_engine.py
pod_task_xcom_result = BashOperator(
bash_command="echo \"{{ task_instance.xcom_pull('pod_task_xcom') }}\"",
task_id="pod_task_xcom_result",
)
您可以為此操作使用 deferrable 模式,以便非同步執行運算元。當運算元知道需要等待時,它可以釋放工作程序,並將恢復運算元的工作交給 Trigger。因此,當它暫停(延遲)時,不會佔用工作程序槽,並且您的叢集將不會在空閒的 Operators 或 Sensors 上浪費大量資源。
tests/system/google/cloud/kubernetes_engine/example_kubernetes_engine_async.py
pod_task_xcom_async = GKEStartPodOperator(
task_id="pod_task_xcom_async",
project_id=GCP_PROJECT_ID,
location=GCP_LOCATION,
cluster_name=CLUSTER_NAME,
namespace="default",
image="alpine",
cmds=["sh", "-c", "mkdir -p /airflow/xcom/;echo '[1,2,3,4]' > /airflow/xcom/return.json"],
name="test-pod-xcom-async",
in_cluster=False,
on_finish_action="delete_pod",
do_xcom_push=True,
deferrable=True,
get_logs=True,
)
在 GKE 叢集上執行 Job¶
為了在 GKE 叢集上執行 Job,有兩個運算元可用
GKEStartJobOperator 擴充套件了 KubernetesJobOperator,以使用 Google Cloud 憑據提供授權。無需管理 kube_config 檔案,因為它會自動生成。所有 Kubernetes 引數(config_file 除外)也對 GKEStartJobOperator 有效。
tests/system/google/cloud/kubernetes_engine/example_kubernetes_engine_job.py
job_task = GKEStartJobOperator(
task_id="job_task",
project_id=GCP_PROJECT_ID,
location=GCP_LOCATION,
cluster_name=CLUSTER_NAME,
namespace=JOB_NAMESPACE,
image="perl:5.34.0",
cmds=["perl", "-Mbignum=bpi", "-wle", "print bpi(2000)"],
name=JOB_NAME,
)
GKEStartJobOperator 也支援 deferrable 模式。請注意,僅當 wait_until_job_complete 引數設定為 True 時,此模式才有意義。
tests/system/google/cloud/kubernetes_engine/example_kubernetes_engine_job.py
job_task_def = GKEStartJobOperator(
task_id="job_task_def",
project_id=GCP_PROJECT_ID,
location=GCP_LOCATION,
cluster_name=CLUSTER_NAME,
namespace=JOB_NAMESPACE,
image="perl:5.34.0",
cmds=["perl", "-Mbignum=bpi", "-wle", "print bpi(2000)"],
name=JOB_NAME_DEF,
wait_until_job_complete=True,
deferrable=True,
)
要在啟用了 Kueue 的 GKE 叢集上執行 Job,請使用 GKEStartKueueJobOperator。
tests/system/google/cloud/kubernetes_engine/example_kubernetes_engine_kueue.py
kueue_job_task = GKEStartKueueJobOperator(
task_id="kueue_job_task",
project_id=GCP_PROJECT_ID,
location=GCP_LOCATION,
cluster_name=CLUSTER_NAME,
queue_name=QUEUE_NAME,
namespace="default",
parallelism=3,
image="perl:5.34.0",
cmds=["perl", "-Mbignum=bpi", "-wle", "print bpi(2000)"],
name="test-pi",
suspend=True,
container_resources=k8s.V1ResourceRequirements(
requests={
"cpu": 1,
"memory": "200Mi",
},
),
)
刪除 GKE 叢集上的 Job¶
為了刪除 GKE 叢集上的 Job,有兩個運算元可用
GKEDeleteJobOperator 擴充套件了 KubernetesDeleteJobOperator,以使用 Google Cloud 憑據提供授權。無需管理 kube_config 檔案,因為它會自動生成。所有 Kubernetes 引數(config_file 除外)也對 GKEDeleteJobOperator 有效。
tests/system/google/cloud/kubernetes_engine/example_kubernetes_engine_job.py
delete_job = GKEDeleteJobOperator(
task_id="delete_job",
project_id=GCP_PROJECT_ID,
location=GCP_LOCATION,
cluster_name=CLUSTER_NAME,
name=JOB_NAME,
namespace=JOB_NAMESPACE,
)
按給定名稱檢索 Job 的資訊¶
您可以使用 GKEDescribeJobOperator 透過提供 Job 的名稱和名稱空間來檢索現有 Job 的詳細描述。
tests/system/google/cloud/kubernetes_engine/example_kubernetes_engine_job.py
describe_job_task = GKEDescribeJobOperator(
task_id="describe_job_task",
project_id=GCP_PROJECT_ID,
location=GCP_LOCATION,
job_name=job_task.output["job_name"],
namespace="default",
cluster_name=CLUSTER_NAME,
)
檢索 Job 列表¶
您可以使用 GKEListJobsOperator 檢索現有 Job 列表。如果提供了 namespace 引數,輸出將包含給定名稱空間中的 Job。如果未指定 namespace 引數,將輸出所有名稱空間中的資訊。
tests/system/google/cloud/kubernetes_engine/example_kubernetes_engine_job.py
list_job_task = GKEListJobsOperator(
task_id="list_job_task", project_id=GCP_PROJECT_ID, location=GCP_LOCATION, cluster_name=CLUSTER_NAME
)
在 GKE 叢集中建立資源¶
您可以使用 GKECreateCustomResourceOperator 在指定的 Google Kubernetes Engine 叢集中建立資源。
tests/system/google/cloud/kubernetes_engine/example_kubernetes_engine_resource.py
create_resource_task = GKECreateCustomResourceOperator(
task_id="create_resource_task",
project_id=GCP_PROJECT_ID,
location=GCP_LOCATION,
cluster_name=CLUSTER_NAME,
yaml_conf=PVC_CONF,
)
刪除 GKE 叢集中的資源¶
您可以使用 GKEDeleteCustomResourceOperator 在指定的 Google Kubernetes Engine 叢集中刪除資源。
tests/system/google/cloud/kubernetes_engine/example_kubernetes_engine_resource.py
delete_resource_task = GKEDeleteCustomResourceOperator(
task_id="delete_resource_task",
project_id=GCP_PROJECT_ID,
location=GCP_LOCATION,
cluster_name=CLUSTER_NAME,
yaml_conf=PVC_CONF,
)
暫停 GKE 叢集上的 Job¶
您可以使用 GKESuspendJobOperator 在指定的 Google Kubernetes Engine 叢集中暫停 Job。
tests/system/google/cloud/kubernetes_engine/example_kubernetes_engine_job.py
suspend_job = GKESuspendJobOperator(
task_id="suspend_job",
project_id=GCP_PROJECT_ID,
location=GCP_LOCATION,
cluster_name=CLUSTER_NAME,
name=job_task.output["job_name"],
namespace="default",
)
恢復 GKE 叢集上的 Job¶
您可以使用 GKEResumeJobOperator 在指定的 Google Kubernetes Engine 叢集中恢復 Job。
tests/system/google/cloud/kubernetes_engine/example_kubernetes_engine_job.py
resume_job = GKEResumeJobOperator(
task_id="resume_job",
project_id=GCP_PROJECT_ID,
location=GCP_LOCATION,
cluster_name=CLUSTER_NAME,
name=job_task.output["job_name"],
namespace="default",
)
參考資料¶
欲瞭解更多資訊,請參閱