Google Cloud Dataproc Operator

Dataproc 是一項託管式 Apache Spark 和 Apache Hadoop 服務,可讓您利用開源資料工具進行批處理、查詢、流處理和機器學習。Dataproc 自動化功能可幫助您快速建立叢集、輕鬆管理叢集,並在不再需要叢集時將其關閉以節省成本。

有關此服務的更多資訊,請訪問 Dataproc 生產文件

前置任務

要使用這些 Operator,您必須做幾件事

建立叢集

建立 Dataproc 叢集時,可以選擇 Compute Engine 作為部署平臺。在此配置中,Dataproc 會自動預置執行叢集所需的 Compute Engine VM 例項。VM 例項用於主節點、主工作節點和次要工作節點(如果指定)。這些 VM 例項由 Compute Engine 建立和管理,而 Dataproc 負責配置大資料處理任務所需的軟體和編排。透過提供節點的配置,您可以描述主節點和次要節點的配置,以及 Compute Engine 例項叢集的狀態。配置次要工作節點時,您可以指定工作節點的數量及其型別。透過啟用“搶佔式”選項以對這些節點使用搶佔式 VM(等同於 Spot 例項),您可以利用這些例項為您的 Dataproc 工作負載帶來的成本節約。主節點通常託管叢集主控和各種控制服務,不具備搶佔式選項,因為主節點保持穩定性和可用性至關重要。叢集建立後,配置設定(包括次要工作節點的搶佔性)無法直接修改。

有關建立叢集時可傳遞的可用欄位的更多資訊,請訪問 Dataproc create cluster API。

叢集配置示例如下

tests/system/google/cloud/dataproc/example_dataproc_hive.py


CLUSTER_CONFIG = {
    "master_config": {
        "num_instances": 1,
        "machine_type_uri": "n1-standard-4",
        "disk_config": {"boot_disk_type": "pd-standard", "boot_disk_size_gb": 32},
    },
    "worker_config": {
        "num_instances": 2,
        "machine_type_uri": "n1-standard-4",
        "disk_config": {"boot_disk_type": "pd-standard", "boot_disk_size_gb": 32},
    },
    "secondary_worker_config": {
        "num_instances": 1,
        "machine_type_uri": "n1-standard-4",
        "disk_config": {
            "boot_disk_type": "pd-standard",
            "boot_disk_size_gb": 32,
        },
        "is_preemptible": True,
        "preemptibility": "PREEMPTIBLE",
    },
}

使用此配置,我們可以建立叢集: DataprocCreateClusterOperator

tests/system/google/cloud/dataproc/example_dataproc_hive.py

create_cluster = DataprocCreateClusterOperator(
    task_id="create_cluster",
    project_id=PROJECT_ID,
    cluster_config=CLUSTER_CONFIG,
    region=REGION,
    cluster_name=CLUSTER_NAME,
    retry=Retry(maximum=100.0, initial=10.0, multiplier=1.0),
    num_retries_if_resource_is_not_ready=3,
)

GKE 上的 Dataproc 將 Dataproc 虛擬叢集部署到 GKE 叢集上。與 Compute Engine 叢集上的 Dataproc 不同,GKE 上的 Dataproc 虛擬叢集不包含單獨的主 VM 和工作器 VM。相反,當您建立 GKE 上的 Dataproc 虛擬叢集時,GKE 上的 Dataproc 會在 GKE 叢集內建立節點池。GKE 上的 Dataproc 作業以 Pod 的形式在這些節點池上執行。節點池和 Pod 在節點池上的排程由 GKE 管理。

建立 GKE Dataproc 叢集時,您可以指定底層計算資源使用搶佔式 VM。GKE 支援使用搶佔式 VM 作為節省成本的措施。透過啟用搶佔式 VM,GKE 將使用搶佔式 VM 預置叢集節點。或者,您可以將節點建立為 Spot VM 例項,這是舊版搶佔式 VM 的最新更新。這有助於在 GKE 上執行 Dataproc 工作負載同時最佳化成本。

要在 Google Kubernetes Engine 中建立 Dataproc 叢集,您可以傳遞叢集配置

tests/system/google/cloud/dataproc/example_dataproc_gke.py


