airflow.providers.google.cloud.hooks.dataflow

此模組包含一個 Google Dataflow Hook。

屬性

DEFAULT_DATAFLOW_LOCATION

JOB_ID_PATTERN

T

DataflowJobStatus

Dataflow 作業狀態的幫助類。

DataflowJobType

Dataflow 作業型別的幫助類。

DataflowHook

用於 Google Dataflow 的 Hook。

AsyncDataflowHook

Dataflow 服務的非同步 hook 類。

函式

process_line_and_extract_dataflow_job_id_callback(...)

構建觸發指定函式的回撥。

模組內容

airflow.providers.google.cloud.hooks.dataflow.DEFAULT_DATAFLOW_LOCATION = 'us-central1'[原始碼]
airflow.providers.google.cloud.hooks.dataflow.JOB_ID_PATTERN[原始碼]
airflow.providers.google.cloud.hooks.dataflow.T[原始碼]
airflow.providers.google.cloud.hooks.dataflow.process_line_and_extract_dataflow_job_id_callback(on_new_job_id_callback)[原始碼]

構建觸發指定函式的回撥。

返回的回撥函式旨在用作 BeamCommandRunner 中的 process_line_callback

引數:

on_new_job_id_callback (Callable[[str], None] | None) – 當作業 ID 已知時呼叫的回撥函式

class airflow.providers.google.cloud.hooks.dataflow.DataflowJobStatus[原始碼]

Dataflow 作業狀態的幫助類。

參考: https://cloud.google.com/dataflow/docs/reference/rest/v1b3/projects.jobs#Job.JobState

JOB_STATE_DONE = 'JOB_STATE_DONE'[原始碼]
JOB_STATE_UNKNOWN = 'JOB_STATE_UNKNOWN'[原始碼]
JOB_STATE_STOPPED = 'JOB_STATE_STOPPED'[原始碼]
JOB_STATE_RUNNING = 'JOB_STATE_RUNNING'[原始碼]
JOB_STATE_FAILED = 'JOB_STATE_FAILED'[原始碼]
JOB_STATE_CANCELLED = 'JOB_STATE_CANCELLED'[原始碼]
JOB_STATE_UPDATED = 'JOB_STATE_UPDATED'[原始碼]
JOB_STATE_DRAINING = 'JOB_STATE_DRAINING'[原始碼]
JOB_STATE_DRAINED = 'JOB_STATE_DRAINED'[原始碼]
JOB_STATE_PENDING = 'JOB_STATE_PENDING'[原始碼]
JOB_STATE_CANCELLING = 'JOB_STATE_CANCELLING'[原始碼]
JOB_STATE_QUEUED = 'JOB_STATE_QUEUED'[原始碼]
FAILED_END_STATES[原始碼]
SUCCEEDED_END_STATES[原始碼]
TERMINAL_STATES[原始碼]
AWAITING_STATES[原始碼]
class airflow.providers.google.cloud.hooks.dataflow.DataflowJobType[原始碼]

Dataflow 作業型別的幫助類。

JOB_TYPE_UNKNOWN = 'JOB_TYPE_UNKNOWN'[原始碼]
JOB_TYPE_BATCH = 'JOB_TYPE_BATCH'[原始碼]
JOB_TYPE_STREAMING = 'JOB_TYPE_STREAMING'[原始碼]
class airflow.providers.google.cloud.hooks.dataflow.DataflowHook(gcp_conn_id='google_cloud_default', poll_sleep=10, impersonation_chain=None, drain_pipeline=False, cancel_timeout=5 * 60, wait_until_finished=None, expected_terminal_state=None, **kwargs)[原始碼]

基類: airflow.providers.google.common.hooks.base_google.GoogleBaseHook

用於 Google Dataflow 的 Hook。

此 Hook 中所有使用 project_id 的方法必須使用關鍵字引數而非位置引數呼叫。

poll_sleep = 10[原始碼]
drain_pipeline = False[原始碼]
cancel_timeout = 300[原始碼]
wait_until_finished = None[原始碼]
job_id: str | None = None[原始碼]
beam_hook[原始碼]
expected_terminal_state = None[原始碼]
get_conn()[原始碼]

返回一個 Google Cloud Dataflow 服務物件。

get_pipelines_conn()[原始碼]

返回一個 Google Cloud Data Pipelines 服務物件。

start_template_dataflow(job_name, variables, parameters, dataflow_template, project_id, append_job_name=True, on_new_job_id_callback=None, on_new_job_callback=None, location=DEFAULT_DATAFLOW_LOCATION, environment=None)[原始碼]

