airflow.providers.google.cloud.hooks.dataproc

此模組包含一個 Google Cloud Dataproc Hook。

異常

DataprocResourceIsNotReadyError

在資源未準備好建立 Dataproc 叢集時丟擲此異常。

DataProcJobBuilder

用於構建 Dataproc 作業的輔助類。

DataprocHook

Google Cloud Dataproc API。

DataprocAsyncHook

與 Google Cloud Dataproc API 的非同步互動。

模組內容

exception airflow.providers.google.cloud.hooks.dataproc.DataprocResourceIsNotReadyError[原始碼]

基類: airflow.exceptions.AirflowException

在資源未準備好建立 Dataproc 叢集時丟擲此異常。

class airflow.providers.google.cloud.hooks.dataproc.DataProcJobBuilder(project_id, task_id, cluster_name, job_type, properties=None)[原始碼]

用於構建 Dataproc 作業的輔助類。

job_type[原始碼]
job: dict[str, Any][原始碼]
add_labels(labels=None)[原始碼]

設定 Dataproc 作業的標籤。

引數:

labels (dict | None) – 作業查詢的標籤。

add_variables(variables=None)[原始碼]

設定 Dataproc 作業的變數。

引數:

variables (dict | None) – 作業查詢的變數。

add_args(args=None)[原始碼]

設定 Dataproc 作業的引數 (args)。

引數:

args (list[str] | None) – 作業查詢的引數 (args)。

add_query(query)[原始碼]

新增 Dataproc 作業的查詢。

引數:

query (str | list[str]) – 作業的查詢。

add_query_uri(query_uri)[原始碼]

設定 Dataproc 作業的查詢 URI。

引數:

query_uri (str) – 作業查詢的 URI。

add_jar_file_uris(jars=None)[原始碼]

設定 Dataproc 作業的 JAR URI。

引數:

jars (list[str] | None) – JAR URI 列表

add_archive_uris(archives=None)[原始碼]

設定 Dataproc 作業的 archives URI。

引數:

archives (list[str] | None) – archives URI 列表

add_file_uris(files=None)[原始碼]

設定 Dataproc 作業的檔案 URI。

引數:

files (list[str] | None) – 檔案 URI 列表

add_python_file_uris(pyfiles=None)[原始碼]

設定 Dataproc 作業的 Python 檔案 URI。

引數:

pyfiles (list[str] | None) – Python 檔案 URI 列表

set_main(main_jar=None, main_class=None)[原始碼]

設定 Dataproc 主類。

引數:
  • main_jar (str | None) – 主檔案的 URI。

  • main_class (str | None) – 主類的名稱。

丟擲:

ValueError

set_python_main(main)[原始碼]

設定 Dataproc 主 Python 檔案 URI。

引數:

main (str) – Python 主檔案的 URI。

set_job_name(name)[原始碼]

設定 Dataproc 作業名稱。

作業名稱會被清理,將點替換為下劃線。

引數:

name (str) – 作業名稱。

build()[原始碼]

返回 Dataproc 作業。

返回:

Dataproc 作業

返回型別:

dict

class airflow.providers.google.cloud.hooks.dataproc.DataprocHook(gcp_conn_id='google_cloud_default', impersonation_chain=None, **kwargs)[原始碼]

基類: airflow.providers.google.common.hooks.base_google.GoogleBaseHook

Google Cloud Dataproc API。

在 Hook 中使用 project_id 的所有方法都必須使用關鍵字引數而不是位置引數呼叫。

get_cluster_client(region=None)[原始碼]

建立一個 ClusterControllerClient。

get_template_client(region=None)[原始碼]

建立一個 WorkflowTemplateServiceClient。

get_job_client(region=None)[原始碼]

建立一個 JobControllerClient。

get_batch_client(region=None)[原始碼]

建立一個 BatchControllerClient。

get_operations_client(region)[原始碼]

建立一個 OperationsClient。

dataproc_options_to_args(options)[原始碼]

從引數字典返回格式化的叢集引數。

引數:

options (dict) – 包含選項的字典

返回:

引數列表

返回型別:

list[str]

wait_for_operation(operation, timeout=None, result_retry=DEFAULT)[原始碼]

等待長時間執行的操作完成。

create_cluster(region, project_id, cluster_name, cluster_config=None, virtual_cluster_config=None, labels=None, request_id=None, retry=DEFAULT, timeout=None, metadata=())[原始碼]

在指定專案中建立一個叢集。

引數:
  • project_id (str) – 叢集所屬的 Google Cloud 專案 ID。

  • region (str) – 用於處理請求的 Cloud Dataproc 區域。

  • cluster_name (str) – 要建立的叢集名稱。

  • labels (dict[str, str] | None) – 將分配給所建立叢集的標籤。

  • cluster_config (dict | google.cloud.dataproc_v1.Cluster | None) – 要建立的叢集配置。如果提供的是字典,其形式必須與 protobuf 訊息 ClusterConfig 相同。

  • virtual_cluster_config (dict | None) – 虛擬叢集配置,用於建立不直接控制底層計算資源的 Dataproc 叢集時,例如使用 VirtualClusterConfig 建立 Dataproc-on-GKE 叢集時。

  • request_id (str | None) – 用於標識請求的唯一 ID。如果伺服器收到兩個 CreateClusterRequest 請求具有相同的 ID,則第二個請求將被忽略,並返回為第一個請求建立並存儲在後端的某個操作。

  • retry (google.api_core.retry.Retry | google.api_core.gapic_v1.method._MethodDefault) – 用於重試請求的重試物件。如果為 None,則不會重試請求。

  • timeout (float | None) – 等待請求完成的時間量(以秒為單位)。如果指定了 retry,則超時適用於每次單獨嘗試。

  • metadata (collections.abc.Sequence[tuple[str, str]]) – 提供給方法的附加元資料。

