Google DataFusion 運算子¶
Cloud Data Fusion 是一項完全託管的雲原生資料整合服務,可幫助使用者高效地構建和管理 ETL/ELT 資料管道。Cloud Data Fusion 提供圖形介面以及包含預配置聯結器和轉換的廣泛開源庫,使組織能夠將重心從程式碼和整合轉移到洞察和行動上。
先決條件任務¶
要使用這些運算子,您必須完成以下幾件事
使用 Cloud Console 選擇或建立 Cloud Platform 專案。
為您的專案啟用結算功能,詳情請參閱 Google Cloud 文件。
啟用 API,詳情請參閱 Cloud Console 文件。
透過 pip 安裝 API 庫。
pip install 'apache-airflow[google]'有關詳細資訊,請參見 安裝。
重啟 DataFusion 例項¶
要重啟 Data Fusion 例項,請使用:CloudDataFusionRestartInstanceOperator。
tests/system/google/cloud/datafusion/example_datafusion.py
restart_instance = CloudDataFusionRestartInstanceOperator(
location=LOCATION, instance_name=INSTANCE_NAME, task_id="restart_instance"
)
您可以將 Jinja 模板 與 instance_name, impersonation_chain 引數一起使用,以動態確定值。結果會儲存到 XCom 中,其他運算子可以使用該結果。
刪除 DataFusion 例項¶
要刪除 Data Fusion 例項,請使用:CloudDataFusionDeleteInstanceOperator。
tests/system/google/cloud/datafusion/example_datafusion.py
delete_instance = CloudDataFusionDeleteInstanceOperator(
location=LOCATION,
instance_name=INSTANCE_NAME,
task_id="delete_instance",
trigger_rule=TriggerRule.ALL_DONE,
)
您可以將 Jinja 模板 與 instance_name, impersonation_chain 引數一起使用,以動態確定值。結果會儲存到 XCom 中,其他運算子可以使用該結果。
建立 DataFusion 例項¶
要建立 Data Fusion 例項,請使用:CloudDataFusionCreateInstanceOperator。
tests/system/google/cloud/datafusion/example_datafusion.py
create_instance = CloudDataFusionCreateInstanceOperator(
location=LOCATION,
instance_name=INSTANCE_NAME,
instance=INSTANCE,
task_id="create_instance",
)
您可以將 Jinja 模板 與 instance_name, instance, impersonation_chain 引數一起使用,以動態確定值。結果會儲存到 XCom 中,其他運算子可以使用該結果。
更新 DataFusion 例項¶
要更新 Data Fusion 例項,請使用:CloudDataFusionUpdateInstanceOperator。
tests/system/google/cloud/datafusion/example_datafusion.py
update_instance = CloudDataFusionUpdateInstanceOperator(
location=LOCATION,
instance_name=INSTANCE_NAME,
instance=INSTANCE,
update_mask="",
task_id="update_instance",
)
您可以將 Jinja 模板 與 instance_name, instance, impersonation_chain 引數一起使用,以動態確定值。結果會儲存到 XCom 中,其他運算子可以使用該結果。
獲取 DataFusion 例項¶
要檢索 Data Fusion 例項,請使用:CloudDataFusionGetInstanceOperator。
tests/system/google/cloud/datafusion/example_datafusion.py
get_instance = CloudDataFusionGetInstanceOperator(
location=LOCATION, instance_name=INSTANCE_NAME, task_id="get_instance"
)
您可以將 Jinja 模板 與 instance_name, impersonation_chain 引數一起使用,以動態確定值。結果會儲存到 XCom 中,其他運算子可以使用該結果。
建立 DataFusion 管道¶
要建立 Data Fusion 管道,請使用:CloudDataFusionCreatePipelineOperator。
tests/system/google/cloud/datafusion/example_datafusion.py
create_pipeline = CloudDataFusionCreatePipelineOperator(
location=LOCATION,
pipeline_name=PIPELINE_NAME,
pipeline=PIPELINE,
instance_name=INSTANCE_NAME,
task_id="create_pipeline",
)
您可以將 Jinja 模板 與 instance_name, pipeline_name, impersonation_chain 引數一起使用,以動態確定值。結果會儲存到 XCom 中,其他運算子可以使用該結果。
啟動 DataFusion 管道¶
要使用同步模式啟動 Data Fusion 管道,請使用:CloudDataFusionStartPipelineOperator。
tests/system/google/cloud/datafusion/example_datafusion.py
start_pipeline = CloudDataFusionStartPipelineOperator(
location=LOCATION,
pipeline_name=PIPELINE_NAME,
instance_name=INSTANCE_NAME,
pipeline_timeout=1000,
task_id="start_pipeline",
)
要使用非同步模式啟動 Data Fusion 管道,請使用:CloudDataFusionStartPipelineOperator。
tests/system/google/cloud/datafusion/example_datafusion.py
start_pipeline_async = CloudDataFusionStartPipelineOperator(
location=LOCATION,
pipeline_name=PIPELINE_NAME,
instance_name=INSTANCE_NAME,
asynchronous=True,
task_id="start_pipeline_async",
)
可以使用可延遲模式非同步啟動 Data Fusion 管道。雖然非同步引數可以使用同步的 sleep() 方法等待 DataFusion 管道達到終止狀態,但可延遲模式使用非同步呼叫檢查狀態。不能同時使用非同步和可延遲引數。請檢視使用可延遲模式的示例:CloudDataFusionStartPipelineOperator。
tests/system/google/cloud/datafusion/example_datafusion.py
start_pipeline_def = CloudDataFusionStartPipelineOperator(
location=LOCATION,
pipeline_name=PIPELINE_NAME,
instance_name=INSTANCE_NAME,
task_id="start_pipeline_def",
deferrable=True,
)
您可以將 Jinja 模板 與 instance_name, pipeline_name, runtime_args, impersonation_chain 引數一起使用,以動態確定值。結果會儲存到 XCom 中,其他運算子可以使用該結果。
停止 DataFusion 管道¶
要停止 Data Fusion 管道,請使用:CloudDataFusionStopPipelineOperator。
tests/system/google/cloud/datafusion/example_datafusion.py
stop_pipeline = CloudDataFusionStopPipelineOperator(
location=LOCATION,
pipeline_name=PIPELINE_NAME,
instance_name=INSTANCE_NAME,
task_id="stop_pipeline",
)
您可以將 Jinja 模板 與 instance_name, pipeline_name, impersonation_chain 引數一起使用,以動態確定值。結果會儲存到 XCom 中,其他運算子可以使用該結果。
刪除 DataFusion 管道¶
要刪除 Data Fusion 管道,請使用:CloudDataFusionDeletePipelineOperator。
tests/system/google/cloud/datafusion/example_datafusion.py
delete_pipeline = CloudDataFusionDeletePipelineOperator(
location=LOCATION,
pipeline_name=PIPELINE_NAME,
instance_name=INSTANCE_NAME,
task_id="delete_pipeline",
trigger_rule=TriggerRule.ALL_DONE,
)
您可以將 Jinja 模板 與 instance_name, version_id, pipeline_name, impersonation_chain 引數一起使用,以動態確定值。結果會儲存到 XCom 中,其他運算子可以使用該結果。
列出 DataFusion 管道¶
要列出 Data Fusion 管道,請使用:CloudDataFusionListPipelinesOperator。
tests/system/google/cloud/datafusion/example_datafusion.py
list_pipelines = CloudDataFusionListPipelinesOperator(
location=LOCATION, instance_name=INSTANCE_NAME, task_id="list_pipelines"
)
您可以將 Jinja 模板 與 instance_name, artifact_name, artifact_version, impersonation_chain 引數一起使用,以動態確定值。結果會儲存到 XCom 中,其他運算子可以使用該結果。
感測器¶
當管道啟動被非同步觸發時,可以使用感測器來執行檢查並驗證管道是否處於正確狀態。
CloudDataFusionPipelineStateSensor.
tests/system/google/cloud/datafusion/example_datafusion.py
start_pipeline_sensor = CloudDataFusionPipelineStateSensor(
task_id="pipeline_state_sensor",
pipeline_name=PIPELINE_NAME,
pipeline_id=start_pipeline_async.output,
expected_statuses=["COMPLETED"],
failure_statuses=["FAILED"],
instance_name=INSTANCE_NAME,
location=LOCATION,
)