airflow.providers.google.cloud.hooks.bigquery

BigQuery Hook 以及一個非常基礎的 BigQuery 的 PEP 249 實現。

屬性

log

BigQueryJob

BigQueryHook

與 BigQuery 互動。

BigQueryConnection

BigQuery 連線。

BigQueryBaseCursor

BigQuery 遊標。

BigQueryCursor

一個非常基礎的 BigQuery 的 PEP 249 遊標實現。

BigQueryAsyncHook

使用 gcloud-aio 庫檢索作業詳情。

BigQueryTableAsyncHook

用於 BigQuery Table 的非同步 Hook。

函式

split_tablename(table_input, default_project_id[, ...])

模組內容

airflow.providers.google.cloud.hooks.bigquery.log[source]
airflow.providers.google.cloud.hooks.bigquery.BigQueryJob[source]
class airflow.providers.google.cloud.hooks.bigquery.BigQueryHook(use_legacy_sql=True, location=None, priority='INTERACTIVE', api_resource_configs=None, impersonation_scopes=None, labels=None, **kwargs)[source]

基類: airflow.providers.google.common.hooks.base_google.GoogleBaseHook, airflow.providers.common.sql.hooks.sql.DbApiHook

與 BigQuery 互動。

此 Hook 使用 Google Cloud 連線。

引數:
  • gcp_conn_id – 用於 GCP 憑據的 Airflow 連線。

  • use_legacy_sql (bool) – 這指定是否使用舊版 SQL 方言。

  • location (str | None) – BigQuery 資源的位置。

  • priority (str) – 指定查詢的優先順序。可能的值包括 INTERACTIVE 和 BATCH。預設值為 INTERACTIVE。

  • api_resource_configs (dict | None) – 這包含應用於 Google BigQuery 作業的引數配置。

  • impersonation_chain – 這是可選的服務賬號,用於使用短期憑據進行模擬。

  • impersonation_scopes (str | collections.abc.Sequence[str] | None) – 被模擬賬號的可選作用域列表。將覆蓋連線中的作用域。

  • labels (dict | None) – BigQuery 資源標籤。

conn_name_attr = 'gcp_conn_id'[source]
default_conn_name = 'google_cloud_bigquery_default'[source]
conn_type = 'gcpbigquery'[source]
hook_name = 'Google Bigquery'[source]
classmethod get_connection_form_widgets()[source]

返回要新增到連線表單的連線小部件。

classmethod get_ui_field_behaviour()[source]

返回自定義欄位行為。

use_legacy_sql: bool[source]
location: str | None[source]
priority: str[source]
running_job_id: str | None = None[source]
api_resource_configs: dict[source]
labels[source]
impersonation_scopes: str | collections.abc.Sequence[str] | None = None[source]
get_conn()[source]

獲取 BigQuery PEP 249 連線物件。

get_client(project_id=PROVIDE_PROJECT_ID, location=None)[source]

獲取已認證的 BigQuery Client。

引數:
  • project_id (str) – 客戶端代表其執行操作的專案 ID。

  • location (str | None) – 作業 / 資料集 / 表格的預設位置。

get_uri()[source]

覆蓋自 DbApiHook 用於 get_sqlalchemy_engine()

get_sqlalchemy_engine(engine_kwargs=None)[source]

建立一個 SQLAlchemy 引擎物件。

引數:

engine_kwargs (dict | None) – create_engine() 中使用的 Kwargs。

get_records(sql, parameters=None)[source]

執行 sql 並返回一組記錄。

引數:
  • sql – 要執行的 sql 語句(str)或要執行的 sql 語句列表

  • parameters – 用於渲染 SQL 查詢的引數。

abstract insert_rows(table, rows, target_fields=None, commit_every=1000, replace=False, **kwargs)[source]

插入行。

當前不支援插入。理論上,您可以使用 BigQuery 的流式 API 將行插入到表中,但這尚未實現。

get_pandas_df(sql, parameters=None, dialect=None, **kwargs)[source]

獲取 BigQuery 結果的 Pandas DataFrame。

必須覆蓋 DbApiHook 方法,因為 Pandas 不支援 PEP 249 連線,SQLite 除外。