delete_cluster(region, cluster_name, project_id, cluster_uuid=None, request_id=None, retry=DEFAULT, timeout=None, metadata=())[原始碼]

在專案中刪除一個叢集。

引數:
  • project_id (str) – 叢集所屬的 Google Cloud 專案 ID。

  • region (str) – 用於處理請求的 Cloud Dataproc 區域。

  • cluster_name (str) – 要刪除的叢集名稱。

  • cluster_uuid (str | None) – 如果指定,則如果具有該 UUID 的叢集不存在,RPC 應該失敗。

  • request_id (str | None) – 用於標識請求的唯一 ID。如果伺服器收到兩個 DeleteClusterRequest 請求具有相同的 ID,則第二個請求將被忽略,並返回為第一個請求建立並存儲在後端的某個操作。

  • retry (google.api_core.retry.Retry | google.api_core.gapic_v1.method._MethodDefault) – 用於重試請求的重試物件。如果為 None,則不會重試請求。

  • timeout (float | None) – 等待請求完成的時間量(以秒為單位)。如果指定了 retry,則超時適用於每次單獨嘗試。

  • metadata (collections.abc.Sequence[tuple[str, str]]) – 提供給方法的附加元資料。

diagnose_cluster(region, cluster_name, project_id, tarball_gcs_dir=None, diagnosis_interval=None, jobs=None, yarn_application_ids=None, retry=DEFAULT, timeout=None, metadata=())[原始碼]

獲取叢集診斷資訊。

操作完成後,響應包含診斷輸出報告的 Cloud Storage URI,其中包含收集的診斷資訊的摘要。

引數:
  • project_id (str) – 叢集所屬的 Google Cloud 專案 ID。

  • region (str) – 用於處理請求的 Cloud Dataproc 區域。

  • cluster_name (str) – 叢集名稱。

  • tarball_gcs_dir (str | None) – 診斷 tarball 的輸出 Cloud Storage 目錄。如果未指定,將使用叢集暫存儲存分割槽中的任務特定目錄。

  • diagnosis_interval (dict | google.type.interval_pb2.Interval | None) – 應在叢集上執行診斷的時間間隔。

  • jobs (collections.abc.MutableSequence[str] | None) – 指定要執行診斷的作業列表。格式: projects/{project}/regions/{region}/jobs/{job}

  • yarn_application_ids (collections.abc.MutableSequence[str] | None) – 指定要執行診斷的 yarn 應用列表。

  • retry (google.api_core.retry.Retry | google.api_core.gapic_v1.method._MethodDefault) – 用於重試請求的重試物件。如果為 None,則不會重試請求。

  • timeout (float | None) – 等待請求完成的時間量(以秒為單位)。如果指定了 retry,則超時適用於每次單獨嘗試。

  • metadata (collections.abc.Sequence[tuple[str, str]]) – 提供給方法的附加元資料。

get_cluster(region, cluster_name, project_id, retry=DEFAULT, timeout=None, metadata=())[原始碼]

獲取專案中叢集的資源表示。

引數:
  • project_id (str) – 叢集所屬的 Google Cloud 專案 ID。

  • region (str) – 用於處理請求的 Cloud Dataproc 區域。

  • cluster_name (str) – 叢集名稱。

  • retry (google.api_core.retry.Retry | google.api_core.gapic_v1.method._MethodDefault) – 用於重試請求的重試物件。如果為 None,則不會重試請求。

  • timeout (float | None) – 等待請求完成的時間量(以秒為單位)。如果指定了 retry,則超時適用於每次單獨嘗試。

  • metadata (collections.abc.Sequence[tuple[str, str]]) – 提供給方法的附加元資料。

list_clusters(region, filter_, project_id, page_size=None, retry=DEFAULT, timeout=None, metadata=())[原始碼]

列出專案中的所有 regions/{region}/clusters。

引數:
  • project_id (str) – 叢集所屬的 Google Cloud 專案 ID。

  • region (str) – 用於處理請求的 Cloud Dataproc 區域。

  • filter – 用於約束叢集。區分大小寫。

  • page_size (int | None) – 底層 API 響應中包含的最大資源數。如果按資源執行分頁流,則此引數不影響返回值。如果按頁執行分頁流,則此引數確定一頁中的最大資源數。

  • retry (google.api_core.retry.Retry | google.api_core.gapic_v1.method._MethodDefault) – 用於重試請求的重試物件。如果為 None,則不會重試請求。

  • timeout (float | None) – 等待請求完成的時間量(以秒為單位)。如果指定了 retry,則超時適用於每次單獨嘗試。

  • metadata (collections.abc.Sequence[tuple[str, str]]) – 提供給方法的附加元資料。

update_cluster(cluster_name, cluster, update_mask, project_id, region, graceful_decommission_timeout=None, request_id=None, retry=DEFAULT, timeout=None, metadata=())[原始碼]

在專案中更新一個叢集。

