Google Cloud BigQuery 運算子¶
BigQuery 是 Google 提供的一款完全託管、PB 級、低成本的分析型資料倉庫。它是一款無需資料庫管理員的無伺服器軟體即服務 (SaaS)。它允許使用者專注於分析資料,使用熟悉的 SQL 找出有價值的洞察。
Airflow 提供運算子來管理資料集和表、執行查詢和驗證資料。
前提任務¶
要使用這些運算子,您必須執行以下幾項操作
使用 Cloud Console 選擇或建立一個 Cloud Platform 專案。
為您的專案啟用結算功能,如Google Cloud 文件中所述。
啟用 API,如Cloud Console 文件中所述。
透過 pip 安裝 API 庫。
pip install 'apache-airflow[google]'有關安裝的詳細資訊可在以下連結找到。
管理資料集¶
建立資料集¶
要在 BigQuery 資料庫中建立一個空資料集,可以使用 BigQueryCreateEmptyDatasetOperator。
tests/system/google/cloud/bigquery/example_bigquery_dataset.py
create_dataset = BigQueryCreateEmptyDatasetOperator(task_id="create_dataset", dataset_id=DATASET_NAME)
獲取資料集詳情¶
要獲取現有資料集的詳細資訊,可以使用 BigQueryGetDatasetOperator。
此運算子返回一個 資料集資源。
tests/system/google/cloud/bigquery/example_bigquery_dataset.py
get_dataset = BigQueryGetDatasetOperator(task_id="get-dataset", dataset_id=DATASET_NAME)
列出資料集中的表¶
要檢索給定資料集中的表列表,請使用 BigQueryGetDatasetTablesOperator。
tests/system/google/cloud/bigquery/example_bigquery_tables.py
get_dataset_tables = BigQueryGetDatasetTablesOperator(
task_id="get_dataset_tables", dataset_id=DATASET_NAME
)
更新表¶
要在 BigQuery 中更新表,可以使用 BigQueryUpdateTableOperator。
update 方法會替換整個表資源,而 patch 方法只替換提交的表資源中提供的欄位。
tests/system/google/cloud/bigquery/example_bigquery_tables.py
update_table = BigQueryUpdateTableOperator(
task_id="update_table",
dataset_id=DATASET_NAME,
table_id="test_table",
fields=["friendlyName", "description"],
table_resource={
"friendlyName": "Updated Table",
"description": "Updated Table",
},
)
更新資料集¶
要在 BigQuery 中更新資料集,可以使用 BigQueryUpdateDatasetOperator。
update 方法會替換整個資料集資源,而 patch 方法只替換提交的資料集資源中提供的欄位。
tests/system/google/cloud/bigquery/example_bigquery_dataset.py
update_dataset = BigQueryUpdateDatasetOperator(
task_id="update_dataset",
dataset_id=DATASET_NAME,
dataset_resource={"description": "Updated dataset"},
)
刪除資料集¶
要從 BigQuery 資料庫中刪除現有資料集,可以使用 BigQueryDeleteDatasetOperator。
tests/system/google/cloud/bigquery/example_bigquery_dataset.py
delete_dataset = BigQueryDeleteDatasetOperator(
task_id="delete_dataset", dataset_id=DATASET_NAME, delete_contents=True
)
管理表¶
建立表¶
要在資料集中使用 Google Cloud Storage 中的資料建立新表,可以透過在 table_resource 欄位中提供表結構來使用 BigQueryCreateTableOperator。
tests/system/google/cloud/bigquery/example_bigquery_tables.py
create_table = BigQueryCreateTableOperator(
task_id="create_table",
dataset_id=DATASET_NAME,
table_id="test_table",
table_resource={
"schema": {
"fields": [
{"name": "emp_name", "type": "STRING", "mode": "REQUIRED"},
{"name": "salary", "type": "INTEGER", "mode": "NULLABLE"},
],
},
},
)
您也可以指定 Google Cloud Storage 物件名稱作為指定 schema 的方式。Google Cloud Storage 中的物件必須是包含 schema 欄位的 JSON 檔案。
tests/system/google/cloud/bigquery/example_bigquery_tables.py
create_table_schema_json = BigQueryCreateTableOperator(
task_id="create_table_schema_json",
dataset_id=DATASET_NAME,
table_id="test_table",
gcs_schema_object=GCS_PATH_TO_SCHEMA_JSON,
table_resource={
"tableReference": {
"projectId": PROJECT_ID,
"datasetId": DATASET_NAME,
"tableId": "test_table",
},
},
)
您可以使用此運算子在現有表之上建立一個檢視。
tests/system/google/cloud/bigquery/example_bigquery_tables.py
create_view = BigQueryCreateTableOperator(
task_id="create_view",
dataset_id=DATASET_NAME,
table_id="test_view",
table_resource={
"view": {
"query": f"SELECT * FROM `{PROJECT_ID}.{DATASET_NAME}.test_table`",
"useLegacySql": False,
},
},
)
您還可以使用此運算子建立物化檢視,該檢視會定期快取查詢結果以提高效能和效率。
tests/system/google/cloud/bigquery/example_bigquery_tables.py
create_materialized_view = BigQueryCreateTableOperator(
task_id="create_materialized_view",
dataset_id=DATASET_NAME,
table_id="test_materialized_view",
table_resource={
"materializedView": {
"query": f"SELECT SUM(salary) AS sum_salary FROM `{PROJECT_ID}.{DATASET_NAME}.test_table`",
"enableRefresh": True,
"refreshIntervalMs": 600000,
},
},
)
建立原生表¶
警告
此運算子已棄用,並將在 2025 年 7 月 30 日後移除。請使用 BigQueryCreateTableOperator。
要在給定的 BigQuery 資料集中建立一個新的空表,可以選擇使用 BigQueryCreateEmptyTableOperator 來指定 schema。
建立外部表¶
警告
此運算子已棄用,並將在 2025 年 7 月 30 日後移除。請使用 BigQueryCreateTableOperator。
要在資料集中使用 Google Cloud Storage 中的資料建立新的外部表,可以使用 BigQueryCreateExternalTableOperator。
從表中獲取資料¶
要從 BigQuery 表中獲取資料,可以使用 BigQueryGetDataOperator 。或者,如果您將欄位傳遞給 selected_fields,則可以獲取所選列的資料。
此運算子的結果可以根據 as_dict 引數的值以兩種不同的格式檢索:False(預設)- 一個 Python 列表的列表,其中巢狀列表中的元素數量將等於獲取的行數。巢狀中的每個元素將是一個巢狀列表,其中的元素代表該行的列值。True - 一個 Python 字典的列表,其中每個字典代表一行。在每個字典中,鍵是列名,值是這些列對應的數值。
tests/system/google/cloud/bigquery/example_bigquery_queries.py
get_data = BigQueryGetDataOperator(
task_id="get_data",
dataset_id=DATASET_NAME,
table_id=TABLE_1,
max_results=10,
selected_fields="value,name",
)
下面的示例展示瞭如何在非同步(可延遲)模式下使用 BigQueryGetDataOperator。請注意,可延遲任務需要在您的 Airflow 部署上執行 Triggerer。
tests/system/google/cloud/bigquery/example_bigquery_queries_async.py
get_data = BigQueryGetDataOperator(
task_id="get_data",
dataset_id=DATASET_NAME,
table_id=TABLE_NAME_1,
use_legacy_sql=False,
max_results=10,
selected_fields="value",
location=LOCATION,
deferrable=True,
)
Upsert 表¶
要 upsert 表,可以使用 BigQueryUpsertTableOperator。
此運算子要麼更新現有表,要麼在給定資料集中建立一個新的空表。
tests/system/google/cloud/bigquery/example_bigquery_tables.py
upsert_table = BigQueryUpsertTableOperator(
task_id="upsert_table",
dataset_id=DATASET_NAME,
table_resource={
"tableReference": {"tableId": "test_table_id"},
"expirationTime": (int(time.time()) + 300) * 1000,
},
)
更新表 Schema¶
要更新表的 schema,可以使用 BigQueryUpdateTableSchemaOperator。
此運算子會更新提供的 schema 欄位值,同時保留其餘值不變。例如,這對於在現有表 schema 上設定新的欄位描述非常有用。
tests/system/google/cloud/bigquery/example_bigquery_tables.py
update_table_schema = BigQueryUpdateTableSchemaOperator(
task_id="update_table_schema",
dataset_id=DATASET_NAME,
table_id="test_table",
schema_fields_updates=[
{"name": "emp_name", "description": "Name of employee"},
{"name": "salary", "description": "Monthly salary in USD"},
],
)
刪除表¶
要刪除現有表,可以使用 BigQueryDeleteTableOperator。
tests/system/google/cloud/bigquery/example_bigquery_tables.py
delete_table = BigQueryDeleteTableOperator(
task_id="delete_table",
deletion_dataset_table=f"{PROJECT_ID}.{DATASET_NAME}.test_table",
)
您也可以使用此運算子刪除檢視。
tests/system/google/cloud/bigquery/example_bigquery_tables.py
delete_view = BigQueryDeleteTableOperator(
task_id="delete_view",
deletion_dataset_table=f"{PROJECT_ID}.{DATASET_NAME}.test_view",
)
您也可以使用此運算子刪除物化檢視。
tests/system/google/cloud/bigquery/example_bigquery_tables.py
delete_materialized_view = BigQueryDeleteTableOperator(
task_id="delete_materialized_view",
deletion_dataset_table=f"{PROJECT_ID}.{DATASET_NAME}.test_materialized_view",
)
執行 BigQuery 作業¶
假設您想執行以下查詢。
tests/system/google/cloud/bigquery/example_bigquery_queries.py
INSERT_ROWS_QUERY = (
f"INSERT {DATASET_NAME}.{TABLE_1} VALUES "
f"(42, 'monty python', '{INSERT_DATE}'), "
f"(42, 'fishy fish', '{INSERT_DATE}');"
)
要在特定的 BigQuery 資料庫中執行 SQL 查詢,可以使用 BigQueryInsertJobOperator,並配置適當的查詢作業配置,該配置可以使用 Jinja 模板化。
tests/system/google/cloud/bigquery/example_bigquery_queries.py
insert_query_job = BigQueryInsertJobOperator(
task_id="insert_query_job",
configuration={
"query": {
"query": INSERT_ROWS_QUERY,
"useLegacySql": False,
"priority": "BATCH",
}
},
)
下面的示例展示瞭如何在非同步(可延遲)模式下使用 BigQueryInsertJobOperator。請注意,可延遲任務需要在您的 Airflow 部署上執行 Triggerer。
tests/system/google/cloud/bigquery/example_bigquery_queries_async.py
insert_query_job = BigQueryInsertJobOperator(
task_id="insert_query_job",
configuration={
"query": {
"query": INSERT_ROWS_QUERY,
"useLegacySql": False,
"priority": "BATCH",
}
},
location=LOCATION,
deferrable=True,
)
有關 BigQuery 作業型別的更多資訊,請檢視文件。
如果您想在配置中包含一些檔案,可以使用 Jinja 模板語言的 include 子句,如下所示
tests/system/google/cloud/bigquery/example_bigquery_queries.py
select_query_job = BigQueryInsertJobOperator(
task_id="select_query_job",
configuration={
"query": {
"query": "{% include QUERY_SQL_PATH %}",
"useLegacySql": False,
}
},
)
包含的檔案也可以使用 Jinja 模板,這對於 .sql 檔案非常有用。
此外,您可以使用 BigQueryInsertJobOperator 的 job_id 引數來提高冪等性。如果未傳遞此引數,則將使用 uuid 作為 job_id。如果提供了此引數,則運算子將嘗試使用此 job_id` 提交新作業。如果已存在具有此類 job_id 的作業,則它將重新連線到現有作業。
此外,對於所有這些操作,您都可以在可延遲模式下使用運算子
tests/system/google/cloud/bigquery/example_bigquery_queries_async.py
insert_query_job = BigQueryInsertJobOperator(
task_id="insert_query_job",
configuration={
"query": {
"query": INSERT_ROWS_QUERY,
"useLegacySql": False,
"priority": "BATCH",
}
},
location=LOCATION,
deferrable=True,
)
驗證資料¶
檢查查詢結果是否包含資料¶
要對 BigQuery 執行檢查,可以使用 BigQueryCheckOperator
此運算子需要一個返回單行的 sql 查詢。該第一行的每個值都使用 Python 的 bool 強制型別轉換進行評估。如果任何值返回 False,則檢查失敗並報錯。
tests/system/google/cloud/bigquery/example_bigquery_queries.py
check_count = BigQueryCheckOperator(
task_id="check_count",
sql=f"SELECT COUNT(*) FROM {DATASET_NAME}.{TABLE_1}",
use_legacy_sql=False,
)
您也可以在此運算子中使用可延遲模式
tests/system/google/cloud/bigquery/example_bigquery_queries_async.py
check_count = BigQueryCheckOperator(
task_id="check_count",
sql=f"SELECT COUNT(*) FROM {DATASET_NAME}.{TABLE_NAME_1}",
use_legacy_sql=False,
location=LOCATION,
deferrable=True,
)
將查詢結果與透過值進行比較¶
要使用 sql 程式碼執行簡單的值檢查,可以使用 BigQueryValueCheckOperator
這些運算子需要一個返回單行的 sql 查詢。該第一行的每個值都會與 pass_value 進行比較,pass_value 可以是字串或數值。如果為數值,您還可以指定 tolerance。
tests/system/google/cloud/bigquery/example_bigquery_queries.py
check_value = BigQueryValueCheckOperator(
task_id="check_value",
sql=f"SELECT COUNT(*) FROM {DATASET_NAME}.{TABLE_1}",
pass_value=4,
use_legacy_sql=False,
)
您也可以在此運算子中使用可延遲模式
tests/system/google/cloud/bigquery/example_bigquery_queries_async.py
check_value = BigQueryValueCheckOperator(
task_id="check_value",
sql=f"SELECT COUNT(*) FROM {DATASET_NAME}.{TABLE_NAME_1}",
pass_value=2,
use_legacy_sql=False,
location=LOCATION,
deferrable=True,
)
比較隨時間變化的指標¶
要檢查作為 SQL 表示式給出的指標值是否在與 days_back 之前的值的特定 tolerance 範圍內,您可以使用 BigQueryIntervalCheckOperator 或 BigQueryIntervalCheckAsyncOperator
tests/system/google/cloud/bigquery/example_bigquery_queries.py
check_interval = BigQueryIntervalCheckOperator(
task_id="check_interval",
table=f"{DATASET_NAME}.{TABLE_1}",
days_back=1,
metrics_thresholds={"COUNT(*)": 1.5},
use_legacy_sql=False,
)
您也可以在此運算子中使用可延遲模式
tests/system/google/cloud/bigquery/example_bigquery_queries_async.py
check_interval = BigQueryIntervalCheckOperator(
task_id="check_interval",
table=f"{DATASET_NAME}.{TABLE_NAME_1}",
days_back=1,
metrics_thresholds={"COUNT(*)": 1.5},
use_legacy_sql=False,
location=LOCATION,
deferrable=True,
)
使用預定義測試檢查列¶
要檢查列是否透過使用者可配置的測試,可以使用 BigQueryColumnCheckOperator
tests/system/google/cloud/bigquery/example_bigquery_queries.py
column_check = BigQueryColumnCheckOperator(
task_id="column_check",
table=f"{DATASET_NAME}.{TABLE_1}",
column_mapping={"value": {"null_check": {"equal_to": 0}}},
)
檢查表級別資料質量¶
要檢查表是否透過使用者定義的測試,可以使用 BigQueryTableCheckOperator
tests/system/google/cloud/bigquery/example_bigquery_queries.py
table_check = BigQueryTableCheckOperator(
task_id="table_check",
table=f"{DATASET_NAME}.{TABLE_1}",
checks={"row_count_check": {"check_statement": "COUNT(*) = 4"}},
)
感測器¶
檢查表是否存在¶
要檢查表是否存在,您可以定義一個感測器運算子。這允許延遲下游運算子的執行,直到表存在為止。如果表按日期分片,例如,您可以使用 {{ ds_nodash }} 宏作為表名字尾。
tests/system/google/cloud/bigquery/example_bigquery_sensors.py
check_table_exists = BigQueryTableExistenceSensor(
task_id="check_table_exists", project_id=PROJECT_ID, dataset_id=DATASET_NAME, table_id=TABLE_NAME
)
如果您想在感測器執行時釋放工作節點槽位,您也可以在此運算子中使用可延遲模式。
tests/system/google/cloud/bigquery/example_bigquery_sensors.py
check_table_exists_def = BigQueryTableExistenceSensor(
task_id="check_table_exists_def",
project_id=PROJECT_ID,
dataset_id=DATASET_NAME,
table_id=TABLE_NAME,
deferrable=True,
)
tests/system/google/cloud/bigquery/example_bigquery_sensors.py
check_table_exists_async = BigQueryTableExistenceSensor(
task_id="check_table_exists_async",
project_id=PROJECT_ID,
dataset_id=DATASET_NAME,
table_id=TABLE_NAME,
)
檢查表分割槽是否存在¶
要檢查表是否存在並且包含分割槽,可以使用 BigQueryTablePartitionExistenceSensor。
tests/system/google/cloud/bigquery/example_bigquery_sensors.py
check_table_partition_exists = BigQueryTablePartitionExistenceSensor(
task_id="check_table_partition_exists",
project_id=PROJECT_ID,
dataset_id=DATASET_NAME,
table_id=TABLE_NAME,
partition_id=PARTITION_NAME,
)
對於按 DAY 分割槽的表,partition_id 引數是一個格式為“%Y%m%d”的字串
如果您想在感測器執行時釋放工作節點槽位,您也可以在此運算子中使用可延遲模式。
tests/system/google/cloud/bigquery/example_bigquery_sensors.py
check_table_partition_exists_def = BigQueryTablePartitionExistenceSensor(
task_id="check_table_partition_exists_def",
project_id=PROJECT_ID,
dataset_id=DATASET_NAME,
table_id=TABLE_NAME,
partition_id=PARTITION_NAME,
deferrable=True,
)
tests/system/google/cloud/bigquery/example_bigquery_sensors.py
check_table_partition_exists_async = BigQueryTablePartitionExistenceSensor(
task_id="check_table_partition_exists_async",
partition_id=PARTITION_NAME,
project_id=PROJECT_ID,
dataset_id=DATASET_NAME,
table_id=TABLE_NAME,
)
參考資料¶
更多資訊,請參閱