airflow.providers.cncf.kubernetes.pod_generator

Pod 生成器。

此模組提供了一個介面,連線了之前的 Pod API 並輸出 `kubernetes.client.models.V1Pod`。其優點在於支援完整的 Kubernetes API,並且無需編寫序列化程式碼。

屬性

log

MAX_LABEL_LEN

PodGenerator

包含 Kubernetes Airflow Worker 的配置邏輯。

函式

make_safe_label_value(string)

規範化提供的標籤,使其具有有效長度和字元。

datetime_to_label_safe_datestring(datetime_obj)

將日期時間字串轉換為可用作標籤的格式。

label_safe_datestring_to_datetime(string)

將標籤轉換回日期時間物件。

merge_objects(base_obj, client_obj)

合併物件。

extend_object_field(base_obj, client_obj, field_name)

將欄位值新增到現有物件。

模組內容

airflow.providers.cncf.kubernetes.pod_generator.log[source]
airflow.providers.cncf.kubernetes.pod_generator.MAX_LABEL_LEN = 63[source]
airflow.providers.cncf.kubernetes.pod_generator.make_safe_label_value(string)[source]

規範化提供的標籤,使其具有有效長度和字元。

有效的標籤值必須小於或等於 63 個字元,並且必須為空或以字母數字字元 ([a-z0-9A-Z]) 開頭和結尾,中間可以使用短劃線 (-)、下劃線 (_)、點 (.) 和字母數字。

如果標籤值在規範化後超過 63 個字元,或與傳送到此函式的原始值有任何不同,則我們需要將其截斷至 53 個字元,並在末尾附加一個唯一的雜湊值。

airflow.providers.cncf.kubernetes.pod_generator.datetime_to_label_safe_datestring(datetime_obj)[source]

將日期時間字串轉換為可用作標籤的格式。

Kubernetes 不喜歡標籤中使用“:”,由於 ISO 日期時間格式使用“:”而非“_”,我們將“:”替換為“_”。

引數:

datetime_obj (datetime.datetime) – datetime.datetime 物件

返回:

表示日期時間的類似 ISO 格式的字串

返回型別:

str

airflow.providers.cncf.kubernetes.pod_generator.label_safe_datestring_to_datetime(string)[source]

將標籤轉換回日期時間物件。

Kubernetes 不允許標籤中使用“:”。ISO 日期時間格式使用“:”而非“_”,我們將“:”替換為“_”。

引數:

string (str) – 字串

返回:

datetime.datetime 物件

返回型別:

datetime.datetime

class airflow.providers.cncf.kubernetes.pod_generator.PodGenerator(pod=None, pod_template_file=None, extract_xcom=True)[source]

包含 Kubernetes Airflow Worker 的配置邏輯。

表示一個 Kubernetes pod,並管理單個 pod 的執行。任何特定於容器的配置都會應用於容器列表中的第一個容器。

引數:
  • pod (kubernetes.client.models.V1Pod | None) – 完整的 pod 定義。與 pod_template_file 互斥。

  • pod_template_file (str | None) – YAML 檔案路徑。與 pod 互斥。

  • extract_xcom (bool) – 是否為 xcom 啟動一個容器。

extract_xcom =True[source]
static from_obj(obj)[source]

從物件轉換為 pod。

static reconcile_pods(base_pod, client_pod)[source]

合併 Kubernetes Pod 物件。

引數:
  • base_pod (kubernetes.client.models.V1Pod) – 包含基礎屬性,如果客戶端 pod 中存在,則會被覆蓋;如果客戶端 pod 中不存在,則保留。

  • client_pod (kubernetes.client.models.V1Pod | None) – 客戶端想要建立的 pod。

返回:

合併後的 pods

返回型別:

kubernetes.client.models.V1Pod

這無法遞迴完成,因為某些欄位會被覆蓋,而另一些則會被連線。

static reconcile_metadata(base_meta, client_meta)[source]

合併 Kubernetes Metadata 物件。

