Trino 到 Google Cloud Storage 傳輸運算子¶
Trino 是一個開源、快速、分散式的 SQL 查詢引擎,用於對從吉位元組到拍位元組的各種大小資料來源執行互動式分析查詢。Trino 允許在資料所在位置進行查詢,包括 Hive、Cassandra、關係資料庫甚至專有資料儲存。一個 Trino 查詢可以合併來自多個源的資料,從而實現對整個組織的分析。
Google Cloud Storage 允許在任何時間在全球範圍記憶體儲和檢索任何數量的資料。您可以使用它來儲存備份和歸檔資料,也可以作為 BigQuery 的資料來源。
資料傳輸¶
Trino 和 Google Storage 之間的資料傳輸透過 TrinoToGCSOperator 運算子執行。
此運算子有 3 個必需引數
sql- 要執行的 SQL。bucket- 要上傳到的 bucket。filename- 上傳到 Google Cloud Storage 時用作物件名稱的檔名。檔名中應指定{},以便運算子在檔案因大小而被分割的情況下注入檔案編號。
所有引數都在參考文件中描述 - TrinoToGCSOperator。
一個示例運算子呼叫可能如下所示
tests/system/google/cloud/gcs/example_trino_to_gcs.py
trino_to_gcs_basic = TrinoToGCSOperator( # TODO
task_id="trino_to_gcs_basic",
sql=f"select * from {SOURCE_SCHEMA_COLUMNS}",
bucket=GCS_BUCKET,
filename=f"{safe_name(SOURCE_SCHEMA_COLUMNS)}.{{}}.json",
)
資料格式選擇¶
運算子支援兩種輸出格式
json- JSON Lines(預設)csv
您可以透過 export_format 引數指定這些選項。
如果您希望建立 CSV 檔案,您的運算子呼叫可能如下所示
tests/system/google/cloud/gcs/example_trino_to_gcs.py
trino_to_gcs_csv = TrinoToGCSOperator(
task_id="trino_to_gcs_csv",
sql=f"select * from {SOURCE_SCHEMA_COLUMNS}",
bucket=GCS_BUCKET,
filename=f"{safe_name(SOURCE_SCHEMA_COLUMNS)}.{{}}.csv",
schema_filename=f"{safe_name(SOURCE_SCHEMA_COLUMNS)}-schema.json",
export_format="csv",
)
生成 BigQuery schema¶
如果您設定了 schema_filename 引數,將從資料庫匯出包含表 BigQuery schema 欄位的 .json 檔案並上傳到 bucket。
如果您想建立 schema 檔案,則示例運算子呼叫可能如下所示
tests/system/google/cloud/gcs/example_trino_to_gcs.py
trino_to_gcs_multiple_types = TrinoToGCSOperator( # TODO
task_id="trino_to_gcs_multiple_types",
sql=f"select * from {SOURCE_SCHEMA_COLUMNS}",
bucket=GCS_BUCKET,
filename=f"{safe_name(SOURCE_SCHEMA_COLUMNS)}.{{}}.json",
schema_filename=f"{safe_name(SOURCE_SCHEMA_COLUMNS)}-schema.json",
gzip=False,
)
有關 BigQuery schema 的更多資訊,請參閱 Big Query 文件中的指定 schema。
將結果分割成多個檔案¶
此運算子支援將大型結果分割成多個檔案的功能。approx_max_file_size_bytes 引數允許開發者指定分割檔案的檔案大小。預設情況下,檔案大小不超過 1 900 000 000 位元組(1900 MB)。
檢視 Google Cloud Storage 中的配額與限制,瞭解單個物件允許的最大檔案大小。
如果您想建立 10 MB 的檔案,您的程式碼可能如下所示
tests/system/google/cloud/gcs/example_trino_to_gcs.py
read_data_from_gcs_many_chunks = BigQueryInsertJobOperator(
task_id="read_data_from_gcs_many_chunks",
configuration={
"query": {
"query": f"SELECT COUNT(*) FROM `{GCP_PROJECT_ID}.{DATASET_NAME}."
f"{safe_name(SOURCE_CUSTOMER_TABLE)}`",
"useLegacySql": False,
}
},
)
使用 BigQuery 查詢資料¶
Google Cloud Storage 中可用的資料可以由 BigQuery 使用。您可以將資料載入到 BigQuery,或在查詢中直接引用 GCS 資料。有關將資料載入到 BigQuery 的資訊,請參閱 BigQuery 文件中的從 Cloud Storage 載入資料簡介。有關查詢 GCS 資料的資訊,請參閱 BigQuery 文件中的查詢 Cloud Storage 資料。
Airflow 也有許多運算子允許您使用 BigQuery。例如,如果您想建立一個外部表,以便您可以建立直接讀取 GCS 資料的查詢,那麼您可以使用 BigQueryCreateExternalTableOperator。使用此運算子如下所示
tests/system/google/cloud/gcs/example_trino_to_gcs.py
create_external_table_multiple_types = BigQueryCreateTableOperator(
task_id="create_external_table_multiple_types",
dataset_id=DATASET_NAME,
table_id=f"{safe_name(SOURCE_SCHEMA_COLUMNS)}",
table_resource={
"schema": {
"fields": [
{"name": "table_catalog", "type": "STRING"},
{"name": "table_schema", "type": "STRING"},
{"name": "table_name", "type": "STRING"},
{"name": "column_name", "type": "STRING"},
{"name": "ordinal_position", "type": "INT64"},
{"name": "column_default", "type": "STRING"},
{"name": "is_nullable", "type": "STRING"},
{"name": "data_type", "type": "STRING"},
],
},
"externalDataConfiguration": {
"sourceFormat": "NEWLINE_DELIMITED_JSON",
"compression": "NONE",
"sourceUris": [f"gs://{GCS_BUCKET}/{safe_name(SOURCE_SCHEMA_COLUMNS)}.*.json"],
},
},
gcs_schema_object=f"gs://{GCS_BUCKET}/{safe_name(SOURCE_SCHEMA_COLUMNS)}-schema.json",
)
有關 Airflow 和 BigQuery 整合的更多資訊,請參閱 Python API 參考 - bigquery。
參考¶
有關更多資訊,請參閱