引數:
  • sql (str) – 要執行的 BigQuery SQL。

  • parameters (collections.abc.Iterable | collections.abc.Mapping[str, Any] | None) – 用於渲染 SQL 查詢的引數(未使用,保留以覆蓋超類方法)

  • dialect (str | None) – BigQuery SQL 方言 – 舊版 SQL 或標準 SQL;如果未指定,則預設為使用 self.use_legacy_sql

  • kwargs – (可選) 傳遞到 pandas_gbq.read_gbq 方法中

table_exists(dataset_id, table_id, project_id)[source]

檢查 Google BigQuery 中是否存在表。

引數:
  • project_id (str) – 要查詢表的 Google Cloud 專案。提供給 Hook 的連線必須提供對指定專案的訪問許可權。

  • dataset_id (str) – 要查詢表的所在資料集的名稱。

  • table_id (str) – 要檢查其是否存在的表的名稱。

table_partition_exists(dataset_id, table_id, partition_id, project_id)[source]

檢查 Google BigQuery 中是否存在分割槽。

引數:
  • project_id (str) – 要查詢表的 Google Cloud 專案。提供給 Hook 的連線必須提供對指定專案的訪問許可權。

  • dataset_id (str) – 要查詢表的所在資料集的名稱。

  • table_id (str) – 要檢查其是否存在的表的名稱。

  • partition_id (str) – 要檢查其是否存在的分割槽的名稱。

create_empty_table(project_id=PROVIDE_PROJECT_ID, dataset_id=None, table_id=None, table_resource=None, schema_fields=None, time_partitioning=None, cluster_fields=None, labels=None, view=None, materialized_view=None, encryption_configuration=None, retry=DEFAULT_RETRY, location=None, exists_ok=True)[source]

在資料集中建立一個新的空表。

要建立一個由 SQL 查詢定義的檢視,請將字典解析到 view 引數。

引數:
返回值:

建立的表

返回型別:

google.cloud.bigquery.table.Table

create_table(dataset_id, table_id, table_resource, location=None, project_id=PROVIDE_PROJECT_ID, exists_ok=True, schema_fields=None, retry=DEFAULT_RETRY, timeout=None)[source]

在資料集中建立一個新的空表。

引數:
create_empty_dataset(dataset_id=None, project_id=PROVIDE_PROJECT_ID, location=None, dataset_reference=None, exists_ok=True)[source]

建立一個新的空資料集。

引數:
  • project_id (str) – 我們要在其中建立空資料集的專案的名稱。如果 dataset_reference 中有 projectId,則無需提供此引數。

  • dataset_id (str | None) – 資料集的 ID。如果 dataset_reference 中有 datasetId,則無需提供此引數。

  • location (str | None) – (可選) 資料集應駐留的地理位置。沒有預設值,但如果未提供任何值,資料集將在美國建立。

  • dataset_reference (dict[str, Any] | None) – 可隨請求正文提供的 Dataset reference。更多資訊請參閱: https://cloud.google.com/bigquery/docs/reference/rest/v2/datasets#resource

  • exists_ok (bool) – 如果為 True,則在建立資料集時忽略“已存在”錯誤。

get_dataset_tables(dataset_id, project_id=PROVIDE_PROJECT_ID, max_results=None, retry=DEFAULT_RETRY)[source]

獲取指定資料集的表列表。

更多資訊請參閱:https://cloud.google.com/bigquery/docs/reference/rest/v2/tables/list

引數:
  • dataset_id (str) – 請求的資料集的 ID。

  • project_id (str) – (可選) 請求的資料集所屬的專案。如果為 None,則使用 self.project_id。

  • max_results (int | None) – (可選) 要返回的最大表數。

  • retry (google.api_core.retry.Retry) – 如何重試 RPC 呼叫。

返回值:

與資料集關聯的表列表。

返回型別:

list[dict[str, Any]]

delete_dataset(dataset_id, project_id=PROVIDE_PROJECT_ID, delete_contents=False, retry=DEFAULT_RETRY)[source]

在您的專案中刪除一個 BigQuery 資料集。

引數:
  • project_id (str) – 資料集所在的專案名稱。

  • dataset_id (str) – 要刪除的資料集。

  • delete_contents (bool) – 如果為 True,則刪除資料集中的所有表。如果為 False 且資料集包含表,則請求將失敗。

  • retry (google.api_core.retry.Retry) – 如何重試 RPC 呼叫。

