Presto 到 Google Cloud Storage 傳輸運算子¶
Presto 是一個開源的分散式 SQL 查詢引擎,用於對從千兆位元組到拍位元組的各種大小資料來源執行互動式分析查詢。Presto 允許在資料所在位置進行查詢,包括 Hive、Cassandra、關係型資料庫甚至專有資料儲存。單個 Presto 查詢可以組合來自多個來源的資料,從而實現對整個組織的資料分析。
Google Cloud Storage 允許在全球範圍內隨時儲存和檢索任意量的資料。您可以使用它儲存備份資料和歸檔資料,也可以作為BigQuery 的資料來源。
資料傳輸¶
使用 PrestoToGCSOperator 運算子在 Presto 和 Google Storage 之間傳輸檔案。
此運算子有 3 個必需引數
sql- 要執行的 SQL。bucket- 要上傳到的儲存桶。filename- 上傳到 Google Cloud Storage 時用作物件名稱的檔名。在檔名中應指定{},以便運算子在檔案因大小而分割時注入檔案編號。
所有引數都在參考文件中進行了描述 - PrestoToGCSOperator。
示例運算子呼叫可能如下所示
tests/system/google/cloud/gcs/example_presto_to_gcs.py
presto_to_gcs_basic = PrestoToGCSOperator(
task_id="presto_to_gcs_basic",
sql=f"select * from {SOURCE_MULTIPLE_TYPES}",
bucket=BUCKET_NAME,
filename=f"{safe_name(SOURCE_MULTIPLE_TYPES)}.{{}}.json",
)
資料格式選擇¶
此運算子支援兩種輸出格式
json- JSON Lines(預設)csv
您可以透過 export_format 引數指定這些選項。
如果您想建立 CSV 檔案,您的運算子呼叫可能如下所示
tests/system/google/cloud/gcs/example_presto_to_gcs.py
presto_to_gcs_csv = PrestoToGCSOperator(
task_id="presto_to_gcs_csv",
sql=f"select * from {SOURCE_MULTIPLE_TYPES}",
bucket=BUCKET_NAME,
filename=f"{safe_name(SOURCE_MULTIPLE_TYPES)}.{{}}.csv",
schema_filename=f"{safe_name(SOURCE_MULTIPLE_TYPES)}-schema.json",
export_format="csv",
)
生成 BigQuery schema¶
如果您設定 schema_filename 引數,一個包含表對應 BigQuery schema 欄位的 .json 檔案將從資料庫匯出並上傳到儲存桶。
如果您想建立 schema 檔案,那麼示例運算子呼叫可能如下所示
tests/system/google/cloud/gcs/example_presto_to_gcs.py
presto_to_gcs_multiple_types = PrestoToGCSOperator(
task_id="presto_to_gcs_multiple_types",
sql=f"select * from {SOURCE_MULTIPLE_TYPES}",
bucket=BUCKET_NAME,
filename=f"{safe_name(SOURCE_MULTIPLE_TYPES)}.{{}}.json",
schema_filename=f"{safe_name(SOURCE_MULTIPLE_TYPES)}-schema.json",
gzip=False,
)
有關 BigQuery schema 的更多資訊,請參閱 Big Query 文件中的 指定 schema。
將結果分割成多個檔案¶
此運算子支援將大型結果分割成多個檔案。`approx_max_file_size_bytes` 引數允許開發者指定分割檔案的大小。預設情況下,檔案大小不超過 1 900 000 000 位元組(1900 MB)
檢視 Google Cloud Storage 中的配額與限制,瞭解單個物件的最大允許檔案大小。
如果您想建立 10 MB 的檔案,您的程式碼可能如下所示
tests/system/google/cloud/gcs/example_presto_to_gcs.py
presto_to_gcs_many_chunks = PrestoToGCSOperator(
task_id="presto_to_gcs_many_chunks",
sql=f"select * from {SOURCE_CUSTOMER_TABLE}",
bucket=BUCKET_NAME,
filename=f"{safe_name(SOURCE_CUSTOMER_TABLE)}.{{}}.json",
schema_filename=f"{safe_name(SOURCE_CUSTOMER_TABLE)}-schema.json",
approx_max_file_size_bytes=10_000_000,
gzip=False,
)
使用 BigQuery 查詢資料¶
Google Cloud Storage 中的資料可供 BigQuery 使用。您可以將資料載入到 BigQuery,或在查詢中直接引用 GCS 資料。有關將資料載入到 BigQuery 的資訊,請查閱 BigQuery 文件中的 從 Cloud Storage 載入資料簡介。有關查詢 GCS 資料的資訊,請查閱 BigQuery 文件中的 查詢 Cloud Storage 資料。
Airflow 還提供了許多允許您使用 BigQuery 的運算子。例如,如果您想建立一個外部表,以便您可以建立直接從 GCS 讀取資料的查詢,那麼您可以使用 BigQueryCreateExternalTableOperator。使用此運算子的程式碼如下所示
tests/system/google/cloud/gcs/example_presto_to_gcs.py
create_external_table_multiple_types = BigQueryCreateTableOperator(
task_id="create_external_table_multiple_types",
dataset_id=DATASET_NAME,
table_id=f"{safe_name(SOURCE_MULTIPLE_TYPES)}",
table_resource={
"tableReference": {
"projectId": PROJECT_ID,
"datasetId": DATASET_NAME,
"tableId": f"{safe_name(SOURCE_MULTIPLE_TYPES)}",
},
"schema": {
"fields": [
{"name": "name", "type": "STRING"},
{"name": "post_abbr", "type": "STRING"},
]
},
"externalDataConfiguration": {
"sourceFormat": "NEWLINE_DELIMITED_JSON",
"compression": "NONE",
"csvOptions": {"skipLeadingRows": 1},
"sourceUris": [f"gs://{BUCKET_NAME}/{safe_name(SOURCE_MULTIPLE_TYPES)}.*.json"],
},
},
gcs_schema_object=f"gs://{BUCKET_NAME}/{safe_name(SOURCE_MULTIPLE_TYPES)}-schema.json",
)
有關 Airflow 和 BigQuery 整合的更多資訊,請查閱 Python API 參考 - bigquery。
參考¶
欲瞭解更多資訊,請查閱