引數:
  • project_id (str) – 叢集所屬的 Google Cloud 專案 ID。

  • region (str) – 用於處理請求的 Cloud Dataproc 區域。

  • cluster_name (str) – 叢集名稱。

  • cluster (dict | google.cloud.dataproc_v1.Cluster) – 叢集的更改。如果提供了字典,則其形式必須與 protobuf 訊息 Cluster 相同。

  • update_mask (dict | google.protobuf.field_mask_pb2.FieldMask) –

    指定相對於 Cluster 要更新欄位的路徑。例如,要將叢集中的 worker 數量更改為 5,應指定為 config.worker_config.num_instances,並且 PATCH 請求正文將指定新值。

    {"config": {"workerConfig": {"numInstances": "5"}}}
    

    同樣,要將叢集中的搶佔式 worker 數量更改為 5,應指定為 config.secondary_worker_config.num_instances,並且 PATCH 請求正文將指定新值。

    {"config": {"secondaryWorkerConfig": {"numInstances": "5"}}}
    

    如果提供了字典,則其形式必須與 protobuf 訊息 FieldMask 相同。

  • graceful_decommission_timeout (dict | google.protobuf.duration_pb2.Duration | None) –

    優雅 YARN 退役的超時時間。優雅退役允許從叢集中移除節點,而不會中斷正在進行的作業。超時指定在強制移除節點(並可能中斷作業)之前等待正在進行的作業完成的時間。預設超時為 0(表示強制退役),允許的最大超時時間為一天。

    僅支援 Dataproc 映像版本 1.2 及更高版本。

    如果提供了字典,則其形式必須與 protobuf 訊息 Duration 相同。

  • request_id (str | None) – 用於標識請求的唯一 ID。如果伺服器接收到兩個具有相同 ID 的 UpdateClusterRequest 請求,則第二個請求將被忽略,並返回為第一個請求建立並存儲在後端的操作。

  • retry (google.api_core.retry.Retry | google.api_core.gapic_v1.method._MethodDefault) – 用於重試請求的重試物件。如果為 None,則不會重試請求。

  • timeout (float | None) – 等待請求完成的時間量(以秒為單位)。如果指定了 retry,則超時適用於每次單獨嘗試。

  • metadata (collections.abc.Sequence[tuple[str, str]]) – 提供給方法的附加元資料。

start_cluster(region, project_id, cluster_name, cluster_uuid=None, request_id=None, retry=DEFAULT, timeout=None, metadata=())[source]

在一個專案中啟動叢集。

引數:
  • region (str) – 用於處理請求的 Cloud Dataproc 區域。

  • project_id (str) – 叢集所屬的 Google Cloud 專案 ID。

  • cluster_name (str) – 叢集名稱。

  • cluster_uuid (str | None) – 叢集 UUID

  • request_id (str | None) – 用於標識請求的唯一 ID。如果伺服器接收到兩個具有相同 ID 的 UpdateClusterRequest 請求,則第二個請求將被忽略,並返回為第一個請求建立並存儲在後端的操作。

  • retry (google.api_core.retry.Retry | google.api_core.gapic_v1.method._MethodDefault) – 用於重試請求的重試物件。如果為 None,則不會重試請求。

  • timeout (float | None) – 等待請求完成的時間量(以秒為單位)。如果指定了 retry,則超時適用於每次單獨嘗試。

  • metadata (collections.abc.Sequence[tuple[str, str]]) – 提供給方法的附加元資料。

返回:

google.api_core.operation.Operation 的一個例項

返回型別:

google.api_core.operation.Operation

stop_cluster(region, project_id, cluster_name, cluster_uuid=None, request_id=None, retry=DEFAULT, timeout=None, metadata=())[source]

在一個專案中啟動叢集。

引數:
  • region (str) – 用於處理請求的 Cloud Dataproc 區域。

  • project_id (str) – 叢集所屬的 Google Cloud 專案 ID。

  • cluster_name (str) – 叢集名稱。

  • cluster_uuid (str | None) – 叢集 UUID

  • request_id (str | None) – 用於標識請求的唯一 ID。如果伺服器接收到兩個具有相同 ID 的 UpdateClusterRequest 請求,則第二個請求將被忽略,並返回為第一個請求建立並存儲在後端的操作。

  • retry (google.api_core.retry.Retry | google.api_core.gapic_v1.method._MethodDefault) – 用於重試請求的重試物件。如果為 None,則不會重試請求。

  • timeout (float | None) – 等待請求完成的時間量(以秒為單位)。如果指定了 retry,則超時適用於每次單獨嘗試。

  • metadata (collections.abc.Sequence[tuple[str, str]]) – 提供給方法的附加元資料。

返回:

google.api_core.operation.Operation 的一個例項

返回型別:

google.api_core.operation.Operation

create_workflow_template(template, project_id, region, retry=DEFAULT, timeout=None, metadata=())[source]

建立一個新的工作流模板。

引數:
  • project_id (str) – 叢集所屬的 Google Cloud 專案 ID。

  • region (str) – 用於處理請求的 Cloud Dataproc 區域。

  • template (dict | google.cloud.dataproc_v1.WorkflowTemplate) – 要建立的 Dataproc 工作流模板。如果提供了字典,則其形式必須與 protobuf 訊息 WorkflowTemplate 相同。

  • retry (google.api_core.retry.Retry | google.api_core.gapic_v1.method._MethodDefault) – 用於重試請求的重試物件。如果為 None,則不會重試請求。

  • timeout (float | None) – 等待請求完成的時間量(以秒為單位)。如果指定了 retry,則超時適用於每次單獨嘗試。

  • metadata (collections.abc.Sequence[tuple[str, str]]) – 提供給方法的附加元資料。

instantiate_workflow_template(template_name, project_id, region, version=None, request_id=None, parameters=None, retry=DEFAULT, timeout=None, metadata=())[source]

例項化模板並開始執行。

引數:
  • template_name (str) – 要例項化的模板名稱。

  • project_id (str) – 叢集所屬的 Google Cloud 專案 ID。

  • region (str) – 用於處理請求的 Cloud Dataproc 區域。

  • version (int | None) – 要例項化的工作流模板版本。如果指定,僅當工作流模板的當前版本與提供的版本相同時,才會例項化工作流。此選項不能用於例項化工作流模板的先前版本。

  • request_id (str | None) – 用於防止多個具有相同標籤的併發工作流例項執行的標籤。這可減輕因重試而啟動併發例項的風險。

  • parameters (dict[str, str] | None) – 引數名稱到應使用的值的對映。值不能超過 100 個字元。

  • retry (google.api_core.retry.Retry | google.api_core.gapic_v1.method._MethodDefault) – 用於重試請求的重試物件。如果為 None,則不會重試請求。

  • timeout (float | None) – 等待請求完成的時間量(以秒為單位)。如果指定了 retry,則超時適用於每次單獨嘗試。

  • metadata (collections.abc.Sequence[tuple[str, str]]) – 提供給方法的附加元資料。

