Google Cloud BigQuery 運算子

BigQuery 是 Google 提供的一款完全託管、PB 級、低成本的分析型資料倉庫。它是一款無需資料庫管理員的無伺服器軟體即服務 (SaaS)。它允許使用者專注於分析資料,使用熟悉的 SQL 找出有價值的洞察。

Airflow 提供運算子來管理資料集和表、執行查詢和驗證資料。

前提任務

要使用這些運算子,您必須執行以下幾項操作

管理資料集

建立資料集

要在 BigQuery 資料庫中建立一個空資料集,可以使用 BigQueryCreateEmptyDatasetOperator

tests/system/google/cloud/bigquery/example_bigquery_dataset.py

create_dataset = BigQueryCreateEmptyDatasetOperator(task_id="create_dataset", dataset_id=DATASET_NAME)

獲取資料集詳情

要獲取現有資料集的詳細資訊,可以使用 BigQueryGetDatasetOperator

此運算子返回一個 資料集資源

tests/system/google/cloud/bigquery/example_bigquery_dataset.py

get_dataset = BigQueryGetDatasetOperator(task_id="get-dataset", dataset_id=DATASET_NAME)

列出資料集中的表

要檢索給定資料集中的表列表,請使用 BigQueryGetDatasetTablesOperator

tests/system/google/cloud/bigquery/example_bigquery_tables.py

get_dataset_tables = BigQueryGetDatasetTablesOperator(
    task_id="get_dataset_tables", dataset_id=DATASET_NAME
)

更新表

要在 BigQuery 中更新表,可以使用 BigQueryUpdateTableOperator

update 方法會替換整個表資源,而 patch 方法只替換提交的表資源中提供的欄位。

tests/system/google/cloud/bigquery/example_bigquery_tables.py

update_table = BigQueryUpdateTableOperator(
    task_id="update_table",
    dataset_id=DATASET_NAME,
    table_id="test_table",
    fields=["friendlyName", "description"],
    table_resource={
        "friendlyName": "Updated Table",
        "description": "Updated Table",
    },
)

更新資料集

要在 BigQuery 中更新資料集,可以使用 BigQueryUpdateDatasetOperator

update 方法會替換整個資料集資源,而 patch 方法只替換提交的資料集資源中提供的欄位。

tests/system/google/cloud/bigquery/example_bigquery_dataset.py

update_dataset = BigQueryUpdateDatasetOperator(
    task_id="update_dataset",
    dataset_id=DATASET_NAME,
    dataset_resource={"description": "Updated dataset"},
)

刪除資料集

要從 BigQuery 資料庫中刪除現有資料集,可以使用 BigQueryDeleteDatasetOperator

tests/system/google/cloud/bigquery/example_bigquery_dataset.py

delete_dataset = BigQueryDeleteDatasetOperator(
    task_id="delete_dataset", dataset_id=DATASET_NAME, delete_contents=True
)

管理表

建立表

要在資料集中使用 Google Cloud Storage 中的資料建立新表,可以透過在 table_resource 欄位中提供表結構來使用 BigQueryCreateTableOperator

tests/system/google/cloud/bigquery/example_bigquery_tables.py

create_table = BigQueryCreateTableOperator(
    task_id="create_table",
    dataset_id=DATASET_NAME,
    table_id="test_table",
    table_resource={
        "schema": {
            "fields": [
                {"name": "emp_name", "type": "STRING", "mode": "REQUIRED"},
                {"name": "salary", "type": "INTEGER", "mode": "NULLABLE"},
            ],
        },
    },
)

您也可以指定 Google Cloud Storage 物件名稱作為指定 schema 的方式。Google Cloud Storage 中的物件必須是包含 schema 欄位的 JSON 檔案。

tests/system/google/cloud/bigquery/example_bigquery_tables.py

create_table_schema_json = BigQueryCreateTableOperator(
    task_id="create_table_schema_json",
    dataset_id=DATASET_NAME,
    table_id="test_table",
    gcs_schema_object=GCS_PATH_TO_SCHEMA_JSON,
    table_resource={
        "tableReference": {
            "projectId": PROJECT_ID,
            "datasetId": DATASET_NAME,
            "tableId": "test_table",
        },
    },
)

您可以使用此運算子在現有表之上建立一個檢視。

tests/system/google/cloud/bigquery/example_bigquery_tables.py

create_view = BigQueryCreateTableOperator(
    task_id="create_view",
    dataset_id=DATASET_NAME,
    table_id="test_view",
    table_resource={
        "view": {
            "query": f"SELECT * FROM `{PROJECT_ID}.{DATASET_NAME}.test_table`",
            "useLegacySql": False,
        },
    },
)

您還可以使用此運算子建立物化檢視,該檢視會定期快取查詢結果以提高效能和效率。