VIRTUAL_CLUSTER_CONFIG = {
    "kubernetes_cluster_config": {
        "gke_cluster_config": {
            "gke_cluster_target": f"projects/{PROJECT_ID}/locations/{REGION}/clusters/{GKE_CLUSTER_NAME}",
            "node_pool_target": [
                {
                    "node_pool": f"projects/{PROJECT_ID}/locations/{REGION}/clusters/{GKE_CLUSTER_NAME}/nodePools/dp",
                    "roles": ["DEFAULT"],
                    "node_pool_config": {
                        "config": {
                            "preemptible": False,
                            "machine_type": "e2-standard-4",
                        }
                    },
                }
            ],
        },
        "kubernetes_software_config": {"component_version": {"SPARK": "3"}},
    },
    "staging_bucket": "test-staging-bucket",
}

使用此配置,我們可以建立叢集: DataprocCreateClusterOperator

tests/system/google/cloud/dataproc/example_dataproc_gke.py

create_cluster_in_gke = DataprocCreateClusterOperator(
    task_id="create_cluster_in_gke",
    project_id=PROJECT_ID,
    region=REGION,
    cluster_name=CLUSTER_NAME,
    virtual_cluster_config=VIRTUAL_CLUSTER_CONFIG,
    retry=Retry(maximum=100.0, initial=10.0, multiplier=1.0),
    num_retries_if_resource_is_not_ready=3,
)

您還可以建立帶有可選元件 Presto 的 Dataproc 叢集。為此,請使用以下配置。請注意,預設映象可能不支援所選的可選元件。如果出現這種情況,請指定您可以在文件中找到的正確 image_version

tests/system/google/cloud/dataproc/example_dataproc_presto.py

CLUSTER_CONFIG = {
    "master_config": {
        "num_instances": 1,
        "machine_type_uri": "n1-standard-4",
        "disk_config": {"boot_disk_type": "pd-standard", "boot_disk_size_gb": 32},
    },
    "worker_config": {
        "num_instances": 2,
        "machine_type_uri": "n1-standard-4",
        "disk_config": {"boot_disk_type": "pd-standard", "boot_disk_size_gb": 32},
    },
    "software_config": {
        "optional_components": [
            "PRESTO",
        ],
        "image_version": "2.0",
    },
}

您還可以建立帶有可選元件 Trino 的 Dataproc 叢集。為此,請使用以下配置。請注意,預設映象可能不支援所選的可選元件。如果出現這種情況,請指定您可以在文件中找到的正確 image_version

tests/system/google/cloud/dataproc/example_dataproc_trino.py


CLUSTER_CONFIG = {
    "master_config": {
        "num_instances": 1,
        "machine_type_uri": "n1-standard-4",
        "disk_config": {"boot_disk_type": "pd-standard", "boot_disk_size_gb": 32},
    },
    "worker_config": {
        "num_instances": 2,
        "machine_type_uri": "n1-standard-4",
        "disk_config": {"boot_disk_type": "pd-standard", "boot_disk_size_gb": 32},
    },
    "software_config": {
        "optional_components": [
            "TRINO",
        ],
        "image_version": "2.1",
    },
}

您可以對此操作使用可延遲模式,以便非同步執行 Operator

tests/system/google/cloud/dataproc/example_dataproc_cluster_deferrable.py

create_cluster = DataprocCreateClusterOperator(
    task_id="create_cluster",
    project_id=PROJECT_ID,
    cluster_config=CLUSTER_CONFIG,
    region=REGION,
    cluster_name=CLUSTER_NAME,
    deferrable=True,
    retry=Retry(maximum=100.0, initial=10.0, multiplier=1.0),
    num_retries_if_resource_is_not_ready=3,
)

生成叢集配置

您還可以使用函式式 API 生成 CLUSTER_CONFIG,使用 ClusterGeneratormake() 可以輕鬆完成此操作。您可以按以下方式生成並使用配置

tests/system/google/cloud/dataproc/example_dataproc_cluster_generator.py

CLUSTER_GENERATOR_CONFIG = ClusterGenerator(
    project_id=PROJECT_ID,
    zone=ZONE,
    master_machine_type="n1-standard-4",
    master_disk_size=32,
    worker_machine_type="n1-standard-4",
    worker_disk_size=32,
    num_workers=2,
    storage_bucket=BUCKET_NAME,
    init_actions_uris=[GCS_INIT_FILE],
    metadata={"PIP_PACKAGES": "pyyaml requests pandas openpyxl"},
    num_preemptible_workers=1,
    preemptibility="PREEMPTIBLE",
    internal_ip_only=False,
).make()