instantiate_inline_workflow_template(template, project_id, region, request_id=None, retry=DEFAULT, timeout=None, metadata=())[source]

例項化模板並開始執行。

引數:
  • template (dict | google.cloud.dataproc_v1.WorkflowTemplate) – 要例項化的工作流模板。如果提供了字典,則其形式必須與 protobuf 訊息 WorkflowTemplate 相同。

  • project_id (str) – 叢集所屬的 Google Cloud 專案 ID。

  • region (str) – 用於處理請求的 Cloud Dataproc 區域。

  • request_id (str | None) – 用於防止多個具有相同標籤的併發工作流例項執行的標籤。這可減輕因重試而啟動併發例項的風險。

  • retry (google.api_core.retry.Retry | google.api_core.gapic_v1.method._MethodDefault) – 用於重試請求的重試物件。如果為 None,則不會重試請求。

  • timeout (float | None) – 等待請求完成的時間量(以秒為單位)。如果指定了 retry,則超時適用於每次單獨嘗試。

  • metadata (collections.abc.Sequence[tuple[str, str]]) – 提供給方法的附加元資料。

wait_for_job(job_id, project_id, region, wait_time=10, timeout=None)[source]

輪詢作業以檢查其是否已完成。

引數:
  • job_id (str) – Dataproc 作業 ID。

  • project_id (str) – 叢集所屬的 Google Cloud 專案 ID。

  • region (str) – 用於處理請求的 Cloud Dataproc 區域。

  • wait_time (int) – 兩次檢查之間的秒數。

  • timeout (int | None) – 等待作業就緒的秒數。

get_job(job_id, project_id, region, retry=DEFAULT, timeout=None, metadata=())[source]

獲取專案中作業的資源表示。

引數:
  • job_id (str) – Dataproc 作業 ID。

  • project_id (str) – 叢集所屬的 Google Cloud 專案 ID。

  • region (str) – 用於處理請求的 Cloud Dataproc 區域。

  • retry (google.api_core.retry.Retry | google.api_core.gapic_v1.method._MethodDefault) – 用於重試請求的重試物件。如果為 None,則不會重試請求。

  • timeout (float | None) – 等待請求完成的時間量(以秒為單位)。如果指定了 retry,則超時適用於每次單獨嘗試。

  • metadata (collections.abc.Sequence[tuple[str, str]]) – 提供給方法的附加元資料。

submit_job(job, project_id, region, request_id=None, retry=DEFAULT, timeout=None, metadata=())[source]

向叢集提交作業。

引數:
  • job (dict | google.cloud.dataproc_v1.Job) – 作業資源。如果提供了字典,則其形式必須與 protobuf 訊息 Job 相同。

  • project_id (str) – 叢集所屬的 Google Cloud 專案 ID。

  • region (str) – 用於處理請求的 Cloud Dataproc 區域。

  • request_id (str | None) – 用於防止多個具有相同標籤的併發工作流例項執行的標籤。這可減輕因重試而啟動併發例項的風險。

  • retry (google.api_core.retry.Retry | google.api_core.gapic_v1.method._MethodDefault) – 用於重試請求的重試物件。如果為 None,則不會重試請求。

  • timeout (float | None) – 等待請求完成的時間量(以秒為單位)。如果指定了 retry,則超時適用於每次單獨嘗試。

  • metadata (collections.abc.Sequence[tuple[str, str]]) – 提供給方法的附加元資料。

cancel_job(job_id, project_id, region=None, retry=DEFAULT, timeout=None, metadata=())[source]

啟動作業取消請求。

引數:
  • project_id (str) – 叢集所屬的 Google Cloud 專案 ID。

  • region (str | None) – 處理請求的 Cloud Dataproc 區域。

  • job_id (str) – 作業 ID。

  • retry (google.api_core.retry.Retry | google.api_core.gapic_v1.method._MethodDefault) – 用於重試請求的重試物件。如果為 None,則不會重試請求。

  • timeout (float | None) – 等待請求完成的時間量(以秒為單位)。如果指定了 retry,則超時適用於每次單獨嘗試。

  • metadata (collections.abc.Sequence[tuple[str, str]]) – 提供給方法的附加元資料。

create_batch(region, project_id, batch, batch_id=None, request_id=None, retry=DEFAULT, timeout=None, metadata=())[source]

建立批處理工作負載。

引數:
  • project_id (str) – 叢集所屬的 Google Cloud 專案 ID。

  • region (str) – 用於處理請求的 Cloud Dataproc 區域。

  • batch (dict | google.cloud.dataproc_v1.Batch) – 要建立的批處理。

  • batch_id (str | None) – 要用於批處理的 ID,它將成為批處理資源名稱的最後一個組成部分。此值的長度必須為 4-63 個字元。有效字元為 [a-z][0-9]-

  • request_id (str | None) – 用於標識請求的唯一 ID。如果伺服器接收到兩個具有相同 ID 的 CreateBatchRequest 請求,則第二個請求將被忽略,並返回為第一個請求建立並存儲在後端的操作。

  • retry (google.api_core.retry.Retry | google.api_core.gapic_v1.method._MethodDefault) – 用於重試請求的重試物件。如果為 None,則不會重試請求。

  • timeout (float | None) – 等待請求完成的時間量(以秒為單位)。如果指定了 retry,則超時適用於每次單獨嘗試。

  • metadata (collections.abc.Sequence[tuple[str, str]]) – 提供給方法的附加元資料。

delete_batch(batch_id, region, project_id, retry=DEFAULT, timeout=None, metadata=())[source]

刪除批處理工作負載資源。

