airflow.providers.google.cloud.operators.dataflow

此模組包含 Google Dataflow 操作器。

CheckJobRunning

用於選擇如果作業已在執行時的操作的輔助 enum。

DataflowConfiguration

用於 BeamRunJavaPipelineOperator 和 BeamRunPythonPipelineOperator 的 Dataflow 配置。

DataflowTemplatedJobStartOperator

使用經典模板啟動 Dataflow 作業;操作的引數將傳遞給作業。

DataflowStartFlexTemplateOperator

使用 Flex 模板啟動 Dataflow 作業。

DataflowStartYamlJobOperator

啟動 Dataflow YAML 作業並返回結果。

DataflowStopJobOperator

停止具有指定名稱字首或作業 ID 的作業。

DataflowCreatePipelineOperator

建立新的 Dataflow 資料管道例項。

DataflowRunPipelineOperator

執行 Dataflow 資料管道。

DataflowDeletePipelineOperator

刪除 Dataflow 資料管道。

模組內容

class airflow.providers.google.cloud.operators.dataflow.CheckJobRunning[原始碼]

基類:enum.Enum

用於選擇如果作業已在執行時的操作的輔助 enum。

IgnoreJob - 不檢查是否正在執行 FinishIfRunning - 如果正在執行則完成當前的 dag 執行而不執行任何操作 WaitForRun - 等待作業完成,然後繼續執行新作業

IgnoreJob = 1[原始碼]
FinishIfRunning = 2[原始碼]
WaitForRun = 3[原始碼]
class airflow.providers.google.cloud.operators.dataflow.DataflowConfiguration(*, job_name=None, append_job_name=True, project_id=PROVIDE_PROJECT_ID, location=DEFAULT_DATAFLOW_LOCATION, gcp_conn_id='google_cloud_default', poll_sleep=10, impersonation_chain=None, drain_pipeline=False, cancel_timeout=5 * 60, wait_until_finished=None, multiple_jobs=None, check_if_running=CheckJobRunning.WaitForRun, service_account=None)[原始碼]

用於 BeamRunJavaPipelineOperator 和 BeamRunPythonPipelineOperator 的 Dataflow 配置。

引數
  • job_name (str | None) – 執行 Dataflow 作業時使用的“jobName”(模板化)。這最終會設定在管道選項中,因此 `options` 中鍵為 'jobName''job_name' 的任何條目都將被覆蓋。

  • append_job_name (bool) – 如果需要在作業名稱後附加唯一字尾,則為 True。

  • project_id (str) – 可選,要在其中啟動作業的 Google Cloud 專案 ID。如果設定為 None 或缺失,則使用 Google Cloud 連線中的預設 project_id。

  • location (str | None) – 作業位置。

  • gcp_conn_id (str) – 連線到 Google Cloud 時使用的連線 ID。

  • poll_sleep (int) – 當作業處於 JOB_STATE_RUNNING 狀態時,輪詢 Google Cloud Platform 獲取 dataflow 作業狀態之間的睡眠時間(以秒為單位)。

  • impersonation_chain (str | collections.abc.Sequence[str] | None) –

    可選的服務帳號,使用短期憑據進行模擬,或者獲取列表中最後一個帳號的 access_token 所需的鏈式帳號列表,該帳號將在請求中被模擬。如果設定為字串,則原始帳號必須向該帳號授予 Service Account Token Creator IAM 角色。如果設定為序列,則列表中身份必須向緊接之前的身份授予 Service Account Token Creator IAM 角色,列表中第一個帳號向原始帳號授予此角色(模板化)。

    警告

    此選項需要 Apache Beam 2.39.0 或更新版本。

  • drain_pipeline (bool) – 可選,如果想在殺死任務例項期間透過耗盡管道而不是取消來停止流式作業,請設定為 True。參閱:https://cloud.google.com/dataflow/docs/guides/stopping-a-pipeline

  • cancel_timeout (int | None) – 在任務被殺死時,操作器應等待管道成功取消的時間(以秒為單位)。(可選)預設為 300 秒

  • wait_until_finished (bool | None) –

    (可選)如果為 True,則在退出前等待管道執行結束。如果為 False,則僅提交作業。如果為 None,則採用預設行為。

    預設行為取決於管道型別

    • 對於流式管道,等待作業啟動,

    • 對於批處理管道,等待作業完成。

    警告

    您不能在管道程式碼中呼叫 PipelineResult.wait_until_finish 方法以使操作器正常工作。即,您必須使用非同步執行。否則,您的管道將始終等待直到完成。有關更多資訊,請參閱:非同步執行

    在 Airflow 中啟動 Dataflow 作業的過程包括兩個步驟:* 執行子程序並讀取 stderr/stderr 日誌以獲取作業 ID。* 迴圈等待上一步中的作業 ID 結束,透過檢查其狀態。

    步驟二在步驟一完成後立即開始,因此如果您的管道程式碼中有 wait_until_finished,步驟二直到程序停止才會開始。當此程序停止時,步驟二將執行,但由於作業將處於終端狀態,它只會執行一次迭代。

    如果您在管道中不呼叫 wait_for_pipeline 方法但將 wait_until_finish=True 傳遞給操作器,則第二個迴圈將等待作業的終端狀態。

    如果您在管道中不呼叫 wait_for_pipeline 方法,並且將 wait_until_finish=False 傳遞給操作器,則第二個迴圈將檢查一次作業是否未處於終端狀態並退出迴圈。

  • multiple_jobs (bool | None) – 如果管道建立多個作業,則監控所有作業。僅由 BeamRunJavaPipelineOperator 支援。

  • check_if_running (CheckJobRunning) – 在執行作業之前,驗證之前的執行是否未在進行中。僅由以下操作器支援:BeamRunJavaPipelineOperator

  • service_account (str | None) – 以特定服務帳號執行作業,而不是預設的 GCE 機器人。