使用經典模板啟動一個 Dataflow 作業並等待其完成。

引數:
  • job_name (str) – 作業的名稱。

  • variables (dict) –

    作業執行時環境選項的對映。如果傳遞了 environment 引數,它將更新。

    參見

    有關可能配置的更多資訊,請查閱 API 文件 https://cloud.google.com/dataflow/pipelines/specifying-exec-params

  • parameters (dict) – 模板的引數

  • dataflow_template (str) – 模板的 GCS 路徑。

  • project_id (str) – 可選,用於啟動作業的 Google Cloud 專案 ID。如果設定為 None 或缺失,則使用 Google Cloud 連線中的預設 project_id。

  • append_job_name (bool) – 如果需要在作業名稱後附加唯一字尾,則為 True。

  • on_new_job_id_callback (Callable[[str], None] | None) – (已棄用)當作業已知時呼叫的回撥函式。

  • on_new_job_callback (Callable[[dict], None] | None) – 當作業已知時呼叫的回撥函式。

  • location (str) –

    作業位置。

    參見

    有關可能配置的更多資訊,請查閱 API 文件 https://cloud.google.com/dataflow/pipelines/specifying-exec-params

launch_job_with_template(*, job_name, variables, parameters, dataflow_template, project_id, append_job_name=True, location=DEFAULT_DATAFLOW_LOCATION, environment=None)[原始碼]

使用經典模板啟動一個 Dataflow 作業,並退出而不等待其完成。

引數:
  • job_name (str) – 作業的名稱。

  • variables (dict) –

    作業執行時環境選項的對映。如果傳遞了 environment 引數,它將更新。

    參見

    有關可能配置的更多資訊,請查閱 API 文件 https://cloud.google.com/dataflow/pipelines/specifying-exec-params

  • parameters (dict) – 模板的引數

  • dataflow_template (str) – 模板的 GCS 路徑。

  • project_id (str) – 可選,用於啟動作業的 Google Cloud 專案 ID。如果設定為 None 或缺失,則使用 Google Cloud 連線中的預設 project_id。

  • append_job_name (bool) – 如果需要在作業名稱後附加唯一字尾,則為 True。

  • location (str) –

    作業位置。

    參見

    有關可能配置的更多資訊,請查閱 API 文件 https://cloud.google.com/dataflow/pipelines/specifying-exec-params

返回:

Dataflow 作業響應

返回型別:

dict[str, str]

send_launch_template_request(*, project_id, location, gcs_path, job_name, parameters, environment)[原始碼]
start_flex_template(body, location, project_id, on_new_job_id_callback=None, on_new_job_callback=None)[原始碼]

使用 Flex 模板啟動一個 Dataflow 作業並等待其完成。

引數:
返回:

作業

返回型別:

dict[str, str]

launch_job_with_flex_template(body, location, project_id)[原始碼]

使用 Flex 模板啟動一個 Dataflow 作業,並退出而不等待作業完成。

引數:
返回:

Dataflow 作業響應

返回型別:

dict[str, str]

launch_beam_yaml_job(*, job_name, yaml_pipeline_file, append_job_name, jinja_variables, options, project_id, location=DEFAULT_DATAFLOW_LOCATION)[原始碼]

啟動一個 Dataflow YAML 作業並執行直到完成。

引數:
  • job_name (str) – 分配給 Cloud Dataflow 作業的唯一名稱。

  • yaml_pipeline_file (str) – 定義要執行的 YAML pipeline 的檔案路徑。必須是本地檔案或以 ‘gs://’ 開頭的 URL。

  • append_job_name (bool) – 如果必須在 job_name 後附加唯一字尾,則設定為 True。

  • jinja_variables (dict[str, str] | None) – 用於具體化 yaml pipeline 檔案的 Jinja2 變數字典。

  • options (dict[str, Any] | None) – 額外的 gcloud 或 Beam 作業引數。它必須是一個字典,其鍵與 gcloud 中的可選標誌名稱匹配。支援的標誌列表可以在以下連結找到:https://cloud.google.com/sdk/gcloud/reference/dataflow/yaml/run。請注意,如果某個標誌不需要值,則其字典值必須為 True 或 None。例如,–log-http 標誌可以作為 {‘log-http’: True} 傳遞。

  • project_id (str) – 擁有該作業的 GCP 專案 ID。

  • location (str) – 作業區域端點的區域 ID。預設為 ‘us-central1’。

  • on_new_job_callback – 一旦知道作業後,將作業傳遞給 operator 的回撥函式。

返回:

作業 ID。

返回型別:

str

static extract_job_id(job)[原始碼]
static build_dataflow_job_name(job_name, append_job_name=True)[原始碼]

