airflow.providers.google.cloud.operators.bigquery

此模組包含 Google BigQuery 運算子。

屬性

BIGQUERY_JOB_DETAILS_LINK_FMT

LABEL_REGEX

BigQueryUIColors

BigQuery 運算子的十六進位制顏色。

IfExistAction

如果資源存在要採取的操作。

BigQueryCheckOperator

對 BigQuery 執行檢查。

BigQueryValueCheckOperator

使用 SQL 程式碼執行簡單的值檢查。

BigQueryIntervalCheckOperator

檢查給定為 SQL 表示式的指標值是否在舊值的容差範圍內。

BigQueryColumnCheckOperator

子類化 SQLColumnCheckOperator,以便為 OpenLineage 提供可解析的作業 ID。

BigQueryTableCheckOperator

子類化 SQLTableCheckOperator,以便為 OpenLineage 提供可解析的作業 ID。

BigQueryGetDataOperator

獲取資料並返回,可以來自 BigQuery 表,也可以是查詢作業的結果。

BigQueryCreateTableOperator

在指定的 BigQuery 資料集中建立一個新表,可選擇帶上模式。

BigQueryCreateEmptyTableOperator

在指定的 BigQuery 資料集中建立一個新表,可選擇帶上模式。

BigQueryCreateExternalTableOperator

使用來自 Google Cloud Storage 的資料建立一個新的外部表。

BigQueryDeleteDatasetOperator

從 BigQuery 中的專案刪除現有資料集。

BigQueryCreateEmptyDatasetOperator

在 BigQuery 中的專案建立一個新資料集。

BigQueryGetDatasetOperator

根據 ID 獲取指定的資料集。

BigQueryGetDatasetTablesOperator

檢索指定資料集中的表列表。

BigQueryUpdateTableOperator

在 BigQuery 中的專案更新一個表。

BigQueryUpdateDatasetOperator

在 BigQuery 中的專案更新一個數據集。

BigQueryDeleteTableOperator

刪除一個 BigQuery 表。

BigQueryUpsertTableOperator

對 BigQuery 表進行 upsert 操作。

BigQueryUpdateTableSchemaOperator

更新 BigQuery 表模式。

BigQueryInsertJobOperator

執行一個 BigQuery 作業。

模組內容

airflow.providers.google.cloud.operators.bigquery.LABEL_REGEX[source]
class airflow.providers.google.cloud.operators.bigquery.BigQueryUIColors[source]

基類: enum.Enum

BigQuery 運算子的十六進位制顏色。

CHECK = '#C0D7FF'[source]
QUERY = '#A1BBFF'[source]
TABLE = '#81A0FF'[source]
DATASET = '#5F86FF'[source]
class airflow.providers.google.cloud.operators.bigquery.IfExistAction[source]

基類: enum.Enum

如果資源存在要採取的操作。

IGNORE = 'ignore'[source]
LOG = 'log'[source]
FAIL = 'fail'[source]
SKIP = 'skip'[source]
class airflow.providers.google.cloud.operators.bigquery.BigQueryCheckOperator(*, sql, gcp_conn_id='google_cloud_default', use_legacy_sql=True, location=None, impersonation_chain=None, labels=None, encryption_configuration=None, deferrable=conf.getboolean('operators', 'default_deferrable', fallback=False), poll_interval=4.0, query_params=None, **kwargs)[source]

基類: _BigQueryDbHookMixin, airflow.providers.common.sql.operators.sql.SQLCheckOperator, _BigQueryOperatorsEncryptionConfigurationMixin

對 BigQuery 執行檢查。

此運算子需要一個返回單行的 SQL 查詢。該行上的每個值都將使用 Python bool 強制轉換進行評估。如果任一值為假值,檢查將出錯。

另請參閱

有關如何使用此運算子的更多資訊,請參閱指南: 檢查查詢結果是否有資料

請注意,Python bool 強制轉換將以下內容評估為 False

  • False

  • 0

  • 空字串 ("")

  • 空列表 ([])

  • 空字典或集合 ({})

給定一個類似 SELECT COUNT(*) FROM foo 的查詢,只有當計數等於零時才會失敗。您可以構建更復雜的查詢,例如檢查表是否與上游源表具有相同的行數,或者今天分割槽的計數是否大於昨天分割槽的計數,或者一組指標是否小於 7 天平均值的三倍標準差。

此運算子可用作管道中的資料質量檢查。根據您在 DAG 中的位置,您可以選擇阻止關鍵路徑,防止釋出可疑資料,或者將其放在旁邊並接收電子郵件警報而不停止 DAG 的進度。

引數:
  • sql (str) – 要執行的 SQL。

  • gcp_conn_id (str) – Google Cloud 的連線 ID。

  • use_legacy_sql (bool) – 是否使用傳統 SQL (true) 或標準 SQL (false)。

  • location (str | None) – 作業的地理位置。詳情請參見: https://cloud.google.com/bigquery/docs/locations#specifying_your_location

  • impersonation_chain (str | collections.abc.Sequence[str] | None) – 可選的服務賬號,用於使用短期憑據模擬,或者獲取列表中最後一個賬號(將在請求中被模擬)的訪問令牌所需的鏈式賬號列表。如果設定為字串,該賬號必須授予發起賬號 Service Account Token Creator IAM 角色。如果設定為序列,列表中的身份必須授予緊前身份 Service Account Token Creator IAM 角色,列表中第一個賬號將此角色授予發起賬號。(模板化)

  • labels (dict | None) – 包含表標籤的字典,傳遞給 BigQuery。

  • encryption_configuration (dict | None) –

    (可選)自定義加密配置(例如,Cloud KMS 金鑰)。

    encryption_configuration = {
        "kmsKeyName": "projects/PROJECT/locations/LOCATION/keyRings/KEY_RING/cryptoKeys/KEY",
    }
    

  • deferrable (bool) – 以可延遲模式執行運算子。

  • poll_interval (float) – (僅限可延遲模式)檢查作業狀態的輪詢週期(秒)。

  • query_params (list | None) – 包含查詢引數型別和值的字典列表,傳遞給 BigQuery。字典的結構應類似於 Google BigQuery Jobs API 中的“queryParameters”:https://cloud.google.com/bigquery/docs/reference/rest/v2/jobs。例如,[{ ‘name’: ‘corpus’, ‘parameterType’: { ‘type’: ‘STRING’ }, ‘parameterValue’: { ‘value’: ‘romeoandjuliet’ } }]。(模板化)

template_fields: collections.abc.Sequence[str] = ('sql', 'gcp_conn_id', 'impersonation_chain', 'labels', 'query_params')[source]
template_ext: collections.abc.Sequence[str] = ('.sql',)[source]
ui_color = '#C0D7FF'[source]
conn_id_field = 'gcp_conn_id'[source]
gcp_conn_id = 'google_cloud_default'[source]
use_legacy_sql = True[source]
location = None[source]
impersonation_chain = None[source]
labels = None[source]
encryption_configuration = None[source]
deferrable = True[source]
poll_interval = 4.0[source]
query_params = None[source]
execute(context)[source]

建立運算子時派生。

Context 與渲染 jinja 模板時使用的字典相同。

有關更多上下文,請參閱 get_template_context。

execute_complete(context, event)[source]

在觸發器觸發時作為回撥函式執行。

此方法立即返回。它依賴於觸發器丟擲異常,否則假定執行成功。

class airflow.providers.google.cloud.operators.bigquery.BigQueryValueCheckOperator(*, sql, pass_value, tolerance=None, encryption_configuration=None, gcp_conn_id='google_cloud_default', use_legacy_sql=True, location=None, impersonation_chain=None, labels=None, deferrable=conf.getboolean('operators', 'default_deferrable', fallback=False), poll_interval=4.0, **kwargs)[source]

基類: _BigQueryDbHookMixin, airflow.providers.common.sql.operators.sql.SQLValueCheckOperator, _BigQueryOperatorsEncryptionConfigurationMixin

使用 SQL 程式碼執行簡單的值檢查。

另請參閱

有關如何使用此運算子的更多資訊,請參閱指南: 將查詢結果與透過值進行比較