引數:
  • base_meta – 包含基礎屬性,如果客戶端 meta 中存在,則會被覆蓋;如果客戶端 meta 中不存在,則保留。

  • client_meta – 客戶端想要建立的元資料。

返回:

合併後的元資料

static reconcile_specs(base_spec, client_spec)[source]

合併 Kubernetes PodSpec 物件。

引數:
  • base_spec (kubernetes.client.models.V1PodSpec | None) – 包含基礎屬性,如果客戶端 spec 中存在,則會被覆蓋;如果客戶端 spec 中不存在,則保留。

  • client_spec (kubernetes.client.models.V1PodSpec | None) – 客戶端想要建立的 spec。

返回:

合併後的元資料

返回型別:

kubernetes.client.models.V1PodSpec | None

static reconcile_containers(base_containers, client_containers)[source]

合併 Kubernetes Container 物件。

引數:
  • base_containers (list[kubernetes.client.models.V1Container]) – 包含基礎屬性,如果客戶端 containers 中存在,則會被覆蓋;如果客戶端 containers 中不存在,則保留。

  • client_containers (list[kubernetes.client.models.V1Container]) – 客戶端想要建立的容器。

返回:

合併後的容器

返回型別:

list[kubernetes.client.models.V1Container]

這會遞迴地作用於容器列表。

classmethod construct_pod(dag_id, task_id, pod_id, try_number, kube_image, date, args, pod_override_object, base_worker_pod, namespace, scheduler_job_id, run_id=None, map_index=-1, content_json_for_volume='', *, with_mutation_hook=False)[source]

建立一個 Pod。

透過從以下 3 個位置收集和合並配置來構造一個 pod
  • airflow.cfg

  • executor_config

  • 動態引數

classmethod build_selector_for_k8s_executor_pod(*, dag_id, task_id, try_number, map_index=None, logical_date=None, run_id=None, airflow_worker=None, include_version=False)[source]

為 Kubernetes Executor Pod 生成選擇器。

classmethod build_labels_for_k8s_executor_pod(*, dag_id, task_id, try_number, airflow_worker=None, map_index=None, logical_date=None, run_id=None, include_version=True)[source]

為 Kubernetes Executor Pod 生成標籤。

static serialize_pod(pod)[source]

將 k8s.V1Pod 轉換為 JSON 可序列化的字典。

引數:

pod (kubernetes.client.models.V1Pod) – k8s.V1Pod 物件

返回:

Pod 的序列化版本,作為字典返回

返回型別:

dict

static deserialize_model_file(path)[source]

從檔案生成一個 Pod。

引數:

path (str) – 檔案路徑

返回:

一個 kubernetes.client.models.V1Pod

返回型別:

kubernetes.client.models.V1Pod

static deserialize_model_dict(pod_dict)[source]

將 Python 字典反序列化為 k8s.V1Pod。

遺憾的是,我們需要訪問 Kubernetes 客戶端的私有方法 _ApiClient__deserialize_model。此問題在此處跟蹤:https://github.com/kubernetes-client/python/issues/977

引數:

pod_dict (dict | None) – k8s.V1Pod 物件的序列化字典

返回:

反序列化的 k8s.V1Pod

返回型別:

kubernetes.client.models.V1Pod

airflow.providers.cncf.kubernetes.pod_generator.merge_objects(base_obj, client_obj)[source]

合併物件。

引數:
  • base_obj – 包含基礎屬性,如果客戶端 obj 中存在,則會被覆蓋;如果客戶端 obj 中不存在,則保留。

  • client_obj – 客戶端想要建立的物件。

返回:

合併後的物件

airflow.providers.cncf.kubernetes.pod_generator.extend_object_field(base_obj, client_obj, field_name)[source]

將欄位值新增到現有物件。

引數:
  • base_obj – 一個具有屬性 field_name 且該屬性是一個列表的物件

  • client_obj – 一個具有屬性 field_name 且該屬性是一個列表的物件。返回此物件的一個副本,其 field_name 已修改。

  • field_name – 列表欄位的名稱

返回:

client_obj,其屬性 field_name 是兩個屬性的拼接結果

此條目是否有幫助?