update_table(table_resource, fields=None, dataset_id=None, table_id=None, project_id=PROVIDE_PROJECT_ID)[source]

更改表的某些欄位。

使用 fields 指定要更新的欄位。必須至少提供一個欄位。如果某個欄位列在 fields 中,但在 table 中為 None,則該欄位值將被刪除。

如果 table.etag 不為 None,則只有當伺服器上的表具有相同的 ETag 時,更新才會成功。因此,使用 get_table 讀取表,更改其欄位,然後將其傳遞給 update_table,將確保更改僅在自讀取以來未發生對錶的修改時才會儲存。

引數:
  • project_id (str) – 要在其中建立表的專案。

  • dataset_id (str | None) – 要在其中建立表的資料集。

  • table_id (str | None) – 要建立的表的名稱。

  • table_resource (dict[str, Any]) – 如文件中所述的表資源:https://cloud.google.com/bigquery/docs/reference/rest/v2/tables#Table 表必須包含 tableReference 或必須提供 project_iddataset_idtable_id

  • fields (list[str] | None) – 要更改的 table 欄位,拼寫與 Table 屬性相同(例如,“friendly_name”)。

insert_all(project_id, dataset_id, table_id, rows, ignore_unknown_values=False, skip_invalid_rows=False, fail_on_error=False)[source]

無需載入作業,即可一次一條記錄地將資料流式傳輸到 BigQuery 中。

引數:
  • project_id (str) – 表所在的專案名稱

  • dataset_id (str) – 表所在的資料集名稱

  • table_id (str) – 表名稱

  • rows (list) –

    要插入的行

    rows = [{"json": {"a_key": "a_value_0"}}, {"json": {"a_key": "a_value_1"}}]
    

  • ignore_unknown_values (bool) – [可選] 接受包含與架構不匹配的值的行。未知值將被忽略。預設值為 false,將未知值視為錯誤。

  • skip_invalid_rows (bool) – [可選] 插入請求中的所有有效行,即使存在無效行。預設值為 false,如果存在任何無效行,將導致整個請求失敗。

  • fail_on_error (bool) – [可選] 如果發生任何錯誤,強制任務失敗。預設值為 false,表示即使發生任何插入錯誤,任務也不應失敗。

update_dataset(fields, dataset_resource, dataset_id=None, project_id=PROVIDE_PROJECT_ID, retry=DEFAULT_RETRY)[source]

更改資料集的某些欄位。

使用 fields 指定要更新的欄位。必須至少提供一個欄位。如果某個欄位列在 fields 中,但在 dataset 中為 None,則該欄位將被刪除。

如果 dataset.etag 不為 None,則只有當伺服器上的資料集具有相同的 ETag 時,更新才會成功。因此,使用 get_dataset 讀取資料集,更改其欄位,然後將其傳遞給 update_dataset,將確保更改僅在自讀取以來未發生對資料集的修改時才會儲存。

引數:
get_datasets_list(project_id=PROVIDE_PROJECT_ID, include_all=False, filter_=None, max_results=None, page_token=None, retry=DEFAULT_RETRY, return_iterator=False)[source]

獲取當前專案中的所有 BigQuery 資料集。

更多資訊請參閱:https://cloud.google.com/bigquery/docs/reference/rest/v2/datasets/list

引數:
  • project_id (str) – 您嘗試獲取所有資料集的 Google Cloud 專案

  • include_all (bool) – 如果結果包含隱藏資料集,則為 True。預設為 False。

  • filter – 按標籤過濾結果的表示式。有關語法,請參閱 https://cloud.google.com/bigquery/docs/reference/rest/v2/datasets/list#filter

  • filter – 字串

  • max_results (int | None) – 要返回的最大資料集數。

  • max_results – 整型

  • page_token (str | None) – 表示資料集遊標的令牌。如果未傳遞,API 將返回第一頁資料集。該令牌標記要返回的迭代器的開頭,並且可以透過 HTTPIteratornext_page_token 訪問 page_token 的值。

  • page_token – 字串

  • retry (google.api_core.retry.Retry) – 如何重試 RPC 呼叫。

  • return_iterator (bool) – 返回 HTTPIterator 而不是 list[Row],HTTPIterator 可用於獲取 next_page_token 屬性。

