Google Cloud Storage 到 BigQuery 傳輸 Operator¶
Google Cloud Storage (GCS) 是一種用於儲存非結構化資料的託管服務。Google Cloud BigQuery 是 Google Cloud 的無伺服器資料倉庫服務。此 Operator 可用於將儲存在 Cloud Storage 儲存桶中的檔案資料填充到 BigQuery 表中。
先決條件任務¶
要使用這些 Operator,您必須完成以下事項
使用 Cloud 控制檯選擇或建立一個 Cloud Platform 專案。
為您的專案啟用結算功能,具體請參閱 Google Cloud 文件中的描述。
啟用 API,具體請參閱 Cloud 控制檯文件中的描述。
透過 pip 安裝 API 庫。
pip install 'apache-airflow[google]'有關 安裝的詳細資訊,請參閱。
Operator¶
從 GCS 到 BigQuery 的檔案傳輸由 GCSToBigQueryOperator Operator 執行。
使用 Jinja 模板化,並透過 bucket, source_objects, schema_object, schema_object_bucket, destination_project_dataset_table, impersonation_chain, src_fmt_configs 引數動態定義值。
您可以使用 source_objects 引數從單個儲存桶載入多個物件。您還可以定義 schema 以及其他設定,例如壓縮格式。有關更多資訊,請參閱上方連結。
傳輸檔案¶
以下 Operator 將一個或多個檔案從 GCS 傳輸到 BigQuery 表中。
tests/system/google/cloud/gcs/example_gcs_to_bigquery.py
load_csv = GCSToBigQueryOperator(
task_id="gcs_to_bigquery_example",
bucket="cloud-samples-data",
source_objects=["bigquery/us-states/us-states.csv"],
destination_project_dataset_table=f"{DATASET_NAME}.{TABLE_NAME}",
schema_fields=[
{"name": "name", "type": "STRING", "mode": "NULLABLE"},
{"name": "post_abbr", "type": "STRING", "mode": "NULLABLE"},
],
write_disposition="WRITE_TRUNCATE",
)
您也可以在可延遲模式下使用 GCSToBigQueryOperator
tests/system/google/cloud/gcs/example_gcs_to_bigquery_async.py
load_string_based_csv = GCSToBigQueryOperator(
task_id="gcs_to_bigquery_example_str_csv_async",
bucket="cloud-samples-data",
source_objects=["bigquery/us-states/us-states.csv"],
destination_project_dataset_table=f"{DATASET_NAME_STR}.{TABLE_NAME_STR}",
write_disposition="WRITE_TRUNCATE",
external_table=False,
autodetect=True,
max_id_key="string_field_0",
deferrable=True,
)
load_date_based_csv = GCSToBigQueryOperator(
task_id="gcs_to_bigquery_example_date_csv_async",
bucket="cloud-samples-data",
source_objects=["bigquery/us-states/us-states-by-date.csv"],
destination_project_dataset_table=f"{DATASET_NAME_DATE}.{TABLE_NAME_DATE}",
write_disposition="WRITE_TRUNCATE",
external_table=False,
autodetect=True,
max_id_key=MAX_ID_DATE,
deferrable=True,
)
load_json = GCSToBigQueryOperator(
task_id="gcs_to_bigquery_example_date_json_async",
bucket="cloud-samples-data",
source_objects=["bigquery/us-states/us-states.json"],
source_format="NEWLINE_DELIMITED_JSON",
destination_project_dataset_table=f"{DATASET_NAME_JSON}.{TABLE_NAME_JSON}",
write_disposition="WRITE_TRUNCATE",
external_table=False,
autodetect=True,
max_id_key=MAX_ID_STR,
deferrable=True,
)
load_csv_delimiter = GCSToBigQueryOperator(
task_id="gcs_to_bigquery_example_delimiter_async",
bucket="big-query-samples",
source_objects=["employees-tabular.csv"],
source_format="csv",
destination_project_dataset_table=f"{DATASET_NAME_DELIMITER}.{TABLE_NAME_DELIMITER}",
write_disposition="WRITE_TRUNCATE",
external_table=False,
autodetect=True,
field_delimiter="\t",
quote_character="",
max_id_key=MAX_ID_STR,
deferrable=True,
)
參考資料¶
如需更多資訊,請參閱