airflow.providers.google.cloud.hooks.bigquery¶
BigQuery Hook 以及一個非常基礎的 BigQuery 的 PEP 249 實現。
屬性¶
類¶
與 BigQuery 互動。 |
|
BigQuery 連線。 |
|
BigQuery 遊標。 |
|
一個非常基礎的 BigQuery 的 PEP 249 遊標實現。 |
|
使用 gcloud-aio 庫檢索作業詳情。 |
|
用於 BigQuery Table 的非同步 Hook。 |
函式¶
|
模組內容¶
- class airflow.providers.google.cloud.hooks.bigquery.BigQueryHook(use_legacy_sql=True, location=None, priority='INTERACTIVE', api_resource_configs=None, impersonation_scopes=None, labels=None, **kwargs)[source]¶
基類:
airflow.providers.google.common.hooks.base_google.GoogleBaseHook,airflow.providers.common.sql.hooks.sql.DbApiHook與 BigQuery 互動。
此 Hook 使用 Google Cloud 連線。
- 引數:
gcp_conn_id – 用於 GCP 憑據的 Airflow 連線。
use_legacy_sql (bool) – 這指定是否使用舊版 SQL 方言。
location (str | None) – BigQuery 資源的位置。
priority (str) – 指定查詢的優先順序。可能的值包括 INTERACTIVE 和 BATCH。預設值為 INTERACTIVE。
api_resource_configs (dict | None) – 這包含應用於 Google BigQuery 作業的引數配置。
impersonation_chain – 這是可選的服務賬號,用於使用短期憑據進行模擬。
impersonation_scopes (str | collections.abc.Sequence[str] | None) – 被模擬賬號的可選作用域列表。將覆蓋連線中的作用域。
labels (dict | None) – BigQuery 資源標籤。
- get_sqlalchemy_engine(engine_kwargs=None)[source]¶
建立一個 SQLAlchemy 引擎物件。
- 引數:
engine_kwargs (dict | None) –
create_engine()中使用的 Kwargs。
- get_records(sql, parameters=None)[source]¶
執行 sql 並返回一組記錄。
- 引數:
sql – 要執行的 sql 語句(str)或要執行的 sql 語句列表
parameters – 用於渲染 SQL 查詢的引數。
- abstract insert_rows(table, rows, target_fields=None, commit_every=1000, replace=False, **kwargs)[source]¶
插入行。
當前不支援插入。理論上,您可以使用 BigQuery 的流式 API 將行插入到表中,但這尚未實現。
- get_pandas_df(sql, parameters=None, dialect=None, **kwargs)[source]¶
獲取 BigQuery 結果的 Pandas DataFrame。
必須覆蓋 DbApiHook 方法,因為 Pandas 不支援 PEP 249 連線,SQLite 除外。
另請參閱
https://github.com/pandas-dev/pandas/blob/055d008615272a1ceca9720dc365a2abd316f353/pandas/io/sql.py#L415 https://github.com/pandas-dev/pandas/issues/6900
- 引數:
sql (str) – 要執行的 BigQuery SQL。
parameters (collections.abc.Iterable | collections.abc.Mapping[str, Any] | None) – 用於渲染 SQL 查詢的引數(未使用,保留以覆蓋超類方法)
dialect (str | None) – BigQuery SQL 方言 – 舊版 SQL 或標準 SQL;如果未指定,則預設為使用 self.use_legacy_sql
kwargs – (可選) 傳遞到 pandas_gbq.read_gbq 方法中
- table_partition_exists(dataset_id, table_id, partition_id, project_id)[source]¶
檢查 Google BigQuery 中是否存在分割槽。
- create_empty_table(project_id=PROVIDE_PROJECT_ID, dataset_id=None, table_id=None, table_resource=None, schema_fields=None, time_partitioning=None, cluster_fields=None, labels=None, view=None, materialized_view=None, encryption_configuration=None, retry=DEFAULT_RETRY, location=None, exists_ok=True)[source]¶
在資料集中建立一個新的空表。
要建立一個由 SQL 查詢定義的檢視,請將字典解析到 view 引數。
- 引數:
project_id (str) – 要在其中建立表的專案。
dataset_id (str | None) – 要在其中建立表的資料集。
table_id (str | None) – 要建立的表的名稱。
table_resource (dict[str, Any] | None) – 文件中描述的表資源: https://cloud.google.com/bigquery/docs/reference/rest/v2/tables#Table 如果提供此引數,將忽略所有其他引數。
schema_fields (list | None) –
如果設定,此處定義的 schema 欄位列表: https://cloud.google.com/bigquery/docs/reference/rest/v2/jobs#configuration.load.schema
schema_fields = [ {"name": "emp_name", "type": "STRING", "mode": "REQUIRED"}, {"name": "salary", "type": "INTEGER", "mode": "NULLABLE"}, ]
labels (dict | None) – 包含表標籤的字典,傳遞給 BigQuery
retry (google.api_core.retry.Retry) – 可選。如何重試 RPC。
time_partitioning (dict | None) –
配置可選的時間分割槽欄位,即根據 API 規範按欄位、型別和過期時間進行分割槽。
cluster_fields (list[str] | None) – [可選] 用於聚簇的欄位。BigQuery 支援對分割槽表和非分割槽表進行聚簇。 https://cloud.google.com/bigquery/docs/reference/rest/v2/tables#clustering.fields
view (dict | None) –
[可選] 包含檢視定義的字典。如果設定,它將建立一個檢視而不是表: https://cloud.google.com/bigquery/docs/reference/rest/v2/tables#ViewDefinition
view = { "query": "SELECT * FROM `test-project-id.test_dataset_id.test_table_prefix*` LIMIT 1000", "useLegacySql": False, }
materialized_view (dict | None) – [可選] 物化檢視定義。
encryption_configuration (dict | None) –
[可選] 自定義加密配置(例如,Cloud KMS 金鑰)。
encryption_configuration = { "kmsKeyName": "projects/testp/locations/us/keyRings/test-kr/cryptoKeys/test-key", }
num_retries – 連線問題時的最大重試次數。
location (str | None) – (可選) 表應駐留的地理位置。
exists_ok (bool) – 如果為
True,建立表時忽略“已存在”錯誤。
- 返回值:
建立的表
- 返回型別:
- create_table(dataset_id, table_id, table_resource, location=None, project_id=PROVIDE_PROJECT_ID, exists_ok=True, schema_fields=None, retry=DEFAULT_RETRY, timeout=None)[source]¶
在資料集中建立一個新的空表。
- 引數:
project_id (str) – 可選。要在其中建立表的專案。
dataset_id (str) – 必需。要在其中建立表的資料集。
table_id (str) – 必需。要建立的表的名稱。
table_resource (dict[str, Any] | google.cloud.bigquery.table.Table | google.cloud.bigquery.table.TableReference | google.cloud.bigquery.table.TableListItem) – 必需。文件中描述的表資源: https://cloud.google.com/bigquery/docs/reference/rest/v2/tables#Table 如果
table是一個引用,將建立一個具有指定 ID 的空表。表所屬的資料集必須已存在。schema_fields (list | None) –
可選。如果設定,此處定義的 schema 欄位列表: https://cloud.google.com/bigquery/docs/reference/rest/v2/jobs#configuration.load.schema
schema_fields = [ {"name": "emp_name", "type": "STRING", "mode": "REQUIRED"}, {"name": "salary", "type": "INTEGER", "mode": "NULLABLE"}, ]
location (str | None) – 可選。操作使用的位置。
exists_ok (bool) – 可選。如果為
True,建立表時忽略“已存在”錯誤。retry (google.api_core.retry.Retry) – 可選。用於重試請求的重試物件。如果指定 None,請求將不會被重試。
timeout (float | None) – 可選。等待請求完成的時間量,以秒為單位。請注意,如果指定了 retry,則超時適用於每次單獨的嘗試。
- create_empty_dataset(dataset_id=None, project_id=PROVIDE_PROJECT_ID, location=None, dataset_reference=None, exists_ok=True)[source]¶
建立一個新的空資料集。
- 引數:
project_id (str) – 我們要在其中建立空資料集的專案的名稱。如果 dataset_reference 中有 projectId,則無需提供此引數。
dataset_id (str | None) – 資料集的 ID。如果 dataset_reference 中有 datasetId,則無需提供此引數。
location (str | None) – (可選) 資料集應駐留的地理位置。沒有預設值,但如果未提供任何值,資料集將在美國建立。
dataset_reference (dict[str, Any] | None) – 可隨請求正文提供的 Dataset reference。更多資訊請參閱: https://cloud.google.com/bigquery/docs/reference/rest/v2/datasets#resource
exists_ok (bool) – 如果為
True,則在建立資料集時忽略“已存在”錯誤。
- get_dataset_tables(dataset_id, project_id=PROVIDE_PROJECT_ID, max_results=None, retry=DEFAULT_RETRY)[source]¶
獲取指定資料集的表列表。
更多資訊請參閱:https://cloud.google.com/bigquery/docs/reference/rest/v2/tables/list
- delete_dataset(dataset_id, project_id=PROVIDE_PROJECT_ID, delete_contents=False, retry=DEFAULT_RETRY)[source]¶
在您的專案中刪除一個 BigQuery 資料集。
- 引數:
project_id (str) – 資料集所在的專案名稱。
dataset_id (str) – 要刪除的資料集。
delete_contents (bool) – 如果為 True,則刪除資料集中的所有表。如果為 False 且資料集包含表,則請求將失敗。
retry (google.api_core.retry.Retry) – 如何重試 RPC 呼叫。
- update_table(table_resource, fields=None, dataset_id=None, table_id=None, project_id=PROVIDE_PROJECT_ID)[source]¶
更改表的某些欄位。
使用
fields指定要更新的欄位。必須至少提供一個欄位。如果某個欄位列在fields中,但在table中為None,則該欄位值將被刪除。如果
table.etag不為None,則只有當伺服器上的表具有相同的 ETag 時,更新才會成功。因此,使用get_table讀取表,更改其欄位,然後將其傳遞給update_table,將確保更改僅在自讀取以來未發生對錶的修改時才會儲存。- 引數:
project_id (str) – 要在其中建立表的專案。
dataset_id (str | None) – 要在其中建立表的資料集。
table_id (str | None) – 要建立的表的名稱。
table_resource (dict[str, Any]) – 如文件中所述的表資源:https://cloud.google.com/bigquery/docs/reference/rest/v2/tables#Table 表必須包含
tableReference或必須提供project_id、dataset_id和table_id。fields (list[str] | None) – 要更改的
table欄位,拼寫與 Table 屬性相同(例如,“friendly_name”)。
- insert_all(project_id, dataset_id, table_id, rows, ignore_unknown_values=False, skip_invalid_rows=False, fail_on_error=False)[source]¶
無需載入作業,即可一次一條記錄地將資料流式傳輸到 BigQuery 中。
- 引數:
project_id (str) – 表所在的專案名稱
dataset_id (str) – 表所在的資料集名稱
table_id (str) – 表名稱
rows (list) –
要插入的行
rows = [{"json": {"a_key": "a_value_0"}}, {"json": {"a_key": "a_value_1"}}]
ignore_unknown_values (bool) – [可選] 接受包含與架構不匹配的值的行。未知值將被忽略。預設值為 false,將未知值視為錯誤。
skip_invalid_rows (bool) – [可選] 插入請求中的所有有效行,即使存在無效行。預設值為 false,如果存在任何無效行,將導致整個請求失敗。
fail_on_error (bool) – [可選] 如果發生任何錯誤,強制任務失敗。預設值為 false,表示即使發生任何插入錯誤,任務也不應失敗。
- update_dataset(fields, dataset_resource, dataset_id=None, project_id=PROVIDE_PROJECT_ID, retry=DEFAULT_RETRY)[source]¶
更改資料集的某些欄位。
使用
fields指定要更新的欄位。必須至少提供一個欄位。如果某個欄位列在fields中,但在dataset中為None,則該欄位將被刪除。如果
dataset.etag不為None,則只有當伺服器上的資料集具有相同的 ETag 時,更新才會成功。因此,使用get_dataset讀取資料集,更改其欄位,然後將其傳遞給update_dataset,將確保更改僅在自讀取以來未發生對資料集的修改時才會儲存。- 引數:
dataset_resource (dict[str, Any]) – 將在請求正文中提供的資料集資源。https://cloud.google.com/bigquery/docs/reference/rest/v2/datasets#resource
dataset_id (str | None) – 資料集的 ID。
fields (collections.abc.Sequence[str]) – 要更改的
dataset屬性(例如,“friendly_name”)。project_id (str) – Google Cloud 專案 ID
retry (google.api_core.retry.Retry) – 如何重試 RPC 呼叫。
- get_datasets_list(project_id=PROVIDE_PROJECT_ID, include_all=False, filter_=None, max_results=None, page_token=None, retry=DEFAULT_RETRY, return_iterator=False)[source]¶
獲取當前專案中的所有 BigQuery 資料集。
更多資訊請參閱:https://cloud.google.com/bigquery/docs/reference/rest/v2/datasets/list
- 引數:
project_id (str) – 您嘗試獲取所有資料集的 Google Cloud 專案
include_all (bool) – 如果結果包含隱藏資料集,則為 True。預設為 False。
filter – 按標籤過濾結果的表示式。有關語法,請參閱 https://cloud.google.com/bigquery/docs/reference/rest/v2/datasets/list#filter。
filter – 字串
max_results (int | None) – 要返回的最大資料集數。
max_results – 整型
page_token (str | None) – 表示資料集遊標的令牌。如果未傳遞,API 將返回第一頁資料集。該令牌標記要返回的迭代器的開頭,並且可以透過
HTTPIterator的next_page_token訪問page_token的值。page_token – 字串
retry (google.api_core.retry.Retry) – 如何重試 RPC 呼叫。
return_iterator (bool) – 返回 HTTPIterator 而不是 list[Row],HTTPIterator 可用於獲取 next_page_token 屬性。
- get_dataset(dataset_id, project_id=PROVIDE_PROJECT_ID)[source]¶
獲取由 dataset_id 引用的資料集。
- 引數:
- 返回值:
資料集資源
- 返回型別:
另請參閱
更多資訊請參閱 Dataset Resource 內容:https://cloud.google.com/bigquery/docs/reference/rest/v2/datasets#resource
- run_grant_dataset_view_access(source_dataset, view_dataset, view_table, view_project=None, project_id=PROVIDE_PROJECT_ID)[source]¶
將資料集的授權檢視訪問許可權授予視圖表。
如果此檢視已獲得資料集的訪問許可權,則不做任何操作。此方法不是原子性的。執行它可能會破壞同時進行的更新。
- run_table_upsert(dataset_id, table_resource, project_id=PROVIDE_PROJECT_ID)[source]¶
如果表存在則更新,否則建立新表。
由於 BigQuery 本身不支援表 upsert,此操作不是原子性的。
- 引數:
dataset_id (str) – 要將表 upsert 到其中的資料集。
table_resource (dict[str, Any]) – 表資源。請參閱 https://cloud.google.com/bigquery/docs/reference/v2/tables#resource
project_id (str) – 要將表 upsert 到其中的專案。如果為 None,則使用 self.project_id。
- delete_table(table_id, not_found_ok=True, project_id=PROVIDE_PROJECT_ID)[source]¶
從資料集中刪除現有表。
如果表不存在,則除非將 not_found_ok 設定為 True,否則返回錯誤。
- list_rows(dataset_id, table_id, max_results=None, selected_fields=None, page_token=None, start_index=None, project_id=PROVIDE_PROJECT_ID, location=None, retry=DEFAULT_RETRY, return_iterator=False)[source]¶
列出表中的行。
請參閱 https://cloud.google.com/bigquery/docs/reference/rest/v2/tabledata/list
- 引數:
dataset_id (str) – 請求的表的資料集 ID。
table_id (str) – 請求的表的表 ID。
max_results (int | None) – 要返回的最大結果數。
selected_fields (list[str] | str | None) – 要返回的欄位列表(逗號分隔)。如果未指定,則返回所有欄位。
page_token (str | None) – 分頁令牌,由先前的呼叫返回,用於標識結果集。
start_index (int | None) – 要讀取的起始行的從零開始的索引。
project_id (str) – 客戶端代表其執行操作的專案 ID。
location (str | None) – 作業的預設位置。
retry (google.api_core.retry.Retry) – 如何重試 RPC 呼叫。
return_iterator (bool) – 返回 RowIterator 而不是 list[Row],RowIterator 可用於獲取 next_page_token 屬性。
- 返回值:
行列表
- 返回型別:
list[google.cloud.bigquery.table.Row] | google.cloud.bigquery.table.RowIterator
- update_table_schema(schema_fields_updates, include_policy_tags, dataset_id, table_id, project_id=PROVIDE_PROJECT_ID)[source]¶
更新指定資料集和表的架構中的欄位。
請注意,架構中的某些欄位是不可變的;嘗試更改它們將導致異常。
如果包含新欄位,則會插入該欄位,這要求設定所有必填欄位。
- 引數:
include_policy_tags (bool) – 如果設定為 True,策略標籤將包含在更新請求中,即使未更改,這也需要特殊許可權,請參閱 https://cloud.google.com/bigquery/docs/column-level-security#roles
dataset_id (str) – 要更新的表的 資料集 ID
table_id (str) – 要更新的表的 表 ID
schema_fields_updates (list[dict[str, Any]]) –
部分架構資源。請參閱 https://cloud.google.com/bigquery/docs/reference/rest/v2/tables#TableSchema
schema_fields_updates = [ {"name": "emp_name", "description": "Some New Description"}, {"name": "salary", "description": "Some New Description"}, { "name": "departments", "fields": [ {"name": "name", "description": "Some New Description"}, {"name": "type", "description": "Some New Description"}, ], }, ]
project_id (str) – 要更新表的專案名稱。
- poll_job_complete(job_id, project_id=PROVIDE_PROJECT_ID, location=None, retry=DEFAULT_RETRY)[source]¶
檢查作業是否已完成。
- 引數:
job_id (str) – 作業 ID。
project_id (str) – 作業執行所在的 Google Cloud 專案
location (str | None) – 作業執行所在的位置
retry (google.api_core.retry.Retry) – 如何重試 RPC 呼叫。
- insert_job(configuration, job_id=None, project_id=PROVIDE_PROJECT_ID, location=None, nowait=False, retry=DEFAULT_RETRY, timeout=None)[source]¶
執行 BigQuery 作業並等待其完成。
- 引數:
configuration (dict) – configuration 引數直接對映到 BigQuery 作業物件中的 configuration 欄位。有關詳細資訊,請參閱 https://cloud.google.com/bigquery/docs/reference/v2/jobs。
job_id (str | None) – 作業的 ID。ID 必須僅包含字母 (a-z, A-Z)、數字 (0-9)、下劃線 (_) 或短劃線 (-)。最大長度為 1,024 個字元。如果未提供,則將生成 uuid。
project_id (str) – 作業執行所在的 Google Cloud 專案。
location (str | None) – 作業執行的位置。
nowait (bool) – 是否在不等待結果的情況下插入作業。
retry (google.api_core.retry.Retry) – 如何重試 RPC 呼叫。
timeout (float | None) – 在使用
retry之前等待底層 HTTP 傳輸的秒數。
- 返回值:
作業 ID。
- 返回型別:
BigQueryJob
- get_query_results(job_id, location, max_results=None, selected_fields=None, project_id=PROVIDE_PROJECT_ID, retry=DEFAULT_RETRY, job_retry=DEFAULT_JOB_RETRY)[source]¶
獲取給定 job_id 的查詢結果。
- 引數:
job_id (str) – 作業的 ID。ID 必須僅包含字母 (a-z, A-Z)、數字 (0-9)、下劃線 (_ 或短劃線 (-)。最大長度為 1,024 個字元。
location (str) – 操作使用的位置。
selected_fields (list[str] | str | None) – 要返回的欄位列表(逗號分隔)。如果未指定,則返回所有欄位。
max_results (int | None) – 從表中獲取的最大記錄(行)數。
project_id (str) – 作業執行所在的 Google Cloud 專案。
retry (google.api_core.retry.Retry) – 如何重試 RPC 呼叫。
job_retry (google.api_core.retry.Retry) – 如何重試失敗的作業。
- 返回值:
如果指定了 selected fields,則返回按這些欄位過濾的行列表
- 丟擲:
AirflowException
- 返回型別:
- property scopes: collections.abc.Sequence[str][source]¶
返回 OAuth 2.0 scopes。
- 返回值:
返回 impersonation_scopes 中定義的 scopes、連線配置中的 scopes 或預設 scopes。
- 返回型別:
- class airflow.providers.google.cloud.hooks.bigquery.BigQueryConnection(*args, **kwargs)[source]¶
BigQuery 連線。
BigQuery 沒有持久連線的概念。因此,這些物件是遊標的無狀態小型工廠,所有實際工作都由遊標完成。
- class airflow.providers.google.cloud.hooks.bigquery.BigQueryBaseCursor(service, project_id, hook, use_legacy_sql=True, api_resource_configs=None, location=None, num_retries=5, labels=None)[source]¶
Bases:
airflow.utils.log.logging_mixin.LoggingMixinBigQuery 遊標。
BigQuery 基本遊標包含用於針對 BigQuery 執行查詢的輔助方法。在不需要 PEP 249 遊標的情況下,運算子可以直接使用這些方法。
- class airflow.providers.google.cloud.hooks.bigquery.BigQueryCursor(service, project_id, hook, use_legacy_sql=True, location=None, num_retries=5)[source]¶
Bases:
BigQueryBaseCursor一個非常基礎的 BigQuery 的 PEP 249 遊標實現。
參考使用了 PyHive PEP 249 實現
https://github.com/dropbox/PyHive/blob/master/pyhive/presto.py https://github.com/dropbox/PyHive/blob/master/pyhive/common.py
- airflow.providers.google.cloud.hooks.bigquery.split_tablename(table_input, default_project_id, var_name=None)[source]¶
- class airflow.providers.google.cloud.hooks.bigquery.BigQueryAsyncHook(gcp_conn_id='google_cloud_default', impersonation_chain=None, **kwargs)[source]¶
Bases:
airflow.providers.google.common.hooks.base_google.GoogleBaseAsyncHook使用 gcloud-aio 庫檢索作業詳情。
- async get_job_output(job_id, project_id=PROVIDE_PROJECT_ID)[source]¶
非同步獲取給定 job ID 的 BigQuery 作業輸出。
- async create_job_for_partition_get(dataset_id, table_id=None, project_id=PROVIDE_PROJECT_ID)[source]¶
使用 gcloud-aio 建立新作業並獲取 job_id。
- value_check(sql, pass_value, records, tolerance=None)[來源]¶
將單行查詢結果和容差與 pass_value 進行匹配。
- 丟擲:
AirflowException – 如果匹配失敗
- interval_check(row1, row2, metrics_thresholds, ignore_zero, ratio_formula)[來源]¶
檢查指標(SQL 表示式)的值是否在一定的容差範圍內。
- 引數:
row1 (str | None) – 第一個 SQL 查詢執行任務的第一個結果行
row2 (str | None) – 第二個 SQL 查詢執行任務的第一個結果行
metrics_thresholds (dict[str, Any]) – 一個以指標為鍵的比例字典,例如 'COUNT(*)': 1.5 要求當前日期的資料與 days_back 天之前的資料之間的差異小於或等於 50%。
ignore_zero (bool) – 是否應該忽略值為零的指標
ratio_formula (str) – 用於計算兩個指標之間比例的公式。假設 cur 是今天的指標,ref 是 days_back 天前的指標。 max_over_min: 計算 max(cur, ref) / min(cur, ref) relative_diff: 計算 abs(cur-ref) / ref
- class airflow.providers.google.cloud.hooks.bigquery.BigQueryTableAsyncHook(gcp_conn_id='google_cloud_default', impersonation_chain=None, **kwargs)[來源]¶
Bases:
airflow.providers.google.common.hooks.base_google.GoogleBaseAsyncHook用於 BigQuery Table 的非同步 Hook。