get_dataset(dataset_id, project_id=PROVIDE_PROJECT_ID)[source]

獲取由 dataset_id 引用的資料集。

引數:
  • dataset_id (str) – BigQuery 資料集 ID

  • project_id (str) – Google Cloud 專案 ID

返回值:

資料集資源

返回型別:

google.cloud.bigquery.dataset.Dataset

另請參閱

更多資訊請參閱 Dataset Resource 內容:https://cloud.google.com/bigquery/docs/reference/rest/v2/datasets#resource

run_grant_dataset_view_access(source_dataset, view_dataset, view_table, view_project=None, project_id=PROVIDE_PROJECT_ID)[source]

將資料集的授權檢視訪問許可權授予視圖表。

如果此檢視已獲得資料集的訪問許可權,則不做任何操作。此方法不是原子性的。執行它可能會破壞同時進行的更新。

引數:
  • source_dataset (str) – 源資料集

  • view_dataset (str) – 檢視所在的資料集

  • view_table (str) – 視圖表

  • project_id (str) – 源資料集所屬的專案。如果為 None,則使用 self.project_id。

  • view_project (str | None) – 檢視所在的專案。如果為 None,則使用 self.project_id。

返回值:

源資料集的資料集資源。

返回型別:

dict[str, Any]

run_table_upsert(dataset_id, table_resource, project_id=PROVIDE_PROJECT_ID)[source]

如果表存在則更新,否則建立新表。

由於 BigQuery 本身不支援表 upsert,此操作不是原子性的。

引數:
delete_table(table_id, not_found_ok=True, project_id=PROVIDE_PROJECT_ID)[source]

從資料集中刪除現有表。

如果表不存在,則除非將 not_found_ok 設定為 True,否則返回錯誤。

引數:
  • table_id (str) – 用點號分隔的 (<project>.|<project>:)<dataset>.<table> 格式,指示要刪除哪個表。

  • not_found_ok (bool) – 如果為 True,則即使請求的表不存在也返回成功。

  • project_id (str) – 用於執行請求的專案

list_rows(dataset_id, table_id, max_results=None, selected_fields=None, page_token=None, start_index=None, project_id=PROVIDE_PROJECT_ID, location=None, retry=DEFAULT_RETRY, return_iterator=False)[source]

列出表中的行。

請參閱 https://cloud.google.com/bigquery/docs/reference/rest/v2/tabledata/list

引數:
  • dataset_id (str) – 請求的表的資料集 ID。

  • table_id (str) – 請求的表的表 ID。

  • max_results (int | None) – 要返回的最大結果數。

  • selected_fields (list[str] | str | None) – 要返回的欄位列表(逗號分隔)。如果未指定,則返回所有欄位。

  • page_token (str | None) – 分頁令牌,由先前的呼叫返回,用於標識結果集。

  • start_index (int | None) – 要讀取的起始行的從零開始的索引。

  • project_id (str) – 客戶端代表其執行操作的專案 ID。

  • location (str | None) – 作業的預設位置。

  • retry (google.api_core.retry.Retry) – 如何重試 RPC 呼叫。

  • return_iterator (bool) – 返回 RowIterator 而不是 list[Row],RowIterator 可用於獲取 next_page_token 屬性。

返回值:

行列表

返回型別:

list[google.cloud.bigquery.table.Row] | google.cloud.bigquery.table.RowIterator

get_schema(dataset_id, table_id, project_id=PROVIDE_PROJECT_ID)[source]

獲取指定資料集和表的架構。

引數:
  • dataset_id (str) – 請求的表的資料集 ID

  • table_id (str) – 請求的表的表 ID

  • project_id (str) – 請求的表的可選專案 ID。如果未提供,將使用聯結器配置的專案。

返回值:

表架構

返回型別:

dict

update_table_schema(schema_fields_updates, include_policy_tags, dataset_id, table_id, project_id=PROVIDE_PROJECT_ID)[source]

更新指定資料集和表的架構中的欄位。

請注意,架構中的某些欄位是不可變的;嘗試更改它們將導致異常。