tests/system/google/cloud/bigquery/example_bigquery_tables.py

create_materialized_view = BigQueryCreateTableOperator(
    task_id="create_materialized_view",
    dataset_id=DATASET_NAME,
    table_id="test_materialized_view",
    table_resource={
        "materializedView": {
            "query": f"SELECT SUM(salary) AS sum_salary FROM `{PROJECT_ID}.{DATASET_NAME}.test_table`",
            "enableRefresh": True,
            "refreshIntervalMs": 600000,
        },
    },
)

建立原生表

警告

此運算子已棄用,並將在 2025 年 7 月 30 日後移除。請使用 BigQueryCreateTableOperator

要在給定的 BigQuery 資料集中建立一個新的空表,可以選擇使用 BigQueryCreateEmptyTableOperator 來指定 schema。

建立外部表

警告

此運算子已棄用,並將在 2025 年 7 月 30 日後移除。請使用 BigQueryCreateTableOperator

要在資料集中使用 Google Cloud Storage 中的資料建立新的外部表,可以使用 BigQueryCreateExternalTableOperator

從表中獲取資料

要從 BigQuery 表中獲取資料,可以使用 BigQueryGetDataOperator 。或者,如果您將欄位傳遞給 selected_fields,則可以獲取所選列的資料。

此運算子的結果可以根據 as_dict 引數的值以兩種不同的格式檢索:False(預設)- 一個 Python 列表的列表,其中巢狀列表中的元素數量將等於獲取的行數。巢狀中的每個元素將是一個巢狀列表,其中的元素代表該行的列值。True - 一個 Python 字典的列表,其中每個字典代表一行。在每個字典中,鍵是列名,值是這些列對應的數值。

tests/system/google/cloud/bigquery/example_bigquery_queries.py

get_data = BigQueryGetDataOperator(
    task_id="get_data",
    dataset_id=DATASET_NAME,
    table_id=TABLE_1,
    max_results=10,
    selected_fields="value,name",
)

下面的示例展示瞭如何在非同步(可延遲)模式下使用 BigQueryGetDataOperator。請注意,可延遲任務需要在您的 Airflow 部署上執行 Triggerer。

tests/system/google/cloud/bigquery/example_bigquery_queries_async.py

get_data = BigQueryGetDataOperator(
    task_id="get_data",
    dataset_id=DATASET_NAME,
    table_id=TABLE_NAME_1,
    use_legacy_sql=False,
    max_results=10,
    selected_fields="value",
    location=LOCATION,
    deferrable=True,
)

Upsert 表

要 upsert 表,可以使用 BigQueryUpsertTableOperator

此運算子要麼更新現有表,要麼在給定資料集中建立一個新的空表。

tests/system/google/cloud/bigquery/example_bigquery_tables.py

upsert_table = BigQueryUpsertTableOperator(
    task_id="upsert_table",
    dataset_id=DATASET_NAME,
    table_resource={
        "tableReference": {"tableId": "test_table_id"},
        "expirationTime": (int(time.time()) + 300) * 1000,
    },
)

更新表 Schema

要更新表的 schema,可以使用 BigQueryUpdateTableSchemaOperator

此運算子會更新提供的 schema 欄位值,同時保留其餘值不變。例如,這對於在現有表 schema 上設定新的欄位描述非常有用。

tests/system/google/cloud/bigquery/example_bigquery_tables.py

update_table_schema = BigQueryUpdateTableSchemaOperator(
    task_id="update_table_schema",
    dataset_id=DATASET_NAME,
    table_id="test_table",
    schema_fields_updates=[
        {"name": "emp_name", "description": "Name of employee"},
        {"name": "salary", "description": "Monthly salary in USD"},
    ],
)

刪除表

要刪除現有表,可以使用 BigQueryDeleteTableOperator

tests/system/google/cloud/bigquery/example_bigquery_tables.py

delete_table = BigQueryDeleteTableOperator(
    task_id="delete_table",
    deletion_dataset_table=f"{PROJECT_ID}.{DATASET_NAME}.test_table",
)

您也可以使用此運算子刪除檢視。

tests/system/google/cloud/bigquery/example_bigquery_tables.py

delete_view = BigQueryDeleteTableOperator(
    task_id="delete_view",
    deletion_dataset_table=f"{PROJECT_ID}.{DATASET_NAME}.test_view",
)

您也可以使用此運算子刪除物化檢視。

tests/system/google/cloud/bigquery/example_bigquery_tables.py

delete_materialized_view = BigQueryDeleteTableOperator(
    task_id="delete_materialized_view",
    deletion_dataset_table=f"{PROJECT_ID}.{DATASET_NAME}.test_materialized_view",
)

執行 BigQuery 作業

假設您想執行以下查詢。

