Google Cloud Dataproc Operator¶
Dataproc 是一項託管式 Apache Spark 和 Apache Hadoop 服務,可讓您利用開源資料工具進行批處理、查詢、流處理和機器學習。Dataproc 自動化功能可幫助您快速建立叢集、輕鬆管理叢集,並在不再需要叢集時將其關閉以節省成本。
有關此服務的更多資訊,請訪問 Dataproc 生產文件
前置任務¶
要使用這些 Operator,您必須做幾件事
使用 Cloud Console 選擇或建立 Cloud Platform 專案。
為您的專案啟用結算功能,如 Google Cloud 文件中所述。
啟用 API,如 Cloud Console 文件中所述。
透過 pip 安裝 API 庫。
pip install 'apache-airflow[google]'安裝提供了詳細資訊。
建立叢集¶
建立 Dataproc 叢集時,可以選擇 Compute Engine 作為部署平臺。在此配置中,Dataproc 會自動預置執行叢集所需的 Compute Engine VM 例項。VM 例項用於主節點、主工作節點和次要工作節點(如果指定)。這些 VM 例項由 Compute Engine 建立和管理,而 Dataproc 負責配置大資料處理任務所需的軟體和編排。透過提供節點的配置,您可以描述主節點和次要節點的配置,以及 Compute Engine 例項叢集的狀態。配置次要工作節點時,您可以指定工作節點的數量及其型別。透過啟用“搶佔式”選項以對這些節點使用搶佔式 VM(等同於 Spot 例項),您可以利用這些例項為您的 Dataproc 工作負載帶來的成本節約。主節點通常託管叢集主控和各種控制服務,不具備搶佔式選項,因為主節點保持穩定性和可用性至關重要。叢集建立後,配置設定(包括次要工作節點的搶佔性)無法直接修改。
有關建立叢集時可傳遞的可用欄位的更多資訊,請訪問 Dataproc create cluster API。
叢集配置示例如下
tests/system/google/cloud/dataproc/example_dataproc_hive.py
CLUSTER_CONFIG = {
"master_config": {
"num_instances": 1,
"machine_type_uri": "n1-standard-4",
"disk_config": {"boot_disk_type": "pd-standard", "boot_disk_size_gb": 32},
},
"worker_config": {
"num_instances": 2,
"machine_type_uri": "n1-standard-4",
"disk_config": {"boot_disk_type": "pd-standard", "boot_disk_size_gb": 32},
},
"secondary_worker_config": {
"num_instances": 1,
"machine_type_uri": "n1-standard-4",
"disk_config": {
"boot_disk_type": "pd-standard",
"boot_disk_size_gb": 32,
},
"is_preemptible": True,
"preemptibility": "PREEMPTIBLE",
},
}
使用此配置,我們可以建立叢集: DataprocCreateClusterOperator
tests/system/google/cloud/dataproc/example_dataproc_hive.py
create_cluster = DataprocCreateClusterOperator(
task_id="create_cluster",
project_id=PROJECT_ID,
cluster_config=CLUSTER_CONFIG,
region=REGION,
cluster_name=CLUSTER_NAME,
retry=Retry(maximum=100.0, initial=10.0, multiplier=1.0),
num_retries_if_resource_is_not_ready=3,
)
GKE 上的 Dataproc 將 Dataproc 虛擬叢集部署到 GKE 叢集上。與 Compute Engine 叢集上的 Dataproc 不同,GKE 上的 Dataproc 虛擬叢集不包含單獨的主 VM 和工作器 VM。相反,當您建立 GKE 上的 Dataproc 虛擬叢集時,GKE 上的 Dataproc 會在 GKE 叢集內建立節點池。GKE 上的 Dataproc 作業以 Pod 的形式在這些節點池上執行。節點池和 Pod 在節點池上的排程由 GKE 管理。
建立 GKE Dataproc 叢集時,您可以指定底層計算資源使用搶佔式 VM。GKE 支援使用搶佔式 VM 作為節省成本的措施。透過啟用搶佔式 VM,GKE 將使用搶佔式 VM 預置叢集節點。或者,您可以將節點建立為 Spot VM 例項,這是舊版搶佔式 VM 的最新更新。這有助於在 GKE 上執行 Dataproc 工作負載同時最佳化成本。
要在 Google Kubernetes Engine 中建立 Dataproc 叢集,您可以傳遞叢集配置
tests/system/google/cloud/dataproc/example_dataproc_gke.py
VIRTUAL_CLUSTER_CONFIG = {
"kubernetes_cluster_config": {
"gke_cluster_config": {
"gke_cluster_target": f"projects/{PROJECT_ID}/locations/{REGION}/clusters/{GKE_CLUSTER_NAME}",
"node_pool_target": [
{
"node_pool": f"projects/{PROJECT_ID}/locations/{REGION}/clusters/{GKE_CLUSTER_NAME}/nodePools/dp",
"roles": ["DEFAULT"],
"node_pool_config": {
"config": {
"preemptible": False,
"machine_type": "e2-standard-4",
}
},
}
],
},
"kubernetes_software_config": {"component_version": {"SPARK": "3"}},
},
"staging_bucket": "test-staging-bucket",
}
使用此配置,我們可以建立叢集: DataprocCreateClusterOperator
tests/system/google/cloud/dataproc/example_dataproc_gke.py
create_cluster_in_gke = DataprocCreateClusterOperator(
task_id="create_cluster_in_gke",
project_id=PROJECT_ID,
region=REGION,
cluster_name=CLUSTER_NAME,
virtual_cluster_config=VIRTUAL_CLUSTER_CONFIG,
retry=Retry(maximum=100.0, initial=10.0, multiplier=1.0),
num_retries_if_resource_is_not_ready=3,
)
您還可以建立帶有可選元件 Presto 的 Dataproc 叢集。為此,請使用以下配置。請注意,預設映象可能不支援所選的可選元件。如果出現這種情況,請指定您可以在文件中找到的正確 image_version。
tests/system/google/cloud/dataproc/example_dataproc_presto.py
CLUSTER_CONFIG = {
"master_config": {
"num_instances": 1,
"machine_type_uri": "n1-standard-4",
"disk_config": {"boot_disk_type": "pd-standard", "boot_disk_size_gb": 32},
},
"worker_config": {
"num_instances": 2,
"machine_type_uri": "n1-standard-4",
"disk_config": {"boot_disk_type": "pd-standard", "boot_disk_size_gb": 32},
},
"software_config": {
"optional_components": [
"PRESTO",
],
"image_version": "2.0",
},
}
您還可以建立帶有可選元件 Trino 的 Dataproc 叢集。為此,請使用以下配置。請注意,預設映象可能不支援所選的可選元件。如果出現這種情況,請指定您可以在文件中找到的正確 image_version。
tests/system/google/cloud/dataproc/example_dataproc_trino.py
CLUSTER_CONFIG = {
"master_config": {
"num_instances": 1,
"machine_type_uri": "n1-standard-4",
"disk_config": {"boot_disk_type": "pd-standard", "boot_disk_size_gb": 32},
},
"worker_config": {
"num_instances": 2,
"machine_type_uri": "n1-standard-4",
"disk_config": {"boot_disk_type": "pd-standard", "boot_disk_size_gb": 32},
},
"software_config": {
"optional_components": [
"TRINO",
],
"image_version": "2.1",
},
}
您可以對此操作使用可延遲模式,以便非同步執行 Operator
tests/system/google/cloud/dataproc/example_dataproc_cluster_deferrable.py
create_cluster = DataprocCreateClusterOperator(
task_id="create_cluster",
project_id=PROJECT_ID,
cluster_config=CLUSTER_CONFIG,
region=REGION,
cluster_name=CLUSTER_NAME,
deferrable=True,
retry=Retry(maximum=100.0, initial=10.0, multiplier=1.0),
num_retries_if_resource_is_not_ready=3,
)
生成叢集配置¶
您還可以使用函式式 API 生成 CLUSTER_CONFIG,使用 ClusterGenerator 的 make() 可以輕鬆完成此操作。您可以按以下方式生成並使用配置
tests/system/google/cloud/dataproc/example_dataproc_cluster_generator.py
CLUSTER_GENERATOR_CONFIG = ClusterGenerator(
project_id=PROJECT_ID,
zone=ZONE,
master_machine_type="n1-standard-4",
master_disk_size=32,
worker_machine_type="n1-standard-4",
worker_disk_size=32,
num_workers=2,
storage_bucket=BUCKET_NAME,
init_actions_uris=[GCS_INIT_FILE],
metadata={"PIP_PACKAGES": "pyyaml requests pandas openpyxl"},
num_preemptible_workers=1,
preemptibility="PREEMPTIBLE",
internal_ip_only=False,
).make()
診斷叢集¶
Dataproc 支援收集叢集診斷資訊,如系統、Spark、Hadoop 和 Dataproc 日誌、叢集配置檔案,這些資訊可用於排查 Dataproc 叢集或作業的故障。請務必注意,這些資訊只能在刪除叢集之前收集。有關診斷叢集時可傳遞的可用欄位的更多資訊,請訪問 Dataproc diagnose cluster API。
要診斷 Dataproc 叢集,請使用: DataprocDiagnoseClusterOperator。
tests/system/google/cloud/dataproc/example_dataproc_cluster_diagnose.py
diagnose_cluster = DataprocDiagnoseClusterOperator(
task_id="diagnose_cluster",
region=REGION,
project_id=PROJECT_ID,
cluster_name=CLUSTER_NAME,
)
您也可以使用可延遲模式以非同步執行 Operator
tests/system/google/cloud/dataproc/example_dataproc_cluster_diagnose.py
diagnose_cluster_deferrable = DataprocDiagnoseClusterOperator(
task_id="diagnose_cluster_deferrable",
region=REGION,
project_id=PROJECT_ID,
cluster_name=CLUSTER_NAME,
deferrable=True,
)
更新叢集¶
您可以透過提供叢集配置和 updateMask 來向上或向下擴充套件叢集。在 updateMask 引數中,您指定了相對於 Cluster 的欄位路徑以進行更新。有關 updateMask 和其他引數的更多資訊,請參閱 Dataproc update cluster API。
新叢集配置和 updateMask 示例如下
tests/system/google/cloud/dataproc/example_dataproc_cluster_update.py
CLUSTER_UPDATE = {
"config": {"worker_config": {"num_instances": 3}, "secondary_worker_config": {"num_instances": 3}}
}
UPDATE_MASK = {
"paths": ["config.worker_config.num_instances", "config.secondary_worker_config.num_instances"]
}
要更新叢集,您可以使用: DataprocUpdateClusterOperator
tests/system/google/cloud/dataproc/example_dataproc_cluster_update.py
scale_cluster = DataprocUpdateClusterOperator(
task_id="scale_cluster",
cluster_name=CLUSTER_NAME,
cluster=CLUSTER_UPDATE,
update_mask=UPDATE_MASK,
graceful_decommission_timeout=TIMEOUT,
project_id=PROJECT_ID,
region=REGION,
retry=Retry(maximum=100.0, initial=10.0, multiplier=1.0),
)
您可以對此操作使用可延遲模式,以便非同步執行 Operator
tests/system/google/cloud/dataproc/example_dataproc_cluster_deferrable.py
update_cluster = DataprocUpdateClusterOperator(
task_id="update_cluster",
cluster_name=CLUSTER_NAME,
cluster=CLUSTER_UPDATE,
update_mask=UPDATE_MASK,
graceful_decommission_timeout=TIMEOUT,
project_id=PROJECT_ID,
region=REGION,
deferrable=True,
retry=Retry(maximum=100.0, initial=10.0, multiplier=1.0),
)
啟動叢集¶
要啟動叢集,您可以使用 DataprocStartClusterOperator
tests/system/google/cloud/dataproc/example_dataproc_cluster_start_stop.py
start_cluster = DataprocStartClusterOperator(
task_id="start_cluster",
project_id=PROJECT_ID,
region=REGION,
cluster_name=CLUSTER_NAME,
)
停止叢集¶
要停止叢集,您可以使用 DataprocStopClusterOperator
tests/system/google/cloud/dataproc/example_dataproc_cluster_start_stop.py
stop_cluster = DataprocStopClusterOperator(
task_id="stop_cluster",
project_id=PROJECT_ID,
region=REGION,
cluster_name=CLUSTER_NAME,
)
刪除叢集¶
要刪除叢集,您可以使用: DataprocDeleteClusterOperator。
tests/system/google/cloud/dataproc/example_dataproc_hive.py
delete_cluster = DataprocDeleteClusterOperator(
task_id="delete_cluster",
project_id=PROJECT_ID,
cluster_name=CLUSTER_NAME,
region=REGION,
)
您可以對此操作使用可延遲模式,以便非同步執行 Operator
tests/system/google/cloud/dataproc/example_dataproc_cluster_deferrable.py
delete_cluster = DataprocDeleteClusterOperator(
task_id="delete_cluster",
project_id=PROJECT_ID,
cluster_name=CLUSTER_NAME,
region=REGION,
trigger_rule=TriggerRule.ALL_DONE,
deferrable=True,
)
向叢集提交作業¶
Dataproc 支援提交不同大資料元件的作業。當前列表包括 Spark、PySpark、Hadoop、Trino、Pig、Flink 和 Hive。有關版本和映象的更多資訊,請參閱 Cloud Dataproc 映象版本列表
要向叢集提交作業,您需要提供作業原始檔。作業原始檔可以位於 GCS、叢集或本地檔案系統上。您可以指定 file:/// 路徑來引用叢集主節點上的本地檔案。
可以使用以下 Operator 提交作業配置: DataprocSubmitJobOperator。
tests/system/google/cloud/dataproc/example_dataproc_pyspark.py
pyspark_task = DataprocSubmitJobOperator(
task_id="pyspark_task", job=PYSPARK_JOB, region=REGION, project_id=PROJECT_ID
)
要提交的作業配置示例¶
我們在下面為每個框架提供了示例。作業中可提供的引數比示例所示更多。有關引數的完整列表,請參閱 DataProc 作業引數
PySpark 作業的配置示例
tests/system/google/cloud/dataproc/example_dataproc_pyspark.py
PYSPARK_JOB = {
"reference": {"project_id": PROJECT_ID},
"placement": {"cluster_name": CLUSTER_NAME},
"pyspark_job": {"main_python_file_uri": GCS_JOB_FILE},
}
SparkSQl 作業的配置示例
tests/system/google/cloud/dataproc/example_dataproc_spark_sql.py
SPARK_SQL_JOB = {
"reference": {"project_id": PROJECT_ID},
"placement": {"cluster_name": CLUSTER_NAME},
"spark_sql_job": {"query_list": {"queries": ["SHOW DATABASES;"]}},
}
Spark 作業的配置示例
tests/system/google/cloud/dataproc/example_dataproc_spark.py
SPARK_JOB = {
"reference": {"project_id": PROJECT_ID},
"placement": {"cluster_name": CLUSTER_NAME},
"spark_job": {
"jar_file_uris": ["file:///usr/lib/spark/examples/jars/spark-examples.jar"],
"main_class": "org.apache.spark.examples.SparkPi",
},
}
在可延遲模式下執行的 Spark 作業配置示例
tests/system/google/cloud/dataproc/example_dataproc_spark_deferrable.py
SPARK_JOB = {
"reference": {"project_id": PROJECT_ID},
"placement": {"cluster_name": CLUSTER_NAME},
"spark_job": {
"jar_file_uris": ["file:///usr/lib/spark/examples/jars/spark-examples.jar"],
"main_class": "org.apache.spark.examples.SparkPi",
},
}
Hive 作業的配置示例
tests/system/google/cloud/dataproc/example_dataproc_hive.py
HIVE_JOB = {
"reference": {"project_id": PROJECT_ID},
"placement": {"cluster_name": CLUSTER_NAME},
"hive_job": {"query_list": {"queries": ["SHOW DATABASES;"]}},
}
Hadoop 作業的配置示例
tests/system/google/cloud/dataproc/example_dataproc_hadoop.py
HADOOP_JOB = {
"reference": {"project_id": PROJECT_ID},
"placement": {"cluster_name": CLUSTER_NAME},
"hadoop_job": {
"main_jar_file_uri": "file:///usr/lib/hadoop-mapreduce/hadoop-mapreduce-examples.jar",
"args": ["wordcount", "gs://pub/shakespeare/rose.txt", OUTPUT_PATH],
},
}
Pig 作業的配置示例
tests/system/google/cloud/dataproc/example_dataproc_pig.py
PIG_JOB = {
"reference": {"project_id": PROJECT_ID},
"placement": {"cluster_name": CLUSTER_NAME},
"pig_job": {"query_list": {"queries": ["define sin HiveUDF('sin');"]}},
}
SparkR 作業的配置示例
tests/system/google/cloud/dataproc/example_dataproc_sparkr.py
SPARKR_JOB = {
"reference": {"project_id": PROJECT_ID},
"placement": {"cluster_name": CLUSTER_NAME},
"spark_r_job": {"main_r_file_uri": GCS_JOB_FILE},
}
Presto 作業的配置示例
tests/system/google/cloud/dataproc/example_dataproc_presto.py
PRESTO_JOB = {
"reference": {"project_id": PROJECT_ID},
"placement": {"cluster_name": CLUSTER_NAME},
"presto_job": {"query_list": {"queries": ["SHOW CATALOGS"]}},
}
Trino 作業的配置示例
tests/system/google/cloud/dataproc/example_dataproc_trino.py
TRINO_JOB = {
"reference": {"project_id": PROJECT_ID},
"placement": {"cluster_name": CLUSTER_NAME},
"trino_job": {"query_list": {"queries": ["SHOW CATALOGS"]}},
}
Flink 作業的配置示例
tests/system/google/cloud/dataproc/example_dataproc_flink.py
FLINK_JOB = {
"reference": {"project_id": PROJECT_ID},
"placement": {"cluster_name": CLUSTER_NAME},
"flink_job": {
"main_class": "org.apache.flink.examples.java.wordcount.WordCount",
"jar_file_uris": ["file:///usr/lib/flink/examples/batch/WordCount.jar"],
},
}
使用工作流模板¶
Dataproc 支援建立可以在稍後觸發的工作流模板。
可以使用以下 Operator 建立工作流模板: DataprocCreateWorkflowTemplateOperator。
tests/system/google/cloud/dataproc/example_dataproc_workflow.py
create_workflow_template = DataprocCreateWorkflowTemplateOperator(
task_id="create_workflow_template",
template=WORKFLOW_TEMPLATE,
project_id=PROJECT_ID,
region=REGION,
)
工作流建立後,使用者可以使用 DataprocInstantiateWorkflowTemplateOperator 觸發它
tests/system/google/cloud/dataproc/example_dataproc_workflow.py
trigger_workflow = DataprocInstantiateWorkflowTemplateOperator(
task_id="trigger_workflow", region=REGION, project_id=PROJECT_ID, template_id=WORKFLOW_NAME
)
對於所有這些操作,您也可以在可延遲模式下使用 Operator
tests/system/google/cloud/dataproc/example_dataproc_workflow_deferrable.py
trigger_workflow_async = DataprocInstantiateWorkflowTemplateOperator(
task_id="trigger_workflow_async",
region=REGION,
project_id=PROJECT_ID,
template_id=WORKFLOW_NAME,
deferrable=True,
)
內聯 Operator 是另一種選擇。它會建立工作流,執行它,並在之後刪除它: DataprocInstantiateInlineWorkflowTemplateOperator
tests/system/google/cloud/dataproc/example_dataproc_workflow.py
instantiate_inline_workflow_template = DataprocInstantiateInlineWorkflowTemplateOperator(
task_id="instantiate_inline_workflow_template", template=WORKFLOW_TEMPLATE, region=REGION
)
對於所有這些操作,您也可以在可延遲模式下使用 Operator
tests/system/google/cloud/dataproc/example_dataproc_workflow_deferrable.py
instantiate_inline_workflow_template_async = DataprocInstantiateInlineWorkflowTemplateOperator(
task_id="instantiate_inline_workflow_template_async",
template=WORKFLOW_TEMPLATE,
region=REGION,
deferrable=True,
)
建立批處理¶
Dataproc 支援建立批處理工作負載。
可以使用以下 Operator 建立批處理: DataprocCreateBatchOperator。
tests/system/google/cloud/dataproc/example_dataproc_batch.py
create_batch = DataprocCreateBatchOperator(
task_id="create_batch",
project_id=PROJECT_ID,
region=REGION,
batch=BATCH_CONFIG,
batch_id=BATCH_ID,
result_retry=Retry(maximum=100.0, initial=10.0, multiplier=1.0),
num_retries_if_resource_is_not_ready=3,
)
create_batch_2 = DataprocCreateBatchOperator(
task_id="create_batch_2",
project_id=PROJECT_ID,
region=REGION,
batch=BATCH_CONFIG,
batch_id=BATCH_ID_2,
result_retry=Retry(maximum=100.0, initial=10.0, multiplier=1.0),
num_retries_if_resource_is_not_ready=3,
)
create_batch_3 = DataprocCreateBatchOperator(
task_id="create_batch_3",
project_id=PROJECT_ID,
region=REGION,
batch=BATCH_CONFIG,
batch_id=BATCH_ID_3,
asynchronous=True,
result_retry=Retry(maximum=100.0, initial=10.0, multiplier=1.0),
num_retries_if_resource_is_not_ready=3,
)
要建立具有持久化歷史伺服器的批處理,首先您應該使用特定引數建立 Dataproc 叢集。有關如何建立叢集的文件,您可以在此處找到
tests/system/google/cloud/dataproc/example_dataproc_batch_persistent.py
create_cluster = DataprocCreateClusterOperator(
task_id="create_cluster_for_phs",
project_id=PROJECT_ID,
cluster_config=CLUSTER_GENERATOR_CONFIG_FOR_PHS,
region=REGION,
cluster_name=CLUSTER_NAME,
retry=Retry(maximum=100.0, initial=10.0, multiplier=1.0),
num_retries_if_resource_is_not_ready=3,
)
叢集建立後,您應該將其新增到批處理配置中。
tests/system/google/cloud/dataproc/example_dataproc_batch_persistent.py
create_batch = DataprocCreateBatchOperator(
task_id="create_batch_with_phs",
project_id=PROJECT_ID,
region=REGION,
batch=BATCH_CONFIG_WITH_PHS,
batch_id=BATCH_ID,
result_retry=Retry(maximum=100.0, initial=10.0, multiplier=1.0),
num_retries_if_resource_is_not_ready=3,
)
要檢查操作是否成功,您可以使用 DataprocBatchSensor。
tests/system/google/cloud/dataproc/example_dataproc_batch.py
batch_async_sensor = DataprocBatchSensor(
task_id="batch_async_sensor",
region=REGION,
project_id=PROJECT_ID,
batch_id=BATCH_ID_3,
poke_interval=10,
)
對於所有這些操作,您也可以在可延遲模式下使用 Operator
tests/system/google/cloud/dataproc/example_dataproc_batch_deferrable.py
create_batch = DataprocCreateBatchOperator(
task_id="create_batch",
project_id=PROJECT_ID,
region=REGION,
batch=BATCH_CONFIG,
batch_id=BATCH_ID,
deferrable=True,
result_retry=Retry(maximum=100.0, initial=10.0, multiplier=1.0),
num_retries_if_resource_is_not_ready=3,
)
獲取批處理¶
要獲取批處理,您可以使用: DataprocGetBatchOperator。
tests/system/google/cloud/dataproc/example_dataproc_batch.py
get_batch = DataprocGetBatchOperator(
task_id="get_batch", project_id=PROJECT_ID, region=REGION, batch_id=BATCH_ID
)
列出批處理¶
要獲取已存在的批處理列表,您可以使用: DataprocListBatchesOperator。
tests/system/google/cloud/dataproc/example_dataproc_batch.py
list_batches = DataprocListBatchesOperator(
task_id="list_batches",
project_id=PROJECT_ID,
region=REGION,
)
刪除批處理¶
要刪除批處理,您可以使用: DataprocDeleteBatchOperator。
tests/system/google/cloud/dataproc/example_dataproc_batch.py
delete_batch = DataprocDeleteBatchOperator(
task_id="delete_batch", project_id=PROJECT_ID, region=REGION, batch_id=BATCH_ID
)
delete_batch_2 = DataprocDeleteBatchOperator(
task_id="delete_batch_2", project_id=PROJECT_ID, region=REGION, batch_id=BATCH_ID_2
)
delete_batch_3 = DataprocDeleteBatchOperator(
task_id="delete_batch_3", project_id=PROJECT_ID, region=REGION, batch_id=BATCH_ID_3
)
delete_batch_4 = DataprocDeleteBatchOperator(
task_id="delete_batch_4", project_id=PROJECT_ID, region=REGION, batch_id=BATCH_ID_4
)
取消批處理操作¶
要取消操作,您可以使用: DataprocCancelOperationOperator。
tests/system/google/cloud/dataproc/example_dataproc_batch.py
cancel_operation = DataprocCancelOperationOperator(
task_id="cancel_operation",
project_id=PROJECT_ID,
region=REGION,
operation_name="{{ task_instance.xcom_pull('create_batch_4')['operation'] }}",
)
參考¶
如需更多資訊,請參閱