如果包含新欄位,則會插入該欄位,這要求設定所有必填欄位。

引數:
  • include_policy_tags (bool) – 如果設定為 True,策略標籤將包含在更新請求中,即使未更改,這也需要特殊許可權,請參閱 https://cloud.google.com/bigquery/docs/column-level-security#roles

  • dataset_id (str) – 要更新的表的 資料集 ID

  • table_id (str) – 要更新的表的 表 ID

  • schema_fields_updates (list[dict[str, Any]]) –

    部分架構資源。請參閱 https://cloud.google.com/bigquery/docs/reference/rest/v2/tables#TableSchema

    schema_fields_updates = [
        {"name": "emp_name", "description": "Some New Description"},
        {"name": "salary", "description": "Some New Description"},
        {
            "name": "departments",
            "fields": [
                {"name": "name", "description": "Some New Description"},
                {"name": "type", "description": "Some New Description"},
            ],
        },
    ]
    

  • project_id (str) – 要更新表的專案名稱。

poll_job_complete(job_id, project_id=PROVIDE_PROJECT_ID, location=None, retry=DEFAULT_RETRY)[source]

檢查作業是否已完成。

引數:
  • job_id (str) – 作業 ID。

  • project_id (str) – 作業執行所在的 Google Cloud 專案

  • location (str | None) – 作業執行所在的位置

  • retry (google.api_core.retry.Retry) – 如何重試 RPC 呼叫。

cancel_job(job_id, project_id=PROVIDE_PROJECT_ID, location=None)[source]

取消作業並等待取消完成。

引數:
  • job_id (str) – 作業 ID。

  • project_id (str) – 作業執行所在的 Google Cloud 專案

  • location (str | None) – 作業執行所在的位置

get_job(job_id, project_id=PROVIDE_PROJECT_ID, location=None)[source]

獲取 BigQuery 作業。

