Google Cloud BigQuery 資料遷移服務 Operator¶
藉助 BigQuery 資料遷移服務,您可以按計劃、託管的方式自動將資料從 SaaS 應用傳輸到 Google BigQuery。您的分析團隊無需編寫任何程式碼即可奠定資料倉庫的基礎。BigQuery 資料遷移服務最初支援 Google 應用來源,例如 Google Ads、Campaign Manager、Google Ad Manager 和 YouTube。透過 BigQuery 資料遷移服務,使用者還可以訪問資料聯結器,方便地將資料從 Teradata 和 Amazon S3 傳輸到 BigQuery。
前置任務¶
要使用這些 Operator,您必須做幾件事:
使用 Cloud Console 選擇或建立一個 Cloud Platform 專案。
按照 Google Cloud 文件 中的說明,為您的專案啟用結算功能。
按照 Cloud Console 文件 中的說明,啟用 API。
透過 pip 安裝 API 庫。
pip install 'apache-airflow[google]'有關 安裝 的詳細資訊,請查閱相關文件。
建立傳輸配置¶
要建立 DTS 傳輸配置,您可以使用 BigQueryCreateDataTransferOperator。
在 Airflow 中,客戶需要建立一個停用自動排程的傳輸配置,然後使用專門的 Airflow Operator 呼叫 StartManualTransferRuns API 來觸發傳輸執行,例如 BigQueryDataTransferServiceStartTransferRunsOperator。BigQueryCreateDataTransferOperator 會檢查傳入的配置中是否存在自動排程選項。如果存在,則不執行任何操作;否則,其值將設定為 True。
tests/system/google/cloud/bigquery/example_bigquery_dts.py
# In the case of Airflow, the customer needs to create a transfer
# config with the automatic scheduling disabled and then trigger
# a transfer run using a specialized Airflow operator
TRANSFER_CONFIG = {
"destination_dataset_id": DATASET_NAME,
"display_name": "test data transfer",
"data_source_id": "google_cloud_storage",
"schedule_options": {"disable_auto_scheduling": True},
"params": {
"field_delimiter": ",",
"max_bad_records": "0",
"skip_leading_rows": "1",
"data_path_template": BUCKET_URI,
"destination_table_name_template": DTS_BQ_TABLE,
"file_format": "CSV",
},
}
您可以建立帶或不帶專案 ID 的 Operator。如果專案 ID 缺失,將從使用的 Google Cloud 連線中檢索。Operator 的基本用法如下:
tests/system/google/cloud/bigquery/example_bigquery_dts.py
gcp_bigquery_create_transfer = BigQueryCreateDataTransferOperator(
transfer_config=TRANSFER_CONFIG,
project_id=PROJECT_ID,
task_id="gcp_bigquery_create_transfer",
)
transfer_config_id = cast("str", XComArg(gcp_bigquery_create_transfer, key="transfer_config_id"))
您可以使用 Jinja 模板 配合引數 transfer_config、project_id、authorization_code、gcp_conn_id、impersonation_chain 來動態確定值。結果會儲存到 XCom,供其他 Operator 使用。此外,新配置的 ID 可透過 transfer_config_id 鍵在 XCom 中訪問。
刪除傳輸配置¶
要刪除 DTS 傳輸配置,您可以使用 BigQueryDeleteDataTransferConfigOperator。
Operator 的基本用法如下:
tests/system/google/cloud/bigquery/example_bigquery_dts.py
gcp_bigquery_delete_transfer = BigQueryDeleteDataTransferConfigOperator(
transfer_config_id=transfer_config_id, task_id="gcp_bigquery_delete_transfer"
)
您可以使用 Jinja 模板 配合引數 transfer_config、project_id、authorization_code、gcp_conn_id、impersonation_chain 來動態確定值。
手動啟動傳輸執行¶
啟動手動傳輸執行,其 schedule_time 等於當前時間,立即執行。使用 BigQueryDataTransferServiceStartTransferRunsOperator。
Operator 的基本用法如下:
tests/system/google/cloud/bigquery/example_bigquery_dts.py
gcp_bigquery_start_transfer = BigQueryDataTransferServiceStartTransferRunsOperator(
task_id="gcp_bigquery_start_transfer",
project_id=PROJECT_ID,
transfer_config_id=transfer_config_id,
requested_run_time={"seconds": int(time.time() + 60)},
)
您可以使用 Jinja 模板 配合引數 transfer_config_id、project_id、requested_time_range、requested_run_time、gcp_conn_id、impersonation_chain 來動態確定值。
要檢查操作是否成功,您可以使用 BigQueryDataTransferServiceTransferRunSensor。
tests/system/google/cloud/bigquery/example_bigquery_dts.py
gcp_run_sensor = BigQueryDataTransferServiceTransferRunSensor(
task_id="gcp_run_sensor",
transfer_config_id=transfer_config_id,
run_id=cast("str", XComArg(gcp_bigquery_start_transfer, key="run_id")),
expected_statuses={"SUCCEEDED"},
)
您可以使用 Jinja 模板 配合引數 run_id、transfer_config_id、expected_statuses、project_id、impersonation_chain 來動態確定值。
參考¶
欲瞭解更多資訊,請參閱: