airflow.providers.google.cloud.operators.bigquery¶
此模組包含 Google BigQuery 運算子。
屬性¶
類¶
BigQuery 運算子的十六進位制顏色。 |
|
如果資源存在要採取的操作。 |
|
對 BigQuery 執行檢查。 |
|
使用 SQL 程式碼執行簡單的值檢查。 |
|
檢查給定為 SQL 表示式的指標值是否在舊值的容差範圍內。 |
|
子類化 SQLColumnCheckOperator,以便為 OpenLineage 提供可解析的作業 ID。 |
|
子類化 SQLTableCheckOperator,以便為 OpenLineage 提供可解析的作業 ID。 |
|
獲取資料並返回,可以來自 BigQuery 表,也可以是查詢作業的結果。 |
|
在指定的 BigQuery 資料集中建立一個新表,可選擇帶上模式。 |
|
在指定的 BigQuery 資料集中建立一個新表,可選擇帶上模式。 |
|
使用來自 Google Cloud Storage 的資料建立一個新的外部表。 |
|
從 BigQuery 中的專案刪除現有資料集。 |
|
在 BigQuery 中的專案建立一個新資料集。 |
|
根據 ID 獲取指定的資料集。 |
|
檢索指定資料集中的表列表。 |
|
在 BigQuery 中的專案更新一個表。 |
|
在 BigQuery 中的專案更新一個數據集。 |
|
刪除一個 BigQuery 表。 |
|
對 BigQuery 表進行 upsert 操作。 |
|
更新 BigQuery 表模式。 |
|
執行一個 BigQuery 作業。 |
模組內容¶
- airflow.providers.google.cloud.operators.bigquery.BIGQUERY_JOB_DETAILS_LINK_FMT = 'https://console.cloud.google.com/bigquery?j={job_id}'[source]¶
- class airflow.providers.google.cloud.operators.bigquery.BigQueryUIColors[source]¶
基類:
enum.EnumBigQuery 運算子的十六進位制顏色。
- class airflow.providers.google.cloud.operators.bigquery.IfExistAction[source]¶
基類:
enum.Enum如果資源存在要採取的操作。
- class airflow.providers.google.cloud.operators.bigquery.BigQueryCheckOperator(*, sql, gcp_conn_id='google_cloud_default', use_legacy_sql=True, location=None, impersonation_chain=None, labels=None, encryption_configuration=None, deferrable=conf.getboolean('operators', 'default_deferrable', fallback=False), poll_interval=4.0, query_params=None, **kwargs)[source]¶
基類:
_BigQueryDbHookMixin,airflow.providers.common.sql.operators.sql.SQLCheckOperator,_BigQueryOperatorsEncryptionConfigurationMixin對 BigQuery 執行檢查。
此運算子需要一個返回單行的 SQL 查詢。該行上的每個值都將使用 Python
bool強制轉換進行評估。如果任一值為假值,檢查將出錯。另請參閱
有關如何使用此運算子的更多資訊,請參閱指南: 檢查查詢結果是否有資料
請注意,Python bool 強制轉換將以下內容評估為 False
False0空字串 (
"")空列表 (
[])空字典或集合 (
{})
給定一個類似
SELECT COUNT(*) FROM foo的查詢,只有當計數等於零時才會失敗。您可以構建更復雜的查詢,例如檢查表是否與上游源表具有相同的行數,或者今天分割槽的計數是否大於昨天分割槽的計數,或者一組指標是否小於 7 天平均值的三倍標準差。此運算子可用作管道中的資料質量檢查。根據您在 DAG 中的位置,您可以選擇阻止關鍵路徑,防止釋出可疑資料,或者將其放在旁邊並接收電子郵件警報而不停止 DAG 的進度。
- 引數:
sql (str) – 要執行的 SQL。
gcp_conn_id (str) – Google Cloud 的連線 ID。
use_legacy_sql (bool) – 是否使用傳統 SQL (true) 或標準 SQL (false)。
location (str | None) – 作業的地理位置。詳情請參見: https://cloud.google.com/bigquery/docs/locations#specifying_your_location
impersonation_chain (str | collections.abc.Sequence[str] | None) – 可選的服務賬號,用於使用短期憑據模擬,或者獲取列表中最後一個賬號(將在請求中被模擬)的訪問令牌所需的鏈式賬號列表。如果設定為字串,該賬號必須授予發起賬號 Service Account Token Creator IAM 角色。如果設定為序列,列表中的身份必須授予緊前身份 Service Account Token Creator IAM 角色,列表中第一個賬號將此角色授予發起賬號。(模板化)
labels (dict | None) – 包含表標籤的字典,傳遞給 BigQuery。
encryption_configuration (dict | None) –
(可選)自定義加密配置(例如,Cloud KMS 金鑰)。
encryption_configuration = { "kmsKeyName": "projects/PROJECT/locations/LOCATION/keyRings/KEY_RING/cryptoKeys/KEY", }
deferrable (bool) – 以可延遲模式執行運算子。
poll_interval (float) – (僅限可延遲模式)檢查作業狀態的輪詢週期(秒)。
query_params (list | None) – 包含查詢引數型別和值的字典列表,傳遞給 BigQuery。字典的結構應類似於 Google BigQuery Jobs API 中的“queryParameters”:https://cloud.google.com/bigquery/docs/reference/rest/v2/jobs。例如,[{ ‘name’: ‘corpus’, ‘parameterType’: { ‘type’: ‘STRING’ }, ‘parameterValue’: { ‘value’: ‘romeoandjuliet’ } }]。(模板化)
- template_fields: collections.abc.Sequence[str] = ('sql', 'gcp_conn_id', 'impersonation_chain', 'labels', 'query_params')[source]¶
- template_ext: collections.abc.Sequence[str] = ('.sql',)[source]¶
- class airflow.providers.google.cloud.operators.bigquery.BigQueryValueCheckOperator(*, sql, pass_value, tolerance=None, encryption_configuration=None, gcp_conn_id='google_cloud_default', use_legacy_sql=True, location=None, impersonation_chain=None, labels=None, deferrable=conf.getboolean('operators', 'default_deferrable', fallback=False), poll_interval=4.0, **kwargs)[source]¶
基類:
_BigQueryDbHookMixin,airflow.providers.common.sql.operators.sql.SQLValueCheckOperator,_BigQueryOperatorsEncryptionConfigurationMixin使用 SQL 程式碼執行簡單的值檢查。
另請參閱
有關如何使用此運算子的更多資訊,請參閱指南: 將查詢結果與透過值進行比較
- 引數:
sql (str) – 要執行的 SQL。
use_legacy_sql (bool) – 是否使用傳統 SQL (true) 或標準 SQL (false)。
encryption_configuration (dict | None) –
(可選)自定義加密配置(例如,Cloud KMS 金鑰)。
encryption_configuration = { "kmsKeyName": "projects/PROJECT/locations/LOCATION/keyRings/KEY_RING/cryptoKeys/KEY", }
gcp_conn_id (str) – (可選)用於連線到 Google Cloud 的連線 ID。
location (str | None) – 作業的地理位置。詳情請參見: https://cloud.google.com/bigquery/docs/locations#specifying_your_location
impersonation_chain (str | collections.abc.Sequence[str] | None) – 可選的服務賬號,用於使用短期憑據模擬,或者獲取列表中最後一個賬號(將在請求中被模擬)的訪問令牌所需的鏈式賬號列表。如果設定為字串,該賬號必須授予發起賬號 Service Account Token Creator IAM 角色。如果設定為序列,列表中的身份必須授予緊前身份 Service Account Token Creator IAM 角色,列表中第一個賬號將此角色授予發起賬號。(模板化)
labels (dict | None) – 包含表標籤的字典,傳遞給 BigQuery。
deferrable (bool) – 以可延遲模式執行運算子。
poll_interval (float) – (僅限可延遲模式)檢查作業狀態的輪詢週期(秒)。
- template_fields: collections.abc.Sequence[str] = ('sql', 'gcp_conn_id', 'pass_value', 'impersonation_chain', 'labels')[source]¶
- template_ext: collections.abc.Sequence[str] = ('.sql',)[source]¶
- class airflow.providers.google.cloud.operators.bigquery.BigQueryIntervalCheckOperator(*, table, metrics_thresholds, date_filter_column='ds', days_back=-7, gcp_conn_id='google_cloud_default', use_legacy_sql=True, location=None, encryption_configuration=None, impersonation_chain=None, labels=None, deferrable=conf.getboolean('operators', 'default_deferrable', fallback=False), poll_interval=4.0, project_id=PROVIDE_PROJECT_ID, **kwargs)[源]¶
基類:
_BigQueryDbHookMixin,airflow.providers.common.sql.operators.sql.SQLIntervalCheckOperator,_BigQueryOperatorsEncryptionConfigurationMixin檢查給定為 SQL 表示式的指標值是否在舊值的容差範圍內。
此方法構建查詢如下
SELECT {metrics_threshold_dict_key} FROM {table} WHERE {date_filter_column}=<date>
另請參閱
有關如何使用此 Operator 的更多資訊,請參閱指南:比較隨時間變化的指標
- 引數:
table (str) – 表名
days_back (SupportsAbs`[`int`]` ) – `ds` 與要對照檢查的 `ds` 之間的天數。預設為 7 天
metrics_thresholds (dict) – 按指標索引的比例字典,例如 `'COUNT(*)'`: `1.5` 要求當前日期與前 `days_back` 天之間的差異小於或等於 50%。
use_legacy_sql (bool) – 是否使用傳統 SQL (true) 或標準 SQL (false)。
encryption_configuration (dict | None) –
(可選)自定義加密配置(例如,Cloud KMS 金鑰)。
encryption_configuration = { "kmsKeyName": "projects/PROJECT/locations/LOCATION/keyRings/KEY_RING/cryptoKeys/KEY", }
gcp_conn_id (str) – (可選)用於連線到 Google Cloud 的連線 ID。
location (str | None) – 作業的地理位置。詳情請參見: https://cloud.google.com/bigquery/docs/locations#specifying_your_location
impersonation_chain (str | collections.abc.Sequence`[`str`]` | None) – 可選的服務賬號,用於使用短期憑據進行模擬,或者一個賬戶鏈,需要透過該鏈獲取列表中最後一個賬戶的 `access_token`,該賬戶將在請求中被模擬。如果設定為字串,該賬戶必須授予起始賬戶 Service Account Token Creator IAM 角色。如果設定為序列,則列表中的身份必須將其 Service Account Token Creator IAM 角色授予直接前一個身份,列表中的第一個賬戶將此角色授予起始賬戶(模板化)。
labels (dict | None) – 包含表標籤的字典,傳遞給 BigQuery
deferrable (bool) – 在 deferrable 模式下執行 Operator
poll_interval (float) – (僅在 deferrable 模式下) 檢查作業狀態的輪詢週期,單位為秒。預設為 4 秒。
project_id (str) – 表示 BigQuery 專案ID 的字串
- template_fields: collections.abc.Sequence[str] = ('table', 'gcp_conn_id', 'sql1', 'sql2', 'impersonation_chain', 'labels')[源]¶
- class airflow.providers.google.cloud.operators.bigquery.BigQueryColumnCheckOperator(*, table, column_mapping, partition_clause=None, database=None, accept_none=True, encryption_configuration=None, gcp_conn_id='google_cloud_default', use_legacy_sql=True, location=None, impersonation_chain=None, labels=None, **kwargs)[源]¶
基類:
_BigQueryDbHookMixin,airflow.providers.common.sql.operators.sql.SQLColumnCheckOperator,_BigQueryOperatorsEncryptionConfigurationMixin子類化 SQLColumnCheckOperator,以便為 OpenLineage 提供可解析的作業 ID。
用法請參閱基類文件字串。
另請參閱
有關如何使用此 Operator 的更多資訊,請參閱指南:使用預定義測試檢查列
- 引數:
table (str) – 表名
column_mapping (dict) – 一個字典,關聯列與其檢查規則
partition_clause (str | None) – 一個字串 SQL 語句,新增到 WHERE 子句以分割槽資料
gcp_conn_id (str) – (可選)用於連線到 Google Cloud 的連線 ID。
encryption_configuration (dict | None) –
(可選)自定義加密配置(例如,Cloud KMS 金鑰)。
encryption_configuration = { "kmsKeyName": "projects/PROJECT/locations/LOCATION/keyRings/KEY_RING/cryptoKeys/KEY", }
use_legacy_sql (bool) – 是否使用傳統 SQL (true) 或標準 SQL (false)。
location (str | None) – 作業的地理位置。詳情請參見: https://cloud.google.com/bigquery/docs/locations#specifying_your_location
impersonation_chain (str | collections.abc.Sequence`[`str`]` | None) – 可選的服務賬號,用於使用短期憑據進行模擬,或者一個賬戶鏈,需要透過該鏈獲取列表中最後一個賬戶的 `access_token`,該賬戶將在請求中被模擬。如果設定為字串,該賬戶必須授予起始賬戶 Service Account Token Creator IAM 角色。如果設定為序列,則列表中的身份必須將其 Service Account Token Creator IAM 角色授予直接前一個身份,列表中的第一個賬戶將此角色授予起始賬戶(模板化)。
labels (dict | None) – 包含表標籤的字典,傳遞給 BigQuery
- template_fields: collections.abc.Sequence[str][源]¶
- class airflow.providers.google.cloud.operators.bigquery.BigQueryTableCheckOperator(*, table, checks, partition_clause=None, gcp_conn_id='google_cloud_default', use_legacy_sql=True, location=None, impersonation_chain=None, labels=None, encryption_configuration=None, **kwargs)[源]¶
基類:
_BigQueryDbHookMixin,airflow.providers.common.sql.operators.sql.SQLTableCheckOperator,_BigQueryOperatorsEncryptionConfigurationMixin子類化 SQLTableCheckOperator,以便為 OpenLineage 提供可解析的作業 ID。
用法請參閱基類。
另請參閱
有關如何使用此 Operator 的更多資訊,請參閱指南:檢查表級別的資料質量
- 引數:
table (str) – 表名
checks (dict) – 一個字典,包含檢查名稱和布林型 SQL 語句
partition_clause (str | None) – 一個字串 SQL 語句,新增到 WHERE 子句以分割槽資料
gcp_conn_id (str) – (可選)用於連線到 Google Cloud 的連線 ID。
use_legacy_sql (bool) – 是否使用傳統 SQL (true) 或標準 SQL (false)。
location (str | None) – 作業的地理位置。詳情請參見: https://cloud.google.com/bigquery/docs/locations#specifying_your_location
impersonation_chain (str | collections.abc.Sequence`[`str`]` | None) – 可選的服務賬號,用於使用短期憑據進行模擬,或者一個賬戶鏈,需要透過該鏈獲取列表中最後一個賬戶的 `access_token`,該賬戶將在請求中被模擬。如果設定為字串,該賬戶必須授予起始賬戶 Service Account Token Creator IAM 角色。如果設定為序列,則列表中的身份必須將其 Service Account Token Creator IAM 角色授予直接前一個身份,列表中的第一個賬戶將此角色授予起始賬戶(模板化)。
labels (dict | None) – 包含表標籤的字典,傳遞給 BigQuery
encryption_configuration (dict | None) –
(可選)自定義加密配置(例如,Cloud KMS 金鑰)。
encryption_configuration = { "kmsKeyName": "projects/PROJECT/locations/LOCATION/keyRings/KEY_RING/cryptoKeys/KEY", }
- template_fields: collections.abc.Sequence[str][源]¶
- class airflow.providers.google.cloud.operators.bigquery.BigQueryGetDataOperator(*, dataset_id=None, table_id=None, table_project_id=None, job_id=None, job_project_id=None, project_id=PROVIDE_PROJECT_ID, max_results=100, selected_fields=None, gcp_conn_id='google_cloud_default', location=None, encryption_configuration=None, impersonation_chain=None, deferrable=conf.getboolean('operators', 'default_deferrable', fallback=False), poll_interval=4.0, as_dict=False, use_legacy_sql=True, **kwargs)[source]¶
Bases:
airflow.providers.google.cloud.operators.cloud_base.GoogleCloudBaseOperator,_BigQueryOperatorsEncryptionConfigurationMixin獲取資料並返回,可以來自 BigQuery 表,也可以是查詢作業的結果。
可以按特定列縮小資料範圍,或整體檢索資料。根據“as_dict”的值,資料以以下兩種格式之一返回:1. False(預設)- 一個 Python 列表的列表,巢狀列表的數量等於獲取的行數。每個巢狀列表代表一行,其中的元素對應於該行的列值。
示例結果:
[['Tony', 10], ['Mike', 20]2. True - 一個 Python 字典列表,其中每個字典代表一行。在每個字典中,鍵是列名,值是這些列對應的數值。
示例結果:
[{'name': 'Tony', 'age': 10}, {'name': 'Mike', 'age': 20}]另請參閱
有關如何使用此運算子的更多資訊,請參閱指南:從表中獲取資料
注意
如果您在
selected_fields中傳遞的欄位順序與 BQ 表格/作業中現有列的順序不同,資料仍將按照 BQ 表格的順序排列。例如,如果 BQ 表格有三列,順序為[A,B,C],而您在selected_fields中傳遞 'B,A',資料仍然會是'A,B'的形式。注意
在非可延遲模式下使用作業 ID 時,作業應處於 DONE(完成)狀態。
示例 - 使用表格從 BigQuery 檢索資料:
get_data = BigQueryGetDataOperator( task_id="get_data_from_bq", dataset_id="test_dataset", table_id="Transaction_partitions", table_project_id="internal-gcp-project", max_results=100, selected_fields="DATE", gcp_conn_id="airflow-conn-id", )
示例 - 使用作業 ID 從 BigQuery 檢索資料:
get_data = BigQueryGetDataOperator( job_id="airflow_8999918812727394_86a1cecc69c5e3028d28247affd7563", job_project_id="internal-gcp-project", max_results=100, selected_fields="DATE", gcp_conn_id="airflow-conn-id", )
- 引數:
dataset_id (str | None) – 請求的表格的資料集 ID。(模板化)
table_id (str | None) – 請求的表格的表格 ID。與 job_id 互斥。(模板化)
table_project_id (str | None) – (可選)請求的表格的專案 ID。如果為 None,將從 hook 的專案 ID 派生。(模板化)
job_id (str | None) – 從中檢索查詢結果的作業 ID。與 table_id 互斥。(模板化)
job_project_id (str | None) – (可選)作業執行所在的 Google Cloud 專案。如果為 None,將從 hook 的專案 ID 派生。(模板化)
project_id (str) – (已棄用)(可選)將從中返回資料的專案名稱。如果為 None,將從 hook 的專案 ID 派生。(模板化)
max_results (int) – 從表中獲取的最大記錄數(行數)。(模板化)
selected_fields (str | None) – 要返回的欄位列表(逗號分隔)。如果未指定,則返回所有欄位。
encryption_configuration (dict | None) –
(可選)自定義加密配置(例如,Cloud KMS 金鑰)。
encryption_configuration = { "kmsKeyName": "projects/PROJECT/locations/LOCATION/keyRings/KEY_RING/cryptoKeys/KEY", }
gcp_conn_id (str) – (可選)用於連線到 Google Cloud 的連線 ID。
location (str | None) – 用於操作的位置。
impersonation_chain (str | collections.abc.Sequence`[`str`]` | None) – 可選的服務賬號,用於使用短期憑據進行模擬,或者一個賬戶鏈,需要透過該鏈獲取列表中最後一個賬戶的 `access_token`,該賬戶將在請求中被模擬。如果設定為字串,該賬戶必須授予起始賬戶 Service Account Token Creator IAM 角色。如果設定為序列,則列表中的身份必須將其 Service Account Token Creator IAM 角色授予直接前一個身份,列表中的第一個賬戶將此角色授予起始賬戶(模板化)。
deferrable (bool) – 在 deferrable 模式下執行 Operator
poll_interval (float) – (僅在 deferrable 模式下) 檢查作業狀態的輪詢週期,單位為秒。預設為 4 秒。
as_dict (bool) – 如果為 True,則結果返回為字典列表;否則返回為列表的列表(預設:False)。
use_legacy_sql (bool) – 是否使用傳統 SQL (true) 或標準 SQL (false)。
- template_fields: collections.abc.Sequence[str] = ('dataset_id', 'table_id', 'table_project_id', 'job_id', 'job_project_id', 'project_id',...[source]¶
- class airflow.providers.google.cloud.operators.bigquery.BigQueryCreateTableOperator(*, dataset_id, table_id, table_resource, project_id=PROVIDE_PROJECT_ID, location=None, gcs_schema_object=None, gcp_conn_id='google_cloud_default', google_cloud_storage_conn_id='google_cloud_default', impersonation_chain=None, if_exists='log', retry=DEFAULT, timeout=None, **kwargs)[source]¶
Bases:
airflow.providers.google.cloud.operators.cloud_base.GoogleCloudBaseOperator在指定的 BigQuery 資料集中建立一個新表,可選擇帶上模式。
用於 BigQuery 表的 schema 可以透過兩種方式指定。您可以直接傳入 schema 欄位,或者指向一個 Google Cloud Storage 物件名稱。Google Cloud Storage 中的物件必須是一個包含 schema 欄位的 JSON 檔案。您也可以在沒有 schema 的情況下建立表。
另請參閱
有關如何使用此運算子的更多資訊,請參閱指南:建立表
- 引數:
project_id (str) – 可選。要在其中建立表的專案。
dataset_id (str) – 必需。要在其中建立表的資料集。
table_id (str) – 必需。要建立的表的名稱。
table_resource (dict[str, Any] | google.cloud.bigquery.table.Table | google.cloud.bigquery.table.TableReference | google.cloud.bigquery.table.TableListItem) – 必需。表資源,如文件所述:https://cloud.google.com/bigquery/docs/reference/rest/v2/tables#Table 如果
table是引用,則將建立一個具有指定 ID 的空表。表所屬的資料集必須已經存在。if_exists (str) – 可選。如果表存在,Airflow 應該做什麼。如果設定為 log,則 TI 將成功,並記錄一條錯誤訊息。設定為 ignore 以忽略錯誤,設定為 fail 以使 TI 失敗,設定為 skip 以跳過。
gcs_schema_object (str | None) – 可選。包含 schema 的 JSON 檔案的完整路徑。例如:
gs://test-bucket/dir1/dir2/employee_schema.jsongcp_conn_id (str) – 可選。用於連線到 Google Cloud 並與 Bigquery 服務互動的連線 ID。
google_cloud_storage_conn_id (str) – 可選。用於連線到 Google Cloud 並與 Google Cloud Storage 服務互動的連線 ID。
location (str | None) – 可選。用於操作的位置。
retry (google.api_core.retry.Retry | google.api_core.gapic_v1.method._MethodDefault) – 可選。用於重試請求的重試物件。如果指定 None,則不會重試請求。
timeout (float | None) – 可選。等待請求完成的時間(秒)。請注意,如果指定了 retry,則超時應用於每個單獨的嘗試。
impersonation_chain (str | collections.abc.Sequence[str] | None) – 可選。使用短期憑據模擬的服務賬號,或獲取列表中最後一個賬號的 access_token 所需的賬號鏈,該賬號將在請求中被模擬。如果設定為字串,該賬號必須授予原始賬號 Service Account Token Creator IAM 角色。如果設定為序列,列表中的身份必須授予直接前一個身份 Service Account Token Creator IAM 角色,列表中第一個賬號將此角色授予原始賬號。
- template_fields: collections.abc.Sequence[str] = ('dataset_id', 'table_id', 'table_resource', 'project_id', 'gcs_schema_object', 'impersonation_chain')[source]¶
- 類 airflow.providers.google.cloud.operators.bigquery.BigQueryCreateEmptyTableOperator(*, dataset_id, table_id, table_resource=None, project_id=PROVIDE_PROJECT_ID, schema_fields=None, gcs_schema_object=None, time_partitioning=None, gcp_conn_id='google_cloud_default', google_cloud_storage_conn_id='google_cloud_default', labels=None, view=None, materialized_view=None, encryption_configuration=None, location=None, cluster_fields=None, impersonation_chain=None, if_exists='log', bigquery_conn_id=None, exists_ok=None, **kwargs)[source]¶
Bases:
airflow.providers.google.cloud.operators.cloud_base.GoogleCloudBaseOperator在指定的 BigQuery 資料集中建立一個新表,可選擇帶上模式。
用於 BigQuery 表的 schema 可以透過兩種方式指定。您可以直接傳入 schema 欄位,或者指向一個 Google Cloud Storage 物件名稱。Google Cloud Storage 中的物件必須是一個包含 schema 欄位的 JSON 檔案。您也可以在沒有 schema 的情況下建立表。
另請參閱
有關如何使用此 operator 的更多資訊,請參閱指南:Create native table
- 引數:
project_id (str) – 要建立表的專案。(templated)
dataset_id (str) – 要在其中建立表的資料集。(templated)
table_id (str) – 要建立的表的名稱。(templated)
table_resource (dict[str, Any] | None) – 文件中描述的表資源:https://cloud.google.com/bigquery/docs/reference/rest/v2/tables#Table 如果提供,則所有其他引數將被忽略。(templated)
schema_fields (list | None) –
如果設定,則為此處定義的 schema 欄位列表:https://cloud.google.com/bigquery/docs/reference/rest/v2/jobs#configuration.load.schema
示例:
schema_fields = [ {"name": "emp_name", "type": "STRING", "mode": "REQUIRED"}, {"name": "salary", "type": "INTEGER", "mode": "NULLABLE"}, ]
gcs_schema_object (str | None) – 包含 schema 的 JSON 檔案的完整路徑 (templated)。例如:
gs://test-bucket/dir1/dir2/employee_schema.jsontime_partitioning (dict | None) –
根據 API 規範配置可選的時間分割槽欄位,即按欄位、型別和過期時間進行分割槽。
gcp_conn_id (str) – (可選) 用於連線到 Google Cloud 並與 Bigquery 服務互動的連線 ID。
google_cloud_storage_conn_id (str) – (可選) 用於連線到 Google Cloud 並與 Google Cloud Storage 服務互動的連線 ID。
labels (dict | None) – 包含表標籤的字典,傳遞給 BigQuery
示例 (schema JSON 在 GCS 中):
CreateTable = BigQueryCreateEmptyTableOperator( task_id="BigQueryCreateEmptyTableOperator_task", dataset_id="ODS", table_id="Employees", project_id="internal-gcp-project", gcs_schema_object="gs://schema-bucket/employee_schema.json", gcp_conn_id="airflow-conn-id", google_cloud_storage_conn_id="airflow-conn-id", )
對應的 Schema 檔案 (
employee_schema.json)[ {"mode": "NULLABLE", "name": "emp_name", "type": "STRING"}, {"mode": "REQUIRED", "name": "salary", "type": "INTEGER"}, ]
示例 (schema 在 DAG 中):
CreateTable = BigQueryCreateEmptyTableOperator( task_id="BigQueryCreateEmptyTableOperator_task", dataset_id="ODS", table_id="Employees", project_id="internal-gcp-project", schema_fields=[ {"name": "emp_name", "type": "STRING", "mode": "REQUIRED"}, {"name": "salary", "type": "INTEGER", "mode": "NULLABLE"}, ], gcp_conn_id="airflow-conn-id-account", google_cloud_storage_conn_id="airflow-conn-id", )
- 引數:
view (dict | None) –
(可選) 包含 view 定義的字典。如果設定,它將建立一個 view 而不是表。
materialized_view (dict | None) – (可選) 具化檢視 (materialized view) 定義。
encryption_configuration (dict | None) –
(可選)自定義加密配置(例如,Cloud KMS 金鑰)。
encryption_configuration = { "kmsKeyName": "projects/PROJECT/locations/LOCATION/keyRings/KEY_RING/cryptoKeys/KEY", }
location (str | None) – 用於操作的位置。
cluster_fields (list[str] | None) –
(可選) 用於聚簇 (clustering) 的欄位。BigQuery 支援對分割槽表和非分割槽表進行聚簇。
impersonation_chain (str | collections.abc.Sequence`[`str`]` | None) – 可選的服務賬號,用於使用短期憑據進行模擬,或者一個賬戶鏈,需要透過該鏈獲取列表中最後一個賬戶的 `access_token`,該賬戶將在請求中被模擬。如果設定為字串,該賬戶必須授予起始賬戶 Service Account Token Creator IAM 角色。如果設定為序列,則列表中的身份必須將其 Service Account Token Creator IAM 角色授予直接前一個身份,列表中的第一個賬戶將此角色授予起始賬戶(模板化)。
if_exists (str) – 如果表已存在,Airflow 應該怎麼做。如果設定為 log,則 TI 將成功,並記錄錯誤訊息。設定為 ignore 則忽略錯誤,設定為 fail 則使 TI 失敗,設定為 skip 則跳過。
exists_ok (bool | None) – 已棄用 - 請改用 if_exists=”ignore”。
- template_fields: collections.abc.Sequence[str] = ('dataset_id', 'table_id', 'table_resource', 'project_id', 'gcs_schema_object', 'labels',...[source]¶
- 類 airflow.providers.google.cloud.operators.bigquery.BigQueryCreateExternalTableOperator(*, bucket=None, source_objects=None, destination_project_dataset_table=None, table_resource=None, schema_fields=None, schema_object=None, gcs_schema_bucket=None, source_format=None, autodetect=False, compression=None, skip_leading_rows=None, field_delimiter=None, max_bad_records=0, quote_character=None, allow_quoted_newlines=False, allow_jagged_rows=False, gcp_conn_id='google_cloud_default', google_cloud_storage_conn_id='google_cloud_default', src_fmt_configs=None, labels=None, encryption_configuration=None, location=None, impersonation_chain=None, bigquery_conn_id=None, **kwargs)[source]¶
Bases:
airflow.providers.google.cloud.operators.cloud_base.GoogleCloudBaseOperator使用來自 Google Cloud Storage 的資料建立一個新的外部表。
要用於 BigQuery 表的 schema 可以透過兩種方式指定。您可以直接傳入 schema 欄位,或者將 operator 指向 Google Cloud Storage 物件名稱。Google Cloud Storage 中的物件必須是包含 schema 欄位的 JSON 檔案。
另請參閱
有關如何使用此 operator 的更多資訊,請參閱指南:Create external table
- 引數:
bucket (str | None) – 外部表指向的儲存桶 (bucket)。(templated)
source_objects (list[str] | None) – 外部表指向的 Google Cloud Storage URI 列表。如果 source_format 是 ‘DATASTORE_BACKUP’,則列表必須只包含一個 URI。
destination_project_dataset_table (str | None) – 要載入資料到的以點分隔的
(<project>.)<dataset>.<table>BigQuery 表 (templated)。如果未包含<project>,則專案將是連線 json 中定義的專案。schema_fields (list | None) –
如果設定,則為此處定義的 schema 欄位列表:https://cloud.google.com/bigquery/docs/reference/rest/v2/jobs#configuration.load.schema
示例:
schema_fields = [ {"name": "emp_name", "type": "STRING", "mode": "REQUIRED"}, {"name": "salary", "type": "INTEGER", "mode": "NULLABLE"}, ]
當 source_format 是 ‘DATASTORE_BACKUP’ 時不應設定此項。
table_resource (dict[str, Any] | None) – 文件中描述的表資源:https://cloud.google.com/bigquery/docs/reference/rest/v2/tables#Table 如果提供,則所有其他引數將被忽略。來自物件的外部 schema 將被解析。
schema_object (str | None) – 如果設定,則為指向包含表 schema 的 .json 檔案的 GCS 物件路徑。(templated)
gcs_schema_bucket (str | None) – 儲存 schema JSON 的 GCS 儲存桶名稱 (templated)。預設值是 self.bucket。
source_format (str | None) – 資料的檔案格式。
autodetect (bool) – 嘗試自動檢測 schema 和格式選項。明確指定 schema_fields 和 schema_object 選項時,將優先使用這些選項。https://cloud.google.com/bigquery/docs/schema-detect#schema_auto-detection_for_external_data_sources
compression (str | None) – (可選) 資料來源的壓縮型別。可能的值包括 GZIP 和 NONE。預設值為 NONE。Google Cloud Bigtable、Google Cloud Datastore 備份和 Avro 格式會忽略此設定。
skip_leading_rows (int | None) – 從 CSV 載入時要跳過的行數。
field_delimiter (str | None) – CSV 中使用的分隔符。
max_bad_records (int) – BigQuery 在執行作業時可以忽略的最大錯誤記錄數。
quote_character (str | None) – 用於引用 CSV 檔案中資料部分的字元。
allow_quoted_newlines (bool) – 是否允許帶引號的換行符 (true) 或不允許 (false)。
allow_jagged_rows (bool) – 接受缺少尾部可選列的行。缺失的值被視為 null。如果為 false,則缺少尾部列的記錄被視為錯誤記錄,如果錯誤記錄過多,則在作業結果中返回無效錯誤。僅適用於 CSV,對其他格式無效。
gcp_conn_id (str) – (可選) 用於連線到 Google Cloud 並與 Bigquery 服務互動的連線 ID。
google_cloud_storage_conn_id (str) – (可選) 用於連線到 Google Cloud 並與 Google Cloud Storage 服務互動的連線 ID。
src_fmt_configs (dict | None) – 配置特定於源格式的可選欄位。
labels (dict | None) – 包含表標籤的字典,傳遞給 BigQuery
encryption_configuration (dict | None) –
(可選)自定義加密配置(例如,Cloud KMS 金鑰)。
encryption_configuration = { "kmsKeyName": "projects/PROJECT/locations/LOCATION/keyRings/KEY_RING/cryptoKeys/KEY", }
location (str | None) – 用於操作的位置。
impersonation_chain (str | collections.abc.Sequence`[`str`]` | None) – 可選的服務賬號,用於使用短期憑據進行模擬,或者一個賬戶鏈,需要透過該鏈獲取列表中最後一個賬戶的 `access_token`,該賬戶將在請求中被模擬。如果設定為字串,該賬戶必須授予起始賬戶 Service Account Token Creator IAM 角色。如果設定為序列,則列表中的身份必須將其 Service Account Token Creator IAM 角色授予直接前一個身份,列表中的第一個賬戶將此角色授予起始賬戶(模板化)。
- template_fields: collections.abc.Sequence[str] = ('bucket', 'source_objects', 'schema_object', 'gcs_schema_bucket',...[source]¶
- class airflow.providers.google.cloud.operators.bigquery.BigQueryDeleteDatasetOperator(*, dataset_id, project_id=PROVIDE_PROJECT_ID, delete_contents=False, gcp_conn_id='google_cloud_default', impersonation_chain=None, **kwargs)[source]¶
Bases:
airflow.providers.google.cloud.operators.cloud_base.GoogleCloudBaseOperator從 BigQuery 中的專案刪除現有資料集。
https://cloud.google.com/bigquery/docs/reference/rest/v2/datasets/delete
另請參閱
有關如何使用此運算元的更多資訊,請參閱指南:刪除資料集
- 引數:
project_id (字串) – 資料集的專案ID。
dataset_id (字串) – 要刪除的資料集。
delete_contents (布林值) – (可選)即使資料集不為空,是否強制刪除。如果設定為 True,將刪除資料集中的所有表(如果有)。如果設定為 False 且資料集不為空,將丟擲 HttpError 400:“{dataset_id} is still in use”。預設值為 False。
gcp_conn_id (str) – (可選)用於連線到 Google Cloud 的連線 ID。
impersonation_chain (str | collections.abc.Sequence`[`str`]` | None) – 可選的服務賬號,用於使用短期憑據進行模擬,或者一個賬戶鏈,需要透過該鏈獲取列表中最後一個賬戶的 `access_token`,該賬戶將在請求中被模擬。如果設定為字串,該賬戶必須授予起始賬戶 Service Account Token Creator IAM 角色。如果設定為序列,則列表中的身份必須將其 Service Account Token Creator IAM 角色授予直接前一個身份,列表中的第一個賬戶將此角色授予起始賬戶(模板化)。
示例:
delete_temp_data = BigQueryDeleteDatasetOperator( dataset_id="temp-dataset", project_id="temp-project", delete_contents=True, # Force the deletion of the dataset as well as its tables (if any). gcp_conn_id="_my_gcp_conn_", task_id="Deletetemp", dag=dag, )
- template_fields: collections.abc.序列[字串] = ('dataset_id', 'project_id', 'impersonation_chain')[source]¶
- class airflow.providers.google.cloud.operators.bigquery.BigQueryCreateEmptyDatasetOperator(*, dataset_id=None, project_id=PROVIDE_PROJECT_ID, dataset_reference=None, location=None, gcp_conn_id='google_cloud_default', impersonation_chain=None, if_exists='log', exists_ok=None, **kwargs)[source]¶
Bases:
airflow.providers.google.cloud.operators.cloud_base.GoogleCloudBaseOperator在 BigQuery 中的專案建立一個新資料集。
https://cloud.google.com/bigquery/docs/reference/rest/v2/datasets#resource
另請參閱
有關如何使用此運算元的更多資訊,請參閱指南:建立資料集
- 引數:
project_id (字串) – 我們要建立資料集的專案名稱。
dataset_id (字串 | 無) – 資料集ID。如果 dataset_reference 中包含 datasetId,則無需提供。
location (字串 | 無) – 資料集應駐留的地理位置。
dataset_reference (字典 | 無) – 可在請求正文中提供的資料集引用。更多資訊:https://cloud.google.com/bigquery/docs/reference/rest/v2/datasets#resource
gcp_conn_id (str) – (可選)用於連線到 Google Cloud 的連線 ID。
impersonation_chain (str | collections.abc.Sequence`[`str`]` | None) – 可選的服務賬號,用於使用短期憑據進行模擬,或者一個賬戶鏈,需要透過該鏈獲取列表中最後一個賬戶的 `access_token`,該賬戶將在請求中被模擬。如果設定為字串,該賬戶必須授予起始賬戶 Service Account Token Creator IAM 角色。如果設定為序列,則列表中的身份必須將其 Service Account Token Creator IAM 角色授予直接前一個身份,列表中的第一個賬戶將此角色授予起始賬戶(模板化)。
if_exists (字串) –
如果資料集存在,Airflow 應執行什麼操作。如果設定為 log,則 TI 將透過並記錄一條錯誤訊息。設定為 ignore 以忽略錯誤,設定為 fail 以使 TI 失敗,設定為 skip 以跳過。示例
create_new_dataset = BigQueryCreateEmptyDatasetOperator( dataset_id='new-dataset', project_id='my-project', dataset_reference={"friendlyName": "New Dataset"} gcp_conn_id='_my_gcp_conn_', task_id='newDatasetCreator', dag=dag)
exists_ok (bool | None) – 已棄用 - 請改用 if_exists=”ignore”。
- template_fields: collections.abc.序列[字串] = ('dataset_id', 'project_id', 'dataset_reference', 'impersonation_chain')[source]¶
- class airflow.providers.google.cloud.operators.bigquery.BigQueryGetDatasetOperator(*, dataset_id, project_id=PROVIDE_PROJECT_ID, gcp_conn_id='google_cloud_default', impersonation_chain=None, **kwargs)[source]¶
Bases:
airflow.providers.google.cloud.operators.cloud_base.GoogleCloudBaseOperator根據 ID 獲取指定的資料集。
另請參閱
有關如何使用此運算元的更多資訊,請參閱指南:獲取資料集詳情
- 引數:
dataset_id (字串) – 資料集ID。如果 dataset_reference 中包含 datasetId,則無需提供。
project_id (字串) – 我們要建立資料集的專案名稱。如果 dataset_reference 中包含 projectId,則無需提供。
gcp_conn_id (str) – (可選)用於連線到 Google Cloud 的連線 ID。
impersonation_chain (str | collections.abc.Sequence`[`str`]` | None) – 可選的服務賬號,用於使用短期憑據進行模擬,或者一個賬戶鏈,需要透過該鏈獲取列表中最後一個賬戶的 `access_token`,該賬戶將在請求中被模擬。如果設定為字串,該賬戶必須授予起始賬戶 Service Account Token Creator IAM 角色。如果設定為序列,則列表中的身份必須將其 Service Account Token Creator IAM 角色授予直接前一個身份,列表中的第一個賬戶將此角色授予起始賬戶(模板化)。
- template_fields: collections.abc.序列[字串] = ('dataset_id', 'project_id', 'impersonation_chain')[source]¶
- class airflow.providers.google.cloud.operators.bigquery.BigQueryGetDatasetTablesOperator(*, dataset_id, project_id=PROVIDE_PROJECT_ID, max_results=None, gcp_conn_id='google_cloud_default', impersonation_chain=None, **kwargs)[source]¶
Bases:
airflow.providers.google.cloud.operators.cloud_base.GoogleCloudBaseOperator檢索指定資料集中的表列表。
另請參閱
有關如何使用此運算元的更多資訊,請參閱指南:列出資料集中的表
- 引數:
dataset_id (字串) – 請求的資料集的資料集ID。
project_id (字串) – (可選)請求的資料集所屬專案。如果為 None,將使用 self.project_id。
max_results (整數 | 無) – (可選)返回的最大表數。
gcp_conn_id (str) – (可選)用於連線到 Google Cloud 的連線 ID。
impersonation_chain (str | collections.abc.Sequence`[`str`]` | None) – 可選的服務賬號,用於使用短期憑據進行模擬,或者一個賬戶鏈,需要透過該鏈獲取列表中最後一個賬戶的 `access_token`,該賬戶將在請求中被模擬。如果設定為字串,該賬戶必須授予起始賬戶 Service Account Token Creator IAM 角色。如果設定為序列,則列表中的身份必須將其 Service Account Token Creator IAM 角色授予直接前一個身份,列表中的第一個賬戶將此角色授予起始賬戶(模板化)。
- template_fields: collections.abc.序列[字串] = ('dataset_id', 'project_id', 'impersonation_chain')[source]¶
- class airflow.providers.google.cloud.operators.bigquery.BigQueryUpdateTableOperator(*, table_resource, fields=None, dataset_id=None, table_id=None, project_id=PROVIDE_PROJECT_ID, gcp_conn_id='google_cloud_default', impersonation_chain=None, **kwargs)[source]¶
Bases:
airflow.providers.google.cloud.operators.cloud_base.GoogleCloudBaseOperator在 BigQuery 中的專案更新一個表。
使用
fields指定要更新表的哪些欄位。如果某個欄位列在fields中,並且在表中為None,則該欄位將被刪除。另請參閱
有關如何使用此運算元的更多資訊,請參閱指南:更新表
- 引數:
dataset_id (字串 | 無) – 資料集ID。如果 table_reference 中包含 datasetId,則無需提供。
table_id (字串 | 無) – 表ID。如果 table_reference 中包含 tableId,則無需提供。
table_resource (dict[str, Any]) – 將隨請求體提供的資料集資源。 https://cloud.google.com/bigquery/docs/reference/rest/v2/tables#resource
fields (list[str] | None) – 需要更改的
table的欄位,以表屬性命名 (例如 “friendly_name”)。project_id (str) – 我們要建立表的專案名稱。如果在 table_reference 中包含 projectId,則無需提供。
gcp_conn_id (str) – (可選)用於連線到 Google Cloud 的連線 ID。
impersonation_chain (str | collections.abc.Sequence`[`str`]` | None) – 可選的服務賬號,用於使用短期憑據進行模擬,或者一個賬戶鏈,需要透過該鏈獲取列表中最後一個賬戶的 `access_token`,該賬戶將在請求中被模擬。如果設定為字串,該賬戶必須授予起始賬戶 Service Account Token Creator IAM 角色。如果設定為序列,則列表中的身份必須將其 Service Account Token Creator IAM 角色授予直接前一個身份,列表中的第一個賬戶將此角色授予起始賬戶(模板化)。
- template_fields: collections.abc.Sequence[str] = ('dataset_id', 'table_id', 'project_id', 'impersonation_chain')[source]¶
- class airflow.providers.google.cloud.operators.bigquery.BigQueryUpdateDatasetOperator(*, dataset_resource, fields=None, dataset_id=None, project_id=PROVIDE_PROJECT_ID, gcp_conn_id='google_cloud_default', impersonation_chain=None, **kwargs)[source]¶
Bases:
airflow.providers.google.cloud.operators.cloud_base.GoogleCloudBaseOperator在 BigQuery 中的專案更新一個數據集。
使用
fields指定要更新的資料集欄位。如果欄位列在fields中且在資料集中為None,則會將其刪除。如果沒有提供fields,則將使用提供的dataset_resource的所有欄位。另請參閱
有關如何使用此 operator 的更多資訊,請參閱指南:更新資料集
- 引數:
dataset_id (字串 | 無) – 資料集ID。如果 dataset_reference 中包含 datasetId,則無需提供。
dataset_resource (dict[str, Any]) – 將隨請求體提供的資料集資源。 https://cloud.google.com/bigquery/docs/reference/rest/v2/datasets#resource
project_id (字串) – 我們要建立資料集的專案名稱。如果 dataset_reference 中包含 projectId,則無需提供。
gcp_conn_id (str) – (可選)用於連線到 Google Cloud 的連線 ID。
impersonation_chain (str | collections.abc.Sequence`[`str`]` | None) – 可選的服務賬號,用於使用短期憑據進行模擬,或者一個賬戶鏈,需要透過該鏈獲取列表中最後一個賬戶的 `access_token`,該賬戶將在請求中被模擬。如果設定為字串,該賬戶必須授予起始賬戶 Service Account Token Creator IAM 角色。如果設定為序列,則列表中的身份必須將其 Service Account Token Creator IAM 角色授予直接前一個身份,列表中的第一個賬戶將此角色授予起始賬戶(模板化)。
- template_fields: collections.abc.Sequence[str] = ('dataset_id', 'project_id', 'impersonation_chain')[source]¶
- class airflow.providers.google.cloud.operators.bigquery.BigQueryDeleteTableOperator(*, deletion_dataset_table, gcp_conn_id='google_cloud_default', ignore_if_missing=False, location=None, impersonation_chain=None, **kwargs)[source]¶
Bases:
airflow.providers.google.cloud.operators.cloud_base.GoogleCloudBaseOperator刪除一個 BigQuery 表。
另請參閱
有關如何使用此 operator 的更多資訊,請參閱指南:刪除表
- 引數:
deletion_dataset_table (str) – 指示要刪除的表,格式為點分隔的
(<project>.|<project>:)<dataset>.<table>。(模板化)gcp_conn_id (str) – (可選)用於連線到 Google Cloud 的連線 ID。
ignore_if_missing (bool) – 如果為 True,則即使請求的表不存在也返回成功。
location (str | None) – 用於操作的位置。
impersonation_chain (str | collections.abc.Sequence`[`str`]` | None) – 可選的服務賬號,用於使用短期憑據進行模擬,或者一個賬戶鏈,需要透過該鏈獲取列表中最後一個賬戶的 `access_token`,該賬戶將在請求中被模擬。如果設定為字串,該賬戶必須授予起始賬戶 Service Account Token Creator IAM 角色。如果設定為序列,則列表中的身份必須將其 Service Account Token Creator IAM 角色授予直接前一個身份,列表中的第一個賬戶將此角色授予起始賬戶(模板化)。
- template_fields: collections.abc.Sequence[str] = ('deletion_dataset_table', 'impersonation_chain')[source]¶
- class airflow.providers.google.cloud.operators.bigquery.BigQueryUpsertTableOperator(*, dataset_id, table_resource, project_id=PROVIDE_PROJECT_ID, gcp_conn_id='google_cloud_default', location=None, impersonation_chain=None, **kwargs)[source]¶
Bases:
airflow.providers.google.cloud.operators.cloud_base.GoogleCloudBaseOperator對 BigQuery 表進行 upsert 操作。
另請參閱
有關如何使用此 operator 的更多資訊,請參閱指南:Upsert 表
- 引數:
dataset_id (str) – 指示要更新的資料集,格式為點分隔的
(<project>.|<project>:)<dataset>。(模板化)table_resource (dict) – 一個表資源。參見 https://cloud.google.com/bigquery/docs/reference/v2/tables#resource
project_id (str) – 我們要更新資料集的專案名稱。如果在 dataset_reference 中包含 projectId,則無需提供。
gcp_conn_id (str) – (可選)用於連線到 Google Cloud 的連線 ID。
location (str | None) – 用於操作的位置。
impersonation_chain (str | collections.abc.Sequence`[`str`]` | None) – 可選的服務賬號,用於使用短期憑據進行模擬,或者一個賬戶鏈,需要透過該鏈獲取列表中最後一個賬戶的 `access_token`,該賬戶將在請求中被模擬。如果設定為字串,該賬戶必須授予起始賬戶 Service Account Token Creator IAM 角色。如果設定為序列,則列表中的身份必須將其 Service Account Token Creator IAM 角色授予直接前一個身份,列表中的第一個賬戶將此角色授予起始賬戶(模板化)。
- template_fields: collections.abc.Sequence[str] = ('dataset_id', 'table_resource', 'impersonation_chain', 'project_id')[source]¶
- class airflow.providers.google.cloud.operators.bigquery.BigQueryUpdateTableSchemaOperator(*, schema_fields_updates, dataset_id, table_id, include_policy_tags=False, project_id=PROVIDE_PROJECT_ID, gcp_conn_id='google_cloud_default', impersonation_chain=None, location=None, **kwargs)[source]¶
Bases:
airflow.providers.google.cloud.operators.cloud_base.GoogleCloudBaseOperator更新 BigQuery 表模式。
根據提供的 schema_fields_updates 引數的內容更新表 schema 中的欄位。提供的 schema 不需要完整,如果欄位已存在於 schema 中,您只需提供要更新的專案的鍵和值,只需確保設定了“name”鍵。
另請參閱
有關如何使用此 operator 的更多資訊,請參閱指南:更新表 schema
- 引數:
schema_fields_updates (list[dict[str, Any]]) –
部分模式資源。請參閱 https://cloud.google.com/bigquery/docs/reference/rest/v2/tables#TableSchema
schema_fields_updates = [ {"name": "emp_name", "description": "Some New Description"}, { "name": "salary", "policyTags": {"names": ["some_new_policy_tag"]}, }, { "name": "departments", "fields": [ {"name": "name", "description": "Some New Description"}, {"name": "type", "description": "Some New Description"}, ], }, ]
include_policy_tags (bool) – (可選) 如果設定為 True,策略標籤將包含在更新請求中,這需要特殊許可權,即使標籤未更改(預設為 False)。請參閱 https://cloud.google.com/bigquery/docs/column-level-security#roles
dataset_id (str) – 指示要更新的資料集,格式為點分隔的
(<project>.|<project>:)<dataset>。(模板化)table_id (str) – 所請求表的表 ID。(支援模板)
project_id (str) – 我們要更新資料集的專案名稱。如果在 dataset_reference 中包含 projectId,則無需提供。
gcp_conn_id (str) – (可選)用於連線到 Google Cloud 的連線 ID。
location (str | None) – 用於操作的位置。
impersonation_chain (str | collections.abc.Sequence`[`str`]` | None) – 可選的服務賬號,用於使用短期憑據進行模擬,或者一個賬戶鏈,需要透過該鏈獲取列表中最後一個賬戶的 `access_token`,該賬戶將在請求中被模擬。如果設定為字串,該賬戶必須授予起始賬戶 Service Account Token Creator IAM 角色。如果設定為序列,則列表中的身份必須將其 Service Account Token Creator IAM 角色授予直接前一個身份,列表中的第一個賬戶將此角色授予起始賬戶(模板化)。
- template_fields: collections.abc.Sequence[str] = ('schema_fields_updates', 'dataset_id', 'table_id', 'project_id', 'impersonation_chain')[source]¶
- class airflow.providers.google.cloud.operators.bigquery.BigQueryInsertJobOperator(configuration, project_id=PROVIDE_PROJECT_ID, location=None, job_id=None, force_rerun=True, reattach_states=None, gcp_conn_id='google_cloud_default', impersonation_chain=None, cancel_on_kill=True, result_retry=DEFAULT_RETRY, result_timeout=None, deferrable=conf.getboolean('operators', 'default_deferrable', fallback=False), poll_interval=4.0, **kwargs)[source]¶
基類:
airflow.providers.google.cloud.operators.cloud_base.GoogleCloudBaseOperator,airflow.providers.google.cloud.openlineage.mixins._BigQueryInsertJobOperatorOpenLineageMixin執行一個 BigQuery 作業。
等待作業完成並返回作業 ID。此 Operator 按以下方式工作:
如果
force_rerun為 True,則使用作業配置或 uuid 計算作業的唯一雜湊值。- 建立格式如下的
job_id: [提供的_job_id | airflow_{dag_id}_{task_id}_{exec_date}]_{唯一性字尾}
- 建立格式如下的
使用
job_id提交 BigQuery 作業。- 如果具有給定 ID 的作業已存在,則嘗試重新連線到該作業,前提是該作業尚未完成且其
狀態在
reattach_states中。如果作業已完成,則 Operator 將引發AirflowException。
使用
force_rerun將每次提交一個新作業,而不重新連線到已存在的作業。作業定義請參閱此處:
另請參閱
有關如何使用此 Operator 的更多資訊,請參閱指南:執行 BigQuery 作業
- 引數:
configuration (dict[str, Any]) – 配置引數直接對映到作業物件中的 BigQuery 配置欄位。有關更多詳細資訊,請參閱 https://cloud.google.com/bigquery/docs/reference/rest/v2/Job#jobconfiguration
job_id (str | None) – 作業的 ID。除非
force_rerun為 True,否則將以作業配置的雜湊值作為字尾。ID 只能包含字母(a-z、A-Z)、數字(0-9)、下劃線 (_) 或破折號 (-)。最大長度為 1,024 個字元。如果未提供,將生成 uuid。force_rerun (bool) – 如果為 True,則 Operator 將使用 uuid 的雜湊值作為作業 ID 字尾。
reattach_states (set[str] | None) – 在這些狀態下,我們應該重新連線到 BigQuery 作業。應該是非最終狀態。
project_id (str) – 作業執行所在的 Google Cloud 專案
location (str | None) – 作業執行的位置
gcp_conn_id (str) – 用於連線到 Google Cloud 的連線 ID。
impersonation_chain (str | collections.abc.Sequence`[`str`]` | None) – 可選的服務賬號,用於使用短期憑據進行模擬,或者一個賬戶鏈,需要透過該鏈獲取列表中最後一個賬戶的 `access_token`,該賬戶將在請求中被模擬。如果設定為字串,該賬戶必須授予起始賬戶 Service Account Token Creator IAM 角色。如果設定為序列,則列表中的身份必須將其 Service Account Token Creator IAM 角色授予直接前一個身份,列表中的第一個賬戶將此角色授予起始賬戶(模板化)。
cancel_on_kill (bool) – 在呼叫 on_kill 時,指示是否取消 hook 作業的標誌。
result_retry (google.api_core.retry.Retry) – 如何重試檢索行的 result 呼叫。
result_timeout (float | None) – 在使用 result_retry 之前等待 result 方法的秒數。
deferrable (bool) – 在 deferrable 模式下執行 Operator
poll_interval (float) – (僅在 deferrable 模式下) 檢查作業狀態的輪詢週期,單位為秒。預設為 4 秒。
- template_fields: collections.abc.Sequence[str] = ('configuration', 'job_id', 'impersonation_chain', 'project_id')[source]¶
- template_ext: collections.abc.Sequence[str] = ('.json', '.sql')[source]¶