Google Dataform 運算子¶
Dataform 是一項服務,供資料分析師開發、測試、版本控制和排程用於 BigQuery 中資料轉換的複雜 SQL 工作流。
Dataform 允許您在資料整合過程的抽取、載入和轉換 (ELT) 階段管理資料轉換。從源系統抽取原始資料並載入到 BigQuery 後,Dataform 可幫助您將其轉換為定義良好、經過測試並提供文件的一套資料表。
有關此任務的更多資訊,請訪問 Dataform 文件
配置¶
在使用 Dataform 運算子之前,您需要初始化倉庫和工作區,有關這方面的更多資訊,請訪問 Dataform 文件
建立倉庫¶
要在 Dataform 服務中建立一個用於跟蹤您程式碼的倉庫,請使用 DataformCreateRepositoryOperator。用法示例如下所示
tests/system/google/cloud/dataform/example_dataform.py
make_repository = DataformCreateRepositoryOperator(
task_id="make-repository",
project_id=PROJECT_ID,
region=REGION,
repository_id=REPOSITORY_ID,
)
建立工作區¶
要在 Dataform 服務中建立一個用於儲存您程式碼的工作區,請使用 DataformCreateWorkspaceOperator。用法示例如下所示
tests/system/google/cloud/dataform/example_dataform.py
make_workspace = DataformCreateWorkspaceOperator(
task_id="make-workspace",
project_id=PROJECT_ID,
region=REGION,
repository_id=REPOSITORY_ID,
workspace_id=WORKSPACE_ID,
)
建立編譯結果¶
要建立編譯結果 (Compilation Result),請使用 DataformCreateCompilationResultOperator。一個簡單的配置可能如下所示
tests/system/google/cloud/dataform/example_dataform.py
create_compilation_result = DataformCreateCompilationResultOperator(
task_id="create-compilation-result",
project_id=PROJECT_ID,
region=REGION,
repository_id=REPOSITORY_ID,
compilation_result={
"git_commitish": "main",
"workspace": (
f"projects/{PROJECT_ID}/locations/{REGION}/repositories/{REPOSITORY_ID}/"
f"workspaces/{WORKSPACE_ID}"
),
},
)
獲取編譯結果¶
要獲取編譯結果 (Compilation Result),您可以使用 DataformGetCompilationResultOperator。
tests/system/google/cloud/dataform/example_dataform.py
get_compilation_result = DataformGetCompilationResultOperator(
task_id="get-compilation-result",
project_id=PROJECT_ID,
region=REGION,
repository_id=REPOSITORY_ID,
compilation_result_id=(
"{{ task_instance.xcom_pull('create-compilation-result')['name'].split('/')[-1] }}"
),
)
建立工作流呼叫¶
要建立工作流呼叫 (Workflow Invocation),您可以使用 DataformCreateWorkflowInvocationOperator。
tests/system/google/cloud/dataform/example_dataform.py
create_workflow_invocation = DataformCreateWorkflowInvocationOperator(
task_id="create-workflow-invocation",
project_id=PROJECT_ID,
region=REGION,
repository_id=REPOSITORY_ID,
workflow_invocation={
"compilation_result": "{{ task_instance.xcom_pull('create-compilation-result')['name'] }}"
},
)
我們可以同步模式和非同步模式執行此操作,對於非同步操作,我們還有一個感測器 DataformWorkflowInvocationStateSensor。
tests/system/google/cloud/dataform/example_dataform.py
create_workflow_invocation_async = DataformCreateWorkflowInvocationOperator(
task_id="create-workflow-invocation-async",
project_id=PROJECT_ID,
region=REGION,
repository_id=REPOSITORY_ID,
asynchronous=True,
workflow_invocation={
"compilation_result": "{{ task_instance.xcom_pull('create-compilation-result')['name'] }}"
},
)
is_workflow_invocation_done = DataformWorkflowInvocationStateSensor(
task_id="is-workflow-invocation-done",
project_id=PROJECT_ID,
region=REGION,
repository_id=REPOSITORY_ID,
workflow_invocation_id=(
"{{ task_instance.xcom_pull('create-workflow-invocation-async')['name'].split('/')[-1] }}"
),
expected_statuses={WorkflowInvocation.State.SUCCEEDED},
)
我們還有一個感測器 DataformWorkflowInvocationActionStateSensor,用於檢查非同步觸發的工作流呼叫的特定操作的狀態。
tests/system/google/cloud/dataform/example_dataform.py
is_workflow_invocation_action_done = DataformWorkflowInvocationActionStateSensor(
task_id="is-workflow-invocation-action-done",
project_id=PROJECT_ID,
region=REGION,
repository_id=REPOSITORY_ID,
workflow_invocation_id=(
"{{ task_instance.xcom_pull('create-workflow-invocation-async')['name'].split('/')[-1] }}"
),
target_name="first_view",
expected_statuses={WorkflowInvocationAction.State.SUCCEEDED},
failure_statuses={
WorkflowInvocationAction.State.SKIPPED,
WorkflowInvocationAction.State.DISABLED,
WorkflowInvocationAction.State.CANCELLED,
WorkflowInvocationAction.State.FAILED,
},
)
獲取工作流呼叫¶
要獲取工作流呼叫 (Workflow Invocation),您可以使用 DataformGetWorkflowInvocationOperator。
tests/system/google/cloud/dataform/example_dataform.py
get_workflow_invocation = DataformGetWorkflowInvocationOperator(
task_id="get-workflow-invocation",
project_id=PROJECT_ID,
region=REGION,
repository_id=REPOSITORY_ID,
workflow_invocation_id=(
"{{ task_instance.xcom_pull('create-workflow-invocation')['name'].split('/')[-1] }}"
),
)
查詢工作流呼叫操作¶
要查詢工作流呼叫操作 (Workflow Invocation Actions),您可以使用 DataformQueryWorkflowInvocationActionsOperator。
tests/system/google/cloud/dataform/example_dataform.py
query_workflow_invocation_actions = DataformQueryWorkflowInvocationActionsOperator(
task_id="query-workflow-invocation-actions",
project_id=PROJECT_ID,
region=REGION,
repository_id=REPOSITORY_ID,
workflow_invocation_id=(
"{{ task_instance.xcom_pull('create-workflow-invocation')['name'].split('/')[-1] }}"
),
)
取消工作流呼叫¶
要取消工作流呼叫 (Workflow Invocation),您可以使用 DataformCancelWorkflowInvocationOperator。
tests/system/google/cloud/dataform/example_dataform.py
cancel_workflow_invocation = DataformCancelWorkflowInvocationOperator(
task_id="cancel-workflow-invocation",
project_id=PROJECT_ID,
region=REGION,
repository_id=REPOSITORY_ID,
workflow_invocation_id=(
"{{ task_instance.xcom_pull('create-workflow-invocation-for-cancel')['name'].split('/')[-1] }}"
),
)
刪除倉庫¶
要刪除倉庫 (repository),請使用 DataformDeleteRepositoryOperator。用法示例如下所示
tests/system/google/cloud/dataform/example_dataform.py
delete_repository = DataformDeleteRepositoryOperator(
task_id="delete-repository",
project_id=PROJECT_ID,
region=REGION,
repository_id=REPOSITORY_ID,
trigger_rule=TriggerRule.ALL_DONE,
)
刪除工作區¶
要刪除工作區 (workspace),請使用 DataformDeleteWorkspaceOperator。用法示例如下所示
tests/system/google/cloud/dataform/example_dataform.py
delete_workspace = DataformDeleteWorkspaceOperator(
task_id="delete-workspace",
project_id=PROJECT_ID,
region=REGION,
repository_id=REPOSITORY_ID,
workspace_id=WORKSPACE_ID,
trigger_rule=TriggerRule.ALL_DONE,
)
移除檔案¶
要移除檔案 (file),請使用 DataformRemoveFileOperator。用法示例如下所示
tests/system/google/cloud/dataform/example_dataform.py
remove_test_file = DataformRemoveFileOperator(
task_id="remove-test-file",
project_id=PROJECT_ID,
region=REGION,
repository_id=REPOSITORY_ID,
workspace_id=WORKSPACE_ID,
filepath="test/test.txt",
)
移除目錄¶
要移除目錄 (directory),請使用 DataformRemoveDirectoryOperator。用法示例如下所示
tests/system/google/cloud/dataform/example_dataform.py
remove_test_directory = DataformRemoveDirectoryOperator(
task_id="remove-test-directory",
project_id=PROJECT_ID,
region=REGION,
repository_id=REPOSITORY_ID,
workspace_id=WORKSPACE_ID,
directory_path="test",
)
初始化工作區¶
為提供的工作區建立預設的專案結構。在此之前應先建立工作區和倉庫。用法示例如下所示
tests/system/google/cloud/dataform/example_dataform.py
first_initialization_step, last_initialization_step = make_initialization_workspace_flow(
project_id=PROJECT_ID,
region=REGION,
repository_id=REPOSITORY_ID,
workspace_id=WORKSPACE_ID,
package_name=f"dataform_package_{ENV_ID}",
without_installation=True,
dataform_schema_name=DATAFORM_SCHEMA_NAME,
)
向工作區寫入檔案¶
要將給定內容的檔案寫入指定工作區 (workspace),請使用 DataformWriteFileOperator。
tests/system/google/cloud/dataform/example_dataform.py
test_file_content = b"""
test test for test file
"""
write_test_file = DataformWriteFileOperator(
task_id="make-test-file",
project_id=PROJECT_ID,
region=REGION,
repository_id=REPOSITORY_ID,
workspace_id=WORKSPACE_ID,
filepath="test/test.txt",
contents=test_file_content,
)
在工作區中建立目錄¶
要在指定工作區 (workspace) 中建立具有給定路徑的目錄,請使用 DataformMakeDirectoryOperator。
tests/system/google/cloud/dataform/example_dataform.py
make_test_directory = DataformMakeDirectoryOperator(
task_id="make-test-directory",
project_id=PROJECT_ID,
region=REGION,
repository_id=REPOSITORY_ID,
workspace_id=WORKSPACE_ID,
directory_path="test",
)
安裝 NPM 包¶
要為指定工作區 (workspace) 安裝 npm 包,請使用 DataformInstallNpmPackagesOperator。
tests/system/google/cloud/dataform/example_dataform.py
install_npm_packages = DataformInstallNpmPackagesOperator(
task_id="install-npm-packages",
project_id=PROJECT_ID,
region=REGION,
repository_id=REPOSITORY_ID,
workspace_id=WORKSPACE_ID,
)