引數:
  • batch_id (str) – 批處理 ID。

  • project_id (str) – 叢集所屬的 Google Cloud 專案 ID。

  • region (str) – 用於處理請求的 Cloud Dataproc 區域。

  • retry (google.api_core.retry.Retry | google.api_core.gapic_v1.method._MethodDefault) – 用於重試請求的重試物件。如果為 None,則不會重試請求。

  • timeout (float | None) – 等待請求完成的時間量(以秒為單位)。如果指定了 retry,則超時適用於每次單獨嘗試。

  • metadata (collections.abc.Sequence[tuple[str, str]]) – 提供給方法的附加元資料。

get_batch(batch_id, region, project_id, retry=DEFAULT, timeout=None, metadata=())[source]

獲取批處理工作負載資源的表示。

引數:
  • batch_id (str) – 批處理 ID。

  • project_id (str) – 叢集所屬的 Google Cloud 專案 ID。

  • region (str) – 用於處理請求的 Cloud Dataproc 區域。

  • retry (google.api_core.retry.Retry | google.api_core.gapic_v1.method._MethodDefault) – 用於重試請求的重試物件。如果為 None,則不會重試請求。

  • timeout (float | None) – 等待請求完成的時間量(以秒為單位)。如果指定了 retry,則超時適用於每次單獨嘗試。

  • metadata (collections.abc.Sequence[tuple[str, str]]) – 提供給方法的附加元資料。

list_batches(region, project_id, page_size=None, page_token=None, retry=DEFAULT, timeout=None, metadata=(), filter=None, order_by=None)[source]

列出批處理工作負載。

引數:
  • project_id (str) – 叢集所屬的 Google Cloud 專案 ID。

  • region (str) – 用於處理請求的 Cloud Dataproc 區域。

  • page_size (int | None) – 每個響應中返回的最大批處理數量。服務返回的數量可能少於此值。預設頁面大小為 20;最大頁面大小為 1000。

  • page_token (str | None) – 從先前的 ListBatches 呼叫接收到的頁面令牌。提供此令牌以檢索後續頁面。

  • retry (google.api_core.retry.Retry | google.api_core.gapic_v1.method._MethodDefault) – 用於重試請求的重試物件。如果為 None,則不會重試請求。

  • timeout (float | None) – 等待請求完成的時間量(以秒為單位)。如果指定了 retry,則超時適用於每次單獨嘗試。

  • metadata (collections.abc.Sequence[tuple[str, str]]) – 提供給方法的附加元資料。

  • filter (str | None) – 如 ListBatchesRequest 中指定的篩選結果。

  • order_by (str | None) – 如 ListBatchesRequest 中指定的排序結果方式。

wait_for_batch(batch_id, region, project_id, wait_check_interval=10, retry=DEFAULT, timeout=None, metadata=())[source]

等待批處理作業完成。

提交批處理作業後,Operator 會等待作業完成。然而,當 Airflow 重啟或任務 PID 因任何原因被終止時,此 hook 非常有用。在這種情況下,建立會再次發生,捕獲丟擲的 AlreadyExists 異常,並失敗到此函式以等待完成。

引數:
  • batch_id (str) – 批處理 ID。

  • region (str) – 用於處理請求的 Cloud Dataproc 區域。

  • project_id (str) – 叢集所屬的 Google Cloud 專案 ID。

  • wait_check_interval (int) – 兩次檢查作業完成情況之間暫停的時間。

  • retry (google.api_core.retry_async.AsyncRetry | google.api_core.gapic_v1.method._MethodDefault) – 用於重試請求的重試物件。如果為 None,則不會重試請求。

  • timeout (float | None) – 等待請求完成的時間量(以秒為單位)。如果指定了 retry,則超時適用於每次單獨嘗試。

  • metadata (collections.abc.Sequence[tuple[str, str]]) – 提供給方法的附加元資料。

check_error_for_resource_is_not_ready_msg(error_msg)[source]

檢查錯誤原因是否為資源尚未準備就緒。

class airflow.providers.google.cloud.hooks.dataproc.DataprocAsyncHook(gcp_conn_id='google_cloud_default', impersonation_chain=None, **kwargs)[source]

基類: airflow.providers.google.common.hooks.base_google.GoogleBaseHook

與 Google Cloud Dataproc API 的非同步互動。

在 Hook 中使用 project_id 的所有方法都必須使用關鍵字引數而不是位置引數呼叫。

get_cluster_client(region=None)[source]

建立 ClusterControllerAsyncClient。

get_template_client(region=None)[source]

建立 WorkflowTemplateServiceAsyncClient。

get_job_client(region=None)[source]

建立 JobControllerAsyncClient。

get_batch_client(region=None)[source]

建立 BatchControllerAsyncClient。

get_operations_client(region)[source]

建立一個 OperationsClient。

async create_cluster(region, project_id, cluster_name, cluster_config=None, virtual_cluster_config=None, labels=None, request_id=None, retry=DEFAULT, timeout=None, metadata=())[source]

在一個專案中建立叢集。

引數:
  • project_id (str) – 叢集所屬的 Google Cloud 專案 ID。

  • region (str) – 用於處理請求的 Cloud Dataproc 區域。

  • cluster_name (str) – 要建立的叢集名稱。

  • labels (dict[str, str] | None) – 將分配給所建立叢集的標籤。

  • cluster_config (dict | google.cloud.dataproc_v1.Cluster | None) – 要建立的叢集配置。如果提供的是字典,其形式必須與 protobuf 訊息 ClusterConfig 相同。

  • virtual_cluster_config (dict | None) – 虛擬叢集配置,用於建立不直接控制底層計算資源的 Dataproc 叢集時,例如使用 VirtualClusterConfig 建立 Dataproc-on-GKE 叢集時。

  • request_id (str | None) – 用於標識請求的唯一 ID。如果伺服器收到兩個 CreateClusterRequest 請求具有相同的 ID,則第二個請求將被忽略,並返回為第一個請求建立並存儲在後端的某個操作。

  • retry (google.api_core.retry_async.AsyncRetry | google.api_core.gapic_v1.method._MethodDefault) – 用於重試請求的重試物件。如果為 None,則不會重試請求。

  • timeout (float | None) – 等待請求完成的時間量(以秒為單位)。如果指定了 retry,則超時適用於每次單獨嘗試。

  • metadata (collections.abc.Sequence[tuple[str, str]]) – 提供給方法的附加元資料。

async delete_cluster(region, cluster_name, project_id, cluster_uuid=None, request_id=None, retry=DEFAULT, timeout=None, metadata=())[source]

在專案中刪除一個叢集。

引數:
  • project_id (str) – 叢集所屬的 Google Cloud 專案 ID。

  • region (str) – 用於處理請求的 Cloud Dataproc 區域。

  • cluster_name (str) – 要刪除的叢集名稱。

  • cluster_uuid (str | None) – 如果指定,則如果具有該 UUID 的叢集不存在,RPC 應該失敗。

  • request_id (str | None) – 用於標識請求的唯一 ID。如果伺服器收到兩個 DeleteClusterRequest 請求具有相同的 ID,則第二個請求將被忽略,並返回為第一個請求建立並存儲在後端的某個操作。

  • retry (google.api_core.retry_async.AsyncRetry | google.api_core.gapic_v1.method._MethodDefault) – 用於重試請求的重試物件。如果為 None,則不會重試請求。

  • timeout (float | None) – 等待請求完成的時間量(以秒為單位)。如果指定了 retry,則超時適用於每次單獨嘗試。

  • metadata (collections.abc.Sequence[tuple[str, str]]) – 提供給方法的附加元資料。

async diagnose_cluster(region, cluster_name, project_id, tarball_gcs_dir=None, diagnosis_interval=None, jobs=None, yarn_application_ids=None, retry=DEFAULT, timeout=None, metadata=())[source]

獲取叢集診斷資訊。

操作完成後,響應包含診斷輸出報告的 Cloud Storage URI,其中包含收集的診斷資訊的摘要。

引數:
  • project_id (str) – 叢集所屬的 Google Cloud 專案 ID。

  • region (str) – 用於處理請求的 Cloud Dataproc 區域。

  • cluster_name (str) – 叢集名稱。

  • tarball_gcs_dir (str | None) – 診斷 tarball 的輸出 Cloud Storage 目錄。如果未指定,將使用叢集暫存儲存分割槽中的任務特定目錄。

  • diagnosis_interval (dict | google.type.interval_pb2.Interval | None) – 應在叢集上執行診斷的時間間隔。

  • jobs (collections.abc.MutableSequence[str] | None) – 指定要執行診斷的作業列表。格式: projects/{project}/regions/{region}/jobs/{job}

  • yarn_application_ids (collections.abc.MutableSequence[str] | None) – 指定要執行診斷的 yarn 應用列表。

  • retry (google.api_core.retry_async.AsyncRetry | google.api_core.gapic_v1.method._MethodDefault) – 用於重試請求的重試物件。如果為 None,則不會重試請求。

  • timeout (float | None) – 等待請求完成的時間量(以秒為單位)。如果指定了 retry,則超時適用於每次單獨嘗試。

  • metadata (collections.abc.Sequence[tuple[str, str]]) – 提供給方法的附加元資料。

async get_cluster(region, cluster_name, project_id, retry=DEFAULT, timeout=None, metadata=())[source]

獲取專案中叢集的資源表示。

引數:
  • project_id (str) – 叢集所屬的 Google Cloud 專案 ID。

  • region (str) – 用於處理請求的 Cloud Dataproc 區域。

  • cluster_name (str) – 叢集名稱。

  • retry (google.api_core.retry_async.AsyncRetry | google.api_core.gapic_v1.method._MethodDefault) – 用於重試請求的重試物件。如果為 None,則不會重試請求。

  • timeout (float | None) – 等待請求完成的時間量(以秒為單位)。如果指定了 retry,則超時適用於每次單獨嘗試。

  • metadata (collections.abc.Sequence[tuple[str, str]]) – 提供給方法的附加元資料。

async list_clusters(region, filter_, project_id, page_size=None, retry=DEFAULT, timeout=None, metadata=())[source]

列出專案中的所有 regions/{region}/clusters。

引數:
  • project_id (str) – 叢集所屬的 Google Cloud 專案 ID。

  • region (str) – 用於處理請求的 Cloud Dataproc 區域。

  • filter – 用於約束叢集。區分大小寫。

  • page_size (int | None) – 底層 API 響應中包含的最大資源數。如果按資源執行分頁流,則此引數不影響返回值。如果按頁執行分頁流,則此引數確定一頁中的最大資源數。

  • retry (google.api_core.retry_async.AsyncRetry | google.api_core.gapic_v1.method._MethodDefault) – 用於重試請求的重試物件。如果為 None,則不會重試請求。

  • timeout (float | None) – 等待請求完成的時間量(以秒為單位)。如果指定了 retry,則超時適用於每次單獨嘗試。

  • metadata (collections.abc.Sequence[tuple[str, str]]) – 提供給方法的附加元資料。

async update_cluster(cluster_name, cluster, update_mask, project_id, region, graceful_decommission_timeout=None, request_id=None, retry=DEFAULT, timeout=None, metadata=())[source]

在專案中更新一個叢集。

