Google Cloud Tasks¶
Cloud Tasks 是一項全託管服務,可用於管理大量分散式任務的執行、分派和交付。使用 Cloud Tasks,您可以在使用者或服務到服務請求之外非同步執行工作。
有關該服務的更多資訊,請訪問Cloud Tasks 產品文件
前置任務¶
要使用這些運算子,您需要執行一些操作
使用Cloud 控制檯選擇或建立 Cloud Platform 專案。
按照Google Cloud 文件中的說明,為您的專案啟用結算功能。
按照Cloud 控制檯文件中的說明,啟用 API。
透過 pip 安裝 API 庫。
pip install 'apache-airflow[google]'有關安裝的詳細資訊可用。
佇列操作¶
建立佇列¶
要建立新佇列,請使用CloudTasksQueueCreateOperator
tests/system/google/cloud/tasks/example_queue.py
create_queue = CloudTasksQueueCreateOperator(
location=LOCATION,
task_queue=Queue(stackdriver_logging_config=dict(sampling_ratio=0.5)),
queue_name=QUEUE_ID + "{{ task_instance.xcom_pull(task_ids='random_string') }}",
retry=Retry(maximum=10.0),
timeout=5,
task_id="create_queue",
)
刪除佇列¶
要刪除佇列,請使用CloudTasksQueueDeleteOperator
tests/system/google/cloud/tasks/example_queue.py
delete_queue = CloudTasksQueueDeleteOperator(
location=LOCATION,
queue_name=QUEUE_ID + "{{ task_instance.xcom_pull(task_ids='random_string') }}",
task_id="delete_queue",
)
恢復佇列¶
要恢復佇列,請使用CloudTasksQueueResumeOperator
tests/system/google/cloud/tasks/example_queue.py
resume_queue = CloudTasksQueueResumeOperator(
location=LOCATION,
queue_name=QUEUE_ID + "{{ task_instance.xcom_pull(task_ids='random_string') }}",
task_id="resume_queue",
)
暫停佇列¶
要暫停佇列,請使用CloudTasksQueuePauseOperator
tests/system/google/cloud/tasks/example_queue.py
pause_queue = CloudTasksQueuePauseOperator(
location=LOCATION,
queue_name=QUEUE_ID + "{{ task_instance.xcom_pull(task_ids='random_string') }}",
task_id="pause_queue",
)
清除佇列¶
要清除佇列,請使用CloudTasksQueuePurgeOperator
tests/system/google/cloud/tasks/example_queue.py
purge_queue = CloudTasksQueuePurgeOperator(
location=LOCATION,
queue_name=QUEUE_ID + "{{ task_instance.xcom_pull(task_ids='random_string') }}",
task_id="purge_queue",
)
獲取佇列¶
要獲取佇列,請使用CloudTasksQueueGetOperator
tests/system/google/cloud/tasks/example_queue.py
get_queue = CloudTasksQueueGetOperator(
location=LOCATION,
queue_name=QUEUE_ID + "{{ task_instance.xcom_pull(task_ids='random_string') }}",
task_id="get_queue",
)
get_queue_result = BashOperator(
task_id="get_queue_result",
bash_command=f"echo {get_queue.output}",
)
更新佇列¶
要更新佇列,請使用CloudTasksQueueUpdateOperator
tests/system/google/cloud/tasks/example_queue.py
update_queue = CloudTasksQueueUpdateOperator(
task_queue=Queue(stackdriver_logging_config=dict(sampling_ratio=1)),
location=LOCATION,
queue_name=QUEUE_ID + "{{ task_instance.xcom_pull(task_ids='random_string') }}",
update_mask=FieldMask(paths=["stackdriver_logging_config.sampling_ratio"]),
task_id="update_queue",
)
列出佇列¶
要列出所有佇列,請使用CloudTasksQueuesListOperator
tests/system/google/cloud/tasks/example_queue.py
list_queue = CloudTasksQueuesListOperator(location=LOCATION, task_id="list_queue")
任務操作¶
建立任務¶
要在特定佇列中建立新任務,請使用CloudTasksTaskCreateOperator
tests/system/google/cloud/tasks/example_tasks.py
create_task = CloudTasksTaskCreateOperator(
location=LOCATION,
queue_name=QUEUE_ID + "{{ task_instance.xcom_pull(task_ids='random_string') }}",
task=TASK,
task_name=TASK_NAME + "{{ task_instance.xcom_pull(task_ids='random_string') }}",
retry=Retry(maximum=10.0),
timeout=5,
task_id="create_task_to_run",
)
獲取任務¶
要獲取特定佇列中的任務,請使用CloudTasksTaskGetOperator
tests/system/google/cloud/tasks/example_tasks.py
tasks_get = CloudTasksTaskGetOperator(
location=LOCATION,
queue_name=QUEUE_ID + "{{ task_instance.xcom_pull(task_ids='random_string') }}",
task_name=TASK_NAME + "{{ task_instance.xcom_pull(task_ids='random_string') }}",
task_id="tasks_get",
)
執行任務¶
要在特定佇列中執行任務,請使用CloudTasksTaskRunOperator
tests/system/google/cloud/tasks/example_tasks.py
run_task = CloudTasksTaskRunOperator(
location=LOCATION,
queue_name=QUEUE_ID + "{{ task_instance.xcom_pull(task_ids='random_string') }}",
task_name=TASK_NAME + "{{ task_instance.xcom_pull(task_ids='random_string') }}",
retry=Retry(maximum=10.0),
task_id="run_task",
)
列出任務¶
要列出特定佇列中的所有任務,請使用CloudTasksTasksListOperator
tests/system/google/cloud/tasks/example_tasks.py
list_tasks = CloudTasksTasksListOperator(
location=LOCATION,
queue_name=QUEUE_ID + "{{ task_instance.xcom_pull(task_ids='random_string') }}",
task_id="list_tasks",
)
刪除任務¶
要從特定佇列中刪除任務,請使用CloudTasksTaskDeleteOperator
tests/system/google/cloud/tasks/example_tasks.py
create_task = CloudTasksTaskCreateOperator(
location=LOCATION,
queue_name=QUEUE_ID + "{{ task_instance.xcom_pull(task_ids='random_string') }}",
task=TASK,
task_name=TASK_NAME + "{{ task_instance.xcom_pull(task_ids='random_string') }}",
retry=Retry(maximum=10.0),
timeout=5,
task_id="create_task_to_run",
)
參考¶
欲瞭解更多資訊,請參閱