airflow.providers.cncf.kubernetes.operators.spark_kubernetes

SparkKubernetesOperator

在 Kubernetes 叢集中建立 sparkApplication 物件。

模組內容

class airflow.providers.cncf.kubernetes.operators.spark_kubernetes.SparkKubernetesOperator(*, image=None, code_path=None, namespace='default', name=None, application_file=None, template_spec=None, get_logs=True, do_xcom_push=False, success_run_history_limit=1, startup_timeout_seconds=600, log_events_on_failure=False, reattach_on_restart=True, delete_on_termination=True, kubernetes_conn_id='kubernetes_default', random_name_suffix=True, **kwargs)[source]

基類: airflow.providers.cncf.kubernetes.operators.pod.KubernetesPodOperator

在 Kubernetes 叢集中建立 sparkApplication 物件。

另請參閱

有關 Spark Application 物件的更多詳細資訊,請參閱參考資料: https://github.com/GoogleCloudPlatform/spark-on-k8s-operator/blob/v1beta2-1.3.3-3.1.1/docs/api-docs.md#sparkapplication

引數:
  • image (str | None) – 希望啟動的 Docker 映象。預設為 hub.docker.com,

  • code_path (str | None) – 映象中 spark 程式碼的路徑,

  • namespace (str) – 放置 sparkApplication 的 Kubernetes 名稱空間

  • name (str | None) – 任務執行所在的 Pod 名稱,將用於(如果 random_name_suffix 為 True,則加上一個隨機字尾)生成 Pod ID (DNS-1123 子域名,僅包含 [a-z0-9.-])。

  • application_file (str | None) – sparkApplication 的 Kubernetes custom_resource_definition 檔案路徑

  • template_spec – Kubernetes sparkApplication 規範

  • get_logs (bool) – 將容器的標準輸出作為任務日誌獲取。

  • do_xcom_push (bool) – 如果為 True,容器中 /airflow/xcom/return.json 檔案的內容也會在容器完成後推送到 XCom。

  • success_run_history_limit (int) – 要保留的應用程式過去成功執行的次數。

  • startup_timeout_seconds – 啟動 Pod 的超時時間(秒)。

  • log_events_on_failure (bool) – 如果發生故障,則記錄 Pod 的事件

  • reattach_on_restart (bool) – 如果排程器在 Pod 執行時終止,則重新連線並監控

  • delete_on_termination (bool) – 當 Pod 達到最終狀態或執行中斷時如何處理。如果為 True(預設),刪除 Pod;如果為 False,保留 Pod。

  • kubernetes_conn_id (str) – 到 Kubernetes 叢集的連線

  • random_name_suffix (bool) – 如果為 True,則在 Pod 名稱後新增隨機字尾

template_fields = ['application_file', 'namespace', 'template_spec', 'kubernetes_conn_id'][source]
template_fields_renderers[source]
template_ext = ('yaml', 'yml', 'json')[source]
ui_color = '#f4a460'[source]
BASE_CONTAINER_NAME = 'spark-kubernetes-driver'[source]
image = None[source]
code_path = None[source]
application_file = None[source]
template_spec = None[source]
kubernetes_conn_id = 'kubernetes_default'[source]
startup_timeout_seconds = 600[source]
reattach_on_restart = True[source]
delete_on_termination = True[source]
do_xcom_push = False[source]
namespace = 'default'[source]
get_logs = True[source]
log_events_on_failure = False[source]
success_run_history_limit = 1[source]
random_name_suffix = True[source]
base_container_name: str[source]
container_logs: list[str][source]
manage_template_specs()[source]
create_job_name()[source]
property pod_manager: airflow.providers.cncf.kubernetes.utils.pod_manager.PodManager[source]
property template_body[source]

用於 CustomObjectLauncher 的模板化主體。

find_spark_job(context, exclude_checked=True)[source]
get_or_create_spark_crd(launcher, context)[source]
process_pod_deletion(pod, *, reraise=True)[source]
property hook: airflow.providers.cncf.kubernetes.hooks.kubernetes.KubernetesHook[source]
property client: kubernetes.client.CoreV1Api[source]
property custom_obj_api: kubernetes.client.CustomObjectsApi[source]
execute(context)[source]

根據 deferrable 引數非同步或同步執行 Pod。

on_kill()[source]

重寫此方法,以便在任務例項被終止時清理子程序。

Operator 中任何使用 threading、subprocess 或 multiprocessing 模組的地方都需要清理,否則會留下殭屍程序。

patch_already_checked(pod, *, reraise=True)[source]

新增一個“已檢查”註解,確保重試時不會重新連線。

dry_run()[source]

打印出該 Operator 將建立的 Spark Job。

此條目有幫助嗎?