Google DataFusion 運算子

Cloud Data Fusion 是一項完全託管的雲原生資料整合服務,可幫助使用者高效地構建和管理 ETL/ELT 資料管道。Cloud Data Fusion 提供圖形介面以及包含預配置聯結器和轉換的廣泛開源庫,使組織能夠將重心從程式碼和整合轉移到洞察和行動上。

先決條件任務

要使用這些運算子,您必須完成以下幾件事

重啟 DataFusion 例項

要重啟 Data Fusion 例項,請使用:CloudDataFusionRestartInstanceOperator

tests/system/google/cloud/datafusion/example_datafusion.py

restart_instance = CloudDataFusionRestartInstanceOperator(
    location=LOCATION, instance_name=INSTANCE_NAME, task_id="restart_instance"
)

您可以將 Jinja 模板instance_name, impersonation_chain 引數一起使用,以動態確定值。結果會儲存到 XCom 中,其他運算子可以使用該結果。

刪除 DataFusion 例項

要刪除 Data Fusion 例項,請使用:CloudDataFusionDeleteInstanceOperator

tests/system/google/cloud/datafusion/example_datafusion.py

delete_instance = CloudDataFusionDeleteInstanceOperator(
    location=LOCATION,
    instance_name=INSTANCE_NAME,
    task_id="delete_instance",
    trigger_rule=TriggerRule.ALL_DONE,
)

您可以將 Jinja 模板instance_name, impersonation_chain 引數一起使用,以動態確定值。結果會儲存到 XCom 中,其他運算子可以使用該結果。

建立 DataFusion 例項

要建立 Data Fusion 例項,請使用:CloudDataFusionCreateInstanceOperator

tests/system/google/cloud/datafusion/example_datafusion.py

create_instance = CloudDataFusionCreateInstanceOperator(
    location=LOCATION,
    instance_name=INSTANCE_NAME,
    instance=INSTANCE,
    task_id="create_instance",
)

您可以將 Jinja 模板instance_name, instance, impersonation_chain 引數一起使用,以動態確定值。結果會儲存到 XCom 中,其他運算子可以使用該結果。

更新 DataFusion 例項

要更新 Data Fusion 例項,請使用:CloudDataFusionUpdateInstanceOperator

tests/system/google/cloud/datafusion/example_datafusion.py

update_instance = CloudDataFusionUpdateInstanceOperator(
    location=LOCATION,
    instance_name=INSTANCE_NAME,
    instance=INSTANCE,
    update_mask="",
    task_id="update_instance",
)

您可以將 Jinja 模板instance_name, instance, impersonation_chain 引數一起使用,以動態確定值。結果會儲存到 XCom 中,其他運算子可以使用該結果。

獲取 DataFusion 例項

要檢索 Data Fusion 例項,請使用:CloudDataFusionGetInstanceOperator

tests/system/google/cloud/datafusion/example_datafusion.py

get_instance = CloudDataFusionGetInstanceOperator(
    location=LOCATION, instance_name=INSTANCE_NAME, task_id="get_instance"
)

您可以將 Jinja 模板instance_name, impersonation_chain 引數一起使用,以動態確定值。結果會儲存到 XCom 中,其他運算子可以使用該結果。

建立 DataFusion 管道

要建立 Data Fusion 管道,請使用:CloudDataFusionCreatePipelineOperator

tests/system/google/cloud/datafusion/example_datafusion.py

create_pipeline = CloudDataFusionCreatePipelineOperator(
    location=LOCATION,
    pipeline_name=PIPELINE_NAME,
    pipeline=PIPELINE,
    instance_name=INSTANCE_NAME,
    task_id="create_pipeline",
)

您可以將 Jinja 模板instance_name, pipeline_name, impersonation_chain 引數一起使用,以動態確定值。結果會儲存到 XCom 中,其他運算子可以使用該結果。

啟動 DataFusion 管道

要使用同步模式啟動 Data Fusion 管道,請使用:CloudDataFusionStartPipelineOperator

tests/system/google/cloud/datafusion/example_datafusion.py

start_pipeline = CloudDataFusionStartPipelineOperator(
    location=LOCATION,
    pipeline_name=PIPELINE_NAME,
    instance_name=INSTANCE_NAME,
    pipeline_timeout=1000,
    task_id="start_pipeline",
)

要使用非同步模式啟動 Data Fusion 管道,請使用:CloudDataFusionStartPipelineOperator

tests/system/google/cloud/datafusion/example_datafusion.py

start_pipeline_async = CloudDataFusionStartPipelineOperator(
    location=LOCATION,
    pipeline_name=PIPELINE_NAME,
    instance_name=INSTANCE_NAME,
    asynchronous=True,
    task_id="start_pipeline_async",
)

可以使用可延遲模式非同步啟動 Data Fusion 管道。雖然非同步引數可以使用同步的 sleep() 方法等待 DataFusion 管道達到終止狀態,但可延遲模式使用非同步呼叫檢查狀態。不能同時使用非同步和可延遲引數。請檢視使用可延遲模式的示例:CloudDataFusionStartPipelineOperator

tests/system/google/cloud/datafusion/example_datafusion.py

start_pipeline_def = CloudDataFusionStartPipelineOperator(
    location=LOCATION,
    pipeline_name=PIPELINE_NAME,
    instance_name=INSTANCE_NAME,
    task_id="start_pipeline_def",
    deferrable=True,
)

您可以將 Jinja 模板instance_name, pipeline_name, runtime_args, impersonation_chain 引數一起使用,以動態確定值。結果會儲存到 XCom 中,其他運算子可以使用該結果。

停止 DataFusion 管道

要停止 Data Fusion 管道,請使用:CloudDataFusionStopPipelineOperator

tests/system/google/cloud/datafusion/example_datafusion.py

stop_pipeline = CloudDataFusionStopPipelineOperator(
    location=LOCATION,
    pipeline_name=PIPELINE_NAME,
    instance_name=INSTANCE_NAME,
    task_id="stop_pipeline",
)

您可以將 Jinja 模板instance_name, pipeline_name, impersonation_chain 引數一起使用,以動態確定值。結果會儲存到 XCom 中,其他運算子可以使用該結果。

刪除 DataFusion 管道

要刪除 Data Fusion 管道,請使用:CloudDataFusionDeletePipelineOperator

tests/system/google/cloud/datafusion/example_datafusion.py

delete_pipeline = CloudDataFusionDeletePipelineOperator(
    location=LOCATION,
    pipeline_name=PIPELINE_NAME,
    instance_name=INSTANCE_NAME,
    task_id="delete_pipeline",
    trigger_rule=TriggerRule.ALL_DONE,
)

您可以將 Jinja 模板instance_name, version_id, pipeline_name, impersonation_chain 引數一起使用,以動態確定值。結果會儲存到 XCom 中,其他運算子可以使用該結果。

列出 DataFusion 管道

要列出 Data Fusion 管道,請使用:CloudDataFusionListPipelinesOperator

tests/system/google/cloud/datafusion/example_datafusion.py

list_pipelines = CloudDataFusionListPipelinesOperator(
    location=LOCATION, instance_name=INSTANCE_NAME, task_id="list_pipelines"
)

您可以將 Jinja 模板instance_name, artifact_name, artifact_version, impersonation_chain 引數一起使用,以動態確定值。結果會儲存到 XCom 中,其他運算子可以使用該結果。

感測器

當管道啟動被非同步觸發時,可以使用感測器來執行檢查並驗證管道是否處於正確狀態。

CloudDataFusionPipelineStateSensor.

tests/system/google/cloud/datafusion/example_datafusion.py

start_pipeline_sensor = CloudDataFusionPipelineStateSensor(
    task_id="pipeline_state_sensor",
    pipeline_name=PIPELINE_NAME,
    pipeline_id=start_pipeline_async.output,
    expected_statuses=["COMPLETED"],
    failure_statuses=["FAILED"],
    instance_name=INSTANCE_NAME,
    location=LOCATION,
)

CloudDataFusionPipelineStateSensor.

此條目有幫助嗎?