診斷叢集

Dataproc 支援收集叢集診斷資訊,如系統、Spark、Hadoop 和 Dataproc 日誌、叢集配置檔案,這些資訊可用於排查 Dataproc 叢集或作業的故障。請務必注意,這些資訊只能在刪除叢集之前收集。有關診斷叢集時可傳遞的可用欄位的更多資訊,請訪問 Dataproc diagnose cluster API。

要診斷 Dataproc 叢集,請使用: DataprocDiagnoseClusterOperator

tests/system/google/cloud/dataproc/example_dataproc_cluster_diagnose.py

    diagnose_cluster = DataprocDiagnoseClusterOperator(
        task_id="diagnose_cluster",
        region=REGION,
        project_id=PROJECT_ID,
        cluster_name=CLUSTER_NAME,
    )

您也可以使用可延遲模式以非同步執行 Operator

tests/system/google/cloud/dataproc/example_dataproc_cluster_diagnose.py

    diagnose_cluster_deferrable = DataprocDiagnoseClusterOperator(
        task_id="diagnose_cluster_deferrable",
        region=REGION,
        project_id=PROJECT_ID,
        cluster_name=CLUSTER_NAME,
        deferrable=True,
    )

更新叢集

您可以透過提供叢集配置和 updateMask 來向上或向下擴充套件叢集。在 updateMask 引數中,您指定了相對於 Cluster 的欄位路徑以進行更新。有關 updateMask 和其他引數的更多資訊,請參閱 Dataproc update cluster API。

新叢集配置和 updateMask 示例如下

tests/system/google/cloud/dataproc/example_dataproc_cluster_update.py

CLUSTER_UPDATE = {
    "config": {"worker_config": {"num_instances": 3}, "secondary_worker_config": {"num_instances": 3}}
}
UPDATE_MASK = {
    "paths": ["config.worker_config.num_instances", "config.secondary_worker_config.num_instances"]
}

要更新叢集,您可以使用: DataprocUpdateClusterOperator

tests/system/google/cloud/dataproc/example_dataproc_cluster_update.py

scale_cluster = DataprocUpdateClusterOperator(
    task_id="scale_cluster",
    cluster_name=CLUSTER_NAME,
    cluster=CLUSTER_UPDATE,
    update_mask=UPDATE_MASK,
    graceful_decommission_timeout=TIMEOUT,
    project_id=PROJECT_ID,
    region=REGION,
    retry=Retry(maximum=100.0, initial=10.0, multiplier=1.0),
)

您可以對此操作使用可延遲模式,以便非同步執行 Operator

tests/system/google/cloud/dataproc/example_dataproc_cluster_deferrable.py

update_cluster = DataprocUpdateClusterOperator(
    task_id="update_cluster",
    cluster_name=CLUSTER_NAME,
    cluster=CLUSTER_UPDATE,
    update_mask=UPDATE_MASK,
    graceful_decommission_timeout=TIMEOUT,
    project_id=PROJECT_ID,
    region=REGION,
    deferrable=True,
    retry=Retry(maximum=100.0, initial=10.0, multiplier=1.0),
)

啟動叢集

要啟動叢集,您可以使用 DataprocStartClusterOperator

tests/system/google/cloud/dataproc/example_dataproc_cluster_start_stop.py

start_cluster = DataprocStartClusterOperator(
    task_id="start_cluster",
    project_id=PROJECT_ID,
    region=REGION,
    cluster_name=CLUSTER_NAME,
)

停止叢集

要停止叢集,您可以使用 DataprocStopClusterOperator

tests/system/google/cloud/dataproc/example_dataproc_cluster_start_stop.py

stop_cluster = DataprocStopClusterOperator(
    task_id="stop_cluster",
    project_id=PROJECT_ID,
    region=REGION,
    cluster_name=CLUSTER_NAME,
)

刪除叢集

要刪除叢集,您可以使用: DataprocDeleteClusterOperator

tests/system/google/cloud/dataproc/example_dataproc_hive.py

delete_cluster = DataprocDeleteClusterOperator(
    task_id="delete_cluster",
    project_id=PROJECT_ID,
    cluster_name=CLUSTER_NAME,
    region=REGION,
)

您可以對此操作使用可延遲模式,以便非同步執行 Operator

tests/system/google/cloud/dataproc/example_dataproc_cluster_deferrable.py

