Google Cloud Batch 運算子¶
Cloud Batch 是一項完全託管的批處理服務,用於在 Google 的基礎設施上排程、排隊和執行批處理作業。
有關此服務的更多資訊,請訪問 Google Cloud Batch 文件。
前置任務¶
要使用這些運算子,您必須完成以下幾項任務
使用 Cloud Console 選擇或建立 Cloud Platform 專案。
為您的專案啟用結算功能,如 Google Cloud 文件中所述。
啟用 API,如 Cloud Console 文件中所述。
透過 pip 安裝 API 庫。
pip install 'apache-airflow[google]'有關詳細資訊,請參閱安裝。
提交作業¶
在 Cloud Batch 中提交作業之前,您需要定義它。有關 Job 物件欄位的更多資訊,請訪問 Google Cloud Batch 作業說明。
一個簡單的作業配置如下所示
tests/system/google/cloud/cloud_batch/example_cloud_batch.py
def _create_job():
runnable = batch_v1.Runnable()
runnable.container = batch_v1.Runnable.Container()
runnable.container.image_uri = "gcr.io/google-containers/busybox"
runnable.container.entrypoint = "/bin/sh"
runnable.container.commands = [
"-c",
"echo Hello world! This is task ${BATCH_TASK_INDEX}.\
This job has a total of ${BATCH_TASK_COUNT} tasks.",
]
task = batch_v1.TaskSpec()
task.runnables = [runnable]
resources = batch_v1.ComputeResource()
resources.cpu_milli = 2000
resources.memory_mib = 16
task.compute_resource = resources
task.max_retry_count = 2
group = batch_v1.TaskGroup()
group.task_count = 2
group.task_spec = task
policy = batch_v1.AllocationPolicy.InstancePolicy()
policy.machine_type = "e2-standard-4"
instances = batch_v1.AllocationPolicy.InstancePolicyOrTemplate()
instances.policy = policy
allocation_policy = batch_v1.AllocationPolicy()
allocation_policy.instances = [instances]
job = batch_v1.Job()
job.task_groups = [group]
job.allocation_policy = allocation_policy
job.labels = {"env": "testing", "type": "container"}
job.logs_policy = batch_v1.LogsPolicy()
job.logs_policy.destination = batch_v1.LogsPolicy.Destination.CLOUD_LOGGING
return job
使用此配置,我們可以提交作業:CloudBatchSubmitJobOperator
tests/system/google/cloud/cloud_batch/example_cloud_batch.py
submit1 = CloudBatchSubmitJobOperator(
task_id="submit-job1",
project_id=PROJECT_ID,
region=REGION,
job_name=job1_name,
job=_create_job(),
dag=dag,
deferrable=False,
)
或者您可以在可延遲模式下定義相同的運算子:CloudBatchSubmitJobOperator
tests/system/google/cloud/cloud_batch/example_cloud_batch.py
submit2 = CloudBatchSubmitJobOperator(
task_id="submit-job2",
project_id=PROJECT_ID,
region=REGION,
job_name=job2_name,
job=batch_v1.Job.to_dict(_create_job()),
dag=dag,
deferrable=True,
)
請注意,此運算子會等待作業完成執行,並且作業的字典表示形式會推送到 XCom。
列出作業的任務¶
要列出某個作業的任務,您可以使用
tests/system/google/cloud/cloud_batch/example_cloud_batch.py
list_tasks = CloudBatchListTasksOperator(
task_id=list_tasks_task_name, project_id=PROJECT_ID, region=REGION, job_name=job1_name, dag=dag
)
此運算子接受兩個可選引數:“limit”用於限制返回的任務數量,“filter”用於僅列出與過濾器匹配的任務。
列出作業¶
要列出作業,您可以使用
tests/system/google/cloud/cloud_batch/example_cloud_batch.py
list_jobs = CloudBatchListJobsOperator(
task_id=list_jobs_task_name,
project_id=PROJECT_ID,
region=REGION,
limit=10,
filter=f"name:projects/{PROJECT_ID}/locations/{REGION}/jobs/{job_name_prefix}*",
dag=dag,
)
此運算子接受兩個可選引數:“limit”用於限制返回的任務數量,“filter”用於僅列出與過濾器匹配的任務。
刪除作業¶
要刪除作業,您可以使用
tests/system/google/cloud/cloud_batch/example_cloud_batch.py
delete_job1 = CloudBatchDeleteJobOperator(
task_id="delete-job1",
project_id=PROJECT_ID,
region=REGION,
job_name=job1_name,
dag=dag,
trigger_rule=TriggerRule.ALL_DONE,
)
請注意,此運算子會等待作業被刪除,並且已刪除作業的字典表示形式會推送到 XCom。