引數:
  • sql (str) – 要執行的 SQL。

  • use_legacy_sql (bool) – 是否使用傳統 SQL (true) 或標準 SQL (false)。

  • encryption_configuration (dict | None) –

    (可選)自定義加密配置(例如,Cloud KMS 金鑰)。

    encryption_configuration = {
        "kmsKeyName": "projects/PROJECT/locations/LOCATION/keyRings/KEY_RING/cryptoKeys/KEY",
    }
    

  • gcp_conn_id (str) – (可選)用於連線到 Google Cloud 的連線 ID。

  • location (str | None) – 作業的地理位置。詳情請參見: https://cloud.google.com/bigquery/docs/locations#specifying_your_location

  • impersonation_chain (str | collections.abc.Sequence[str] | None) – 可選的服務賬號,用於使用短期憑據模擬,或者獲取列表中最後一個賬號(將在請求中被模擬)的訪問令牌所需的鏈式賬號列表。如果設定為字串,該賬號必須授予發起賬號 Service Account Token Creator IAM 角色。如果設定為序列,列表中的身份必須授予緊前身份 Service Account Token Creator IAM 角色,列表中第一個賬號將此角色授予發起賬號。(模板化)

  • labels (dict | None) – 包含表標籤的字典,傳遞給 BigQuery。

  • deferrable (bool) – 以可延遲模式執行運算子。

  • poll_interval (float) – (僅限可延遲模式)檢查作業狀態的輪詢週期(秒)。

template_fields: collections.abc.Sequence[str] = ('sql', 'gcp_conn_id', 'pass_value', 'impersonation_chain', 'labels')[source]
template_ext: collections.abc.Sequence[str] = ('.sql',)[source]
ui_color = '#C0D7FF'[source]
conn_id_field = 'gcp_conn_id'[source]
location = None[source]
gcp_conn_id = 'google_cloud_default'[source]
use_legacy_sql = True[source]
encryption_configuration = None[source]
impersonation_chain = None[source]
labels = None[source]
deferrable = True[source]
poll_interval = 4.0[source]
execute(context)[source]

建立運算子時派生。

Context 與渲染 jinja 模板時使用的字典相同。

有關更多上下文,請參閱 get_template_context。

execute_complete(context, event)[source]

在觸發器觸發時作為回撥函式執行。

此方法立即返回。它依賴於觸發器丟擲異常,否則假定執行成功。

class airflow.providers.google.cloud.operators.bigquery.BigQueryIntervalCheckOperator(*, table, metrics_thresholds, date_filter_column='ds', days_back=-7, gcp_conn_id='google_cloud_default', use_legacy_sql=True, location=None, encryption_configuration=None, impersonation_chain=None, labels=None, deferrable=conf.getboolean('operators', 'default_deferrable', fallback=False), poll_interval=4.0, project_id=PROVIDE_PROJECT_ID, **kwargs)[源]

基類:_BigQueryDbHookMixinairflow.providers.common.sql.operators.sql.SQLIntervalCheckOperator_BigQueryOperatorsEncryptionConfigurationMixin

檢查給定為 SQL 表示式的指標值是否在舊值的容差範圍內。

此方法構建查詢如下

SELECT {metrics_threshold_dict_key} FROM {table}
WHERE {date_filter_column}=<date>

另請參閱

有關如何使用此 Operator 的更多資訊,請參閱指南:比較隨時間變化的指標

引數:
  • table (str) – 表名

  • days_back (SupportsAbs`[`int`]` ) – `ds` 與要對照檢查的 `ds` 之間的天數。預設為 7 天

  • metrics_thresholds (dict) – 按指標索引的比例字典,例如 `'COUNT(*)'`: `1.5` 要求當前日期與前 `days_back` 天之間的差異小於或等於 50%。

  • use_legacy_sql (bool) – 是否使用傳統 SQL (true) 或標準 SQL (false)。

  • encryption_configuration (dict | None) –

    (可選)自定義加密配置(例如,Cloud KMS 金鑰)。

    encryption_configuration = {
        "kmsKeyName": "projects/PROJECT/locations/LOCATION/keyRings/KEY_RING/cryptoKeys/KEY",
    }
    

  • gcp_conn_id (str) – (可選)用於連線到 Google Cloud 的連線 ID。

  • location (str | None) – 作業的地理位置。詳情請參見: https://cloud.google.com/bigquery/docs/locations#specifying_your_location

  • impersonation_chain (str | collections.abc.Sequence`[`str`]` | None) – 可選的服務賬號,用於使用短期憑據進行模擬,或者一個賬戶鏈,需要透過該鏈獲取列表中最後一個賬戶的 `access_token`,該賬戶將在請求中被模擬。如果設定為字串,該賬戶必須授予起始賬戶 Service Account Token Creator IAM 角色。如果設定為序列,則列表中的身份必須將其 Service Account Token Creator IAM 角色授予直接前一個身份,列表中的第一個賬戶將此角色授予起始賬戶(模板化)。

  • labels (dict | None) – 包含表標籤的字典,傳遞給 BigQuery

  • deferrable (bool) – 在 deferrable 模式下執行 Operator

  • poll_interval (float) – (僅在 deferrable 模式下) 檢查作業狀態的輪詢週期,單位為秒。預設為 4 秒。

  • project_id (str) – 表示 BigQuery 專案ID 的字串

template_fields: collections.abc.Sequence[str] = ('table', 'gcp_conn_id', 'sql1', 'sql2', 'impersonation_chain', 'labels')[源]
ui_color = '#C0D7FF'[源]
conn_id_field = 'gcp_conn_id'[源]
gcp_conn_id = 'google_cloud_default'[源]
use_legacy_sql = True[源]
location = None[源]
encryption_configuration = None[源]
impersonation_chain = None[源]
labels = None[源]
project_id = None[源]
deferrable = True[源]
poll_interval = 4.0[源]
execute(context)[源]

建立運算子時派生。

Context 與渲染 jinja 模板時使用的字典相同。

有關更多上下文,請參閱 get_template_context。

execute_complete(context, event)[源]

在觸發器觸發時作為回撥函式執行。

此方法立即返回。它依賴於觸發器丟擲異常,否則假定執行成功。

class airflow.providers.google.cloud.operators.bigquery.BigQueryColumnCheckOperator(*, table, column_mapping, partition_clause=None, database=None, accept_none=True, encryption_configuration=None, gcp_conn_id='google_cloud_default', use_legacy_sql=True, location=None, impersonation_chain=None, labels=None, **kwargs)[源]

基類:_BigQueryDbHookMixinairflow.providers.common.sql.operators.sql.SQLColumnCheckOperator_BigQueryOperatorsEncryptionConfigurationMixin

子類化 SQLColumnCheckOperator,以便為 OpenLineage 提供可解析的作業 ID。

用法請參閱基類文件字串。

另請參閱

有關如何使用此 Operator 的更多資訊,請參閱指南:使用預定義測試檢查列

引數:
  • table (str) – 表名

  • column_mapping (dict) – 一個字典,關聯列與其檢查規則

  • partition_clause (str | None) – 一個字串 SQL 語句,新增到 WHERE 子句以分割槽資料

  • gcp_conn_id (str) – (可選)用於連線到 Google Cloud 的連線 ID。

  • encryption_configuration (dict | None) –

    (可選)自定義加密配置(例如,Cloud KMS 金鑰)。

    encryption_configuration = {
        "kmsKeyName": "projects/PROJECT/locations/LOCATION/keyRings/KEY_RING/cryptoKeys/KEY",
    }
    

  • use_legacy_sql (bool) – 是否使用傳統 SQL (true) 或標準 SQL (false)。

  • location (str | None) – 作業的地理位置。詳情請參見: https://cloud.google.com/bigquery/docs/locations#specifying_your_location

  • impersonation_chain (str | collections.abc.Sequence`[`str`]` | None) – 可選的服務賬號,用於使用短期憑據進行模擬,或者一個賬戶鏈,需要透過該鏈獲取列表中最後一個賬戶的 `access_token`,該賬戶將在請求中被模擬。如果設定為字串,該賬戶必須授予起始賬戶 Service Account Token Creator IAM 角色。如果設定為序列,則列表中的身份必須將其 Service Account Token Creator IAM 角色授予直接前一個身份,列表中的第一個賬戶將此角色授予起始賬戶(模板化)。

  • labels (dict | None) – 包含表標籤的字典,傳遞給 BigQuery

template_fields: collections.abc.Sequence[str][源]
conn_id_field = 'gcp_conn_id'[源]
table[源]
column_mapping[源]
partition_clause = None[源]
database = None[源]
accept_none = True[源]
gcp_conn_id = 'google_cloud_default'[源]
encryption_configuration = None[源]
use_legacy_sql = True[源]
location = None[源]
impersonation_chain = None[源]
labels = None[源]
execute(context=None)[源]

對給定的列執行檢查。

class airflow.providers.google.cloud.operators.bigquery.BigQueryTableCheckOperator(*, table, checks, partition_clause=None, gcp_conn_id='google_cloud_default', use_legacy_sql=True, location=None, impersonation_chain=None, labels=None, encryption_configuration=None, **kwargs)[源]

基類:_BigQueryDbHookMixinairflow.providers.common.sql.operators.sql.SQLTableCheckOperator_BigQueryOperatorsEncryptionConfigurationMixin

子類化 SQLTableCheckOperator,以便為 OpenLineage 提供可解析的作業 ID。

用法請參閱基類。

另請參閱

有關如何使用此 Operator 的更多資訊,請參閱指南:檢查表級別的資料質量

引數:
  • table (str) – 表名

  • checks (dict) – 一個字典,包含檢查名稱和布林型 SQL 語句

  • partition_clause (str | None) – 一個字串 SQL 語句,新增到 WHERE 子句以分割槽資料

  • gcp_conn_id (str) – (可選)用於連線到 Google Cloud 的連線 ID。

  • use_legacy_sql (bool) – 是否使用傳統 SQL (true) 或標準 SQL (false)。

  • location (str | None) – 作業的地理位置。詳情請參見: https://cloud.google.com/bigquery/docs/locations#specifying_your_location

  • impersonation_chain (str | collections.abc.Sequence`[`str`]` | None) – 可選的服務賬號,用於使用短期憑據進行模擬,或者一個賬戶鏈,需要透過該鏈獲取列表中最後一個賬戶的 `access_token`,該賬戶將在請求中被模擬。如果設定為字串,該賬戶必須授予起始賬戶 Service Account Token Creator IAM 角色。如果設定為序列,則列表中的身份必須將其 Service Account Token Creator IAM 角色授予直接前一個身份,列表中的第一個賬戶將此角色授予起始賬戶(模板化)。

  • labels (dict | None) – 包含表標籤的字典,傳遞給 BigQuery

  • encryption_configuration (dict | None) –

    (可選)自定義加密配置(例如,Cloud KMS 金鑰)。

    encryption_configuration = {
        "kmsKeyName": "projects/PROJECT/locations/LOCATION/keyRings/KEY_RING/cryptoKeys/KEY",
    }
    

template_fields: collections.abc.Sequence[str][源]
conn_id_field = 'gcp_conn_id'[源]
gcp_conn_id = 'google_cloud_default'[源]
use_legacy_sql = True[源]
location = None[源]
impersonation_chain = None[源]
labels = None[源]
encryption_configuration = None[源]
execute(context=None)[源]

對錶執行給定的檢查。

class airflow.providers.google.cloud.operators.bigquery.BigQueryGetDataOperator(*, dataset_id=None, table_id=None, table_project_id=None, job_id=None, job_project_id=None, project_id=PROVIDE_PROJECT_ID, max_results=100, selected_fields=None, gcp_conn_id='google_cloud_default', location=None, encryption_configuration=None, impersonation_chain=None, deferrable=conf.getboolean('operators', 'default_deferrable', fallback=False), poll_interval=4.0, as_dict=False, use_legacy_sql=True, **kwargs)[source]

Bases: airflow.providers.google.cloud.operators.cloud_base.GoogleCloudBaseOperator, _BigQueryOperatorsEncryptionConfigurationMixin

獲取資料並返回,可以來自 BigQuery 表,也可以是查詢作業的結果。

可以按特定列縮小資料範圍,或整體檢索資料。根據“as_dict”的值,資料以以下兩種格式之一返回:1. False(預設)- 一個 Python 列表的列表,巢狀列表的數量等於獲取的行數。每個巢狀列表代表一行,其中的元素對應於該行的列值。