引數:
  • project_id (str) – 叢集所屬的 Google Cloud 專案 ID。

  • region (str) – 用於處理請求的 Cloud Dataproc 區域。

  • cluster_name (str) – 叢集名稱。

  • cluster (dict | google.cloud.dataproc_v1.Cluster) – 叢集的更改。如果提供了字典,則其形式必須與 protobuf 訊息 Cluster 相同。

  • update_mask (dict | google.protobuf.field_mask_pb2.FieldMask) –

    指定相對於 Cluster 要更新欄位的路徑。例如,要將叢集中的 worker 數量更改為 5,應指定為 config.worker_config.num_instances,並且 PATCH 請求正文將指定新值。

    {"config": {"workerConfig": {"numInstances": "5"}}}
    

    同樣,要將叢集中的搶佔式 worker 數量更改為 5,應指定為 config.secondary_worker_config.num_instances,並且 PATCH 請求正文將指定新值。

    {"config": {"secondaryWorkerConfig": {"numInstances": "5"}}}
    

    如果提供了字典,則其形式必須與 protobuf 訊息 FieldMask 相同。

  • graceful_decommission_timeout (dict | google.protobuf.duration_pb2.Duration | None) –

    優雅 YARN 退役的超時時間。優雅退役允許從叢集中移除節點,而不會中斷正在進行的作業。超時指定在強制移除節點(並可能中斷作業)之前等待正在進行的作業完成的時間。預設超時為 0(表示強制退役),允許的最大超時時間為一天。

    僅支援 Dataproc 映像版本 1.2 及更高版本。

    如果提供了字典,則其形式必須與 protobuf 訊息 Duration 相同。

  • request_id (str | None) – 用於標識請求的唯一 ID。如果伺服器接收到兩個具有相同 ID 的 UpdateClusterRequest 請求,則第二個請求將被忽略,並返回為第一個請求建立並存儲在後端的操作。

  • retry (google.api_core.retry_async.AsyncRetry | google.api_core.gapic_v1.method._MethodDefault) – 用於重試請求的重試物件。如果為 None,則不會重試請求。

  • timeout (float | None) – 等待請求完成的時間量(以秒為單位)。如果指定了 retry,則超時適用於每次單獨嘗試。

  • metadata (collections.abc.Sequence[tuple[str, str]]) – 提供給方法的附加元資料。

async create_workflow_template(template, project_id, region, retry=DEFAULT, timeout=None, metadata=())[source]

建立一個新的工作流模板。

引數:
  • project_id (str) – 叢集所屬的 Google Cloud 專案 ID。

  • region (str) – 用於處理請求的 Cloud Dataproc 區域。

  • template (dict | google.cloud.dataproc_v1.WorkflowTemplate) – 要建立的 Dataproc 工作流模板。如果提供了字典,則其形式必須與 protobuf 訊息 WorkflowTemplate 相同。

  • retry (google.api_core.retry_async.AsyncRetry | google.api_core.gapic_v1.method._MethodDefault) – 用於重試請求的重試物件。如果為 None,則不會重試請求。

  • timeout (float | None) – 等待請求完成的時間量(以秒為單位)。如果指定了 retry,則超時適用於每次單獨嘗試。

  • metadata (collections.abc.Sequence[tuple[str, str]]) – 提供給方法的附加元資料。

async instantiate_workflow_template(template_name, project_id, region, version=None, request_id=None, parameters=None, retry=DEFAULT, timeout=None, metadata=())[source]

例項化模板並開始執行。

引數:
  • template_name (str) – 要例項化的模板名稱。

  • project_id (str) – 叢集所屬的 Google Cloud 專案 ID。

  • region (str) – 用於處理請求的 Cloud Dataproc 區域。

  • version (int | None) – 要例項化的工作流模板版本。如果指定,僅當工作流模板的當前版本與提供的版本相同時,才會例項化工作流。此選項不能用於例項化工作流模板的先前版本。

  • request_id (str | None) – 用於防止多個具有相同標籤的併發工作流例項執行的標籤。這可減輕因重試而啟動併發例項的風險。

  • parameters (dict[str, str] | None) – 引數名稱到應使用的值的對映。值不能超過 100 個字元。

  • retry (google.api_core.retry_async.AsyncRetry | google.api_core.gapic_v1.method._MethodDefault) – 用於重試請求的重試物件。如果為 None,則不會重試請求。

  • timeout (float | None) – 等待請求完成的時間量(以秒為單位)。如果指定了 retry,則超時適用於每次單獨嘗試。

  • metadata (collections.abc.Sequence[tuple[str, str]]) – 提供給方法的附加元資料。

async instantiate_inline_workflow_template(template, project_id, region, request_id=None, retry=DEFAULT, timeout=None, metadata=())[source]

例項化模板並開始執行。

引數:
  • template (dict | google.cloud.dataproc_v1.WorkflowTemplate) – 要例項化的工作流模板。如果提供了字典,則其形式必須與 protobuf 訊息 WorkflowTemplate 相同。

  • project_id (str) – 叢集所屬的 Google Cloud 專案 ID。

  • region (str) – 用於處理請求的 Cloud Dataproc 區域。

  • request_id (str | None) – 用於防止多個具有相同標籤的併發工作流例項執行的標籤。這可減輕因重試而啟動併發例項的風險。

  • retry (google.api_core.retry_async.AsyncRetry | google.api_core.gapic_v1.method._MethodDefault) – 用於重試請求的重試物件。如果為 None,則不會重試請求。

  • timeout (float | None) – 等待請求完成的時間量(以秒為單位)。如果指定了 retry,則超時適用於每次單獨嘗試。

  • metadata (collections.abc.Sequence[tuple[str, str]]) – 提供給方法的附加元資料。

async get_operation(region, operation_name)[source]
async get_job(job_id, project_id, region, retry=DEFAULT, timeout=None, metadata=())[source]

獲取專案中作業的資源表示。

引數:
  • job_id (str) – Dataproc 作業 ID。

  • project_id (str) – 叢集所屬的 Google Cloud 專案 ID。

  • region (str) – 用於處理請求的 Cloud Dataproc 區域。

  • retry (google.api_core.retry_async.AsyncRetry | google.api_core.gapic_v1.method._MethodDefault) – 用於重試請求的重試物件。如果為 None,則不會重試請求。

  • timeout (float | None) – 等待請求完成的時間量(以秒為單位)。如果指定了 retry,則超時適用於每次單獨嘗試。

  • metadata (collections.abc.Sequence[tuple[str, str]]) – 提供給方法的附加元資料。

async submit_job(job, project_id, region, request_id=None, retry=DEFAULT, timeout=None, metadata=())[source]

