Google Cloud Workflows 操作器

您可以使用 Workflows 來建立無伺服器工作流,這些工作流按照您定義的順序將一系列無伺服器任務連結在一起。結合 Google Cloud 的強大 API、Cloud Functions 和 Cloud Run 等無伺服器產品,以及對外部 API 的呼叫,來建立靈活的無伺服器應用程式。

有關此服務的更多資訊,請訪問 Workflows 官方文件 <產品文件

前提任務

要使用這些操作器,您必須完成以下幾項:

建立工作流

要建立工作流,請使用 WorkflowsCreateWorkflowOperator

tests/system/google/cloud/workflows/example_workflows.py

create_workflow = WorkflowsCreateWorkflowOperator(
    task_id="create_workflow",
    location=LOCATION,
    project_id=PROJECT_ID,
    workflow=WORKFLOW,
    workflow_id=WORKFLOW_ID,
)

工作流應以類似於此示例的方式定義

tests/system/google/cloud/workflows/example_workflows.py

WORKFLOW_CONTENT = """
- getLanguage:
    assign:
        - inputLanguage: "English"
- readWikipedia:
    call: http.get
    args:
        url: https://www.wikipedia.org/
        query:
            action: opensearch
            search: ${inputLanguage}
    result: wikiResult
- returnResult:
    return: ${wikiResult}
"""

WORKFLOW = {
    "description": "Test workflow",
    "labels": {"airflow-version": "dev"},
    "source_contents": WORKFLOW_CONTENT,
}

有關編寫工作流的更多資訊,請檢視官方產品文件 <產品文件

更新工作流

要更新工作流,請使用 WorkflowsUpdateWorkflowOperator

tests/system/google/cloud/workflows/example_workflows.py

update_workflow = WorkflowsUpdateWorkflowOperator(
    task_id="update_workflow",
    location=LOCATION,
    project_id=PROJECT_ID,
    workflow_id=WORKFLOW_ID,
    update_mask=FieldMask(paths=["name", "description"]),
)

獲取工作流

要獲取工作流,請使用 WorkflowsGetWorkflowOperator

tests/system/google/cloud/workflows/example_workflows.py

get_workflow = WorkflowsGetWorkflowOperator(
    task_id="get_workflow", location=LOCATION, project_id=PROJECT_ID, workflow_id=WORKFLOW_ID
)

列出工作流

要列出工作流,請使用 WorkflowsListWorkflowsOperator

tests/system/google/cloud/workflows/example_workflows.py

list_workflows = WorkflowsListWorkflowsOperator(
    task_id="list_workflows",
    location=LOCATION,
    project_id=PROJECT_ID,
)

刪除工作流

要刪除工作流,請使用 WorkflowsDeleteWorkflowOperator

tests/system/google/cloud/workflows/example_workflows.py

delete_workflow = WorkflowsDeleteWorkflowOperator(
    task_id="delete_workflow",
    location=LOCATION,
    project_id=PROJECT_ID,
    workflow_id=WORKFLOW_ID,
    trigger_rule=TriggerRule.ALL_DONE,
)

建立執行

要建立執行,請使用 WorkflowsCreateExecutionOperator。由於 API 限制,此操作器不是冪等的。

tests/system/google/cloud/workflows/example_workflows.py

create_execution = WorkflowsCreateExecutionOperator(
    task_id="create_execution",
    location=LOCATION,
    project_id=PROJECT_ID,
    execution=EXECUTION,
    workflow_id=WORKFLOW_ID,
)

建立操作器不會等待執行完成。要等待執行結果,請使用 WorkflowExecutionSensor

tests/system/google/cloud/workflows/example_workflows.py

wait_for_execution = WorkflowExecutionSensor(
    task_id="wait_for_execution",
    location=LOCATION,
    project_id=PROJECT_ID,
    workflow_id=WORKFLOW_ID,
    execution_id=create_execution_id,
)

獲取執行

要獲取執行,請使用 WorkflowsGetExecutionOperator

tests/system/google/cloud/workflows/example_workflows.py

get_execution = WorkflowsGetExecutionOperator(
    task_id="get_execution",
    location=LOCATION,
    project_id=PROJECT_ID,
    workflow_id=WORKFLOW_ID,
    execution_id=create_execution_id,
)

列出執行

要列出執行,請使用 WorkflowsListExecutionsOperator。預設情況下,此操作器僅返回過去 60 分鐘內的執行。

tests/system/google/cloud/workflows/example_workflows.py

list_executions = WorkflowsListExecutionsOperator(
    task_id="list_executions", location=LOCATION, project_id=PROJECT_ID, workflow_id=WORKFLOW_ID
)

取消執行

要取消執行,請使用 WorkflowsCancelExecutionOperator

tests/system/google/cloud/workflows/example_workflows.py

cancel_execution = WorkflowsCancelExecutionOperator(
    task_id="cancel_execution",
    location=LOCATION,
    project_id=PROJECT_ID,
    workflow_id=SLEEP_WORKFLOW_ID,
    execution_id=cancel_execution_id,
)

本條目有幫助嗎?