tests/system/google/cloud/bigquery/example_bigquery_queries.py

INSERT_ROWS_QUERY = (
    f"INSERT {DATASET_NAME}.{TABLE_1} VALUES "
    f"(42, 'monty python', '{INSERT_DATE}'), "
    f"(42, 'fishy fish', '{INSERT_DATE}');"
)

要在特定的 BigQuery 資料庫中執行 SQL 查詢,可以使用 BigQueryInsertJobOperator,並配置適當的查詢作業配置,該配置可以使用 Jinja 模板化。

tests/system/google/cloud/bigquery/example_bigquery_queries.py

insert_query_job = BigQueryInsertJobOperator(
    task_id="insert_query_job",
    configuration={
        "query": {
            "query": INSERT_ROWS_QUERY,
            "useLegacySql": False,
            "priority": "BATCH",
        }
    },
)

下面的示例展示瞭如何在非同步(可延遲)模式下使用 BigQueryInsertJobOperator。請注意,可延遲任務需要在您的 Airflow 部署上執行 Triggerer。

tests/system/google/cloud/bigquery/example_bigquery_queries_async.py

insert_query_job = BigQueryInsertJobOperator(
    task_id="insert_query_job",
    configuration={
        "query": {
            "query": INSERT_ROWS_QUERY,
            "useLegacySql": False,
            "priority": "BATCH",
        }
    },
    location=LOCATION,
    deferrable=True,
)

有關 BigQuery 作業型別的更多資訊,請檢視文件

如果您想在配置中包含一些檔案,可以使用 Jinja 模板語言的 include 子句,如下所示

tests/system/google/cloud/bigquery/example_bigquery_queries.py

select_query_job = BigQueryInsertJobOperator(
    task_id="select_query_job",
    configuration={
        "query": {
            "query": "{% include QUERY_SQL_PATH %}",
            "useLegacySql": False,
        }
    },
)

包含的檔案也可以使用 Jinja 模板,這對於 .sql 檔案非常有用。

