airflow.providers.google.cloud.transfers.gcs_to_bigquery

此模組包含一個 Google Cloud Storage 到 BigQuery 的運算子。

屬性

ALLOWED_FORMATS

GCSToBigQueryOperator

將檔案從 Google Cloud Storage 載入到 BigQuery 中。

模組內容

airflow.providers.google.cloud.transfers.gcs_to_bigquery.ALLOWED_FORMATS = ['CSV', 'NEWLINE_DELIMITED_JSON', 'AVRO', 'GOOGLE_SHEETS', 'DATASTORE_BACKUP', 'PARQUET', 'ORC'][原始碼]
class airflow.providers.google.cloud.transfers.gcs_to_bigquery.GCSToBigQueryOperator(*, bucket, source_objects, destination_project_dataset_table, schema_fields=None, schema_object=None, schema_object_bucket=None, source_format='CSV', compression='NONE', create_disposition='CREATE_IF_NEEDED', skip_leading_rows=None, write_disposition='WRITE_EMPTY', field_delimiter=',', max_bad_records=0, quote_character=None, ignore_unknown_values=False, allow_quoted_newlines=False, allow_jagged_rows=False, encoding='UTF-8', max_id_key=None, gcp_conn_id='google_cloud_default', schema_update_options=(), src_fmt_configs=None, external_table=False, time_partitioning=None, cluster_fields=None, autodetect=True, encryption_configuration=None, location=None, impersonation_chain=None, labels=None, description=None, deferrable=conf.getboolean('operators', 'default_deferrable', fallback=False), result_retry=DEFAULT_RETRY, result_timeout=None, cancel_on_kill=True, job_id=None, force_rerun=True, reattach_states=None, project_id=PROVIDE_PROJECT_ID, force_delete=False, **kwargs)[原始碼]

Bases: airflow.models.BaseOperator

將檔案從 Google Cloud Storage 載入到 BigQuery 中。

用於 BigQuery 表的模式可以透過兩種方式指定。你可以直接傳入模式欄位,或者指向 Google Cloud Storage 中的物件名稱。Google Cloud Storage 中的物件必須是包含模式欄位的 JSON 檔案。

另請參閱

有關如何使用此運算子的更多資訊,請查閱指南:運算子

引數:
  • bucket – 要從中載入的儲存桶。(模板化)

  • source_objects – 要從中載入的 Google Cloud Storage URI 字串或列表。(模板化) 如果 source_format 是 ‘DATASTORE_BACKUP’,列表必須只包含一個 URI。

  • destination_project_dataset_table – 用於載入資料的帶有 (<project>.|<project>:)<dataset>.<table> 點號格式的 BigQuery 表。如果未包含 <project>,則專案將是連線 json 中定義的專案。(模板化)

  • schema_fields – 如果設定,則為此處定義的模式欄位列表:https://cloud.google.com/bigquery/docs/reference/v2/jobs#configuration.load 當 source_format 為 ‘DATASTORE_BACKUP’ 時不應設定此項。如果 ‘schema_object’ 為 null 且 autodetect 為 False,則必須定義此引數。

  • schema_object – 如果設定,則為指向包含表模式的 .json 檔案的 GCS 物件路徑。(模板化) 如果 ‘schema_fields’ 為 null 且 autodetect 為 False,則必須定義此引數。

  • schema_object_bucket – [可選] 如果設定,則為儲存模式物件模板的 GCS 儲存桶。(模板化) (預設值: bucket 的值)

  • source_format – 要匯出的檔案格式。

  • compression – [可選] 資料來源的壓縮型別。可能的值包括 GZIP 和 NONE。預設值為 NONE。對於 Google Cloud Bigtable、Google Cloud Datastore 備份和 Avro 格式,此設定將被忽略。

  • create_disposition – 如果表不存在時的建立處置。

  • skip_leading_rows – 載入資料時 BigQuery 將跳過的 CSV 檔案頂部的行數。當 autodetect 開啟時,行為如下:skip_leading_rows 未指定 - 自動檢測嘗試檢測第一行中的標題。如果未檢測到,則該行作為資料讀取。否則,資料將從第二行開始讀取。skip_leading_rows 為 0 - 指示自動檢測沒有標題,資料應從第一行開始讀取。skip_leading_rows = N > 0 - 自動檢測跳過 N-1 行,並嘗試在第 N 行檢測標題。如果未檢測到標題,則跳過第 N 行。否則,第 N 行用於提取檢測到的模式的列名。預設值設定為 None,以便 autodetect 選項可以檢測模式欄位。

  • write_disposition – 如果表已存在時的寫入處置。

  • field_delimiter – 從 CSV 載入時使用的分隔符。

  • max_bad_records – BigQuery 在執行作業時可以忽略的最大錯誤記錄數。

  • quote_character – 用於引用 CSV 檔案中資料部分的字元。

  • ignore_unknown_values – [可選] 指示 BigQuery 是否應允許表中模式未表示的額外值。如果為 true,則忽略額外值。如果為 false,則帶有額外列的記錄將被視為錯誤記錄,如果錯誤記錄過多,則在作業結果中返回無效錯誤。

  • allow_quoted_newlines – 是否允許帶引號的換行符 (true) 或不允許 (false)。

  • allow_jagged_rows – 接受缺少尾隨可選列的行。缺失值將被視為 null。如果為 false,則缺少尾隨列的記錄將被視為錯誤記錄,如果錯誤記錄過多,則在作業結果中返回無效錯誤。僅適用於 CSV,對於其他格式將被忽略。

  • encoding – 資料的字元編碼。參見:https://cloud.google.com/bigquery/docs/reference/rest/v2/jobs#configuration.query.tableDefinitions.(key).csvOptions.encoding https://cloud.google.com/bigquery/docs/reference/rest/v2/tables#externalDataConfiguration.csvOptions.encoding

  • max_id_key – 如果設定,則為要載入的 BigQuery 表中列的名稱。載入完成後,此名稱將用於從 BigQuery 中選擇 MAX 值。結果將由 execute() 命令返回,並存儲在 XCom 中供後續運算子使用。這對於增量載入很有幫助——在未來的執行中,你可以從最大 ID 開始。

  • schema_update_options – 允許將目標表的模式作為載入作業的副作用進行更新。

  • src_fmt_configs – 配置特定於源格式的可選欄位

  • external_table – 標誌,用於指定目標表是否應為 BigQuery 外部表。預設值為 False。

  • time_partitioning – 配置可選的時間分割槽欄位,即根據 API 規範按欄位、型別和過期時間進行分割槽。請注意,‘field’ 不能與 dataset.table$partition 同時使用。

  • cluster_fields – 請求將此載入的結果按一個或多個列排序儲存。BigQuery 支援對分割槽表和非分割槽表進行聚類。給定的列順序決定了排序順序。不適用於外部表。

  • autodetect – [可選] 指示是否應自動推斷 CSV 和 JSON 源的選項和模式。(預設值: True)。如果未定義 ‘schema_fields’ 和 ‘schema_object’,則必須將引數設定為 True。如果在 Airflow 外部建立表,建議將其設定為 True。如果 autodetect 為 None 且未提供模式(無論是透過 schema_fields 還是 schema_object),則假定表已存在。

  • encryption_configuration

    [可選] 自定義加密配置(例如,Cloud KMS 金鑰)。

    encryption_configuration = {
        "kmsKeyName": "projects/testp/locations/us/keyRings/test-kr/cryptoKeys/test-key",
    }
    

  • location – [可選] 作業的地理位置。除美國和歐盟外,必填。詳細資訊請參見 https://cloud.google.com/bigquery/docs/locations#specifying_your_location

  • impersonation_chain (str | collections.abc.Sequence[str] | None) – 可選的服務賬號,用於使用短期憑據進行模擬,或者獲取列表中最後一個賬號的 access_token 所需的賬號鏈列表,該賬號將在請求中被模擬。如果設定為字串,則該賬號必須授予發起賬號 Service Account Token Creator IAM 角色。如果設定為序列,則列表中身份必須授予 Service Account Token Creator IAM 角色給直接前一個身份,列表中第一個賬號將此角色授予發起賬號(模板化)。

  • labels – [可選] BigQuery 表的標籤。

  • description – [可選] BigQuery 表的描述。僅在新建目標表時使用。如果表已存在且提供了與當前描述不同的值,則作業將失敗。

  • deferrable (bool) – 以可延遲模式執行運算子

  • force_delete (bool) – 如果目標表已存在,則強制刪除它。

