Google Cloud Storage 到 BigQuery 傳輸 Operator

Google Cloud Storage (GCS) 是一種用於儲存非結構化資料的託管服務。Google Cloud BigQuery 是 Google Cloud 的無伺服器資料倉庫服務。此 Operator 可用於將儲存在 Cloud Storage 儲存桶中的檔案資料填充到 BigQuery 表中。

先決條件任務

要使用這些 Operator,您必須完成以下事項

Operator

從 GCS 到 BigQuery 的檔案傳輸由 GCSToBigQueryOperator Operator 執行。

使用 Jinja 模板化,並透過 bucket, source_objects, schema_object, schema_object_bucket, destination_project_dataset_table, impersonation_chain, src_fmt_configs 引數動態定義值。

您可以使用 source_objects 引數從單個儲存桶載入多個物件。您還可以定義 schema 以及其他設定,例如壓縮格式。有關更多資訊,請參閱上方連結。

傳輸檔案

以下 Operator 將一個或多個檔案從 GCS 傳輸到 BigQuery 表中。

tests/system/google/cloud/gcs/example_gcs_to_bigquery.py

load_csv = GCSToBigQueryOperator(
    task_id="gcs_to_bigquery_example",
    bucket="cloud-samples-data",
    source_objects=["bigquery/us-states/us-states.csv"],
    destination_project_dataset_table=f"{DATASET_NAME}.{TABLE_NAME}",
    schema_fields=[
        {"name": "name", "type": "STRING", "mode": "NULLABLE"},
        {"name": "post_abbr", "type": "STRING", "mode": "NULLABLE"},
    ],
    write_disposition="WRITE_TRUNCATE",
)

您也可以在可延遲模式下使用 GCSToBigQueryOperator

tests/system/google/cloud/gcs/example_gcs_to_bigquery_async.py

load_string_based_csv = GCSToBigQueryOperator(
    task_id="gcs_to_bigquery_example_str_csv_async",
    bucket="cloud-samples-data",
    source_objects=["bigquery/us-states/us-states.csv"],
    destination_project_dataset_table=f"{DATASET_NAME_STR}.{TABLE_NAME_STR}",
    write_disposition="WRITE_TRUNCATE",
    external_table=False,
    autodetect=True,
    max_id_key="string_field_0",
    deferrable=True,
)

load_date_based_csv = GCSToBigQueryOperator(
    task_id="gcs_to_bigquery_example_date_csv_async",
    bucket="cloud-samples-data",
    source_objects=["bigquery/us-states/us-states-by-date.csv"],
    destination_project_dataset_table=f"{DATASET_NAME_DATE}.{TABLE_NAME_DATE}",
    write_disposition="WRITE_TRUNCATE",
    external_table=False,
    autodetect=True,
    max_id_key=MAX_ID_DATE,
    deferrable=True,
)

load_json = GCSToBigQueryOperator(
    task_id="gcs_to_bigquery_example_date_json_async",
    bucket="cloud-samples-data",
    source_objects=["bigquery/us-states/us-states.json"],
    source_format="NEWLINE_DELIMITED_JSON",
    destination_project_dataset_table=f"{DATASET_NAME_JSON}.{TABLE_NAME_JSON}",
    write_disposition="WRITE_TRUNCATE",
    external_table=False,
    autodetect=True,
    max_id_key=MAX_ID_STR,
    deferrable=True,
)

load_csv_delimiter = GCSToBigQueryOperator(
    task_id="gcs_to_bigquery_example_delimiter_async",
    bucket="big-query-samples",
    source_objects=["employees-tabular.csv"],
    source_format="csv",
    destination_project_dataset_table=f"{DATASET_NAME_DELIMITER}.{TABLE_NAME_DELIMITER}",
    write_disposition="WRITE_TRUNCATE",
    external_table=False,
    autodetect=True,
    field_delimiter="\t",
    quote_character="",
    max_id_key=MAX_ID_STR,
    deferrable=True,
)

參考資料

如需更多資訊,請參閱

此條目是否有幫助?