此外,您可以使用 BigQueryInsertJobOperatorjob_id 引數來提高冪等性。如果未傳遞此引數,則將使用 uuid 作為 job_id。如果提供了此引數,則運算子將嘗試使用此 job_id` 提交新作業。如果已存在具有此類 job_id 的作業,則它將重新連線到現有作業。

此外,對於所有這些操作,您都可以在可延遲模式下使用運算子

tests/system/google/cloud/bigquery/example_bigquery_queries_async.py

insert_query_job = BigQueryInsertJobOperator(
    task_id="insert_query_job",
    configuration={
        "query": {
            "query": INSERT_ROWS_QUERY,
            "useLegacySql": False,
            "priority": "BATCH",
        }
    },
    location=LOCATION,
    deferrable=True,
)

驗證資料

檢查查詢結果是否包含資料

要對 BigQuery 執行檢查,可以使用 BigQueryCheckOperator

此運算子需要一個返回單行的 sql 查詢。該第一行的每個值都使用 Python 的 bool 強制型別轉換進行評估。如果任何值返回 False,則檢查失敗並報錯。

tests/system/google/cloud/bigquery/example_bigquery_queries.py

check_count = BigQueryCheckOperator(
    task_id="check_count",
    sql=f"SELECT COUNT(*) FROM {DATASET_NAME}.{TABLE_1}",
    use_legacy_sql=False,
)

您也可以在此運算子中使用可延遲模式

tests/system/google/cloud/bigquery/example_bigquery_queries_async.py

check_count = BigQueryCheckOperator(
    task_id="check_count",
    sql=f"SELECT COUNT(*) FROM {DATASET_NAME}.{TABLE_NAME_1}",
    use_legacy_sql=False,
    location=LOCATION,
    deferrable=True,
)

將查詢結果與透過值進行比較

要使用 sql 程式碼執行簡單的值檢查,可以使用 BigQueryValueCheckOperator

這些運算子需要一個返回單行的 sql 查詢。該第一行的每個值都會與 pass_value 進行比較,pass_value 可以是字串或數值。如果為數值,您還可以指定 tolerance

tests/system/google/cloud/bigquery/example_bigquery_queries.py

check_value = BigQueryValueCheckOperator(
    task_id="check_value",
    sql=f"SELECT COUNT(*) FROM {DATASET_NAME}.{TABLE_1}",
    pass_value=4,
    use_legacy_sql=False,
)

您也可以在此運算子中使用可延遲模式

tests/system/google/cloud/bigquery/example_bigquery_queries_async.py

check_value = BigQueryValueCheckOperator(
    task_id="check_value",
    sql=f"SELECT COUNT(*) FROM {DATASET_NAME}.{TABLE_NAME_1}",
    pass_value=2,
    use_legacy_sql=False,
    location=LOCATION,
    deferrable=True,
)

比較隨時間變化的指標

要檢查作為 SQL 表示式給出的指標值是否在與 days_back 之前的值的特定 tolerance 範圍內,您可以使用 BigQueryIntervalCheckOperatorBigQueryIntervalCheckAsyncOperator

tests/system/google/cloud/bigquery/example_bigquery_queries.py

check_interval = BigQueryIntervalCheckOperator(
    task_id="check_interval",
    table=f"{DATASET_NAME}.{TABLE_1}",
    days_back=1,
    metrics_thresholds={"COUNT(*)": 1.5},
    use_legacy_sql=False,
)

您也可以在此運算子中使用可延遲模式

tests/system/google/cloud/bigquery/example_bigquery_queries_async.py

check_interval = BigQueryIntervalCheckOperator(
    task_id="check_interval",
    table=f"{DATASET_NAME}.{TABLE_NAME_1}",
    days_back=1,
    metrics_thresholds={"COUNT(*)": 1.5},
    use_legacy_sql=False,
    location=LOCATION,
    deferrable=True,
)

使用預定義測試檢查列

要檢查列是否透過使用者可配置的測試,可以使用 BigQueryColumnCheckOperator

tests/system/google/cloud/bigquery/example_bigquery_queries.py

column_check = BigQueryColumnCheckOperator(
    task_id="column_check",
    table=f"{DATASET_NAME}.{TABLE_1}",
    column_mapping={"value": {"null_check": {"equal_to": 0}}},
)

檢查表級別資料質量

要檢查表是否透過使用者定義的測試,可以使用 BigQueryTableCheckOperator

tests/system/google/cloud/bigquery/example_bigquery_queries.py

table_check = BigQueryTableCheckOperator(
    task_id="table_check",
    table=f"{DATASET_NAME}.{TABLE_1}",
    checks={"row_count_check": {"check_statement": "COUNT(*) = 4"}},
)

感測器

檢查表是否存在

要檢查表是否存在,您可以定義一個感測器運算子。這允許延遲下游運算子的執行,直到表存在為止。如果表按日期分片,例如,您可以使用 {{ ds_nodash }} 宏作為表名字尾。

BigQueryTableExistenceSensor.

tests/system/google/cloud/bigquery/example_bigquery_sensors.py

check_table_exists = BigQueryTableExistenceSensor(
    task_id="check_table_exists", project_id=PROJECT_ID, dataset_id=DATASET_NAME, table_id=TABLE_NAME
)

如果您想在感測器執行時釋放工作節點槽位,您也可以在此運算子中使用可延遲模式。

tests/system/google/cloud/bigquery/example_bigquery_sensors.py

check_table_exists_def = BigQueryTableExistenceSensor(
    task_id="check_table_exists_def",
    project_id=PROJECT_ID,
    dataset_id=DATASET_NAME,
    table_id=TABLE_NAME,
    deferrable=True,
)

tests/system/google/cloud/bigquery/example_bigquery_sensors.py

check_table_exists_async = BigQueryTableExistenceSensor(
    task_id="check_table_exists_async",
    project_id=PROJECT_ID,
    dataset_id=DATASET_NAME,
    table_id=TABLE_NAME,
)

檢查表分割槽是否存在

要檢查表是否存在並且包含分割槽,可以使用 BigQueryTablePartitionExistenceSensor

tests/system/google/cloud/bigquery/example_bigquery_sensors.py

check_table_partition_exists = BigQueryTablePartitionExistenceSensor(
    task_id="check_table_partition_exists",
    project_id=PROJECT_ID,
    dataset_id=DATASET_NAME,
    table_id=TABLE_NAME,
    partition_id=PARTITION_NAME,
)

對於按 DAY 分割槽的表,partition_id 引數是一個格式為“%Y%m%d”的字串

如果您想在感測器執行時釋放工作節點槽位,您也可以在此運算子中使用可延遲模式。

tests/system/google/cloud/bigquery/example_bigquery_sensors.py

check_table_partition_exists_def = BigQueryTablePartitionExistenceSensor(
    task_id="check_table_partition_exists_def",
    project_id=PROJECT_ID,
    dataset_id=DATASET_NAME,
    table_id=TABLE_NAME,
    partition_id=PARTITION_NAME,
    deferrable=True,
)

tests/system/google/cloud/bigquery/example_bigquery_sensors.py

check_table_partition_exists_async = BigQueryTablePartitionExistenceSensor(
    task_id="check_table_partition_exists_async",
    partition_id=PARTITION_NAME,
    project_id=PROJECT_ID,
    dataset_id=DATASET_NAME,
    table_id=TABLE_NAME,
)

參考資料

更多資訊,請參閱

此條目是否有幫助?