Google Cloud BigQuery 到 Postgres 的傳輸 Operator¶
Google Cloud BigQuery 是 Google Cloud 的無伺服器資料倉庫服務。PostgreSQL 是一種開源關係型資料庫管理系統。此 operator 可用於將資料從 BigQuery 表複製到 PostgreSQL。
先決條件任務¶
要使用這些 operators,您必須執行以下操作:
使用 Cloud Console 選擇或建立一個 Cloud Platform 專案。
為您的專案啟用計費,如 Google Cloud 文件 中所述。
啟用 API,如 Cloud Console 文件 中所述。
透過 pip 安裝 API 庫。
pip install 'apache-airflow[google]'有關詳細資訊,請參閱 安裝。
Operator¶
使用 BigQueryToPostgresOperator operator 執行將資料從 BigQuery 表複製到 Postgres 表的操作。
使用 Jinja 模板 配合 target_table_name, impersonation_chain, dataset_id, table_id 動態定義值。
您可以使用引數 selected_fields 來限制要複製的欄位(預設為所有欄位),以及引數 replace 來覆蓋目標表而不是追加資料。如果使用引數 replace,則需要同時指定 selected_fields 和 replace_index 引數,這是由於 PostgreSQL 底層 INSERT 命令中的 ON CONFLICT 子句的限制所致。
更多資訊請參考以上鍊接。
傳輸資料¶
以下 Operator 將資料從 BigQuery 表複製到 PostgreSQL。
tests/system/google/cloud/bigquery/example_bigquery_to_postgres.py
bigquery_to_postgres = BigQueryToPostgresOperator(
task_id="bigquery_to_postgres",
postgres_conn_id=CONNECTION_ID,
dataset_table=f"{BIGQUERY_DATASET_NAME}.{BIGQUERY_TABLE}",
target_table_name=SQL_TABLE,
batch_size=BATCH_SIZE,
replace=False,
)
此 Operator 也可以使用來自 BigQuery 表的匹配資料替換 PostgreSQL 表中的資料。
tests/system/google/cloud/bigquery/example_bigquery_to_postgres.py
bigquery_to_postgres_upsert = BigQueryToPostgresOperator(
task_id="bigquery_to_postgres_upsert",
postgres_conn_id=CONNECTION_ID,
dataset_table=f"{BIGQUERY_DATASET_NAME}.{BIGQUERY_TABLE}",
target_table_name=SQL_TABLE,
batch_size=BATCH_SIZE,
replace=True,
selected_fields=["emp_name", "salary"],
replace_index=["emp_name", "salary"],
)
參考資料¶
有關進一步資訊,請查閱: