airflow.providers.google.cloud.transfers.gcs_to_bigquery¶
此模組包含一個 Google Cloud Storage 到 BigQuery 的運算子。
屬性¶
類¶
將檔案從 Google Cloud Storage 載入到 BigQuery 中。 |
模組內容¶
- airflow.providers.google.cloud.transfers.gcs_to_bigquery.ALLOWED_FORMATS = ['CSV', 'NEWLINE_DELIMITED_JSON', 'AVRO', 'GOOGLE_SHEETS', 'DATASTORE_BACKUP', 'PARQUET', 'ORC'][原始碼]¶
- class airflow.providers.google.cloud.transfers.gcs_to_bigquery.GCSToBigQueryOperator(*, bucket, source_objects, destination_project_dataset_table, schema_fields=None, schema_object=None, schema_object_bucket=None, source_format='CSV', compression='NONE', create_disposition='CREATE_IF_NEEDED', skip_leading_rows=None, write_disposition='WRITE_EMPTY', field_delimiter=',', max_bad_records=0, quote_character=None, ignore_unknown_values=False, allow_quoted_newlines=False, allow_jagged_rows=False, encoding='UTF-8', max_id_key=None, gcp_conn_id='google_cloud_default', schema_update_options=(), src_fmt_configs=None, external_table=False, time_partitioning=None, cluster_fields=None, autodetect=True, encryption_configuration=None, location=None, impersonation_chain=None, labels=None, description=None, deferrable=conf.getboolean('operators', 'default_deferrable', fallback=False), result_retry=DEFAULT_RETRY, result_timeout=None, cancel_on_kill=True, job_id=None, force_rerun=True, reattach_states=None, project_id=PROVIDE_PROJECT_ID, force_delete=False, **kwargs)[原始碼]¶
Bases:
airflow.models.BaseOperator將檔案從 Google Cloud Storage 載入到 BigQuery 中。
用於 BigQuery 表的模式可以透過兩種方式指定。你可以直接傳入模式欄位,或者指向 Google Cloud Storage 中的物件名稱。Google Cloud Storage 中的物件必須是包含模式欄位的 JSON 檔案。
另請參閱
有關如何使用此運算子的更多資訊,請查閱指南:運算子
- 引數:
bucket – 要從中載入的儲存桶。(模板化)
source_objects – 要從中載入的 Google Cloud Storage URI 字串或列表。(模板化) 如果 source_format 是 ‘DATASTORE_BACKUP’,列表必須只包含一個 URI。
destination_project_dataset_table – 用於載入資料的帶有
(<project>.|<project>:)<dataset>.<table>點號格式的 BigQuery 表。如果未包含<project>,則專案將是連線 json 中定義的專案。(模板化)schema_fields – 如果設定,則為此處定義的模式欄位列表:https://cloud.google.com/bigquery/docs/reference/v2/jobs#configuration.load 當 source_format 為 ‘DATASTORE_BACKUP’ 時不應設定此項。如果 ‘schema_object’ 為 null 且 autodetect 為 False,則必須定義此引數。
schema_object – 如果設定,則為指向包含表模式的 .json 檔案的 GCS 物件路徑。(模板化) 如果 ‘schema_fields’ 為 null 且 autodetect 為 False,則必須定義此引數。
schema_object_bucket – [可選] 如果設定,則為儲存模式物件模板的 GCS 儲存桶。(模板化) (預設值:
bucket的值)source_format – 要匯出的檔案格式。
compression – [可選] 資料來源的壓縮型別。可能的值包括 GZIP 和 NONE。預設值為 NONE。對於 Google Cloud Bigtable、Google Cloud Datastore 備份和 Avro 格式,此設定將被忽略。
create_disposition – 如果表不存在時的建立處置。
skip_leading_rows – 載入資料時 BigQuery 將跳過的 CSV 檔案頂部的行數。當 autodetect 開啟時,行為如下:skip_leading_rows 未指定 - 自動檢測嘗試檢測第一行中的標題。如果未檢測到,則該行作為資料讀取。否則,資料將從第二行開始讀取。skip_leading_rows 為 0 - 指示自動檢測沒有標題,資料應從第一行開始讀取。skip_leading_rows = N > 0 - 自動檢測跳過 N-1 行,並嘗試在第 N 行檢測標題。如果未檢測到標題,則跳過第 N 行。否則,第 N 行用於提取檢測到的模式的列名。預設值設定為 None,以便 autodetect 選項可以檢測模式欄位。
write_disposition – 如果表已存在時的寫入處置。
field_delimiter – 從 CSV 載入時使用的分隔符。
max_bad_records – BigQuery 在執行作業時可以忽略的最大錯誤記錄數。
quote_character – 用於引用 CSV 檔案中資料部分的字元。
ignore_unknown_values – [可選] 指示 BigQuery 是否應允許表中模式未表示的額外值。如果為 true,則忽略額外值。如果為 false,則帶有額外列的記錄將被視為錯誤記錄,如果錯誤記錄過多,則在作業結果中返回無效錯誤。
allow_quoted_newlines – 是否允許帶引號的換行符 (true) 或不允許 (false)。
allow_jagged_rows – 接受缺少尾隨可選列的行。缺失值將被視為 null。如果為 false,則缺少尾隨列的記錄將被視為錯誤記錄,如果錯誤記錄過多,則在作業結果中返回無效錯誤。僅適用於 CSV,對於其他格式將被忽略。
encoding – 資料的字元編碼。參見:https://cloud.google.com/bigquery/docs/reference/rest/v2/jobs#configuration.query.tableDefinitions.(key).csvOptions.encoding https://cloud.google.com/bigquery/docs/reference/rest/v2/tables#externalDataConfiguration.csvOptions.encoding
max_id_key – 如果設定,則為要載入的 BigQuery 表中列的名稱。載入完成後,此名稱將用於從 BigQuery 中選擇 MAX 值。結果將由 execute() 命令返回,並存儲在 XCom 中供後續運算子使用。這對於增量載入很有幫助——在未來的執行中,你可以從最大 ID 開始。
schema_update_options – 允許將目標表的模式作為載入作業的副作用進行更新。
src_fmt_configs – 配置特定於源格式的可選欄位
external_table – 標誌,用於指定目標表是否應為 BigQuery 外部表。預設值為 False。
time_partitioning – 配置可選的時間分割槽欄位,即根據 API 規範按欄位、型別和過期時間進行分割槽。請注意,‘field’ 不能與 dataset.table$partition 同時使用。
cluster_fields – 請求將此載入的結果按一個或多個列排序儲存。BigQuery 支援對分割槽表和非分割槽表進行聚類。給定的列順序決定了排序順序。不適用於外部表。
autodetect – [可選] 指示是否應自動推斷 CSV 和 JSON 源的選項和模式。(預設值:
True)。如果未定義 ‘schema_fields’ 和 ‘schema_object’,則必須將引數設定為 True。如果在 Airflow 外部建立表,建議將其設定為 True。如果 autodetect 為 None 且未提供模式(無論是透過 schema_fields 還是 schema_object),則假定表已存在。encryption_configuration –
[可選] 自定義加密配置(例如,Cloud KMS 金鑰)。
encryption_configuration = { "kmsKeyName": "projects/testp/locations/us/keyRings/test-kr/cryptoKeys/test-key", }
location – [可選] 作業的地理位置。除美國和歐盟外,必填。詳細資訊請參見 https://cloud.google.com/bigquery/docs/locations#specifying_your_location
impersonation_chain (str | collections.abc.Sequence[str] | None) – 可選的服務賬號,用於使用短期憑據進行模擬,或者獲取列表中最後一個賬號的 access_token 所需的賬號鏈列表,該賬號將在請求中被模擬。如果設定為字串,則該賬號必須授予發起賬號 Service Account Token Creator IAM 角色。如果設定為序列,則列表中身份必須授予 Service Account Token Creator IAM 角色給直接前一個身份,列表中第一個賬號將此角色授予發起賬號(模板化)。
labels – [可選] BigQuery 表的標籤。
description – [可選] BigQuery 表的描述。僅在新建目標表時使用。如果表已存在且提供了與當前描述不同的值,則作業將失敗。
deferrable (bool) – 以可延遲模式執行運算子
force_delete (bool) – 如果目標表已存在,則強制刪除它。
- template_fields: collections.abc.Sequence[str] = ('bucket', 'source_objects', 'schema_object', 'schema_object_bucket',...[原始碼]¶
- template_ext: collections.abc.Sequence[str] = ('.sql',)[原始碼]¶
- execute(context)[source]¶
在建立 Operator 時派生。
Context 是與渲染 Jinja 模板時使用的相同的字典。
請參閱 get_template_context 以獲取更多上下文資訊。
- execute_complete(context, event)[source]¶
立即返回,並依賴 trigger 丟擲成功事件。trigger 的回撥函式。
依賴 trigger 丟擲異常,否則假定執行成功。