示例結果: [['Tony', 10], ['Mike', 20]

2. True - 一個 Python 字典列表,其中每個字典代表一行。在每個字典中,鍵是列名,值是這些列對應的數值。

示例結果: [{'name': 'Tony', 'age': 10}, {'name': 'Mike', 'age': 20}]

另請參閱

有關如何使用此運算子的更多資訊,請參閱指南:從表中獲取資料

注意

如果您在 selected_fields 中傳遞的欄位順序與 BQ 表格/作業中現有列的順序不同,資料仍將按照 BQ 表格的順序排列。例如,如果 BQ 表格有三列,順序為 [A,B,C],而您在 selected_fields 中傳遞 'B,A',資料仍然會是 'A,B' 的形式。

注意

在非可延遲模式下使用作業 ID 時,作業應處於 DONE(完成)狀態。

示例 - 使用表格從 BigQuery 檢索資料:

get_data = BigQueryGetDataOperator(
    task_id="get_data_from_bq",
    dataset_id="test_dataset",
    table_id="Transaction_partitions",
    table_project_id="internal-gcp-project",
    max_results=100,
    selected_fields="DATE",
    gcp_conn_id="airflow-conn-id",
)

示例 - 使用作業 ID 從 BigQuery 檢索資料:

get_data = BigQueryGetDataOperator(
    job_id="airflow_8999918812727394_86a1cecc69c5e3028d28247affd7563",
    job_project_id="internal-gcp-project",
    max_results=100,
    selected_fields="DATE",
    gcp_conn_id="airflow-conn-id",
)
引數:
  • dataset_id (str | None) – 請求的表格的資料集 ID。(模板化)

  • table_id (str | None) – 請求的表格的表格 ID。與 job_id 互斥。(模板化)

  • table_project_id (str | None) – (可選)請求的表格的專案 ID。如果為 None,將從 hook 的專案 ID 派生。(模板化)

  • job_id (str | None) – 從中檢索查詢結果的作業 ID。與 table_id 互斥。(模板化)

  • job_project_id (str | None) – (可選)作業執行所在的 Google Cloud 專案。如果為 None,將從 hook 的專案 ID 派生。(模板化)

  • project_id (str) – (已棄用)(可選)將從中返回資料的專案名稱。如果為 None,將從 hook 的專案 ID 派生。(模板化)

  • max_results (int) – 從表中獲取的最大記錄數(行數)。(模板化)

  • selected_fields (str | None) – 要返回的欄位列表(逗號分隔)。如果未指定,則返回所有欄位。

  • encryption_configuration (dict | None) –

    (可選)自定義加密配置(例如,Cloud KMS 金鑰)。

    encryption_configuration = {
        "kmsKeyName": "projects/PROJECT/locations/LOCATION/keyRings/KEY_RING/cryptoKeys/KEY",
    }
    

  • gcp_conn_id (str) – (可選)用於連線到 Google Cloud 的連線 ID。

  • location (str | None) – 用於操作的位置。

  • impersonation_chain (str | collections.abc.Sequence`[`str`]` | None) – 可選的服務賬號,用於使用短期憑據進行模擬,或者一個賬戶鏈,需要透過該鏈獲取列表中最後一個賬戶的 `access_token`,該賬戶將在請求中被模擬。如果設定為字串,該賬戶必須授予起始賬戶 Service Account Token Creator IAM 角色。如果設定為序列,則列表中的身份必須將其 Service Account Token Creator IAM 角色授予直接前一個身份,列表中的第一個賬戶將此角色授予起始賬戶(模板化)。

  • deferrable (bool) – 在 deferrable 模式下執行 Operator

  • poll_interval (float) – (僅在 deferrable 模式下) 檢查作業狀態的輪詢週期,單位為秒。預設為 4 秒。

  • as_dict (bool) – 如果為 True,則結果返回為字典列表;否則返回為列表的列表(預設:False)。

  • use_legacy_sql (bool) – 是否使用傳統 SQL (true) 或標準 SQL (false)。

template_fields: collections.abc.Sequence[str] = ('dataset_id', 'table_id', 'table_project_id', 'job_id', 'job_project_id', 'project_id',...[source]
ui_color = '#A1BBFF'[source]
table_project_id = None[source]
dataset_id = None[source]
table_id = None[source]
job_project_id = None[source]
job_id = None[source]
max_results = 100[source]
selected_fields = None[source]
gcp_conn_id = 'google_cloud_default'[source]
location = None[source]
impersonation_chain = None[source]
encryption_configuration = None[source]
project_id = None[source]
deferrable = True[source]
poll_interval = 4.0[source]
as_dict = False[source]
use_legacy_sql = True[source]
generate_query(hook)[source]

為給定的資料集和表 ID 生成 SELECT 查詢。

execute(context)[source]

建立運算子時派生。

Context 與渲染 jinja 模板時使用的字典相同。

有關更多上下文,請參閱 get_template_context。

execute_complete(context, event)[source]

在觸發器觸發時作為回撥函式執行。

此方法立即返回。它依賴於觸發器丟擲異常,否則假定執行成功。

class airflow.providers.google.cloud.operators.bigquery.BigQueryCreateTableOperator(*, dataset_id, table_id, table_resource, project_id=PROVIDE_PROJECT_ID, location=None, gcs_schema_object=None, gcp_conn_id='google_cloud_default', google_cloud_storage_conn_id='google_cloud_default', impersonation_chain=None, if_exists='log', retry=DEFAULT, timeout=None, **kwargs)[source]

Bases: airflow.providers.google.cloud.operators.cloud_base.GoogleCloudBaseOperator

在指定的 BigQuery 資料集中建立一個新表,可選擇帶上模式。

用於 BigQuery 表的 schema 可以透過兩種方式指定。您可以直接傳入 schema 欄位,或者指向一個 Google Cloud Storage 物件名稱。Google Cloud Storage 中的物件必須是一個包含 schema 欄位的 JSON 檔案。您也可以在沒有 schema 的情況下建立表。

另請參閱

有關如何使用此運算子的更多資訊,請參閱指南:建立表

引數:
  • project_id (str) – 可選。要在其中建立表的專案。

  • dataset_id (str) – 必需。要在其中建立表的資料集。

  • table_id (str) – 必需。要建立的表的名稱。

  • table_resource (dict[str, Any] | google.cloud.bigquery.table.Table | google.cloud.bigquery.table.TableReference | google.cloud.bigquery.table.TableListItem) – 必需。表資源,如文件所述:https://cloud.google.com/bigquery/docs/reference/rest/v2/tables#Table 如果 table 是引用,則將建立一個具有指定 ID 的空表。表所屬的資料集必須已經存在。

  • if_exists (str) – 可選。如果表存在,Airflow 應該做什麼。如果設定為 log,則 TI 將成功,並記錄一條錯誤訊息。設定為 ignore 以忽略錯誤,設定為 fail 以使 TI 失敗,設定為 skip 以跳過。

  • gcs_schema_object (str | None) – 可選。包含 schema 的 JSON 檔案的完整路徑。例如:gs://test-bucket/dir1/dir2/employee_schema.json

  • gcp_conn_id (str) – 可選。用於連線到 Google Cloud 並與 Bigquery 服務互動的連線 ID。

  • google_cloud_storage_conn_id (str) – 可選。用於連線到 Google Cloud 並與 Google Cloud Storage 服務互動的連線 ID。

  • location (str | None) – 可選。用於操作的位置。

  • retry (google.api_core.retry.Retry | google.api_core.gapic_v1.method._MethodDefault) – 可選。用於重試請求的重試物件。如果指定 None,則不會重試請求。

  • timeout (float | None) – 可選。等待請求完成的時間(秒)。請注意,如果指定了 retry,則超時應用於每個單獨的嘗試。

  • impersonation_chain (str | collections.abc.Sequence[str] | None) – 可選。使用短期憑據模擬的服務賬號,或獲取列表中最後一個賬號的 access_token 所需的賬號鏈,該賬號將在請求中被模擬。如果設定為字串,該賬號必須授予原始賬號 Service Account Token Creator IAM 角色。如果設定為序列,列表中的身份必須授予直接前一個身份 Service Account Token Creator IAM 角色,列表中第一個賬號將此角色授予原始賬號。

template_fields: collections.abc.Sequence[str] = ('dataset_id', 'table_id', 'table_resource', 'project_id', 'gcs_schema_object', 'impersonation_chain')[source]
template_fields_renderers[source]
ui_color = '#81A0FF'[source]
project_id = None[source]
location = None[source]
dataset_id[source]
table_id[source]
table_resource[source]
if_exists[source]
gcs_schema_object = None[source]
gcp_conn_id = 'google_cloud_default'[source]
google_cloud_storage_conn_id = 'google_cloud_default'[source]
impersonation_chain = None[source]
retry[source]
timeout = None[source]
execute(context)[source]

建立運算子時派生。

Context 與渲染 jinja 模板時使用的字典相同。

有關更多上下文,請參閱 get_template_context。

get_openlineage_facets_on_complete(_)[source]

實現 _on_complete,因為我們將使用 create 方法返回的表資源。

airflow.providers.google.cloud.operators.bigquery.BigQueryCreateEmptyTableOperator(*, dataset_id, table_id, table_resource=None, project_id=PROVIDE_PROJECT_ID, schema_fields=None, gcs_schema_object=None, time_partitioning=None, gcp_conn_id='google_cloud_default', google_cloud_storage_conn_id='google_cloud_default', labels=None, view=None, materialized_view=None, encryption_configuration=None, location=None, cluster_fields=None, impersonation_chain=None, if_exists='log', bigquery_conn_id=None, exists_ok=None, **kwargs)[source]

Bases: airflow.providers.google.cloud.operators.cloud_base.GoogleCloudBaseOperator

在指定的 BigQuery 資料集中建立一個新表,可選擇帶上模式。

用於 BigQuery 表的 schema 可以透過兩種方式指定。您可以直接傳入 schema 欄位,或者指向一個 Google Cloud Storage 物件名稱。Google Cloud Storage 中的物件必須是一個包含 schema 欄位的 JSON 檔案。您也可以在沒有 schema 的情況下建立表。

另請參閱

有關如何使用此 operator 的更多資訊,請參閱指南:Create native table

引數:
  • project_id (str) – 要建立表的專案。(templated)

  • dataset_id (str) – 要在其中建立表的資料集。(templated)

  • table_id (str) – 要建立的表的名稱。(templated)

  • table_resource (dict[str, Any] | None) – 文件中描述的表資源:https://cloud.google.com/bigquery/docs/reference/rest/v2/tables#Table 如果提供,則所有其他引數將被忽略。(templated)

  • schema_fields (list | None) –

    如果設定,則為此處定義的 schema 欄位列表:https://cloud.google.com/bigquery/docs/reference/rest/v2/jobs#configuration.load.schema

    示例:

    schema_fields = [
        {"name": "emp_name", "type": "STRING", "mode": "REQUIRED"},
        {"name": "salary", "type": "INTEGER", "mode": "NULLABLE"},
    ]
    

  • gcs_schema_object (str | None) – 包含 schema 的 JSON 檔案的完整路徑 (templated)。例如:gs://test-bucket/dir1/dir2/employee_schema.json

  • time_partitioning (dict | None) –

    根據 API 規範配置可選的時間分割槽欄位,即按欄位、型別和過期時間進行分割槽。

  • gcp_conn_id (str) – (可選) 用於連線到 Google Cloud 並與 Bigquery 服務互動的連線 ID。

  • google_cloud_storage_conn_id (str) – (可選) 用於連線到 Google Cloud 並與 Google Cloud Storage 服務互動的連線 ID。

  • labels (dict | None) – 包含表標籤的字典,傳遞給 BigQuery

示例 (schema JSON 在 GCS 中):

CreateTable = BigQueryCreateEmptyTableOperator(
    task_id="BigQueryCreateEmptyTableOperator_task",
    dataset_id="ODS",
    table_id="Employees",
    project_id="internal-gcp-project",
    gcs_schema_object="gs://schema-bucket/employee_schema.json",
    gcp_conn_id="airflow-conn-id",
    google_cloud_storage_conn_id="airflow-conn-id",
)

對應的 Schema 檔案 (employee_schema.json)

[
    {"mode": "NULLABLE", "name": "emp_name", "type": "STRING"},
    {"mode": "REQUIRED", "name": "salary", "type": "INTEGER"},
]

示例 (schema 在 DAG 中):

CreateTable = BigQueryCreateEmptyTableOperator(
    task_id="BigQueryCreateEmptyTableOperator_task",
    dataset_id="ODS",
    table_id="Employees",
    project_id="internal-gcp-project",
    schema_fields=[
        {"name": "emp_name", "type": "STRING", "mode": "REQUIRED"},
        {"name": "salary", "type": "INTEGER", "mode": "NULLABLE"},
    ],
    gcp_conn_id="airflow-conn-id-account",
    google_cloud_storage_conn_id="airflow-conn-id",
)
引數:
  • view (dict | None) –

    (可選) 包含 view 定義的字典。如果設定,它將建立一個 view 而不是表。

  • materialized_view (dict | None) – (可選) 具化檢視 (materialized view) 定義。

  • encryption_configuration (dict | None) –

    (可選)自定義加密配置(例如,Cloud KMS 金鑰)。

    encryption_configuration = {
        "kmsKeyName": "projects/PROJECT/locations/LOCATION/keyRings/KEY_RING/cryptoKeys/KEY",
    }
    

  • location (str | None) – 用於操作的位置。

  • cluster_fields (list[str] | None) –

    (可選) 用於聚簇 (clustering) 的欄位。BigQuery 支援對分割槽表和非分割槽表進行聚簇。

  • impersonation_chain (str | collections.abc.Sequence`[`str`]` | None) – 可選的服務賬號,用於使用短期憑據進行模擬,或者一個賬戶鏈,需要透過該鏈獲取列表中最後一個賬戶的 `access_token`,該賬戶將在請求中被模擬。如果設定為字串,該賬戶必須授予起始賬戶 Service Account Token Creator IAM 角色。如果設定為序列,則列表中的身份必須將其 Service Account Token Creator IAM 角色授予直接前一個身份,列表中的第一個賬戶將此角色授予起始賬戶(模板化)。

  • if_exists (str) – 如果表已存在,Airflow 應該怎麼做。如果設定為 log,則 TI 將成功,並記錄錯誤訊息。設定為 ignore 則忽略錯誤,設定為 fail 則使 TI 失敗,設定為 skip 則跳過。

  • exists_ok (bool | None) – 已棄用 - 請改用 if_exists=”ignore”

template_fields: collections.abc.Sequence[str] = ('dataset_id', 'table_id', 'table_resource', 'project_id', 'gcs_schema_object', 'labels',...[source]
template_fields_renderers[source]
ui_color = '#81A0FF'[source]
project_id = None[source]
dataset_id[source]
table_id[source]
schema_fields = None[source]
gcs_schema_object = None[source]
gcp_conn_id = 'google_cloud_default'[source]
google_cloud_storage_conn_id = 'google_cloud_default'[source]
time_partitioning[source]
labels = None[source]
view = None[source]
materialized_view = None[source]
encryption_configuration : dict | None = None[source]
location = None[source]
cluster_fields = None[source]
table_resource = None[source]
impersonation_chain = None[source]
execute(context)[source]

建立運算子時派生。

Context 與渲染 jinja 模板時使用的字典相同。

有關更多上下文,請參閱 get_template_context。

get_openlineage_facets_on_complete(_)[source]

實現 _on_complete,因為我們將使用 create 方法返回的表資源。

airflow.providers.google.cloud.operators.bigquery.BigQueryCreateExternalTableOperator(*, bucket=None, source_objects=None, destination_project_dataset_table=None, table_resource=None, schema_fields=None, schema_object=None, gcs_schema_bucket=None, source_format=None, autodetect=False, compression=None, skip_leading_rows=None, field_delimiter=None, max_bad_records=0, quote_character=None, allow_quoted_newlines=False, allow_jagged_rows=False, gcp_conn_id='google_cloud_default', google_cloud_storage_conn_id='google_cloud_default', src_fmt_configs=None, labels=None, encryption_configuration=None, location=None, impersonation_chain=None, bigquery_conn_id=None, **kwargs)[source]

Bases: airflow.providers.google.cloud.operators.cloud_base.GoogleCloudBaseOperator

使用來自 Google Cloud Storage 的資料建立一個新的外部表。

要用於 BigQuery 表的 schema 可以透過兩種方式指定。您可以直接傳入 schema 欄位,或者將 operator 指向 Google Cloud Storage 物件名稱。Google Cloud Storage 中的物件必須是包含 schema 欄位的 JSON 檔案。

另請參閱

有關如何使用此 operator 的更多資訊,請參閱指南:Create external table

引數:
  • bucket (str | None) – 外部表指向的儲存桶 (bucket)。(templated)

  • source_objects (list[str] | None) – 外部表指向的 Google Cloud Storage URI 列表。如果 source_format 是 ‘DATASTORE_BACKUP’,則列表必須只包含一個 URI。

  • destination_project_dataset_table (str | None) – 要載入資料到的以點分隔的 (<project>.)<dataset>.<table> BigQuery 表 (templated)。如果未包含 <project>,則專案將是連線 json 中定義的專案。

  • schema_fields (list | None) –

    如果設定,則為此處定義的 schema 欄位列表:https://cloud.google.com/bigquery/docs/reference/rest/v2/jobs#configuration.load.schema

    示例:

    schema_fields = [
        {"name": "emp_name", "type": "STRING", "mode": "REQUIRED"},
        {"name": "salary", "type": "INTEGER", "mode": "NULLABLE"},
    ]
    

    當 source_format 是 ‘DATASTORE_BACKUP’ 時不應設定此項。

  • table_resource (dict[str, Any] | None) – 文件中描述的表資源:https://cloud.google.com/bigquery/docs/reference/rest/v2/tables#Table 如果提供,則所有其他引數將被忽略。來自物件的外部 schema 將被解析。

  • schema_object (str | None) – 如果設定,則為指向包含表 schema 的 .json 檔案的 GCS 物件路徑。(templated)

  • gcs_schema_bucket (str | None) – 儲存 schema JSON 的 GCS 儲存桶名稱 (templated)。預設值是 self.bucket。

  • source_format (str | None) – 資料的檔案格式。

  • autodetect (bool) – 嘗試自動檢測 schema 和格式選項。明確指定 schema_fields 和 schema_object 選項時,將優先使用這些選項。https://cloud.google.com/bigquery/docs/schema-detect#schema_auto-detection_for_external_data_sources

  • compression (str | None) – (可選) 資料來源的壓縮型別。可能的值包括 GZIP 和 NONE。預設值為 NONE。Google Cloud Bigtable、Google Cloud Datastore 備份和 Avro 格式會忽略此設定。

  • skip_leading_rows (int | None) – 從 CSV 載入時要跳過的行數。

  • field_delimiter (str | None) – CSV 中使用的分隔符。

  • max_bad_records (int) – BigQuery 在執行作業時可以忽略的最大錯誤記錄數。

  • quote_character (str | None) – 用於引用 CSV 檔案中資料部分的字元。

  • allow_quoted_newlines (bool) – 是否允許帶引號的換行符 (true) 或不允許 (false)。

  • allow_jagged_rows (bool) – 接受缺少尾部可選列的行。缺失的值被視為 null。如果為 false,則缺少尾部列的記錄被視為錯誤記錄,如果錯誤記錄過多,則在作業結果中返回無效錯誤。僅適用於 CSV,對其他格式無效。

  • gcp_conn_id (str) – (可選) 用於連線到 Google Cloud 並與 Bigquery 服務互動的連線 ID。

  • google_cloud_storage_conn_id (str) – (可選) 用於連線到 Google Cloud 並與 Google Cloud Storage 服務互動的連線 ID。

  • src_fmt_configs (dict | None) – 配置特定於源格式的可選欄位。

  • labels (dict | None) – 包含表標籤的字典,傳遞給 BigQuery

  • encryption_configuration (dict | None) –

    (可選)自定義加密配置(例如,Cloud KMS 金鑰)。

    encryption_configuration = {
        "kmsKeyName": "projects/PROJECT/locations/LOCATION/keyRings/KEY_RING/cryptoKeys/KEY",
    }
    

  • location (str | None) – 用於操作的位置。

  • impersonation_chain (str | collections.abc.Sequence`[`str`]` | None) – 可選的服務賬號,用於使用短期憑據進行模擬,或者一個賬戶鏈,需要透過該鏈獲取列表中最後一個賬戶的 `access_token`,該賬戶將在請求中被模擬。如果設定為字串,該賬戶必須授予起始賬戶 Service Account Token Creator IAM 角色。如果設定為序列,則列表中的身份必須將其 Service Account Token Creator IAM 角色授予直接前一個身份,列表中的第一個賬戶將此角色授予起始賬戶(模板化)。

template_fields: collections.abc.Sequence[str] = ('bucket', 'source_objects', 'schema_object', 'gcs_schema_bucket',...[source]
template_fields_renderers[source]
ui_color = '#81A0FF'[source]
table_resource = None[source]
bucket = ''[source]
source_objects = [][source]
schema_object = None[source]
gcs_schema_bucket = ''[source]
destination_project_dataset_table = ''[source]
max_bad_records = 0[source]
quote_character = None[source]
allow_quoted_newlines = False[source]
allow_jagged_rows = False[source]
gcp_conn_id = 'google_cloud_default'[source]
google_cloud_storage_conn_id = 'google_cloud_default'[source]
autodetect = False[source]
src_fmt_configs[source]
labels = None[source]
encryption_configuration = None[source]
location = None[source]
impersonation_chain = None[source]
execute(context)[source]

建立運算子時派生。

Context 與渲染 jinja 模板時使用的字典相同。

有關更多上下文,請參閱 get_template_context。

get_openlineage_facets_on_complete(_)[source]

實現 _on_complete,因為我們將使用 create 方法返回的表資源。

class airflow.providers.google.cloud.operators.bigquery.BigQueryDeleteDatasetOperator(*, dataset_id, project_id=PROVIDE_PROJECT_ID, delete_contents=False, gcp_conn_id='google_cloud_default', impersonation_chain=None, **kwargs)[source]

Bases: airflow.providers.google.cloud.operators.cloud_base.GoogleCloudBaseOperator

從 BigQuery 中的專案刪除現有資料集。

https://cloud.google.com/bigquery/docs/reference/rest/v2/datasets/delete

另請參閱

有關如何使用此運算元的更多資訊,請參閱指南:刪除資料集

引數:
  • project_id (字串) – 資料集的專案ID。

  • dataset_id (字串) – 要刪除的資料集。

  • delete_contents (布林值) – (可選)即使資料集不為空,是否強制刪除。如果設定為 True,將刪除資料集中的所有表(如果有)。如果設定為 False 且資料集不為空,將丟擲 HttpError 400:“{dataset_id} is still in use”。預設值為 False。

  • gcp_conn_id (str) – (可選)用於連線到 Google Cloud 的連線 ID。

  • impersonation_chain (str | collections.abc.Sequence`[`str`]` | None) – 可選的服務賬號,用於使用短期憑據進行模擬,或者一個賬戶鏈,需要透過該鏈獲取列表中最後一個賬戶的 `access_token`,該賬戶將在請求中被模擬。如果設定為字串,該賬戶必須授予起始賬戶 Service Account Token Creator IAM 角色。如果設定為序列,則列表中的身份必須將其 Service Account Token Creator IAM 角色授予直接前一個身份,列表中的第一個賬戶將此角色授予起始賬戶(模板化)。

示例:

delete_temp_data = BigQueryDeleteDatasetOperator(
    dataset_id="temp-dataset",
    project_id="temp-project",
    delete_contents=True,  # Force the deletion of the dataset as well as its tables (if any).
    gcp_conn_id="_my_gcp_conn_",
    task_id="Deletetemp",
    dag=dag,
)
template_fields: collections.abc.序列[字串] = ('dataset_id', 'project_id', 'impersonation_chain')[source]
ui_color = '#5F86FF'[source]
dataset_id[source]
project_id = None[source]
delete_contents = False[source]
gcp_conn_id = 'google_cloud_default'[source]
impersonation_chain = None[source]
execute(context)[source]

建立運算子時派生。

Context 與渲染 jinja 模板時使用的字典相同。

有關更多上下文,請參閱 get_template_context。

class airflow.providers.google.cloud.operators.bigquery.BigQueryCreateEmptyDatasetOperator(*, dataset_id=None, project_id=PROVIDE_PROJECT_ID, dataset_reference=None, location=None, gcp_conn_id='google_cloud_default', impersonation_chain=None, if_exists='log', exists_ok=None, **kwargs)[source]

Bases: airflow.providers.google.cloud.operators.cloud_base.GoogleCloudBaseOperator

在 BigQuery 中的專案建立一個新資料集。

https://cloud.google.com/bigquery/docs/reference/rest/v2/datasets#resource

另請參閱

有關如何使用此運算元的更多資訊,請參閱指南:建立資料集

引數:
  • project_id (字串) – 我們要建立資料集的專案名稱。

  • dataset_id (字串 | ) – 資料集ID。如果 dataset_reference 中包含 datasetId,則無需提供。

  • location (字串 | ) – 資料集應駐留的地理位置。

  • dataset_reference (字典 | ) – 可在請求正文中提供的資料集引用。更多資訊:https://cloud.google.com/bigquery/docs/reference/rest/v2/datasets#resource

  • gcp_conn_id (str) – (可選)用於連線到 Google Cloud 的連線 ID。

  • impersonation_chain (str | collections.abc.Sequence`[`str`]` | None) – 可選的服務賬號,用於使用短期憑據進行模擬,或者一個賬戶鏈,需要透過該鏈獲取列表中最後一個賬戶的 `access_token`,該賬戶將在請求中被模擬。如果設定為字串,該賬戶必須授予起始賬戶 Service Account Token Creator IAM 角色。如果設定為序列,則列表中的身份必須將其 Service Account Token Creator IAM 角色授予直接前一個身份,列表中的第一個賬戶將此角色授予起始賬戶(模板化)。

  • if_exists (字串) –

    如果資料集存在,Airflow 應執行什麼操作。如果設定為 log,則 TI 將透過並記錄一條錯誤訊息。設定為 ignore 以忽略錯誤,設定為 fail 以使 TI 失敗,設定為 skip 以跳過。示例

    create_new_dataset = BigQueryCreateEmptyDatasetOperator(
        dataset_id='new-dataset',
        project_id='my-project',
        dataset_reference={"friendlyName": "New Dataset"}
        gcp_conn_id='_my_gcp_conn_',
        task_id='newDatasetCreator',
        dag=dag)
    

  • exists_ok (bool | None) – 已棄用 - 請改用 if_exists=”ignore”

template_fields: collections.abc.序列[字串] = ('dataset_id', 'project_id', 'dataset_reference', 'impersonation_chain')[source]
template_fields_renderers[source]
ui_color = '#5F86FF'[source]
dataset_id = None[source]
project_id = None[source]
location = None[source]
gcp_conn_id = 'google_cloud_default'[source]
dataset_reference[source]
impersonation_chain = None[source]
execute(context)[source]

建立運算子時派生。

Context 與渲染 jinja 模板時使用的字典相同。

有關更多上下文,請參閱 get_template_context。

class airflow.providers.google.cloud.operators.bigquery.BigQueryGetDatasetOperator(*, dataset_id, project_id=PROVIDE_PROJECT_ID, gcp_conn_id='google_cloud_default', impersonation_chain=None, **kwargs)[source]

Bases: airflow.providers.google.cloud.operators.cloud_base.GoogleCloudBaseOperator

根據 ID 獲取指定的資料集。

另請參閱

有關如何使用此運算元的更多資訊,請參閱指南:獲取資料集詳情

引數:
  • dataset_id (字串) – 資料集ID。如果 dataset_reference 中包含 datasetId,則無需提供。

  • project_id (字串) – 我們要建立資料集的專案名稱。如果 dataset_reference 中包含 projectId,則無需提供。

  • gcp_conn_id (str) – (可選)用於連線到 Google Cloud 的連線 ID。

  • impersonation_chain (str | collections.abc.Sequence`[`str`]` | None) – 可選的服務賬號,用於使用短期憑據進行模擬,或者一個賬戶鏈,需要透過該鏈獲取列表中最後一個賬戶的 `access_token`,該賬戶將在請求中被模擬。如果設定為字串,該賬戶必須授予起始賬戶 Service Account Token Creator IAM 角色。如果設定為序列,則列表中的身份必須將其 Service Account Token Creator IAM 角色授予直接前一個身份,列表中的第一個賬戶將此角色授予起始賬戶(模板化)。

template_fields: collections.abc.序列[字串] = ('dataset_id', 'project_id', 'impersonation_chain')[source]
ui_color = '#5F86FF'[source]
dataset_id[source]
project_id = None[source]
gcp_conn_id = 'google_cloud_default'[source]
impersonation_chain = None[source]
execute(context)[source]

建立運算子時派生。

Context 與渲染 jinja 模板時使用的字典相同。

有關更多上下文,請參閱 get_template_context。

class airflow.providers.google.cloud.operators.bigquery.BigQueryGetDatasetTablesOperator(*, dataset_id, project_id=PROVIDE_PROJECT_ID, max_results=None, gcp_conn_id='google_cloud_default', impersonation_chain=None, **kwargs)[source]

Bases: airflow.providers.google.cloud.operators.cloud_base.GoogleCloudBaseOperator

檢索指定資料集中的表列表。

另請參閱

有關如何使用此運算元的更多資訊,請參閱指南:列出資料集中的表

引數:
  • dataset_id (字串) – 請求的資料集的資料集ID。

  • project_id (字串) – (可選)請求的資料集所屬專案。如果為 None,將使用 self.project_id。

  • max_results (整數 | ) – (可選)返回的最大表數。

  • gcp_conn_id (str) – (可選)用於連線到 Google Cloud 的連線 ID。

  • impersonation_chain (str | collections.abc.Sequence`[`str`]` | None) – 可選的服務賬號,用於使用短期憑據進行模擬,或者一個賬戶鏈,需要透過該鏈獲取列表中最後一個賬戶的 `access_token`,該賬戶將在請求中被模擬。如果設定為字串,該賬戶必須授予起始賬戶 Service Account Token Creator IAM 角色。如果設定為序列,則列表中的身份必須將其 Service Account Token Creator IAM 角色授予直接前一個身份,列表中的第一個賬戶將此角色授予起始賬戶(模板化)。

template_fields: collections.abc.序列[字串] = ('dataset_id', 'project_id', 'impersonation_chain')[source]
ui_color = '#5F86FF'[source]
dataset_id[source]
project_id = None[source]
max_results = None[source]
gcp_conn_id = 'google_cloud_default'[source]
impersonation_chain = None[source]
execute(context)[source]

建立運算子時派生。

Context 與渲染 jinja 模板時使用的字典相同。

有關更多上下文,請參閱 get_template_context。

class airflow.providers.google.cloud.operators.bigquery.BigQueryUpdateTableOperator(*, table_resource, fields=None, dataset_id=None, table_id=None, project_id=PROVIDE_PROJECT_ID, gcp_conn_id='google_cloud_default', impersonation_chain=None, **kwargs)[source]

Bases: airflow.providers.google.cloud.operators.cloud_base.GoogleCloudBaseOperator

在 BigQuery 中的專案更新一個表。

使用 fields 指定要更新表的哪些欄位。如果某個欄位列在 fields 中,並且在表中為 None,則該欄位將被刪除。

另請參閱

有關如何使用此運算元的更多資訊,請參閱指南:更新表

引數:
  • dataset_id (字串 | ) – 資料集ID。如果 table_reference 中包含 datasetId,則無需提供。

  • table_id (字串 | ) – 表ID。如果 table_reference 中包含 tableId,則無需提供。

  • table_resource (dict[str, Any]) – 將隨請求體提供的資料集資源。 https://cloud.google.com/bigquery/docs/reference/rest/v2/tables#resource

  • fields (list[str] | None) – 需要更改的 table 的欄位,以表屬性命名 (例如 “friendly_name”)。

  • project_id (str) – 我們要建立表的專案名稱。如果在 table_reference 中包含 projectId,則無需提供。

  • gcp_conn_id (str) – (可選)用於連線到 Google Cloud 的連線 ID。

  • impersonation_chain (str | collections.abc.Sequence`[`str`]` | None) – 可選的服務賬號,用於使用短期憑據進行模擬,或者一個賬戶鏈,需要透過該鏈獲取列表中最後一個賬戶的 `access_token`,該賬戶將在請求中被模擬。如果設定為字串,該賬戶必須授予起始賬戶 Service Account Token Creator IAM 角色。如果設定為序列,則列表中的身份必須將其 Service Account Token Creator IAM 角色授予直接前一個身份,列表中的第一個賬戶將此角色授予起始賬戶(模板化)。

template_fields: collections.abc.Sequence[str] = ('dataset_id', 'table_id', 'project_id', 'impersonation_chain')[source]
template_fields_renderers[source]
ui_color = '#81A0FF'[source]
dataset_id = None[source]
table_id = None[source]
project_id = None[source]
fields = None[source]
gcp_conn_id = 'google_cloud_default'[source]
table_resource[source]
impersonation_chain = None[source]
execute(context)[source]

建立運算子時派生。

Context 與渲染 jinja 模板時使用的字典相同。

有關更多上下文,請參閱 get_template_context。

get_openlineage_facets_on_complete(_)[source]

實現 _on_complete,因為我們將使用 update 方法返回的表資源。

class airflow.providers.google.cloud.operators.bigquery.BigQueryUpdateDatasetOperator(*, dataset_resource, fields=None, dataset_id=None, project_id=PROVIDE_PROJECT_ID, gcp_conn_id='google_cloud_default', impersonation_chain=None, **kwargs)[source]

Bases: airflow.providers.google.cloud.operators.cloud_base.GoogleCloudBaseOperator

在 BigQuery 中的專案更新一個數據集。

使用 fields 指定要更新的資料集欄位。如果欄位列在 fields 中且在資料集中為 None,則會將其刪除。如果沒有提供 fields,則將使用提供的 dataset_resource 的所有欄位。

另請參閱

有關如何使用此 operator 的更多資訊,請參閱指南:更新資料集

引數:
  • dataset_id (字串 | ) – 資料集ID。如果 dataset_reference 中包含 datasetId,則無需提供。

  • dataset_resource (dict[str, Any]) – 將隨請求體提供的資料集資源。 https://cloud.google.com/bigquery/docs/reference/rest/v2/datasets#resource

  • fields (list[str] | None) – 要更改的資料集屬性 (例如 “friendly_name”)。

  • project_id (字串) – 我們要建立資料集的專案名稱。如果 dataset_reference 中包含 projectId,則無需提供。

  • gcp_conn_id (str) – (可選)用於連線到 Google Cloud 的連線 ID。

  • impersonation_chain (str | collections.abc.Sequence`[`str`]` | None) – 可選的服務賬號,用於使用短期憑據進行模擬,或者一個賬戶鏈,需要透過該鏈獲取列表中最後一個賬戶的 `access_token`,該賬戶將在請求中被模擬。如果設定為字串,該賬戶必須授予起始賬戶 Service Account Token Creator IAM 角色。如果設定為序列,則列表中的身份必須將其 Service Account Token Creator IAM 角色授予直接前一個身份,列表中的第一個賬戶將此角色授予起始賬戶(模板化)。

template_fields: collections.abc.Sequence[str] = ('dataset_id', 'project_id', 'impersonation_chain')[source]
template_fields_renderers[source]
ui_color = '#5F86FF'[source]
dataset_id = None[source]
project_id = None[source]
fields = None[source]
gcp_conn_id = 'google_cloud_default'[source]
dataset_resource[source]
impersonation_chain = None[source]
execute(context)[source]

建立運算子時派生。

Context 與渲染 jinja 模板時使用的字典相同。

有關更多上下文,請參閱 get_template_context。

class airflow.providers.google.cloud.operators.bigquery.BigQueryDeleteTableOperator(*, deletion_dataset_table, gcp_conn_id='google_cloud_default', ignore_if_missing=False, location=None, impersonation_chain=None, **kwargs)[source]

Bases: airflow.providers.google.cloud.operators.cloud_base.GoogleCloudBaseOperator

刪除一個 BigQuery 表。

另請參閱

有關如何使用此 operator 的更多資訊,請參閱指南:刪除表

引數:
  • deletion_dataset_table (str) – 指示要刪除的表,格式為點分隔的 (<project>.|<project>:)<dataset>.<table>。(模板化)

  • gcp_conn_id (str) – (可選)用於連線到 Google Cloud 的連線 ID。

  • ignore_if_missing (bool) – 如果為 True,則即使請求的表不存在也返回成功。

  • location (str | None) – 用於操作的位置。

  • impersonation_chain (str | collections.abc.Sequence`[`str`]` | None) – 可選的服務賬號,用於使用短期憑據進行模擬,或者一個賬戶鏈,需要透過該鏈獲取列表中最後一個賬戶的 `access_token`,該賬戶將在請求中被模擬。如果設定為字串,該賬戶必須授予起始賬戶 Service Account Token Creator IAM 角色。如果設定為序列,則列表中的身份必須將其 Service Account Token Creator IAM 角色授予直接前一個身份,列表中的第一個賬戶將此角色授予起始賬戶(模板化)。

template_fields: collections.abc.Sequence[str] = ('deletion_dataset_table', 'impersonation_chain')[source]
ui_color = '#81A0FF'[source]
deletion_dataset_table[source]
gcp_conn_id = 'google_cloud_default'[source]
ignore_if_missing = False[source]
location = None[source]
impersonation_chain = None[source]
hook: airflow.providers.google.cloud.hooks.bigquery.BigQueryHook | None = None[source]
execute(context)[source]

建立運算子時派生。

Context 與渲染 jinja 模板時使用的字典相同。

有關更多上下文,請參閱 get_template_context。

get_openlineage_facets_on_complete(_)[source]

實現 _on_complete,因為我們需要從 hook 獲取預設 project_id。

class airflow.providers.google.cloud.operators.bigquery.BigQueryUpsertTableOperator(*, dataset_id, table_resource, project_id=PROVIDE_PROJECT_ID, gcp_conn_id='google_cloud_default', location=None, impersonation_chain=None, **kwargs)[source]

Bases: airflow.providers.google.cloud.operators.cloud_base.GoogleCloudBaseOperator

對 BigQuery 表進行 upsert 操作。

另請參閱

有關如何使用此 operator 的更多資訊,請參閱指南:Upsert 表

引數:
  • dataset_id (str) – 指示要更新的資料集,格式為點分隔的 (<project>.|<project>:)<dataset>。(模板化)

  • table_resource (dict) – 一個表資源。參見 https://cloud.google.com/bigquery/docs/reference/v2/tables#resource

  • project_id (str) – 我們要更新資料集的專案名稱。如果在 dataset_reference 中包含 projectId,則無需提供。

  • gcp_conn_id (str) – (可選)用於連線到 Google Cloud 的連線 ID。

  • location (str | None) – 用於操作的位置。

  • impersonation_chain (str | collections.abc.Sequence`[`str`]` | None) – 可選的服務賬號,用於使用短期憑據進行模擬,或者一個賬戶鏈,需要透過該鏈獲取列表中最後一個賬戶的 `access_token`,該賬戶將在請求中被模擬。如果設定為字串,該賬戶必須授予起始賬戶 Service Account Token Creator IAM 角色。如果設定為序列,則列表中的身份必須將其 Service Account Token Creator IAM 角色授予直接前一個身份,列表中的第一個賬戶將此角色授予起始賬戶(模板化)。

template_fields: collections.abc.Sequence[str] = ('dataset_id', 'table_resource', 'impersonation_chain', 'project_id')[source]
template_fields_renderers[source]
ui_color = '#81A0FF'[source]
dataset_id[source]
table_resource[source]
project_id = None[source]
gcp_conn_id = 'google_cloud_default'[source]
location = None[source]
impersonation_chain = None[source]
execute(context)[source]

建立運算子時派生。

Context 與渲染 jinja 模板時使用的字典相同。

有關更多上下文,請參閱 get_template_context。

get_openlineage_facets_on_complete(_)[source]

實現 _on_complete,因為我們將使用 upsert 方法返回的表資源。

class airflow.providers.google.cloud.operators.bigquery.BigQueryUpdateTableSchemaOperator(*, schema_fields_updates, dataset_id, table_id, include_policy_tags=False, project_id=PROVIDE_PROJECT_ID, gcp_conn_id='google_cloud_default', impersonation_chain=None, location=None, **kwargs)[source]

Bases: airflow.providers.google.cloud.operators.cloud_base.GoogleCloudBaseOperator

更新 BigQuery 表模式。

根據提供的 schema_fields_updates 引數的內容更新表 schema 中的欄位。提供的 schema 不需要完整,如果欄位已存在於 schema 中,您只需提供要更新的專案的鍵和值,只需確保設定了“name”鍵。

另請參閱

有關如何使用此 operator 的更多資訊,請參閱指南:更新表 schema

引數:
  • schema_fields_updates (list[dict[str, Any]]) –

    部分模式資源。請參閱 https://cloud.google.com/bigquery/docs/reference/rest/v2/tables#TableSchema

    schema_fields_updates = [
        {"name": "emp_name", "description": "Some New Description"},
        {
            "name": "salary",
            "policyTags": {"names": ["some_new_policy_tag"]},
        },
        {
            "name": "departments",
            "fields": [
                {"name": "name", "description": "Some New Description"},
                {"name": "type", "description": "Some New Description"},
            ],
        },
    ]
    

  • include_policy_tags (bool) – (可選) 如果設定為 True,策略標籤將包含在更新請求中,這需要特殊許可權,即使標籤未更改(預設為 False)。請參閱 https://cloud.google.com/bigquery/docs/column-level-security#roles

  • dataset_id (str) – 指示要更新的資料集,格式為點分隔的 (<project>.|<project>:)<dataset>。(模板化)

  • table_id (str) – 所請求表的表 ID。(支援模板)

  • project_id (str) – 我們要更新資料集的專案名稱。如果在 dataset_reference 中包含 projectId,則無需提供。

  • gcp_conn_id (str) – (可選)用於連線到 Google Cloud 的連線 ID。

  • location (str | None) – 用於操作的位置。

  • impersonation_chain (str | collections.abc.Sequence`[`str`]` | None) – 可選的服務賬號,用於使用短期憑據進行模擬,或者一個賬戶鏈,需要透過該鏈獲取列表中最後一個賬戶的 `access_token`,該賬戶將在請求中被模擬。如果設定為字串,該賬戶必須授予起始賬戶 Service Account Token Creator IAM 角色。如果設定為序列,則列表中的身份必須將其 Service Account Token Creator IAM 角色授予直接前一個身份,列表中的第一個賬戶將此角色授予起始賬戶(模板化)。

template_fields: collections.abc.Sequence[str] = ('schema_fields_updates', 'dataset_id', 'table_id', 'project_id', 'impersonation_chain')[source]
template_fields_renderers[source]
ui_color = '#81A0FF'[source]
schema_fields_updates[source]
include_policy_tags = False[source]
table_id[source]
dataset_id[source]
project_id = None[source]
gcp_conn_id = 'google_cloud_default'[source]
impersonation_chain = None[source]
location = None[source]
execute(context)[source]

建立運算子時派生。

Context 與渲染 jinja 模板時使用的字典相同。

有關更多上下文,請參閱 get_template_context。

get_openlineage_facets_on_complete(_)[source]

實現 _on_complete,因為我們將使用 update 方法返回的表資源。

class airflow.providers.google.cloud.operators.bigquery.BigQueryInsertJobOperator(configuration, project_id=PROVIDE_PROJECT_ID, location=None, job_id=None, force_rerun=True, reattach_states=None, gcp_conn_id='google_cloud_default', impersonation_chain=None, cancel_on_kill=True, result_retry=DEFAULT_RETRY, result_timeout=None, deferrable=conf.getboolean('operators', 'default_deferrable', fallback=False), poll_interval=4.0, **kwargs)[source]

基類: airflow.providers.google.cloud.operators.cloud_base.GoogleCloudBaseOperator, airflow.providers.google.cloud.openlineage.mixins._BigQueryInsertJobOperatorOpenLineageMixin

執行一個 BigQuery 作業。

等待作業完成並返回作業 ID。此 Operator 按以下方式工作:

  • 如果 force_rerun 為 True,則使用作業配置或 uuid 計算作業的唯一雜湊值。

  • 建立格式如下的 job_id

    [提供的_job_id | airflow_{dag_id}_{task_id}_{exec_date}]_{唯一性字尾}

  • 使用 job_id 提交 BigQuery 作業。

  • 如果具有給定 ID 的作業已存在,則嘗試重新連線到該作業,前提是該作業尚未完成且其

    狀態在 reattach_states 中。如果作業已完成,則 Operator 將引發 AirflowException

使用 force_rerun 將每次提交一個新作業,而不重新連線到已存在的作業。

作業定義請參閱此處:

另請參閱

有關如何使用此 Operator 的更多資訊,請參閱指南:執行 BigQuery 作業

引數:
  • configuration (dict[str, Any]) – 配置引數直接對映到作業物件中的 BigQuery 配置欄位。有關更多詳細資訊,請參閱 https://cloud.google.com/bigquery/docs/reference/rest/v2/Job#jobconfiguration

  • job_id (str | None) – 作業的 ID。除非 force_rerun 為 True,否則將以作業配置的雜湊值作為字尾。ID 只能包含字母(a-z、A-Z)、數字(0-9)、下劃線 (_) 或破折號 (-)。最大長度為 1,024 個字元。如果未提供,將生成 uuid。

  • force_rerun (bool) – 如果為 True,則 Operator 將使用 uuid 的雜湊值作為作業 ID 字尾。

  • reattach_states (set[str] | None) – 在這些狀態下,我們應該重新連線到 BigQuery 作業。應該是非最終狀態。

  • project_id (str) – 作業執行所在的 Google Cloud 專案

  • location (str | None) – 作業執行的位置

  • gcp_conn_id (str) – 用於連線到 Google Cloud 的連線 ID。

  • impersonation_chain (str | collections.abc.Sequence`[`str`]` | None) – 可選的服務賬號,用於使用短期憑據進行模擬,或者一個賬戶鏈,需要透過該鏈獲取列表中最後一個賬戶的 `access_token`,該賬戶將在請求中被模擬。如果設定為字串,該賬戶必須授予起始賬戶 Service Account Token Creator IAM 角色。如果設定為序列,則列表中的身份必須將其 Service Account Token Creator IAM 角色授予直接前一個身份,列表中的第一個賬戶將此角色授予起始賬戶(模板化)。

  • cancel_on_kill (bool) – 在呼叫 on_kill 時,指示是否取消 hook 作業的標誌。

  • result_retry (google.api_core.retry.Retry) – 如何重試檢索行的 result 呼叫。

  • result_timeout (float | None) – 在使用 result_retry 之前等待 result 方法的秒數。

  • deferrable (bool) – 在 deferrable 模式下執行 Operator

  • poll_interval (float) – (僅在 deferrable 模式下) 檢查作業狀態的輪詢週期,單位為秒。預設為 4 秒。

template_fields: collections.abc.Sequence[str] = ('configuration', 'job_id', 'impersonation_chain', 'project_id')[source]
template_ext: collections.abc.Sequence[str] = ('.json', '.sql')[source]
template_fields_renderers[source]
ui_color = '#A1BBFF'[source]
configuration[source]
location = None[source]
job_id = None[source]
project_id = None[source]
gcp_conn_id = 'google_cloud_default'[source]
force_rerun = True[source]
reattach_states: set[str][source]
impersonation_chain = None[source]
cancel_on_kill = True[source]
result_retry[source]
result_timeout = None[source]
hook: airflow.providers.google.cloud.hooks.bigquery.BigQueryHook | None = None[source]
deferrable = True[source]
poll_interval = 4.0[source]
property sql: str | None[source]
prepare_template()[source]

在模板欄位被其內容替換後執行。

如果您的物件需要在模板渲染之前更改檔案內容,則應覆蓋此方法。

execute(context)[source]

建立運算子時派生。

Context 與渲染 jinja 模板時使用的字典相同。

有關更多上下文,請參閱 get_template_context。

execute_complete(context, event)[source]

在觸發器觸發時作為回撥函式執行。

此方法立即返回。它依賴於觸發器丟擲異常,否則假定執行成功。

on_kill()[source]

覆蓋此方法可在任務例項被 kill 時清理子程序。

在 Operator 中使用 threading、subprocess 或 multiprocessing 模組時,需要進行清理,否則會留下殭屍程序。

此條目是否有幫助?