構建 Dataflow 作業名稱。

is_job_dataflow_running(name, project_id, location=None, variables=None)[原始碼]

檢查 Dataflow 作業是否仍在執行。

引數:
  • name (str) – 作業的名稱。

  • project_id (str) – 可選,用於啟動作業的 Google Cloud 專案 ID。如果設定為 None 或缺失,則使用 Google Cloud 連線中的預設 project_id。

  • location (str | None) – 作業位置。

返回:

如果作業正在執行,則為 True。

返回型別:

bool

cancel_job(project_id, job_name=None, job_id=None, location=DEFAULT_DATAFLOW_LOCATION)[原始碼]

使用指定的名稱字首或作業 ID 取消作業。

namejob_id 引數是互斥的。

引數:
  • job_name (str | None) – 指定要取消哪些作業的名稱字首。

  • job_id (str | None) – 指定要取消哪些作業的作業 ID。

  • location (str) – 作業位置。

  • project_id (str) – 可選,用於啟動作業的 Google Cloud 專案 ID。如果設定為 None 或缺失,則使用 Google Cloud 連線中的預設 project_id。

start_sql_job(job_name, query, options, project_id, location=DEFAULT_DATAFLOW_LOCATION, on_new_job_id_callback=None, on_new_job_callback=None)[source]

啟動 Dataflow SQL 查詢。

引數:
  • job_name (str) – 分配給 Cloud Dataflow 作業的唯一名稱。

  • query (str) – 要執行的 SQL 查詢。

  • options (dict[str, Any]) – 要執行的作業引數。更多資訊,請參考:https://cloud.google.com/sdk/gcloud/reference/beta/dataflow/sql/query 命令參考

  • location (str) – Dataflow 作業的位置(例如 europe-west1)

  • project_id (str) – 擁有該作業的 GCP 專案 ID。如果設定為 None 或缺失,則使用 GCP 連線中的預設 project_id。

  • on_new_job_id_callback (Callable[[str], None] | None) – (已棄用)當作業 ID 已知時呼叫的回撥函式。

  • on_new_job_callback (Callable[[dict], None] | None) – 當作業已知時呼叫的回撥函式。

返回:

新的作業物件

get_job(job_id, project_id=PROVIDE_PROJECT_ID, location=DEFAULT_DATAFLOW_LOCATION)[source]

獲取指定作業 ID 的作業。

引數:
返回:

作業

返回型別:

dict

fetch_job_metrics_by_id(job_id, project_id, location=DEFAULT_DATAFLOW_LOCATION)[source]

獲取指定作業 ID 的作業指標。

引數:
返回:

JobMetrics。請參考:https://cloud.google.com/dataflow/docs/reference/rest/v1b3/JobMetrics

返回型別:

dict

fetch_job_messages_by_id(job_id, project_id, location=DEFAULT_DATAFLOW_LOCATION)[source]

獲取指定作業 ID 的作業訊息。

引數:
  • job_id (str) – 要獲取的作業 ID。

  • project_id (str) – 可選,用於啟動作業的 Google Cloud 專案 ID。如果設定為 None 或缺失,則使用 Google Cloud 連線中的預設 project_id。

  • location (str) – 作業位置。

返回:

JobMessages 列表。請參考:https://cloud.google.com/dataflow/docs/reference/rest/v1b3/ListJobMessagesResponse#JobMessage

返回型別:

list[dict]

fetch_job_autoscaling_events_by_id(job_id, project_id, location=DEFAULT_DATAFLOW_LOCATION)[source]

獲取指定作業 ID 的作業自動擴縮事件。

引數:
  • job_id (str) – 要獲取的作業 ID。

  • project_id (str) – 可選,用於啟動作業的 Google Cloud 專案 ID。如果設定為 None 或缺失,則使用 Google Cloud 連線中的預設 project_id。

  • location (str) – 作業位置。

返回:

AutoscalingEvents 列表。請參考:https://cloud.google.com/dataflow/docs/reference/rest/v1b3/ListJobMessagesResponse#autoscalingevent

返回型別:

list[dict]

wait_for_done(job_name, location, project_id, job_id=None, multiple_jobs=False)[source]

等待 Dataflow 作業完成。

引數:
  • job_name (str) – 執行 DataFlow 作業時使用的“jobName”(模板化)。這將最終在管道選項中設定,因此 options 中任何鍵為 'jobName' 的條目都將被覆蓋。

  • location (str) – 作業執行的位置

  • project_id (str) – 可選,用於啟動作業的 Google Cloud 專案 ID。如果設定為 None 或缺失,則使用 Google Cloud 連線中的預設 project_id。

  • job_id (str | None) – Dataflow 作業 ID

  • multiple_jobs (bool) – 如果管道建立多個作業,則監控所有作業

is_job_done(location, project_id, job_id)[source]

檢查 Dataflow 作業是否已啟動(對於流處理作業)或已完成(對於批處理作業)。

引數:
  • location (str) – 作業執行的位置

  • project_id (str) – 要在其中啟動作業的 Google Cloud 專案 ID

  • job_id (str) – Dataflow 作業 ID

create_data_pipeline(body, project_id, location=DEFAULT_DATAFLOW_LOCATION)[source]

建立一個新的 Dataflow Data Pipelines 例項。

引數:

以 JSON 格式返回建立的 Data Pipelines 例項。

get_data_pipeline(pipeline_name, project_id, location=DEFAULT_DATAFLOW_LOCATION)[source]

檢索 Dataflow Data Pipelines 例項。

引數:
  • pipeline_name (str) – 管道的顯示名稱。例如 projects/PROJECT_ID/locations/LOCATION_ID/pipelines/PIPELINE_ID 中,它就是 PIPELINE_ID。

  • project_id (str) – 擁有該作業的 GCP 專案 ID。

  • location (str) – Data Pipelines 例項的目標位置(例如 us-central1)。

以 JSON 格式返回建立的 Data Pipelines 例項。

run_data_pipeline(pipeline_name, project_id, location=DEFAULT_DATAFLOW_LOCATION)[source]

執行 Dataflow Data Pipeline 例項。

引數:
  • pipeline_name (str) – 管道的顯示名稱。例如 projects/PROJECT_ID/locations/LOCATION_ID/pipelines/PIPELINE_ID 中,它就是 PIPELINE_ID。

  • project_id (str) – 擁有該作業的 GCP 專案 ID。

  • location (str) – Data Pipelines 例項的目標位置(例如 us-central1)。

以 JSON 格式返回建立的作業。

delete_data_pipeline(pipeline_name, project_id, location=DEFAULT_DATAFLOW_LOCATION)[source]

刪除 Dataflow Data Pipelines 例項。

引數:
  • pipeline_name (str) – 管道的顯示名稱。例如 projects/PROJECT_ID/locations/LOCATION_ID/pipelines/PIPELINE_ID 中,它就是 PIPELINE_ID。

  • project_id (str) – 擁有該作業的 GCP 專案 ID。

  • location (str) – Data Pipelines 例項的目標位置(例如 us-central1)。

以 JSON 格式返回建立的作業。

static build_parent_name(project_id, location)[source]
class airflow.providers.google.cloud.hooks.dataflow.AsyncDataflowHook(**kwargs)[source]

基類:airflow.providers.google.common.hooks.base_google.GoogleBaseAsyncHook

Dataflow 服務的非同步 hook 類。

sync_hook_class[source]
async initialize_client(client_class)[source]

初始化給定類的物件。

此方法用於初始化非同步客戶端。由於 Dataflow 服務使用了大量類,因此決定以相同的方式初始化它們,並使用從 GoogleBaseHook 類方法接收到的憑據。 :param client_class: Google Cloud SDK 的類

async get_project_id()[source]
async get_job(job_id, project_id=PROVIDE_PROJECT_ID, job_view=JobView.JOB_VIEW_SUMMARY, location=DEFAULT_DATAFLOW_LOCATION)[source]

獲取指定作業 ID 的作業。

引數:
  • job_id (str) – 要獲取的作業 ID。

  • project_id (str) – 要在其中啟動作業的 Google Cloud 專案 ID。如果設定為 None 或缺失,則使用 Google Cloud 連線中的預設 project_id。

  • job_view (int) – 可選。JobView 物件,用於確定返回資料的表示形式

  • location (str) – 可選。Dataflow 作業的位置(例如 europe-west1)。請參考:https://cloud.google.com/dataflow/docs/concepts/regional-endpoints

async get_job_status(job_id, project_id=PROVIDE_PROJECT_ID, job_view=JobView.JOB_VIEW_SUMMARY, location=DEFAULT_DATAFLOW_LOCATION)[source]

獲取指定作業 ID 的作業狀態。

引數:
  • job_id (str) – 要獲取的作業 ID。

  • project_id (str) – 要在其中啟動作業的 Google Cloud 專案 ID。如果設定為 None 或缺失,則使用 Google Cloud 連線中的預設 project_id。

  • job_view (int) – 可選。JobView 物件,用於確定返回資料的表示形式

  • location (str) – 可選。Dataflow 作業的位置(例如 europe-west1)。請參考:https://cloud.google.com/dataflow/docs/concepts/regional-endpoints

