Google Cloud Workflows 操作器¶
您可以使用 Workflows 來建立無伺服器工作流,這些工作流按照您定義的順序將一系列無伺服器任務連結在一起。結合 Google Cloud 的強大 API、Cloud Functions 和 Cloud Run 等無伺服器產品,以及對外部 API 的呼叫,來建立靈活的無伺服器應用程式。
有關此服務的更多資訊,請訪問 Workflows 官方文件 <產品文件。
前提任務¶
要使用這些操作器,您必須完成以下幾項:
使用 Cloud Console 選擇或建立一個 Cloud Platform 專案。
為您的專案啟用結算,詳情請參閱 Google Cloud 文件。
啟用 API,詳情請參閱 Cloud Console 文件。
透過 pip 安裝 API 庫。
pip install 'apache-airflow[google]'詳細資訊請參閱 安裝。
建立工作流¶
要建立工作流,請使用 WorkflowsCreateWorkflowOperator。
tests/system/google/cloud/workflows/example_workflows.py
create_workflow = WorkflowsCreateWorkflowOperator(
task_id="create_workflow",
location=LOCATION,
project_id=PROJECT_ID,
workflow=WORKFLOW,
workflow_id=WORKFLOW_ID,
)
工作流應以類似於此示例的方式定義
tests/system/google/cloud/workflows/example_workflows.py
WORKFLOW_CONTENT = """
- getLanguage:
assign:
- inputLanguage: "English"
- readWikipedia:
call: http.get
args:
url: https://www.wikipedia.org/
query:
action: opensearch
search: ${inputLanguage}
result: wikiResult
- returnResult:
return: ${wikiResult}
"""
WORKFLOW = {
"description": "Test workflow",
"labels": {"airflow-version": "dev"},
"source_contents": WORKFLOW_CONTENT,
}
有關編寫工作流的更多資訊,請檢視官方產品文件 <產品文件。
更新工作流¶
要更新工作流,請使用 WorkflowsUpdateWorkflowOperator。
tests/system/google/cloud/workflows/example_workflows.py
update_workflow = WorkflowsUpdateWorkflowOperator(
task_id="update_workflow",
location=LOCATION,
project_id=PROJECT_ID,
workflow_id=WORKFLOW_ID,
update_mask=FieldMask(paths=["name", "description"]),
)
獲取工作流¶
要獲取工作流,請使用 WorkflowsGetWorkflowOperator。
tests/system/google/cloud/workflows/example_workflows.py
get_workflow = WorkflowsGetWorkflowOperator(
task_id="get_workflow", location=LOCATION, project_id=PROJECT_ID, workflow_id=WORKFLOW_ID
)
列出工作流¶
要列出工作流,請使用 WorkflowsListWorkflowsOperator。
tests/system/google/cloud/workflows/example_workflows.py
list_workflows = WorkflowsListWorkflowsOperator(
task_id="list_workflows",
location=LOCATION,
project_id=PROJECT_ID,
)
刪除工作流¶
要刪除工作流,請使用 WorkflowsDeleteWorkflowOperator。
tests/system/google/cloud/workflows/example_workflows.py
delete_workflow = WorkflowsDeleteWorkflowOperator(
task_id="delete_workflow",
location=LOCATION,
project_id=PROJECT_ID,
workflow_id=WORKFLOW_ID,
trigger_rule=TriggerRule.ALL_DONE,
)
建立執行¶
要建立執行,請使用 WorkflowsCreateExecutionOperator。由於 API 限制,此操作器不是冪等的。
tests/system/google/cloud/workflows/example_workflows.py
create_execution = WorkflowsCreateExecutionOperator(
task_id="create_execution",
location=LOCATION,
project_id=PROJECT_ID,
execution=EXECUTION,
workflow_id=WORKFLOW_ID,
)
建立操作器不會等待執行完成。要等待執行結果,請使用 WorkflowExecutionSensor。
tests/system/google/cloud/workflows/example_workflows.py
wait_for_execution = WorkflowExecutionSensor(
task_id="wait_for_execution",
location=LOCATION,
project_id=PROJECT_ID,
workflow_id=WORKFLOW_ID,
execution_id=create_execution_id,
)
獲取執行¶
要獲取執行,請使用 WorkflowsGetExecutionOperator。
tests/system/google/cloud/workflows/example_workflows.py
get_execution = WorkflowsGetExecutionOperator(
task_id="get_execution",
location=LOCATION,
project_id=PROJECT_ID,
workflow_id=WORKFLOW_ID,
execution_id=create_execution_id,
)
列出執行¶
要列出執行,請使用 WorkflowsListExecutionsOperator。預設情況下,此操作器僅返回過去 60 分鐘內的執行。
tests/system/google/cloud/workflows/example_workflows.py
list_executions = WorkflowsListExecutionsOperator(
task_id="list_executions", location=LOCATION, project_id=PROJECT_ID, workflow_id=WORKFLOW_ID
)
取消執行¶
要取消執行,請使用 WorkflowsCancelExecutionOperator。
tests/system/google/cloud/workflows/example_workflows.py
cancel_execution = WorkflowsCancelExecutionOperator(
task_id="cancel_execution",
location=LOCATION,
project_id=PROJECT_ID,
workflow_id=SLEEP_WORKFLOW_ID,
execution_id=cancel_execution_id,
)