delete_cluster = DataprocDeleteClusterOperator(
    task_id="delete_cluster",
    project_id=PROJECT_ID,
    cluster_name=CLUSTER_NAME,
    region=REGION,
    trigger_rule=TriggerRule.ALL_DONE,
    deferrable=True,
)

向叢集提交作業

Dataproc 支援提交不同大資料元件的作業。當前列表包括 Spark、PySpark、Hadoop、Trino、Pig、Flink 和 Hive。有關版本和映象的更多資訊,請參閱 Cloud Dataproc 映象版本列表

要向叢集提交作業,您需要提供作業原始檔。作業原始檔可以位於 GCS、叢集或本地檔案系統上。您可以指定 file:/// 路徑來引用叢集主節點上的本地檔案。

可以使用以下 Operator 提交作業配置: DataprocSubmitJobOperator

tests/system/google/cloud/dataproc/example_dataproc_pyspark.py

pyspark_task = DataprocSubmitJobOperator(
    task_id="pyspark_task", job=PYSPARK_JOB, region=REGION, project_id=PROJECT_ID
)

要提交的作業配置示例

我們在下面為每個框架提供了示例。作業中可提供的引數比示例所示更多。有關引數的完整列表,請參閱 DataProc 作業引數

PySpark 作業的配置示例

tests/system/google/cloud/dataproc/example_dataproc_pyspark.py

PYSPARK_JOB = {
    "reference": {"project_id": PROJECT_ID},
    "placement": {"cluster_name": CLUSTER_NAME},
    "pyspark_job": {"main_python_file_uri": GCS_JOB_FILE},
}

SparkSQl 作業的配置示例

tests/system/google/cloud/dataproc/example_dataproc_spark_sql.py

SPARK_SQL_JOB = {
    "reference": {"project_id": PROJECT_ID},
    "placement": {"cluster_name": CLUSTER_NAME},
    "spark_sql_job": {"query_list": {"queries": ["SHOW DATABASES;"]}},
}

Spark 作業的配置示例

tests/system/google/cloud/dataproc/example_dataproc_spark.py

SPARK_JOB = {
    "reference": {"project_id": PROJECT_ID},
    "placement": {"cluster_name": CLUSTER_NAME},
    "spark_job": {
        "jar_file_uris": ["file:///usr/lib/spark/examples/jars/spark-examples.jar"],
        "main_class": "org.apache.spark.examples.SparkPi",
    },
}

可延遲模式下執行的 Spark 作業配置示例

tests/system/google/cloud/dataproc/example_dataproc_spark_deferrable.py

SPARK_JOB = {
    "reference": {"project_id": PROJECT_ID},
    "placement": {"cluster_name": CLUSTER_NAME},
    "spark_job": {
        "jar_file_uris": ["file:///usr/lib/spark/examples/jars/spark-examples.jar"],
        "main_class": "org.apache.spark.examples.SparkPi",
    },
}

Hive 作業的配置示例

tests/system/google/cloud/dataproc/example_dataproc_hive.py

HIVE_JOB = {
    "reference": {"project_id": PROJECT_ID},
    "placement": {"cluster_name": CLUSTER_NAME},
    "hive_job": {"query_list": {"queries": ["SHOW DATABASES;"]}},
}

Hadoop 作業的配置示例

tests/system/google/cloud/dataproc/example_dataproc_hadoop.py

HADOOP_JOB = {
    "reference": {"project_id": PROJECT_ID},
    "placement": {"cluster_name": CLUSTER_NAME},
    "hadoop_job": {
        "main_jar_file_uri": "file:///usr/lib/hadoop-mapreduce/hadoop-mapreduce-examples.jar",
        "args": ["wordcount", "gs://pub/shakespeare/rose.txt", OUTPUT_PATH],
    },
}

Pig 作業的配置示例

tests/system/google/cloud/dataproc/example_dataproc_pig.py

PIG_JOB = {
    "reference": {"project_id": PROJECT_ID},
    "placement": {"cluster_name": CLUSTER_NAME},
    "pig_job": {"query_list": {"queries": ["define sin HiveUDF('sin');"]}},
}

SparkR 作業的配置示例

tests/system/google/cloud/dataproc/example_dataproc_sparkr.py

SPARKR_JOB = {
    "reference": {"project_id": PROJECT_ID},
    "placement": {"cluster_name": CLUSTER_NAME},
    "spark_r_job": {"main_r_file_uri": GCS_JOB_FILE},
}