async list_jobs(jobs_filter=None, project_id=PROVIDE_PROJECT_ID, location=DEFAULT_DATAFLOW_LOCATION, page_size=None, page_token=None)[source]

列出作業。

詳情請參考:https://cloud.google.com/python/docs/reference/dataflow/latest/google.cloud.dataflow_v1beta3.types.ListJobsRequest

引數:
  • jobs_filter (int | None) – 可選。此欄位過濾並返回指定作業狀態的作業。

  • project_id (str | None) – 可選。要啟動作業的 Google Cloud 專案 ID。如果設定為 None 或缺失,則使用 Google Cloud 連線中的預設 project_id。

  • location (str | None) – 可選。Dataflow 作業的位置(例如 europe-west1)。

  • page_size (int | None) – 可選。如果有很多作業,則最多將響應限制為此數量。

  • page_token (str | None) – 可選。將其設定為先前響應的“next_page_token”欄位,以請求長列表中的其他結果。

async list_job_messages(job_id, project_id=PROVIDE_PROJECT_ID, minimum_importance=JobMessageImportance.JOB_MESSAGE_BASIC, page_size=None, page_token=None, start_time=None, end_time=None, location=DEFAULT_DATAFLOW_LOCATION)[source]

從 MessagesV1Beta3AsyncClient 返回 ListJobMessagesAsyncPager 物件。

此方法封裝了 MessagesV1Beta3AsyncClient 的一個類似方法。ListJobMessagesAsyncPager 可以迭代以提取與特定作業 ID 關聯的訊息。

有關更多詳細資訊,請參閱 MessagesV1Beta3AsyncClient 方法說明:https://cloud.google.com/python/docs/reference/dataflow/latest/google.cloud.dataflow_v1beta3.services.messages_v1_beta3.MessagesV1Beta3AsyncClient

引數:
  • job_id (str) – 要獲取訊息的 Dataflow 作業 ID。

  • project_id (str | None) – 可選。要啟動作業的 Google Cloud 專案 ID。如果設定為 None 或缺失,則使用 Google Cloud 連線中的預設 project_id。

  • minimum_importance (int) – 可選。過濾只獲取重要性 >= level 的訊息。有關更多詳細資訊,請參閱說明:https://cloud.google.com/python/docs/reference/dataflow/latest/google.cloud.dataflow_v1beta3.types.JobMessageImportance

  • page_size (int | None) – 可選。如果指定,則確定要返回的最大訊息數。如果未指定,服務可能會選擇適當的預設值,或返回任意大量結果。

  • page_token (str | None) – 可選。如果提供,這應該是先前呼叫返回的 next_page_token 的值。這將導致返回下一頁結果。

  • start_time (google.protobuf.timestamp_pb2.Timestamp | None) – 可選。如果指定,則僅返回時間戳 >= start_time 的訊息。預設值為作業建立時間(即訊息的開始)。

  • end_time (google.protobuf.timestamp_pb2.Timestamp | None) – 可選。如果指定,則僅返回時間戳 < end_time 的訊息。預設值為當前時間。

  • location (str | None) – 可選。包含由 job_id 指定的作業的[區域端點](https://cloud.google.com/dataflow/docs/concepts/regional-endpoints)。

async get_job_metrics(job_id, project_id=PROVIDE_PROJECT_ID, start_time=None, location=DEFAULT_DATAFLOW_LOCATION)[source]

從 MetricsV1Beta3AsyncClient 返回 JobMetrics 物件。

此方法封裝了 MetricsV1Beta3AsyncClient 的一個類似方法。

有關更多詳細資訊,請參閱 MetricsV1Beta3AsyncClient 方法說明:https://cloud.google.com/python/docs/reference/dataflow/latest/google.cloud.dataflow_v1beta3.services.metrics_v1_beta3.MetricsV1Beta3AsyncClient

引數:
  • job_id (str) – 要獲取指標的 Dataflow 作業 ID。

  • project_id (str | None) – 可選。要啟動作業的 Google Cloud 專案 ID。如果設定為 None 或缺失,則使用 Google Cloud 連線中的預設 project_id。

  • start_time (google.protobuf.timestamp_pb2.Timestamp | None) – 可選。僅返回自此時間以來發生變化的指標資料。預設是返回關於作業所有指標的所有資訊。

  • location (str | None) – 可選。包含由 job_id 指定的作業的[區域端點](https://cloud.google.com/dataflow/docs/concepts/regional-endpoints)。

此條目有幫助嗎?