Google Cloud Tasks

Cloud Tasks 是一項全託管服務,可用於管理大量分散式任務的執行、分派和交付。使用 Cloud Tasks,您可以在使用者或服務到服務請求之外非同步執行工作。

有關該服務的更多資訊,請訪問Cloud Tasks 產品文件

前置任務

要使用這些運算子,您需要執行一些操作

佇列操作

建立佇列

要建立新佇列,請使用CloudTasksQueueCreateOperator

tests/system/google/cloud/tasks/example_queue.py

create_queue = CloudTasksQueueCreateOperator(
    location=LOCATION,
    task_queue=Queue(stackdriver_logging_config=dict(sampling_ratio=0.5)),
    queue_name=QUEUE_ID + "{{ task_instance.xcom_pull(task_ids='random_string') }}",
    retry=Retry(maximum=10.0),
    timeout=5,
    task_id="create_queue",
)

刪除佇列

要刪除佇列,請使用CloudTasksQueueDeleteOperator

tests/system/google/cloud/tasks/example_queue.py

delete_queue = CloudTasksQueueDeleteOperator(
    location=LOCATION,
    queue_name=QUEUE_ID + "{{ task_instance.xcom_pull(task_ids='random_string') }}",
    task_id="delete_queue",
)

恢復佇列

要恢復佇列,請使用CloudTasksQueueResumeOperator

tests/system/google/cloud/tasks/example_queue.py

resume_queue = CloudTasksQueueResumeOperator(
    location=LOCATION,
    queue_name=QUEUE_ID + "{{ task_instance.xcom_pull(task_ids='random_string') }}",
    task_id="resume_queue",
)

暫停佇列

要暫停佇列,請使用CloudTasksQueuePauseOperator

tests/system/google/cloud/tasks/example_queue.py

pause_queue = CloudTasksQueuePauseOperator(
    location=LOCATION,
    queue_name=QUEUE_ID + "{{ task_instance.xcom_pull(task_ids='random_string') }}",
    task_id="pause_queue",
)

清除佇列

要清除佇列,請使用CloudTasksQueuePurgeOperator

tests/system/google/cloud/tasks/example_queue.py

purge_queue = CloudTasksQueuePurgeOperator(
    location=LOCATION,
    queue_name=QUEUE_ID + "{{ task_instance.xcom_pull(task_ids='random_string') }}",
    task_id="purge_queue",
)

獲取佇列

要獲取佇列,請使用CloudTasksQueueGetOperator

tests/system/google/cloud/tasks/example_queue.py

get_queue = CloudTasksQueueGetOperator(
    location=LOCATION,
    queue_name=QUEUE_ID + "{{ task_instance.xcom_pull(task_ids='random_string') }}",
    task_id="get_queue",
)

get_queue_result = BashOperator(
    task_id="get_queue_result",
    bash_command=f"echo {get_queue.output}",
)

更新佇列

要更新佇列,請使用CloudTasksQueueUpdateOperator

tests/system/google/cloud/tasks/example_queue.py

update_queue = CloudTasksQueueUpdateOperator(
    task_queue=Queue(stackdriver_logging_config=dict(sampling_ratio=1)),
    location=LOCATION,
    queue_name=QUEUE_ID + "{{ task_instance.xcom_pull(task_ids='random_string') }}",
    update_mask=FieldMask(paths=["stackdriver_logging_config.sampling_ratio"]),
    task_id="update_queue",
)

列出佇列

要列出所有佇列,請使用CloudTasksQueuesListOperator

tests/system/google/cloud/tasks/example_queue.py

list_queue = CloudTasksQueuesListOperator(location=LOCATION, task_id="list_queue")

任務操作

建立任務

要在特定佇列中建立新任務,請使用CloudTasksTaskCreateOperator

tests/system/google/cloud/tasks/example_tasks.py

create_task = CloudTasksTaskCreateOperator(
    location=LOCATION,
    queue_name=QUEUE_ID + "{{ task_instance.xcom_pull(task_ids='random_string') }}",
    task=TASK,
    task_name=TASK_NAME + "{{ task_instance.xcom_pull(task_ids='random_string') }}",
    retry=Retry(maximum=10.0),
    timeout=5,
    task_id="create_task_to_run",
)

獲取任務

要獲取特定佇列中的任務,請使用CloudTasksTaskGetOperator

tests/system/google/cloud/tasks/example_tasks.py

tasks_get = CloudTasksTaskGetOperator(
    location=LOCATION,
    queue_name=QUEUE_ID + "{{ task_instance.xcom_pull(task_ids='random_string') }}",
    task_name=TASK_NAME + "{{ task_instance.xcom_pull(task_ids='random_string') }}",
    task_id="tasks_get",
)

執行任務

要在特定佇列中執行任務,請使用CloudTasksTaskRunOperator

tests/system/google/cloud/tasks/example_tasks.py

run_task = CloudTasksTaskRunOperator(
    location=LOCATION,
    queue_name=QUEUE_ID + "{{ task_instance.xcom_pull(task_ids='random_string') }}",
    task_name=TASK_NAME + "{{ task_instance.xcom_pull(task_ids='random_string') }}",
    retry=Retry(maximum=10.0),
    task_id="run_task",
)

列出任務

要列出特定佇列中的所有任務,請使用CloudTasksTasksListOperator

tests/system/google/cloud/tasks/example_tasks.py

list_tasks = CloudTasksTasksListOperator(
    location=LOCATION,
    queue_name=QUEUE_ID + "{{ task_instance.xcom_pull(task_ids='random_string') }}",
    task_id="list_tasks",
)

刪除任務

要從特定佇列中刪除任務,請使用CloudTasksTaskDeleteOperator

tests/system/google/cloud/tasks/example_tasks.py

create_task = CloudTasksTaskCreateOperator(
    location=LOCATION,
    queue_name=QUEUE_ID + "{{ task_instance.xcom_pull(task_ids='random_string') }}",
    task=TASK,
    task_name=TASK_NAME + "{{ task_instance.xcom_pull(task_ids='random_string') }}",
    retry=Retry(maximum=10.0),
    timeout=5,
    task_id="create_task_to_run",
)

參考

欲瞭解更多資訊,請參閱

此條目是否有幫助?