Presto 作業的配置示例

tests/system/google/cloud/dataproc/example_dataproc_presto.py

PRESTO_JOB = {
    "reference": {"project_id": PROJECT_ID},
    "placement": {"cluster_name": CLUSTER_NAME},
    "presto_job": {"query_list": {"queries": ["SHOW CATALOGS"]}},
}

Trino 作業的配置示例

tests/system/google/cloud/dataproc/example_dataproc_trino.py

TRINO_JOB = {
    "reference": {"project_id": PROJECT_ID},
    "placement": {"cluster_name": CLUSTER_NAME},
    "trino_job": {"query_list": {"queries": ["SHOW CATALOGS"]}},
}

Flink 作業的配置示例

tests/system/google/cloud/dataproc/example_dataproc_flink.py

FLINK_JOB = {
    "reference": {"project_id": PROJECT_ID},
    "placement": {"cluster_name": CLUSTER_NAME},
    "flink_job": {
        "main_class": "org.apache.flink.examples.java.wordcount.WordCount",
        "jar_file_uris": ["file:///usr/lib/flink/examples/batch/WordCount.jar"],
    },
}

使用工作流模板

Dataproc 支援建立可以在稍後觸發的工作流模板。

可以使用以下 Operator 建立工作流模板: DataprocCreateWorkflowTemplateOperator

tests/system/google/cloud/dataproc/example_dataproc_workflow.py

create_workflow_template = DataprocCreateWorkflowTemplateOperator(
    task_id="create_workflow_template",
    template=WORKFLOW_TEMPLATE,
    project_id=PROJECT_ID,
    region=REGION,
)

工作流建立後,使用者可以使用 DataprocInstantiateWorkflowTemplateOperator 觸發它

tests/system/google/cloud/dataproc/example_dataproc_workflow.py

trigger_workflow = DataprocInstantiateWorkflowTemplateOperator(
    task_id="trigger_workflow", region=REGION, project_id=PROJECT_ID, template_id=WORKFLOW_NAME
)

對於所有這些操作,您也可以在可延遲模式下使用 Operator

tests/system/google/cloud/dataproc/example_dataproc_workflow_deferrable.py

trigger_workflow_async = DataprocInstantiateWorkflowTemplateOperator(
    task_id="trigger_workflow_async",
    region=REGION,
    project_id=PROJECT_ID,
    template_id=WORKFLOW_NAME,
    deferrable=True,
)

內聯 Operator 是另一種選擇。它會建立工作流,執行它,並在之後刪除它: DataprocInstantiateInlineWorkflowTemplateOperator

tests/system/google/cloud/dataproc/example_dataproc_workflow.py

instantiate_inline_workflow_template = DataprocInstantiateInlineWorkflowTemplateOperator(
    task_id="instantiate_inline_workflow_template", template=WORKFLOW_TEMPLATE, region=REGION
)

對於所有這些操作,您也可以在可延遲模式下使用 Operator

tests/system/google/cloud/dataproc/example_dataproc_workflow_deferrable.py

instantiate_inline_workflow_template_async = DataprocInstantiateInlineWorkflowTemplateOperator(
    task_id="instantiate_inline_workflow_template_async",
    template=WORKFLOW_TEMPLATE,
    region=REGION,
    deferrable=True,
)

建立批處理

Dataproc 支援建立批處理工作負載。

可以使用以下 Operator 建立批處理: DataprocCreateBatchOperator

tests/system/google/cloud/dataproc/example_dataproc_batch.py

create_batch = DataprocCreateBatchOperator(
    task_id="create_batch",
    project_id=PROJECT_ID,
    region=REGION,
    batch=BATCH_CONFIG,
    batch_id=BATCH_ID,
    result_retry=Retry(maximum=100.0, initial=10.0, multiplier=1.0),
    num_retries_if_resource_is_not_ready=3,
)

create_batch_2 = DataprocCreateBatchOperator(
    task_id="create_batch_2",
    project_id=PROJECT_ID,
    region=REGION,
    batch=BATCH_CONFIG,
    batch_id=BATCH_ID_2,
    result_retry=Retry(maximum=100.0, initial=10.0, multiplier=1.0),
    num_retries_if_resource_is_not_ready=3,
)

create_batch_3 = DataprocCreateBatchOperator(
    task_id="create_batch_3",
    project_id=PROJECT_ID,
    region=REGION,
    batch=BATCH_CONFIG,
    batch_id=BATCH_ID_3,
    asynchronous=True,
    result_retry=Retry(maximum=100.0, initial=10.0, multiplier=1.0),
    num_retries_if_resource_is_not_ready=3,
)

要建立具有持久化歷史伺服器的批處理,首先您應該使用特定引數建立 Dataproc 叢集。有關如何建立叢集的文件,您可以在此處找到

tests/system/google/cloud/dataproc/example_dataproc_batch_persistent.py

create_cluster = DataprocCreateClusterOperator(
    task_id="create_cluster_for_phs",
    project_id=PROJECT_ID,
    cluster_config=CLUSTER_GENERATOR_CONFIG_FOR_PHS,
    region=REGION,
    cluster_name=CLUSTER_NAME,
    retry=Retry(maximum=100.0, initial=10.0, multiplier=1.0),
    num_retries_if_resource_is_not_ready=3,
)

叢集建立後,您應該將其新增到批處理配置中。

tests/system/google/cloud/dataproc/example_dataproc_batch_persistent.py

create_batch = DataprocCreateBatchOperator(
    task_id="create_batch_with_phs",
    project_id=PROJECT_ID,
    region=REGION,
    batch=BATCH_CONFIG_WITH_PHS,
    batch_id=BATCH_ID,
    result_retry=Retry(maximum=100.0, initial=10.0, multiplier=1.0),
    num_retries_if_resource_is_not_ready=3,
)

要檢查操作是否成功,您可以使用 DataprocBatchSensor

tests/system/google/cloud/dataproc/example_dataproc_batch.py

batch_async_sensor = DataprocBatchSensor(
    task_id="batch_async_sensor",
    region=REGION,
    project_id=PROJECT_ID,
    batch_id=BATCH_ID_3,
    poke_interval=10,
)

對於所有這些操作,您也可以在可延遲模式下使用 Operator

tests/system/google/cloud/dataproc/example_dataproc_batch_deferrable.py

create_batch = DataprocCreateBatchOperator(
    task_id="create_batch",
    project_id=PROJECT_ID,
    region=REGION,
    batch=BATCH_CONFIG,
    batch_id=BATCH_ID,
    deferrable=True,
    result_retry=Retry(maximum=100.0, initial=10.0, multiplier=1.0),
    num_retries_if_resource_is_not_ready=3,
)

獲取批處理

要獲取批處理,您可以使用: DataprocGetBatchOperator

tests/system/google/cloud/dataproc/example_dataproc_batch.py

get_batch = DataprocGetBatchOperator(
    task_id="get_batch", project_id=PROJECT_ID, region=REGION, batch_id=BATCH_ID
)

列出批處理

要獲取已存在的批處理列表,您可以使用: DataprocListBatchesOperator

tests/system/google/cloud/dataproc/example_dataproc_batch.py

list_batches = DataprocListBatchesOperator(
    task_id="list_batches",
    project_id=PROJECT_ID,
    region=REGION,
)

刪除批處理

要刪除批處理,您可以使用: DataprocDeleteBatchOperator

tests/system/google/cloud/dataproc/example_dataproc_batch.py

delete_batch = DataprocDeleteBatchOperator(
    task_id="delete_batch", project_id=PROJECT_ID, region=REGION, batch_id=BATCH_ID
)
delete_batch_2 = DataprocDeleteBatchOperator(
    task_id="delete_batch_2", project_id=PROJECT_ID, region=REGION, batch_id=BATCH_ID_2
)
delete_batch_3 = DataprocDeleteBatchOperator(
    task_id="delete_batch_3", project_id=PROJECT_ID, region=REGION, batch_id=BATCH_ID_3
)
delete_batch_4 = DataprocDeleteBatchOperator(
    task_id="delete_batch_4", project_id=PROJECT_ID, region=REGION, batch_id=BATCH_ID_4
)

取消批處理操作

要取消操作,您可以使用: DataprocCancelOperationOperator

tests/system/google/cloud/dataproc/example_dataproc_batch.py

cancel_operation = DataprocCancelOperationOperator(
    task_id="cancel_operation",
    project_id=PROJECT_ID,
    region=REGION,
    operation_name="{{ task_instance.xcom_pull('create_batch_4')['operation'] }}",
)

參考

如需更多資訊,請參閱

本條目是否有幫助?