Google Cloud BigQuery 到 Postgres 的傳輸 Operator

Google Cloud BigQuery 是 Google Cloud 的無伺服器資料倉庫服務。PostgreSQL 是一種開源關係型資料庫管理系統。此 operator 可用於將資料從 BigQuery 表複製到 PostgreSQL。

先決條件任務

要使用這些 operators,您必須執行以下操作:

Operator

使用 BigQueryToPostgresOperator operator 執行將資料從 BigQuery 表複製到 Postgres 表的操作。

使用 Jinja 模板 配合 target_table_name, impersonation_chain, dataset_id, table_id 動態定義值。

您可以使用引數 selected_fields 來限制要複製的欄位(預設為所有欄位),以及引數 replace 來覆蓋目標表而不是追加資料。如果使用引數 replace,則需要同時指定 selected_fieldsreplace_index 引數,這是由於 PostgreSQL 底層 INSERT 命令中的 ON CONFLICT 子句的限制所致。

更多資訊請參考以上鍊接。

傳輸資料

以下 Operator 將資料從 BigQuery 表複製到 PostgreSQL。

tests/system/google/cloud/bigquery/example_bigquery_to_postgres.py

bigquery_to_postgres = BigQueryToPostgresOperator(
    task_id="bigquery_to_postgres",
    postgres_conn_id=CONNECTION_ID,
    dataset_table=f"{BIGQUERY_DATASET_NAME}.{BIGQUERY_TABLE}",
    target_table_name=SQL_TABLE,
    batch_size=BATCH_SIZE,
    replace=False,
)

此 Operator 也可以使用來自 BigQuery 表的匹配資料替換 PostgreSQL 表中的資料。

tests/system/google/cloud/bigquery/example_bigquery_to_postgres.py

bigquery_to_postgres_upsert = BigQueryToPostgresOperator(
    task_id="bigquery_to_postgres_upsert",
    postgres_conn_id=CONNECTION_ID,
    dataset_table=f"{BIGQUERY_DATASET_NAME}.{BIGQUERY_TABLE}",
    target_table_name=SQL_TABLE,
    batch_size=BATCH_SIZE,
    replace=True,
    selected_fields=["emp_name", "salary"],
    replace_index=["emp_name", "salary"],
)

參考資料

有關進一步資訊,請查閱:

此條目有幫助嗎?