airflow.providers.google.cloud.hooks.dataflow¶
此模組包含一個 Google Dataflow Hook。
屬性¶
類¶
Dataflow 作業狀態的幫助類。 |
|
Dataflow 作業型別的幫助類。 |
|
用於 Google Dataflow 的 Hook。 |
|
Dataflow 服務的非同步 hook 類。 |
函式¶
構建觸發指定函式的回撥。 |
模組內容¶
- airflow.providers.google.cloud.hooks.dataflow.process_line_and_extract_dataflow_job_id_callback(on_new_job_id_callback)[原始碼]¶
構建觸發指定函式的回撥。
返回的回撥函式旨在用作
BeamCommandRunner中的process_line_callback。- 引數:
on_new_job_id_callback (Callable[[str], None] | None) – 當作業 ID 已知時呼叫的回撥函式
- class airflow.providers.google.cloud.hooks.dataflow.DataflowJobStatus[原始碼]¶
Dataflow 作業狀態的幫助類。
參考: https://cloud.google.com/dataflow/docs/reference/rest/v1b3/projects.jobs#Job.JobState
- class airflow.providers.google.cloud.hooks.dataflow.DataflowHook(gcp_conn_id='google_cloud_default', poll_sleep=10, impersonation_chain=None, drain_pipeline=False, cancel_timeout=5 * 60, wait_until_finished=None, expected_terminal_state=None, **kwargs)[原始碼]¶
基類:
airflow.providers.google.common.hooks.base_google.GoogleBaseHook用於 Google Dataflow 的 Hook。
此 Hook 中所有使用 project_id 的方法必須使用關鍵字引數而非位置引數呼叫。
- start_template_dataflow(job_name, variables, parameters, dataflow_template, project_id, append_job_name=True, on_new_job_id_callback=None, on_new_job_callback=None, location=DEFAULT_DATAFLOW_LOCATION, environment=None)[原始碼]¶
使用經典模板啟動一個 Dataflow 作業並等待其完成。
- 引數:
job_name (str) – 作業的名稱。
variables (dict) –
作業執行時環境選項的對映。如果傳遞了 environment 引數,它將更新。
參見
有關可能配置的更多資訊,請查閱 API 文件 https://cloud.google.com/dataflow/pipelines/specifying-exec-params
parameters (dict) – 模板的引數
dataflow_template (str) – 模板的 GCS 路徑。
project_id (str) – 可選,用於啟動作業的 Google Cloud 專案 ID。如果設定為 None 或缺失,則使用 Google Cloud 連線中的預設 project_id。
append_job_name (bool) – 如果需要在作業名稱後附加唯一字尾,則為 True。
on_new_job_id_callback (Callable[[str], None] | None) – (已棄用)當作業已知時呼叫的回撥函式。
on_new_job_callback (Callable[[dict], None] | None) – 當作業已知時呼叫的回撥函式。
location (str) –
作業位置。
參見
有關可能配置的更多資訊,請查閱 API 文件 https://cloud.google.com/dataflow/pipelines/specifying-exec-params
- launch_job_with_template(*, job_name, variables, parameters, dataflow_template, project_id, append_job_name=True, location=DEFAULT_DATAFLOW_LOCATION, environment=None)[原始碼]¶
使用經典模板啟動一個 Dataflow 作業,並退出而不等待其完成。
- 引數:
job_name (str) – 作業的名稱。
variables (dict) –
作業執行時環境選項的對映。如果傳遞了 environment 引數,它將更新。
參見
有關可能配置的更多資訊,請查閱 API 文件 https://cloud.google.com/dataflow/pipelines/specifying-exec-params
parameters (dict) – 模板的引數
dataflow_template (str) – 模板的 GCS 路徑。
project_id (str) – 可選,用於啟動作業的 Google Cloud 專案 ID。如果設定為 None 或缺失,則使用 Google Cloud 連線中的預設 project_id。
append_job_name (bool) – 如果需要在作業名稱後附加唯一字尾,則為 True。
location (str) –
作業位置。
參見
有關可能配置的更多資訊,請查閱 API 文件 https://cloud.google.com/dataflow/pipelines/specifying-exec-params
- 返回:
Dataflow 作業響應
- 返回型別:
- send_launch_template_request(*, project_id, location, gcs_path, job_name, parameters, environment)[原始碼]¶
- start_flex_template(body, location, project_id, on_new_job_id_callback=None, on_new_job_callback=None)[原始碼]¶
使用 Flex 模板啟動一個 Dataflow 作業並等待其完成。
- 引數:
body (dict) – 請求體。參見: https://cloud.google.com/dataflow/docs/reference/rest/v1b3/projects.locations.flexTemplates/launch#request-body
location (str) – Dataflow 作業的位置(例如 europe-west1)
project_id (str) – 擁有該作業的 GCP 專案 ID。如果設定為
None或缺失,則使用 GCP 連線中的預設 project_id。on_new_job_id_callback (Callable[[str], None] | None) – (已棄用)檢測到作業 ID 時呼叫的回撥函式。
on_new_job_callback (Callable[[dict], None] | None) – 檢測到作業時呼叫的回撥函式。
- 返回:
作業
- 返回型別:
- launch_job_with_flex_template(body, location, project_id)[原始碼]¶
使用 Flex 模板啟動一個 Dataflow 作業,並退出而不等待作業完成。
- 引數:
body (dict) – 請求體。參見: https://cloud.google.com/dataflow/docs/reference/rest/v1b3/projects.locations.flexTemplates/launch#request-body
location (str) – Dataflow 作業的位置(例如 europe-west1)
project_id (str) – 擁有該作業的 GCP 專案 ID。如果設定為
None或缺失,則使用 GCP 連線中的預設 project_id。
- 返回:
Dataflow 作業響應
- 返回型別:
- launch_beam_yaml_job(*, job_name, yaml_pipeline_file, append_job_name, jinja_variables, options, project_id, location=DEFAULT_DATAFLOW_LOCATION)[原始碼]¶
啟動一個 Dataflow YAML 作業並執行直到完成。
- 引數:
job_name (str) – 分配給 Cloud Dataflow 作業的唯一名稱。
yaml_pipeline_file (str) – 定義要執行的 YAML pipeline 的檔案路徑。必須是本地檔案或以 ‘gs://’ 開頭的 URL。
append_job_name (bool) – 如果必須在 job_name 後附加唯一字尾,則設定為 True。
jinja_variables (dict[str, str] | None) – 用於具體化 yaml pipeline 檔案的 Jinja2 變數字典。
options (dict[str, Any] | None) – 額外的 gcloud 或 Beam 作業引數。它必須是一個字典,其鍵與 gcloud 中的可選標誌名稱匹配。支援的標誌列表可以在以下連結找到:https://cloud.google.com/sdk/gcloud/reference/dataflow/yaml/run。請注意,如果某個標誌不需要值,則其字典值必須為 True 或 None。例如,–log-http 標誌可以作為 {‘log-http’: True} 傳遞。
project_id (str) – 擁有該作業的 GCP 專案 ID。
location (str) – 作業區域端點的區域 ID。預設為 ‘us-central1’。
on_new_job_callback – 一旦知道作業後,將作業傳遞給 operator 的回撥函式。
- 返回:
作業 ID。
- 返回型別:
- is_job_dataflow_running(name, project_id, location=None, variables=None)[原始碼]¶
檢查 Dataflow 作業是否仍在執行。
- cancel_job(project_id, job_name=None, job_id=None, location=DEFAULT_DATAFLOW_LOCATION)[原始碼]¶
使用指定的名稱字首或作業 ID 取消作業。
name和job_id引數是互斥的。
- start_sql_job(job_name, query, options, project_id, location=DEFAULT_DATAFLOW_LOCATION, on_new_job_id_callback=None, on_new_job_callback=None)[source]¶
啟動 Dataflow SQL 查詢。
- 引數:
job_name (str) – 分配給 Cloud Dataflow 作業的唯一名稱。
query (str) – 要執行的 SQL 查詢。
options (dict[str, Any]) – 要執行的作業引數。更多資訊,請參考:https://cloud.google.com/sdk/gcloud/reference/beta/dataflow/sql/query 命令參考
location (str) – Dataflow 作業的位置(例如 europe-west1)
project_id (str) – 擁有該作業的 GCP 專案 ID。如果設定為
None或缺失,則使用 GCP 連線中的預設 project_id。on_new_job_id_callback (Callable[[str], None] | None) – (已棄用)當作業 ID 已知時呼叫的回撥函式。
on_new_job_callback (Callable[[dict], None] | None) – 當作業已知時呼叫的回撥函式。
- 返回:
新的作業物件
- get_job(job_id, project_id=PROVIDE_PROJECT_ID, location=DEFAULT_DATAFLOW_LOCATION)[source]¶
獲取指定作業 ID 的作業。
- 引數:
job_id (str) – 要獲取的作業 ID。
project_id (str) – 可選,用於啟動作業的 Google Cloud 專案 ID。如果設定為 None 或缺失,則使用 Google Cloud 連線中的預設 project_id。
location (str) – Dataflow 作業的位置(例如 europe-west1)。請參考:https://cloud.google.com/dataflow/docs/concepts/regional-endpoints
- 返回:
作業
- 返回型別:
- fetch_job_metrics_by_id(job_id, project_id, location=DEFAULT_DATAFLOW_LOCATION)[source]¶
獲取指定作業 ID 的作業指標。
- 引數:
job_id (str) – 要獲取的作業 ID。
project_id (str) – 可選,用於啟動作業的 Google Cloud 專案 ID。如果設定為 None 或缺失,則使用 Google Cloud 連線中的預設 project_id。
location (str) – Dataflow 作業的位置(例如 europe-west1)。請參考:https://cloud.google.com/dataflow/docs/concepts/regional-endpoints
- 返回:
JobMetrics。請參考:https://cloud.google.com/dataflow/docs/reference/rest/v1b3/JobMetrics
- 返回型別:
- fetch_job_messages_by_id(job_id, project_id, location=DEFAULT_DATAFLOW_LOCATION)[source]¶
獲取指定作業 ID 的作業訊息。
- fetch_job_autoscaling_events_by_id(job_id, project_id, location=DEFAULT_DATAFLOW_LOCATION)[source]¶
獲取指定作業 ID 的作業自動擴縮事件。
- wait_for_done(job_name, location, project_id, job_id=None, multiple_jobs=False)[source]¶
等待 Dataflow 作業完成。
- 引數:
job_name (str) – 執行 DataFlow 作業時使用的“jobName”(模板化)。這將最終在管道選項中設定,因此
options中任何鍵為'jobName'的條目都將被覆蓋。location (str) – 作業執行的位置
project_id (str) – 可選,用於啟動作業的 Google Cloud 專案 ID。如果設定為 None 或缺失,則使用 Google Cloud 連線中的預設 project_id。
job_id (str | None) – Dataflow 作業 ID
multiple_jobs (bool) – 如果管道建立多個作業,則監控所有作業
- create_data_pipeline(body, project_id, location=DEFAULT_DATAFLOW_LOCATION)[source]¶
建立一個新的 Dataflow Data Pipelines 例項。
- 引數:
body (dict) – 請求體(包含 Pipeline 例項)。請參考:https://cloud.google.com/dataflow/docs/reference/data-pipelines/rest/v1/projects.locations.pipelines/create#request-body
project_id (str) – 擁有該作業的 GCP 專案 ID。
location (str) – Data Pipelines 例項的目標位置(例如 us-central1)。
以 JSON 格式返回建立的 Data Pipelines 例項。
- get_data_pipeline(pipeline_name, project_id, location=DEFAULT_DATAFLOW_LOCATION)[source]¶
檢索 Dataflow Data Pipelines 例項。
- 引數:
以 JSON 格式返回建立的 Data Pipelines 例項。
- run_data_pipeline(pipeline_name, project_id, location=DEFAULT_DATAFLOW_LOCATION)[source]¶
執行 Dataflow Data Pipeline 例項。
- 引數:
以 JSON 格式返回建立的作業。
- class airflow.providers.google.cloud.hooks.dataflow.AsyncDataflowHook(**kwargs)[source]¶
基類:
airflow.providers.google.common.hooks.base_google.GoogleBaseAsyncHookDataflow 服務的非同步 hook 類。
- async initialize_client(client_class)[source]¶
初始化給定類的物件。
此方法用於初始化非同步客戶端。由於 Dataflow 服務使用了大量類,因此決定以相同的方式初始化它們,並使用從 GoogleBaseHook 類方法接收到的憑據。 :param client_class: Google Cloud SDK 的類
- async get_job(job_id, project_id=PROVIDE_PROJECT_ID, job_view=JobView.JOB_VIEW_SUMMARY, location=DEFAULT_DATAFLOW_LOCATION)[source]¶
獲取指定作業 ID 的作業。
- 引數:
job_id (str) – 要獲取的作業 ID。
project_id (str) – 要在其中啟動作業的 Google Cloud 專案 ID。如果設定為 None 或缺失,則使用 Google Cloud 連線中的預設 project_id。
job_view (int) – 可選。JobView 物件,用於確定返回資料的表示形式
location (str) – 可選。Dataflow 作業的位置(例如 europe-west1)。請參考:https://cloud.google.com/dataflow/docs/concepts/regional-endpoints
- async get_job_status(job_id, project_id=PROVIDE_PROJECT_ID, job_view=JobView.JOB_VIEW_SUMMARY, location=DEFAULT_DATAFLOW_LOCATION)[source]¶
獲取指定作業 ID 的作業狀態。
- 引數:
job_id (str) – 要獲取的作業 ID。
project_id (str) – 要在其中啟動作業的 Google Cloud 專案 ID。如果設定為 None 或缺失,則使用 Google Cloud 連線中的預設 project_id。
job_view (int) – 可選。JobView 物件,用於確定返回資料的表示形式
location (str) – 可選。Dataflow 作業的位置(例如 europe-west1)。請參考:https://cloud.google.com/dataflow/docs/concepts/regional-endpoints
- async list_jobs(jobs_filter=None, project_id=PROVIDE_PROJECT_ID, location=DEFAULT_DATAFLOW_LOCATION, page_size=None, page_token=None)[source]¶
列出作業。
- 引數:
jobs_filter (int | None) – 可選。此欄位過濾並返回指定作業狀態的作業。
project_id (str | None) – 可選。要啟動作業的 Google Cloud 專案 ID。如果設定為 None 或缺失,則使用 Google Cloud 連線中的預設 project_id。
location (str | None) – 可選。Dataflow 作業的位置(例如 europe-west1)。
page_size (int | None) – 可選。如果有很多作業,則最多將響應限制為此數量。
page_token (str | None) – 可選。將其設定為先前響應的“next_page_token”欄位,以請求長列表中的其他結果。
- async list_job_messages(job_id, project_id=PROVIDE_PROJECT_ID, minimum_importance=JobMessageImportance.JOB_MESSAGE_BASIC, page_size=None, page_token=None, start_time=None, end_time=None, location=DEFAULT_DATAFLOW_LOCATION)[source]¶
從 MessagesV1Beta3AsyncClient 返回 ListJobMessagesAsyncPager 物件。
此方法封裝了 MessagesV1Beta3AsyncClient 的一個類似方法。ListJobMessagesAsyncPager 可以迭代以提取與特定作業 ID 關聯的訊息。
有關更多詳細資訊,請參閱 MessagesV1Beta3AsyncClient 方法說明:https://cloud.google.com/python/docs/reference/dataflow/latest/google.cloud.dataflow_v1beta3.services.messages_v1_beta3.MessagesV1Beta3AsyncClient
- 引數:
job_id (str) – 要獲取訊息的 Dataflow 作業 ID。
project_id (str | None) – 可選。要啟動作業的 Google Cloud 專案 ID。如果設定為 None 或缺失,則使用 Google Cloud 連線中的預設 project_id。
minimum_importance (int) – 可選。過濾只獲取重要性 >= level 的訊息。有關更多詳細資訊,請參閱說明:https://cloud.google.com/python/docs/reference/dataflow/latest/google.cloud.dataflow_v1beta3.types.JobMessageImportance
page_size (int | None) – 可選。如果指定,則確定要返回的最大訊息數。如果未指定,服務可能會選擇適當的預設值,或返回任意大量結果。
page_token (str | None) – 可選。如果提供,這應該是先前呼叫返回的 next_page_token 的值。這將導致返回下一頁結果。
start_time (google.protobuf.timestamp_pb2.Timestamp | None) – 可選。如果指定,則僅返回時間戳 >= start_time 的訊息。預設值為作業建立時間(即訊息的開始)。
end_time (google.protobuf.timestamp_pb2.Timestamp | None) – 可選。如果指定,則僅返回時間戳 < end_time 的訊息。預設值為當前時間。
location (str | None) – 可選。包含由 job_id 指定的作業的[區域端點](https://cloud.google.com/dataflow/docs/concepts/regional-endpoints)。
- async get_job_metrics(job_id, project_id=PROVIDE_PROJECT_ID, start_time=None, location=DEFAULT_DATAFLOW_LOCATION)[source]¶
從 MetricsV1Beta3AsyncClient 返回 JobMetrics 物件。
此方法封裝了 MetricsV1Beta3AsyncClient 的一個類似方法。
有關更多詳細資訊,請參閱 MetricsV1Beta3AsyncClient 方法說明:https://cloud.google.com/python/docs/reference/dataflow/latest/google.cloud.dataflow_v1beta3.services.metrics_v1_beta3.MetricsV1Beta3AsyncClient
- 引數:
job_id (str) – 要獲取指標的 Dataflow 作業 ID。
project_id (str | None) – 可選。要啟動作業的 Google Cloud 專案 ID。如果設定為 None 或缺失,則使用 Google Cloud 連線中的預設 project_id。
start_time (google.protobuf.timestamp_pb2.Timestamp | None) – 可選。僅返回自此時間以來發生變化的指標資料。預設是返回關於作業所有指標的所有資訊。
location (str | None) – 可選。包含由 job_id 指定的作業的[區域端點](https://cloud.google.com/dataflow/docs/concepts/regional-endpoints)。