template_fields: collections.abc.Sequence[
job_name = None[原始碼]
append_job_name = True[原始碼]
project_id = None[原始碼]
location = 'us-central1'[原始碼]
gcp_conn_id = 'google_cloud_default'[原始碼]
poll_sleep = 10[原始碼]
impersonation_chain = None[原始碼]
drain_pipeline = False[原始碼]
cancel_timeout = 300[原始碼]
wait_until_finished = None[原始碼]
multiple_jobs = None[原始碼]
check_if_running[原始碼]
service_account = None[原始碼]
class airflow.providers.google.cloud.operators.dataflow.DataflowTemplatedJobStartOperator(*, template, project_id=PROVIDE_PROJECT_ID, job_name='{{task.task_id}}', options=None, dataflow_default_options=None, parameters=None, location=None, gcp_conn_id='google_cloud_default', poll_sleep=10, impersonation_chain=None, environment=None, cancel_timeout=10 * 60, wait_until_finished=None, append_job_name=True, deferrable=conf.getboolean('operators', 'default_deferrable', fallback=False), expected_terminal_state=None, **kwargs)[原始碼]

基類:airflow.providers.google.cloud.operators.cloud_base.GoogleCloudBaseOperator

使用經典模板啟動 Dataflow 作業;操作的引數將傳遞給作業。

參閱

有關如何使用此操作器的更多資訊,請參閱指南:模板化作業

引數
  • template (str) – Dataflow 模板的引用。

  • job_name (str) – 執行 Dataflow 模板時使用的“jobName”(模板化)。

  • options (dict[str, Any] | None) –

    作業執行時環境選項的對映。如果傳遞,它將更新環境引數。

    參閱

    有關可能配置的更多資訊,請參閱 API 文件 https://cloud.google.com/dataflow/pipelines/specifying-exec-params

  • dataflow_default_options (dict[str, Any] | None) – 預設作業環境選項的對映。

  • parameters (dict[str, str] | None) – 模板的作業特定引數對映。

  • project_id (str) – 可選,要在其中啟動作業的 Google Cloud 專案 ID。如果設定為 None 或缺失,則使用 Google Cloud 連線中的預設 project_id。

  • location (str | None) – 作業位置。

  • gcp_conn_id (str) – 連線到 Google Cloud 時使用的連線 ID。

  • poll_sleep (int) – 當作業處於 JOB_STATE_RUNNING 狀態時,輪詢 Google Cloud Platform 獲取 dataflow 作業狀態之間的睡眠時間(以秒為單位)。

  • impersonation_chain (str | collections.abc.Sequence[str] | None) – 可選的服務帳號,使用短期憑據進行模擬,或者獲取列表中最後一個帳號的 access_token 所需的鏈式帳號列表,該帳號將在請求中被模擬。如果設定為字串,則原始帳號必須向該帳號授予 Service Account Token Creator IAM 角色。如果設定為序列,則列表中身份必須向緊接之前的身份授予 Service Account Token Creator IAM 角色,列表中第一個帳號向原始帳號授予此角色(模板化)。

  • environment (dict | None) –

    可選,作業執行時環境選項的對映。

    參閱

    有關可能配置的更多資訊,請參閱 API 文件 https://cloud.google.com/dataflow/pipelines/specifying-exec-params

  • cancel_timeout (int | None) – 在任務被殺死時,操作器應等待管道成功取消的時間(以秒為單位)。

  • append_job_name (bool) – 如果需要在作業名稱後附加唯一字尾,則為 True。

  • wait_until_finished (bool | None) –

    (可選)如果為 True,則在退出前等待管道執行結束。如果為 False,則僅提交作業。如果為 None,則採用預設行為。

    預設行為取決於管道型別

    • 對於流式管道,等待作業啟動,

    • 對於批處理管道,等待作業完成。

    警告

    您不能在管道程式碼中呼叫 PipelineResult.wait_until_finish 方法以使操作器正常工作。即,您必須使用非同步執行。否則,您的管道將始終等待直到完成。有關更多資訊,請參閱:非同步執行

    在 Airflow 中啟動 Dataflow 作業的過程包括兩個步驟

    • 執行子程序並讀取 stderr/stderr 日誌以獲取作業 ID。

    • 迴圈等待上一步中的作業 ID 結束。此迴圈檢查作業的狀態。

    步驟二在步驟一完成後立即開始,因此如果您的管道程式碼中有 wait_until_finished,步驟二直到程序停止才會開始。當此程序停止時,步驟二將執行,但由於作業將處於終端狀態,它只會執行一次迭代。

    如果您在管道中不呼叫 wait_for_pipeline 方法但將 wait_until_finish=True 傳遞給操作器,則第二個迴圈將等待作業的終端狀態。

    如果您在管道中不呼叫 wait_for_pipeline 方法,並且將 wait_until_finish=False 傳遞給操作器,則第二個迴圈將檢查一次作業是否未處於終端狀態並退出迴圈。

  • expected_terminal_state (str | None) – 操作器預期的終端狀態,Airflow 任務在該狀態下成功。如果未指定,將由 hook 確定。

建議在 dag 的 default_args 中定義 dataflow_* 引數,如專案、區域和 staging 位置。

default_args = {
    "dataflow_default_options": {
        "zone": "europe-west1-d",
        "tempLocation": "gs://my-staging-bucket/staging/",
    }
}

您需要將 Dataflow 模板的路徑作為檔案引用透過 template 引數傳遞。使用 parameters 將引數傳遞給您的作業。使用 environment 將執行時環境變數傳遞給您的作業。

t1 = DataflowTemplatedJobStartOperator(
    task_id="dataflow_example",
    template="{{var.value.gcp_dataflow_base}}",
    parameters={
        "inputFile": "gs://bucket/input/my_input.txt",
        "outputFile": "gs://bucket/output/my_output.txt",
    },
    gcp_conn_id="airflow-conn-id",
    dag=my_dag,
)

templatedataflow_default_optionsparametersjob_name 都是模板化的,因此您可以在其中使用變數。

請注意,dataflow_default_options 旨在儲存適用於 DAG 中所有 dataflow 操作器的高階選項,例如專案資訊。

引數

deferrable (bool) – 在可推遲模式下執行操作器。

template_fields: collections.abc.Sequence[
ui_color = '#0273d4'[原始碼]
template[原始碼]
job_name = '{{task.task_id}}'[source]
options[source]
dataflow_default_options[source]
parameters[source]
project_id = None[source]
location = None[source]
gcp_conn_id = 'google_cloud_default'[source]
poll_sleep = 10[source]
impersonation_chain = None[source]
environment = None[source]
cancel_timeout = 600[source]
wait_until_finished = None[source]
append_job_name = True[source]
deferrable = True[source]
expected_terminal_state = None[source]
job: dict[str, str] | None = None[source]
property hook: airflow.providers.google.cloud.hooks.dataflow.DataflowHook[source]
execute(context)[source]

在建立 Operator 時派生。

Context 與渲染 Jinja 模板時使用的字典相同。

有關更多 Context,請參閱 get_template_context。

execute_complete(context, event)[source]

在觸發器完成其工作後執行。

on_kill()[source]

重寫此方法可在任務例項被殺死時清理子程序。

Operator 中對 threading、subprocess 或 multiprocessing 模組的任何使用都需要清理,否則會留下殭屍程序。

class airflow.providers.google.cloud.operators.dataflow.DataflowStartFlexTemplateOperator(body, location, project_id=PROVIDE_PROJECT_ID, gcp_conn_id='google_cloud_default', drain_pipeline=False, cancel_timeout=10 * 60, wait_until_finished=None, impersonation_chain=None, deferrable=conf.getboolean('operators', 'default_deferrable', fallback=False), append_job_name=True, expected_terminal_state=None, poll_sleep=10, *args, **kwargs)[source]

基類:airflow.providers.google.cloud.operators.cloud_base.GoogleCloudBaseOperator

使用 Flex 模板啟動 Dataflow 作業。

參閱

有關如何使用此 Operator 的更多資訊,請參閱指南: Templated jobs

引數
  • body (dict) – 請求正文。請參閱: https://cloud.google.com/dataflow/docs/reference/rest/v1b3/projects.locations.flexTemplates/launch#request-body

  • location (str) – Dataflow 作業的位置(例如 europe-west1)

  • project_id (str) – 擁有該作業的 GCP 專案 ID。

  • gcp_conn_id (str) – 用於連線到 Google Cloud Platform 的連線 ID。

  • drain_pipeline (bool) – 可選,如果想在殺死任務例項期間透過耗盡管道而不是取消來停止流式作業,請設定為 True。參閱:https://cloud.google.com/dataflow/docs/guides/stopping-a-pipeline

  • cancel_timeout (int | None) – 在任務被殺死時,操作器應等待管道成功取消的時間(以秒為單位)。

  • wait_until_finished (bool | None) –

    (可選)如果為 True,則在退出前等待管道執行結束。如果為 False,則僅提交作業。如果為 None,則採用預設行為。

    預設行為取決於管道型別

    • 對於流式管道,等待作業啟動,

    • 對於批處理管道,等待作業完成。

    警告

    您不能在管道程式碼中呼叫 PipelineResult.wait_until_finish 方法以使操作器正常工作。即,您必須使用非同步執行。否則,您的管道將始終等待直到完成。有關更多資訊,請參閱:非同步執行

    在 Airflow 中啟動 Dataflow 作業的過程包括兩個步驟

    • 執行子程序並讀取 stderr/stderr 日誌以獲取作業 ID。

    • 迴圈等待上一步中的作業 ID 結束。此迴圈檢查作業的狀態。

    步驟二在步驟一完成後立即開始,因此如果您的管道程式碼中有 wait_until_finished,步驟二直到程序停止才會開始。當此程序停止時,步驟二將執行,但由於作業將處於終端狀態,它只會執行一次迭代。

    如果您在流水線中沒有呼叫 wait_for_pipeline 方法,而是將 wait_until_finished=True 傳遞給 Operator,則第二個迴圈將等待作業的終端狀態。

    如果您在流水線中沒有呼叫 wait_for_pipeline 方法,而是將 wait_until_finished=False 傳遞給 Operator,則第二個迴圈將檢查作業是否不在終端狀態並退出迴圈。

  • impersonation_chain (str | collections.abc.Sequence[str] | None) – 可選的服務帳號,使用短期憑據進行模擬,或者獲取列表中最後一個帳號的 access_token 所需的鏈式帳號列表,該帳號將在請求中被模擬。如果設定為字串,則原始帳號必須向該帳號授予 Service Account Token Creator IAM 角色。如果設定為序列,則列表中身份必須向緊接之前的身份授予 Service Account Token Creator IAM 角色,列表中第一個帳號向原始帳號授予此角色(模板化)。

  • deferrable (bool) – 在可推遲模式下執行操作器。

  • expected_terminal_state (str | None) – Operator 的預期最終狀態,相應的 Airflow 任務在此狀態下視為成功。未指定時,將由 Hook 確定。

  • append_job_name (bool) – 如果需要在作業名稱後附加唯一字尾,則為 True。

  • poll_sleep (int) – 當作業處於 JOB_STATE_RUNNING 狀態時,輪詢 Google Cloud Platform 獲取 dataflow 作業狀態之間的睡眠時間(以秒為單位)。

template_fields: collections.abc.Sequence[str] = ('body', 'location', 'project_id', 'gcp_conn_id')
body[source]
location[source]
project_id = None[source]
gcp_conn_id = 'google_cloud_default'[source]
drain_pipeline = False[source]
cancel_timeout = 600[source]
wait_until_finished = None[source]
job: dict[str, str] | None = None[source]
impersonation_chain = None[source]
deferrable = True[source]
expected_terminal_state = None[source]
append_job_name = True[source]
poll_sleep = 10[source]
property hook: airflow.providers.google.cloud.hooks.dataflow.DataflowHook[source]
execute(context)[source]

在建立 Operator 時派生。

Context 與渲染 Jinja 模板時使用的字典相同。

有關更多 Context,請參閱 get_template_context。

execute_complete(context, event)[source]

在觸發器完成其工作後執行。

on_kill()[source]

重寫此方法可在任務例項被殺死時清理子程序。

Operator 中對 threading、subprocess 或 multiprocessing 模組的任何使用都需要清理,否則會留下殭屍程序。

class airflow.providers.google.cloud.operators.dataflow.DataflowStartYamlJobOperator(*, job_name, yaml_pipeline_file, region=DEFAULT_DATAFLOW_LOCATION, project_id=PROVIDE_PROJECT_ID, gcp_conn_id='google_cloud_default', append_job_name=True, drain_pipeline=False, deferrable=conf.getboolean('operators', 'default_deferrable', fallback=False), poll_sleep=10, cancel_timeout=5 * 60, expected_terminal_state=None, jinja_variables=None, options=None, impersonation_chain=None, **kwargs)[source]

基類:airflow.providers.google.cloud.operators.cloud_base.GoogleCloudBaseOperator

啟動 Dataflow YAML 作業並返回結果。

參閱

有關如何使用此 Operator 的更多資訊,請參閱指南: Dataflow YAML

警告

此 Operator 需要在 Airflow worker 上安裝 gcloud 命令(Google Cloud SDK)https://cloud.google.com/sdk/docs/install``

引數
  • job_name (str) – 必需。要分配給 Cloud Dataflow 作業的唯一名稱。

  • yaml_pipeline_file (str) – 必需。定義要執行的 YAML 流水線的檔案的路徑。必須是本地檔案或以“gs://”開頭的 URL。

  • region (str) – 可選。作業區域端點的區域 ID。預設為“us-central1”。

  • project_id (str) – 必需。擁有該作業的 GCP 專案 ID。如果設定為 None 或缺失,則使用 GCP 連線中的預設 project_id。

  • gcp_conn_id (str) – 可選。用於連線到 GCP 的連線 ID。

  • append_job_name (bool) – 可選。如果必須將唯一的字尾附加到 job_name,則設定為 True。預設為 True。

  • drain_pipeline (bool) – 可選。如果要透過排出(draining)而不是取消(canceling)來停止流式流水線作業(當殺死任務例項時),則設定為 True。請注意,這不適用於批處理流水線作業或 deferrable 模式。預設為 False。更多資訊請參見:https://cloud.google.com/dataflow/docs/guides/stopping-a-pipeline

  • deferrable (bool) – 可選。以 deferrable 模式執行 Operator。

  • expected_terminal_state (str | None) – 可選。Dataflow 作業的預期終端狀態,Operator 任務在此狀態下視為成功。對於批處理作業,預設為“JOB_STATE_DONE”,對於流式作業,預設為“JOB_STATE_RUNNING”。

  • poll_sleep (int) – 可選。輪詢 Google Cloud Platform 獲取 Dataflow 作業狀態之間睡眠的時間(秒)。同步和 deferrable 模式下均使用此值。

  • cancel_timeout (int | None) – 可選。當任務被殺死時,Operator 應等待流水線成功取消的時間(秒)。

  • jinja_variables (dict[str, str] | None) – 可選。用於具體化 yaml 流水線檔案的 Jinja2 變數字典。

  • options (dict[str, Any] | None) – 可選。額外的 gcloud 或 Beam 作業引數。它必須是一個字典,其鍵與 gcloud 中的可選標誌名稱匹配。支援的標誌列表可在以下位置找到:https://cloud.google.com/sdk/gcloud/reference/dataflow/yaml/run。請注意,如果標誌不需要值,則其字典值必須為 True 或 None。例如,–log-http 標誌可以作為 {‘log-http’: True} 傳遞。

  • impersonation_chain (str | collections.abc.Sequence[str] | None) – 可選的服務帳號,使用短期憑據進行模擬,或者獲取列表中最後一個帳號的 access_token 所需的鏈式帳號列表,該帳號將在請求中被模擬。如果設定為字串,則原始帳號必須向該帳號授予 Service Account Token Creator IAM 角色。如果設定為序列,則列表中身份必須向緊接之前的身份授予 Service Account Token Creator IAM 角色,列表中第一個帳號向原始帳號授予此角色(模板化)。

返回值:

包含作業資料的字典。

template_fields: collections.abc.Sequence[str] = ('job_name', 'yaml_pipeline_file', 'jinja_variables', 'options', 'region', 'project_id', 'gcp_conn_id')[source]
template_fields_renderers[source]
job_name[source]
yaml_pipeline_file[source]
region = 'us-central1'[source]
project_id = None[source]
gcp_conn_id = 'google_cloud_default'[source]
append_job_name = True[source]
drain_pipeline = False[source]
deferrable = True[source]
poll_sleep = 10[source]
cancel_timeout = 300[source]
expected_terminal_state = None[source]
options = None[source]
jinja_variables = None[source]
impersonation_chain = None[source]
job_id: str | None = None[source]
execute(context)[source]

在建立 Operator 時派生。

Context 與渲染 Jinja 模板時使用的字典相同。

有關更多 Context,請參閱 get_template_context。

execute_complete(context, event)[source]

在觸發器返回事件後執行。

on_kill()[source]

如果任務例項被終止,則取消 Dataflow 作業。

如果任務例項在延遲狀態下被終止,則不會呼叫此方法。

property hook: airflow.providers.google.cloud.hooks.dataflow.DataflowHook[source]
class airflow.providers.google.cloud.operators.dataflow.DataflowStopJobOperator(job_name_prefix=None, job_id=None, project_id=PROVIDE_PROJECT_ID, location=DEFAULT_DATAFLOW_LOCATION, gcp_conn_id='google_cloud_default', poll_sleep=10, impersonation_chain=None, stop_timeout=10 * 60, drain_pipeline=True, **kwargs)[source]

基類:airflow.providers.google.cloud.operators.cloud_base.GoogleCloudBaseOperator

停止具有指定名稱字首或作業 ID 的作業。

所有具有指定名稱字首的作業都將被停止。流式作業預設會耗盡。

引數 job_name_prefixjob_id 互斥。

參閱

有關停止流水線的更多詳細資訊,請參閱: https://cloud.google.com/dataflow/docs/guides/stopping-a-pipeline

參閱

有關如何使用此操作器的更多資訊,請參閱指南: 停止流水線

引數
  • job_name_prefix (str | None) – 指定要停止的作業名稱字首。

  • job_id (str | None) – 指定要停止的作業 ID。

  • project_id (str) – 可選,要在其中啟動作業的 Google Cloud 專案 ID。如果設定為 None 或缺失,則使用 Google Cloud 連線中的預設 project_id。

  • location (str) – 可選,作業位置。如果設定為 None 或缺失,則使用 "us-central1"。

  • gcp_conn_id (str) – 連線到 Google Cloud 時使用的連線 ID。

  • poll_sleep (int) – 在輪詢 Google Cloud Platform 以確認 Dataflow 作業狀態已停止之間等待的秒數。

  • impersonation_chain (str | collections.abc.Sequence[str] | None) – 可選的服務帳號,使用短期憑據進行模擬,或者獲取列表中最後一個帳號的 access_token 所需的鏈式帳號列表,該帳號將在請求中被模擬。如果設定為字串,則原始帳號必須向該帳號授予 Service Account Token Creator IAM 角色。如果設定為序列,則列表中身份必須向緊接之前的身份授予 Service Account Token Creator IAM 角色,列表中第一個帳號向原始帳號授予此角色(模板化)。

  • drain_pipeline (bool) – 可選,如果想透過取消而不是耗盡來停止流式作業,請設定為 False。請參閱: https://cloud.google.com/dataflow/docs/guides/stopping-a-pipeline

  • stop_timeout (int | None) – 成功取消/耗盡作業的等待時間(秒)

template_fields = ['job_id', 'project_id', 'impersonation_chain'][source]
poll_sleep = 10[source]
stop_timeout = 600[source]
job_name = None[source]
job_id = None[source]
project_id = None[source]
location = 'us-central1'[source]
gcp_conn_id = 'google_cloud_default'[source]
impersonation_chain = None[source]
hook: airflow.providers.google.cloud.hooks.dataflow.DataflowHook | None = None[source]
drain_pipeline = True[source]
execute(context)[source]

在建立 Operator 時派生。

Context 與渲染 Jinja 模板時使用的字典相同。

有關更多 Context,請參閱 get_template_context。

class airflow.providers.google.cloud.operators.dataflow.DataflowCreatePipelineOperator(*, body, project_id=PROVIDE_PROJECT_ID, location=DEFAULT_DATAFLOW_LOCATION, gcp_conn_id='google_cloud_default', impersonation_chain=None, **kwargs)[source]

基類:airflow.providers.google.cloud.operators.cloud_base.GoogleCloudBaseOperator

建立新的 Dataflow 資料管道例項。

參閱

有關如何使用此操作器的更多資訊,請參閱指南: JSON 格式的流水線

引數
  • body (dict) – 請求正文(包含 Pipeline 例項)。請參閱: https://cloud.google.com/dataflow/docs/reference/data-pipelines/rest/v1/projects.locations.pipelines/create#request-body

  • project_id (str) – 擁有該作業的 GCP 專案 ID。

  • location (str) – 指定 Data Pipelines 例項所在的位置(例如 us-central1)。

  • gcp_conn_id (str) – 用於連線 Google Cloud Platform 的連線 ID。

  • impersonation_chain (str | collections.abc.Sequence[str] | None) –

    可選的服務帳號,使用短期憑據進行模擬,或者獲取列表中最後一個帳號的 access_token 所需的鏈式帳號列表,該帳號將在請求中被模擬。如果設定為字串,則原始帳號必須向該帳號授予 Service Account Token Creator IAM 角色。如果設定為序列,則列表中身份必須向緊接之前的身份授予 Service Account Token Creator IAM 角色,列表中第一個帳號向原始帳號授予此角色(模板化)。

    警告

    此選項需要 Apache Beam 2.39.0 或更新版本。

以 JSON 格式返回建立的 Dataflow 資料流水線例項。

body[source]
project_id = None[source]
location = 'us-central1'[source]
gcp_conn_id = 'google_cloud_default'[source]
impersonation_chain = None[source]
dataflow_hook: airflow.providers.google.cloud.hooks.dataflow.DataflowHook | None = None[source]
pipeline_name[source]
execute(context)[source]

在建立 Operator 時派生。

Context 與渲染 Jinja 模板時使用的字典相同。

有關更多 Context,請參閱 get_template_context。

class airflow.providers.google.cloud.operators.dataflow.DataflowRunPipelineOperator(pipeline_name, project_id=PROVIDE_PROJECT_ID, location=DEFAULT_DATAFLOW_LOCATION, gcp_conn_id='google_cloud_default', impersonation_chain=None, **kwargs)[source]

基類:airflow.providers.google.cloud.operators.cloud_base.GoogleCloudBaseOperator

執行 Dataflow 資料管道。

參閱

有關如何使用此操作器的更多資訊,請參閱指南: JSON 格式的流水線

引數
  • pipeline_name (str) – 流水線的顯示名稱。例如 projects/PROJECT_ID/locations/LOCATION_ID/pipelines/PIPELINE_ID 中,它就是 PIPELINE_ID。

  • project_id (str) – 擁有該作業的 GCP 專案 ID。

  • location (str) – 指定 Data Pipelines 例項所在的位置(例如 us-central1)。

  • gcp_conn_id (str) – 用於連線 Google Cloud Platform 的連線 ID。

  • impersonation_chain (str | collections.abc.Sequence[str] | None) – 可選的服務帳號,使用短期憑據進行模擬,或者獲取列表中最後一個帳號的 access_token 所需的鏈式帳號列表,該帳號將在請求中被模擬。如果設定為字串,則原始帳號必須向該帳號授予 Service Account Token Creator IAM 角色。如果設定為序列,則列表中身份必須向緊接之前的身份授予 Service Account Token Creator IAM 角色,列表中第一個帳號向原始帳號授予此角色(模板化)。

以 JSON 格式返回建立的作業。

pipeline_name[source]
project_id = None[source]
location = 'us-central1'[source]
gcp_conn_id = 'google_cloud_default'[source]
impersonation_chain = None[source]
dataflow_hook: airflow.providers.google.cloud.hooks.dataflow.DataflowHook | None = None[source]
execute(context)[source]

在建立 Operator 時派生。

Context 與渲染 Jinja 模板時使用的字典相同。

有關更多 Context,請參閱 get_template_context。

class airflow.providers.google.cloud.operators.dataflow.DataflowDeletePipelineOperator(pipeline_name, project_id=PROVIDE_PROJECT_ID, location=DEFAULT_DATAFLOW_LOCATION, gcp_conn_id='google_cloud_default', impersonation_chain=None, **kwargs)[source]

基類:airflow.providers.google.cloud.operators.cloud_base.GoogleCloudBaseOperator

刪除 Dataflow 資料管道。

參閱

有關如何使用此操作器的更多資訊,請參閱指南: 刪除流水線

引數
  • pipeline_name (str) – 流水線的顯示名稱。例如 projects/PROJECT_ID/locations/LOCATION_ID/pipelines/PIPELINE_ID 中,它就是 PIPELINE_ID。

  • project_id (str) – 擁有該作業的 GCP 專案 ID。

  • location (str) – 指定 Data Pipelines 例項所在的位置(例如 us-central1)。

  • gcp_conn_id (str) – 用於連線 Google Cloud Platform 的連線 ID。

  • impersonation_chain (str | collections.abc.Sequence[str] | None) – 可選的服務帳號,使用短期憑據進行模擬,或者獲取列表中最後一個帳號的 access_token 所需的鏈式帳號列表,該帳號將在請求中被模擬。如果設定為字串,則原始帳號必須向該帳號授予 Service Account Token Creator IAM 角色。如果設定為序列,則列表中身份必須向緊接之前的身份授予 Service Account Token Creator IAM 角色,列表中第一個帳號向原始帳號授予此角色(模板化)。

pipeline_name[source]
project_id = None[source]
location = 'us-central1'[source]
gcp_conn_id = 'google_cloud_default'[source]
impersonation_chain = None[source]
dataflow_hook: airflow.providers.google.cloud.hooks.dataflow.DataflowHook | None = None[source]
response: dict | None = None[源]
execute(context)[源]

在建立 Operator 時派生。

Context 與渲染 Jinja 模板時使用的字典相同。

有關更多 Context,請參閱 get_template_context。

本條目有幫助嗎?