template_fields: collections.abc.Sequence[str] = ('bucket', 'source_objects', 'schema_object', 'schema_object_bucket',...[原始碼]
template_ext: collections.abc.Sequence[str] = ('.sql',)[原始碼]
ui_color = '#f0eee4'[原始碼]
hook: airflow.providers.google.cloud.hooks.bigquery.BigQueryHook | None = None[原始碼]
configuration: dict[str, Any][原始碼]
bucket[原始碼]
source_objects[原始碼]
schema_object = None[原始碼]
schema_object_bucket = None[原始碼]
destination_project_dataset_table[原始碼]
project_id = None[原始碼]
schema_fields = None[原始碼]
source_format = ''[原始碼]
compression = 'NONE'[原始碼]
create_disposition = 'CREATE_IF_NEEDED'[原始碼]
skip_leading_rows = None[原始碼]
write_disposition = 'WRITE_EMPTY'[原始碼]
field_delimiter = ','[原始碼]
max_bad_records = 0[原始碼]
quote_character = None[原始碼]
ignore_unknown_values = False[原始碼]
allow_quoted_newlines = False[原始碼]
allow_jagged_rows = False[原始碼]
external_table = False[原始碼]
encoding : str = 'UTF-8'[原始碼]
max_id_key = None[原始碼]
gcp_conn_id = 'google_cloud_default'[原始碼]
schema_update_options = ()[原始碼]
src_fmt_configs = None[原始碼]
time_partitioning = None[原始碼]
cluster_fields = None[原始碼]
autodetect : bool | None = True[原始碼]
encryption_configuration = None[source]
location = None[source]
impersonation_chain = None[source]
labels = None[source]
description = None[source]
job_id = None[source]
deferrable = True[source]
result_retry[source]
result_timeout = None[source]
force_rerun = True[source]
reattach_states: set[str][source]
cancel_on_kill = True[source]
force_delete = False[source]
source_uris: list[str] = [][source]
execute(context)[source]

在建立 Operator 時派生。

Context 是與渲染 Jinja 模板時使用的相同的字典。

請參閱 get_template_context 以獲取更多上下文資訊。

execute_complete(context, event)[source]

立即返回,並依賴 trigger 丟擲成功事件。trigger 的回撥函式。

依賴 trigger 丟擲異常,否則假定執行成功。

on_kill()[source]

重寫此方法,以便在 task instance 被終止時清理子程序。

Operator 內使用 threading、subprocess 或 multiprocessing 模組的任何地方都需要清理,否則會留下殭屍程序。

get_openlineage_facets_on_complete(task_instance)[source]

實現 on_complete,因為我們將包含最終的 BQ 作業 ID。

此條目有幫助嗎?