引數:
  • job_id (str) – 作業的 ID。ID 必須僅包含字母 (a-z, A-Z)、數字 (0-9)、下劃線 (_ 或短劃線 (-)。最大長度為 1,024 個字元。

  • project_id (str) – 作業執行所在的 Google Cloud 專案。

  • location (str | None) – 作業執行所在的位置。

insert_job(configuration, job_id=None, project_id=PROVIDE_PROJECT_ID, location=None, nowait=False, retry=DEFAULT_RETRY, timeout=None)[source]

執行 BigQuery 作業並等待其完成。

引數:
  • configuration (dict) – configuration 引數直接對映到 BigQuery 作業物件中的 configuration 欄位。有關詳細資訊,請參閱 https://cloud.google.com/bigquery/docs/reference/v2/jobs

  • job_id (str | None) – 作業的 ID。ID 必須僅包含字母 (a-z, A-Z)、數字 (0-9)、下劃線 (_) 或短劃線 (-)。最大長度為 1,024 個字元。如果未提供,則將生成 uuid。

  • project_id (str) – 作業執行所在的 Google Cloud 專案。

  • location (str | None) – 作業執行的位置。

  • nowait (bool) – 是否在不等待結果的情況下插入作業。

  • retry (google.api_core.retry.Retry) – 如何重試 RPC 呼叫。

  • timeout (float | None) – 在使用 retry 之前等待底層 HTTP 傳輸的秒數。

返回值:

作業 ID。

返回型別:

BigQueryJob

generate_job_id(job_id, dag_id, task_id, logical_date, configuration, force_rerun=False)[source]
split_tablename(table_input, default_project_id, var_name=None)[source]
get_query_results(job_id, location, max_results=None, selected_fields=None, project_id=PROVIDE_PROJECT_ID, retry=DEFAULT_RETRY, job_retry=DEFAULT_JOB_RETRY)[source]

獲取給定 job_id 的查詢結果。

引數:
  • job_id (str) – 作業的 ID。ID 必須僅包含字母 (a-z, A-Z)、數字 (0-9)、下劃線 (_ 或短劃線 (-)。最大長度為 1,024 個字元。

  • location (str) – 操作使用的位置。

  • selected_fields (list[str] | str | None) – 要返回的欄位列表(逗號分隔)。如果未指定,則返回所有欄位。

  • max_results (int | None) – 從表中獲取的最大記錄(行)數。

  • project_id (str) – 作業執行所在的 Google Cloud 專案。

  • retry (google.api_core.retry.Retry) – 如何重試 RPC 呼叫。

  • job_retry (google.api_core.retry.Retry) – 如何重試失敗的作業。

返回值:

如果指定了 selected fields,則返回按這些欄位過濾的行列表

丟擲:

AirflowException

返回型別:

list[dict[str, Any]]

property scopes: collections.abc.Sequence[str][source]

返回 OAuth 2.0 scopes。

返回值:

返回 impersonation_scopes 中定義的 scopes、連線配置中的 scopes 或預設 scopes。

返回型別:

collections.abc.Sequence[str]

class airflow.providers.google.cloud.hooks.bigquery.BigQueryConnection(*args, **kwargs)[source]

BigQuery 連線。

BigQuery 沒有持久連線的概念。因此,這些物件是遊標的無狀態小型工廠,所有實際工作都由遊標完成。

close()[source]

不執行任何操作。BigQueryConnection 不需要此操作。

commit()[source]

不執行任何操作。BigQueryConnection 不支援事務。

cursor()[source]

使用連線返回一個新的 Cursor 物件。

abstract rollback()[source]

不執行任何操作。BigQueryConnection 不支援事務。

class airflow.providers.google.cloud.hooks.bigquery.BigQueryBaseCursor(service, project_id, hook, use_legacy_sql=True, api_resource_configs=None, location=None, num_retries=5, labels=None)[source]

Bases: airflow.utils.log.logging_mixin.LoggingMixin

BigQuery 遊標。

BigQuery 基本遊標包含用於針對 BigQuery 執行查詢的輔助方法。在不需要 PEP 249 遊標的情況下,運算子可以直接使用這些方法。

service[source]
project_id[source]
use_legacy_sql = True[source]
api_resource_configs: dict[source]
running_job_id: str | None = None[source]
location = None[source]
num_retries = 5[source]
labels = None[source]
hook[source]
class airflow.providers.google.cloud.hooks.bigquery.BigQueryCursor(service, project_id, hook, use_legacy_sql=True, location=None, num_retries=5)[source]

Bases: BigQueryBaseCursor

一個非常基礎的 BigQuery 的 PEP 249 遊標實現。

參考使用了 PyHive PEP 249 實現

https://github.com/dropbox/PyHive/blob/master/pyhive/presto.py https://github.com/dropbox/PyHive/blob/master/pyhive/common.py

buffersize: int | None = None[source]
page_token: str | None = None[source]
job_id: str | None = None[source]
buffer: list = [][source]
all_pages_loaded: bool = False[source]
property description: list[source]

返回遊標描述。

close()[source]

預設情況下,不執行任何操作。

property rowcount: int[source]

預設情況下,返回 -1 表示不支援此功能。

execute(operation, parameters=None)[source]

執行 BigQuery 查詢,並更新 BigQueryCursor 描述。

引數:
  • operation (str) – 要執行的查詢。

  • parameters (dict | None) – 要替換到查詢中的引數。

executemany(operation, seq_of_parameters)[source]

使用不同的引數多次執行 BigQuery 查詢。

引數:
  • operation (str) – 要執行的查詢。

  • seq_of_parameters (list) – 要替換到查詢中的字典引數列表。

flush_results()[source]

重新整理與結果相關的遊標屬性。

fetchone()[source]

獲取查詢結果集的下一行。

next()[source]

從緩衝區返回下一行。

fetchone 的輔助方法。

如果緩衝區為空,則嘗試透過結果集分頁獲取下一頁,並將其載入到緩衝區中。

fetchmany(size=None)[source]

獲取查詢結果的下一組行。

這將返回一個序列的序列(例如,一個元組列表)。當沒有更多行可用時,返回一個空序列。

每次呼叫獲取的行數由引數指定。如果未給定,則由遊標的 arraysize 確定要獲取的行數。

此方法嘗試獲取由 size 引數指示的儘可能多的行。如果由於指定的行數不可用而無法做到,則可能會返回較少的行。

如果之前呼叫 execute() 未產生任何結果集,或者尚未發出任何呼叫,則會引發 Error(或其子類)異常。

fetchall()[source]

獲取查詢結果的所有(剩餘)行。

返回一個序列的序列(例如,一個元組列表)。

get_arraysize()[source]

獲取每次獲取的行數。

另請參閱

fetchmany()

set_arraysize(arraysize)[source]

設定每次獲取的行數。

另請參閱

fetchmany()

arraysize[source]
setinputsizes(sizes)[source]

預設情況下,不執行任何操作。

setoutputsize(size, column=None)[source]

預設情況下,不執行任何操作。

airflow.providers.google.cloud.hooks.bigquery.split_tablename(table_input, default_project_id, var_name=None)[source]
class airflow.providers.google.cloud.hooks.bigquery.BigQueryAsyncHook(gcp_conn_id='google_cloud_default', impersonation_chain=None, **kwargs)[source]

Bases: airflow.providers.google.common.hooks.base_google.GoogleBaseAsyncHook

使用 gcloud-aio 庫檢索作業詳情。

sync_hook_class[source]
async get_job_instance(project_id, job_id, session)[source]

根據 job ID 和 project ID 獲取指定的作業資源。

async get_job_status(job_id, project_id=PROVIDE_PROJECT_ID, location=None)[source]
async get_job_output(job_id, project_id=PROVIDE_PROJECT_ID)[source]

非同步獲取給定 job ID 的 BigQuery 作業輸出。

async create_job_for_partition_get(dataset_id, table_id=None, project_id=PROVIDE_PROJECT_ID)[source]

使用 gcloud-aio 建立新作業並獲取 job_id。

async cancel_job(job_id, project_id, location)[source]

取消一個 BigQuery 作業。

引數:
  • job_id (str) – 要取消的任務的 ID。

  • project_id (str | None) – 任務執行所在的 Google Cloud 專案。

  • location (str | None) – 任務執行所在的區域。

get_records(query_results, as_dict=False, selected_fields=None)[來源]

將 BigQuery 的響應轉換為記錄。

引數:
  • query_results (dict[str, Any]) – SQL 查詢的結果

  • as_dict (bool) – 如果為 True,則將結果作為字典列表返回;否則作為列表的列表返回。

  • selected_fields (str | list[str] | None)

value_check(sql, pass_value, records, tolerance=None)[來源]

將單行查詢結果和容差與 pass_value 進行匹配。

丟擲:

AirflowException – 如果匹配失敗

interval_check(row1, row2, metrics_thresholds, ignore_zero, ratio_formula)[來源]

檢查指標(SQL 表示式)的值是否在一定的容差範圍內。

引數:
  • row1 (str | None) – 第一個 SQL 查詢執行任務的第一個結果行

  • row2 (str | None) – 第二個 SQL 查詢執行任務的第一個結果行

  • metrics_thresholds (dict[str, Any]) – 一個以指標為鍵的比例字典,例如 'COUNT(*)': 1.5 要求當前日期的資料與 days_back 天之前的資料之間的差異小於或等於 50%。

  • ignore_zero (bool) – 是否應該忽略值為零的指標

  • ratio_formula (str) – 用於計算兩個指標之間比例的公式。假設 cur 是今天的指標,ref 是 days_back 天前的指標。 max_over_min: 計算 max(cur, ref) / min(cur, ref) relative_diff: 計算 abs(cur-ref) / ref

class airflow.providers.google.cloud.hooks.bigquery.BigQueryTableAsyncHook(gcp_conn_id='google_cloud_default', impersonation_chain=None, **kwargs)[來源]

Bases: airflow.providers.google.common.hooks.base_google.GoogleBaseAsyncHook

用於 BigQuery Table 的非同步 Hook。

sync_hook_class[來源]
async get_table_client(dataset, table_id, project_id, session)[來源]

獲取 Google Big Query 表物件。

引數:
  • dataset (str) – 查詢表儲存桶的資料集名稱。

  • table_id (str) – 要檢查其是否存在的表的名稱。

  • project_id (str) – 要查詢表的 Google Cloud 專案。提供給 Hook 的連線必須提供對指定專案的訪問許可權。

  • session (aiohttp.ClientSession) – aiohttp ClientSession

此條目有幫助嗎?