向叢集提交作業。

引數:
  • job (dict | google.cloud.dataproc_v1.Job) – 作業資源。如果提供了字典,則其形式必須與 protobuf 訊息 Job 相同。

  • project_id (str) – 叢集所屬的 Google Cloud 專案 ID。

  • region (str) – 用於處理請求的 Cloud Dataproc 區域。

  • request_id (str | None) – 用於防止多個具有相同標籤的併發工作流例項執行的標籤。這可減輕因重試而啟動併發例項的風險。

  • retry (google.api_core.retry_async.AsyncRetry | google.api_core.gapic_v1.method._MethodDefault) – 用於重試請求的重試物件。如果為 None,則不會重試請求。

  • timeout (float | None) – 等待請求完成的時間量(以秒為單位)。如果指定了 retry,則超時適用於每次單獨嘗試。

  • metadata (collections.abc.Sequence[tuple[str, str]]) – 提供給方法的附加元資料。

async cancel_job(job_id, project_id, region=None, retry=DEFAULT, timeout=None, metadata=())[source]

啟動作業取消請求。

引數:
  • project_id (str) – 叢集所屬的 Google Cloud 專案 ID。

  • region (str | None) – 處理請求的 Cloud Dataproc 區域。

  • job_id (str) – 作業 ID。

  • retry (google.api_core.retry_async.AsyncRetry | google.api_core.gapic_v1.method._MethodDefault) – 用於重試請求的重試物件。如果為 None,則不會重試請求。

  • timeout (float | None) – 等待請求完成的時間量(以秒為單位)。如果指定了 retry,則超時適用於每次單獨嘗試。

  • metadata (collections.abc.Sequence[tuple[str, str]]) – 提供給方法的附加元資料。

async create_batch(region, project_id, batch, batch_id=None, request_id=None, retry=DEFAULT, timeout=None, metadata=())[source]

建立批處理工作負載。

引數:
  • project_id (str) – 叢集所屬的 Google Cloud 專案 ID。

  • region (str) – 用於處理請求的 Cloud Dataproc 區域。

  • batch (dict | google.cloud.dataproc_v1.Batch) – 要建立的批處理。

  • batch_id (str | None) – 要用於批處理的 ID,它將成為批處理資源名稱的最後一個組成部分。此值的長度必須為 4-63 個字元。有效字元為 [a-z][0-9]-

  • request_id (str | None) – 用於標識請求的唯一 ID。如果伺服器接收到兩個具有相同 ID 的 CreateBatchRequest 請求,則第二個請求將被忽略,並返回為第一個請求建立並存儲在後端的操作。

  • retry (google.api_core.retry_async.AsyncRetry | google.api_core.gapic_v1.method._MethodDefault) – 用於重試請求的重試物件。如果為 None,則不會重試請求。

  • timeout (float | None) – 等待請求完成的時間量(以秒為單位)。如果指定了 retry,則超時適用於每次單獨嘗試。

  • metadata (collections.abc.Sequence[tuple[str, str]]) – 提供給方法的附加元資料。

async delete_batch(batch_id, region, project_id, retry=DEFAULT, timeout=None, metadata=())[source]

刪除批處理工作負載資源。

引數:
  • batch_id (str) – 批處理 ID。

  • project_id (str) – 叢集所屬的 Google Cloud 專案 ID。

  • region (str) – 用於處理請求的 Cloud Dataproc 區域。

  • retry (google.api_core.retry_async.AsyncRetry | google.api_core.gapic_v1.method._MethodDefault) – 用於重試請求的重試物件。如果為 None,則不會重試請求。

  • timeout (float | None) – 等待請求完成的時間量(以秒為單位)。如果指定了 retry,則超時適用於每次單獨嘗試。

  • metadata (collections.abc.Sequence[tuple[str, str]]) – 提供給方法的附加元資料。

async get_batch(batch_id, region, project_id, retry=DEFAULT, timeout=None, metadata=())[source]

獲取批處理工作負載資源的表示。

引數:
  • batch_id (str) – 批處理 ID。

  • project_id (str) – 叢集所屬的 Google Cloud 專案 ID。

  • region (str) – 用於處理請求的 Cloud Dataproc 區域。

  • retry (google.api_core.retry_async.AsyncRetry | google.api_core.gapic_v1.method._MethodDefault) – 用於重試請求的重試物件。如果為 None,則不會重試請求。

  • timeout (float | None) – 等待請求完成的時間量(以秒為單位)。如果指定了 retry,則超時適用於每次單獨嘗試。

  • metadata (collections.abc.Sequence[tuple[str, str]]) – 提供給方法的附加元資料。

async list_batches(region, project_id, page_size=None, page_token=None, retry=DEFAULT, timeout=None, metadata=(), filter=None, order_by=None)[source]

列出批處理工作負載。

引數:
  • project_id (str) – 叢集所屬的 Google Cloud 專案 ID。

  • region (str) – 用於處理請求的 Cloud Dataproc 區域。

  • page_size (int | None) – 每個響應中返回的最大批處理數量。服務返回的數量可能少於此值。預設頁面大小為 20;最大頁面大小為 1000。

  • page_token (str | None) – 從先前的 ListBatches 呼叫接收到的頁面令牌。提供此令牌以檢索後續頁面。

  • retry (google.api_core.retry_async.AsyncRetry | google.api_core.gapic_v1.method._MethodDefault) – 用於重試請求的重試物件。如果為 None,則不會重試請求。

  • timeout (float | None) – 等待請求完成的時間量(以秒為單位)。如果指定了 retry,則超時適用於每次單獨嘗試。

  • metadata (collections.abc.Sequence[tuple[str, str]]) – 提供給方法的附加元資料。

  • filter (str | None) – 如 ListBatchesRequest 中指定的篩選結果。

  • order_by (str | None) – 如 ListBatchesRequest 中指定的排序結果方式。

此條目有幫助嗎?