Google Cloud Dataflow 運算子

Dataflow 是一項託管服務,用於執行各種資料處理模式。這些流水線使用 Apache Beam 程式設計模型建立,該模型支援批處理和流處理。

先決條件任務

要使用這些運算子,您必須執行一些操作:

執行資料流水線的方式

根據您的環境和原始檔,有幾種執行 Dataflow 流水線的方式:

  • 非模板化流水線: 如果開發者有 Java 的 *.jar 檔案或 Python 的 *.py 檔案,他們可以在 Airflow Worker 上將其作為本地程序執行。這也意味著必須在 Worker 上安裝必要的系統依賴項。對於 Java,Worker 必須安裝 JRE 執行時。對於 Python,則必須安裝 Python 直譯器。執行時版本必須與流水線版本相容。這是啟動流水線最快的方式,但由於其系統依賴性問題頻繁,可能會導致問題。有關詳細資訊,請參閱:Java SDK 流水線, Python SDK 流水線。開發者還可以透過以 JSON 格式傳遞流水線結構來建立流水線,然後執行它來建立作業。有關詳細資訊,請參閱:JSON 格式的流水線JSON 格式的流水線

  • 模板化流水線: 程式設計師可以透過準備一個模板來使流水線獨立於環境,然後該模板將在 Google 管理的機器上執行。這樣,環境的變化就不會影響您的流水線。模板有兩種型別:

    • 經典模板。開發者執行流水線並建立一個模板。Apache Beam SDK 將檔案暫存在 Cloud Storage 中,建立一個模板檔案(類似於作業請求),並將模板檔案儲存在 Cloud Storage 中。請參閱:模板化作業

    • Flex 模板。開發者將流水線打包到 Docker 映象中,然後使用 gcloud 命令列工具構建 Flex 模板規範檔案並將其儲存在 Cloud Storage 中。請參閱:模板化作業

一個好的做法是使用非模板化流水線測試您的流水線,然後在生產環境中使用模板執行流水線。

有關流水線型別之間差異的詳細資訊,請參閱 Google Cloud 文件中的Dataflow 模板

啟動非模板化流水線

JSON 格式的流水線

可以透過以 JSON 格式傳遞流水線結構來建立新的流水線。請參閱 DataflowCreatePipelineOperator。這將建立一個新的流水線,該流水線將在 Dataflow Pipelines UI 上可見。

以下是執行 DataflowCreatePipelineOperator 建立 Dataflow 流水線的示例:

tests/system/google/cloud/dataflow/example_dataflow_pipeline.py

create_pipeline = DataflowCreatePipelineOperator(
    task_id="create_pipeline",
    project_id=GCP_PROJECT_ID,
    location=GCP_LOCATION,
    body={
        "name": f"projects/{GCP_PROJECT_ID}/locations/{GCP_LOCATION}/pipelines/{PIPELINE_NAME}",
        "type": PIPELINE_TYPE,
        "workload": {
            "dataflowFlexTemplateRequest": {
                "launchParameter": {
                    "containerSpecGcsPath": GCS_PATH,
                    "jobName": PIPELINE_JOB_NAME,
                    "environment": {"tempLocation": TEMP_LOCATION},
                    "parameters": {
                        "inputFile": INPUT_FILE,
                        "output": OUTPUT,
                    },
                },
                "projectId": GCP_PROJECT_ID,
                "location": GCP_LOCATION,
            }
        },
    },
)

要執行新建立的流水線,您可以使用 DataflowRunPipelineOperator

tests/system/google/cloud/dataflow/example_dataflow_pipeline.py

run_pipeline = DataflowRunPipelineOperator(
    task_id="run_pipeline",
    pipeline_name=PIPELINE_NAME,
    project_id=GCP_PROJECT_ID,
)

呼叫後,DataflowRunPipelineOperator 將返回透過執行給定流水線建立的 Google Cloud Dataflow Job

有關 API 使用的更多資訊,請參閱 Google Cloud 文件中的資料流水線 API REST 資源

要使用原始檔(Java 中的 JAR 或 Python 檔案)建立新的流水線,請使用 create job 運算子。原始檔可以位於 GCS 或本地檔案系統上。BeamRunJavaPipelineOperatorBeamRunPythonPipelineOperator

Java SDK 流水線

對於 Java 流水線,必須為 BeamRunJavaPipelineOperator 指定 jar 引數,因為它包含要在 Dataflow 上執行的流水線。該 JAR 檔案可以位於 Airflow 可以下載的 GCS 上,也可以位於本地檔案系統上(提供其絕對路徑)。

以下是使用儲存在 GCS 上的 jar 建立和執行 Java 流水線的示例

tests/system/google/cloud/dataflow/example_dataflow_native_java.py

start_java_job_dataflow = BeamRunJavaPipelineOperator(
    runner=BeamRunnerType.DataflowRunner,
    task_id="start_java_job_dataflow",
    jar=GCS_JAR,
    pipeline_options={
        "output": GCS_OUTPUT,
    },
    job_class="org.apache.beam.examples.WordCount",
    dataflow_config={
        "job_name": f"java-pipeline-job-{ENV_ID}",
        "check_if_running": CheckJobRunning.IgnoreJob,
        "location": LOCATION,
        "poll_sleep": 10,
        "append_job_name": False,
    },
)

以下是使用儲存在 GCS 上的 jar 在可延遲模式下建立和執行 Java 流水線的示例

tests/system/google/cloud/dataflow/example_dataflow_native_java.py

start_java_job_dataflow_deferrable = BeamRunJavaPipelineOperator(
    runner=BeamRunnerType.DataflowRunner,
    task_id="start_java_job_dataflow_deferrable",
    jar=GCS_JAR,
    pipeline_options={
        "output": GCS_OUTPUT,
    },
    job_class="org.apache.beam.examples.WordCount",
    dataflow_config={
        "job_name": f"deferrable-java-pipeline-job-{ENV_ID}",
        "check_if_running": CheckJobRunning.WaitForRun,
        "location": LOCATION,
        "poll_sleep": 10,
        "append_job_name": False,
    },
    deferrable=True,
)

以下是使用儲存在本地檔案系統上的 jar 建立和執行 Java 流水線的示例

tests/system/google/cloud/dataflow/example_dataflow_native_java.py

start_java_job_direct = BeamRunJavaPipelineOperator(
    task_id="start_java_job_direct",
    jar=LOCAL_JAR,
    pipeline_options={
        "output": GCS_OUTPUT,
    },
    job_class="org.apache.beam.examples.WordCount",
    dataflow_config={
        "check_if_running": CheckJobRunning.WaitForRun,
        "location": LOCATION,
        "poll_sleep": 10,
    },
)

以下是使用儲存在 GCS 上的 jar 建立和執行 Java 流式處理流水線的示例

tests/system/google/cloud/dataflow/example_dataflow_java_streaming.py


start_java_streaming_job_dataflow = BeamRunJavaPipelineOperator(
    runner=BeamRunnerType.DataflowRunner,
    task_id="start_java_streaming_dataflow_job",
    jar=LOCAL_JAR,
    pipeline_options={
        "tempLocation": GCS_TMP,
        "input_topic": INPUT_TOPIC,
        "output_topic": OUTPUT_TOPIC,
        "streaming": True,
    },
    dataflow_config={
        "job_name": f"java-streaming-job-{ENV_ID}",
        "location": LOCATION,
    },
)

Python SDK 流水線

必須為 BeamRunPythonPipelineOperator 指定 py_file 引數,因為它包含要在 Dataflow 上執行的流水線。Python 檔案可以位於 Airflow 可以下載的 GCS 上,也可以位於本地檔案系統上(提供其絕對路徑)。

py_interpreter 引數指定執行流水線時使用的 Python 版本,預設值為 python3。如果您的 Airflow 例項執行在 Python 2 上 - 指定 python2 並確保您的 py_file 是 Python 2。為了獲得最佳結果,請使用 Python 3。

如果指定了 py_requirements 引數,將建立一個帶有指定需求的臨時 Python 虛擬環境,並在其中執行流水線。

py_system_site_packages 引數指定是否所有來自 Airflow 例項的 Python 包都可以在虛擬環境中訪問(如果指定了 py_requirements 引數),建議避免使用,除非 Dataflow 作業需要。

tests/system/google/cloud/dataflow/example_dataflow_native_python.py

start_python_job_dataflow = BeamRunPythonPipelineOperator(
    runner=BeamRunnerType.DataflowRunner,
    task_id="start_python_job_dataflow",
    py_file=GCS_PYTHON_SCRIPT,
    py_options=[],
    pipeline_options={
        "output": GCS_OUTPUT,
    },
    py_requirements=["apache-beam[gcp]==2.59.0"],
    py_interpreter="python3",
    py_system_site_packages=False,
    dataflow_config={"location": LOCATION, "job_name": "start_python_job"},
)

執行模型

Dataflow 有多種執行流水線的方式。可以透過以下模式進行:非同步批處理(觸發即忘)、阻塞批處理(等待完成)或流式處理(無限期執行)。在 Airflow 中,最佳實踐是使用非同步批處理流水線或流式處理,並使用感測器來監聽預期的作業狀態。

預設情況下,BeamRunJavaPipelineOperator, BeamRunPythonPipelineOperator, DataflowTemplatedJobStartOperatorDataflowStartFlexTemplateOperator 的引數 wait_until_finished 被設定為 None,這會導致取決於流水線型別的不同行為:

  • 對於流式處理流水線,等待作業啟動,

  • 對於批處理流水線,等待作業完成。

如果 wait_until_finished 設定為 True,運算子將始終等待流水線執行結束。如果設定為 False,則僅提交作業。

請參閱:為在 Cloud Dataflow 服務上執行配置 PipelineOptions

非同步執行

Dataflow 批處理作業預設是非同步的;但是,這取決於應用程式程式碼(包含在 JAR 或 Python 檔案中)及其編寫方式。為了使 Dataflow 作業非同步執行,請確保不等待流水線物件(即在應用程式程式碼中不對 PipelineResult 呼叫 waitUntilFinishwait_until_finish)。

tests/system/google/cloud/dataflow/example_dataflow_native_python_async.py

start_python_job_async = BeamRunPythonPipelineOperator(
    task_id="start_python_job_async",
    runner=BeamRunnerType.DataflowRunner,
    py_file=GCS_PYTHON_SCRIPT,
    py_options=[],
    pipeline_options={
        "output": GCS_OUTPUT,
    },
    py_requirements=["apache-beam[gcp]==2.59.0"],
    py_interpreter="python3",
    py_system_site_packages=False,
    dataflow_config={
        "job_name": "start_python_job_async",
        "location": LOCATION,
        "wait_until_finished": False,
    },
)

阻塞執行

為了使 Dataflow 作業執行並等待完成,請確保在應用程式程式碼中等待流水線物件。這可以透過對 Java SDK 呼叫 pipeline.run() 返回的 PipelineResult 上的 waitUntilFinish 來實現,或者對 Python SDK 呼叫 pipeline.run() 返回的 PipelineResult 上的 wait_until_finish 來實現。

應避免阻塞作業,因為在 Airflow 上執行時會有一個後臺程序。此程序會持續執行以等待 Dataflow 作業完成,從而增加 Airflow 的資源消耗。

流式執行

要執行流式 Dataflow 作業,請確保設定了流式選項(對於 Python),或者在流水線中從一個無邊界資料來源(如 Pub/Sub)讀取資料(對於 Java)。

tests/system/google/cloud/dataflow/example_dataflow_streaming_python.py

start_streaming_python_job = BeamRunPythonPipelineOperator(
    runner=BeamRunnerType.DataflowRunner,
    task_id="start_streaming_python_job",
    py_file=GCS_PYTHON_SCRIPT,
    py_options=[],
    pipeline_options={
        "temp_location": GCS_TMP,
        "input_topic": "projects/pubsub-public-data/topics/taxirides-realtime",
        "output_topic": f"projects/{PROJECT_ID}/topics/{TOPIC_ID}",
        "streaming": True,
    },
    py_requirements=["apache-beam[gcp]==2.59.0"],
    py_interpreter="python3",
    py_system_site_packages=False,
    dataflow_config={"location": LOCATION, "job_name": "start_python_job_streaming"},
)

將引數 drain_pipeline 設定為 True 允許在終止任務例項期間透過排空(draining)而不是取消(canceling)來停止流式作業。

請參閱停止正在執行的流水線

模板化作業

模板提供了將流水線暫存在 Cloud Storage 並從那裡執行的能力。這在開發工作流程中提供了靈活性,因為它將流水線的開發與暫存和執行步驟分離開來。Dataflow 有兩種型別的模板:經典模板和 Flex 模板。有關更多資訊,請參閱Dataflow 模板的官方文件

以下是使用 DataflowTemplatedJobStartOperator 執行經典模板的 Dataflow 作業示例

tests/system/google/cloud/dataflow/example_dataflow_template.py

start_template_job = DataflowTemplatedJobStartOperator(
    task_id="start_template_job",
    project_id=PROJECT_ID,
    template="gs://dataflow-templates/latest/Word_Count",
    parameters={"inputFile": f"gs://{BUCKET_NAME}/{CSV_FILE_NAME}", "output": GCS_OUTPUT},
    location=LOCATION,
    wait_until_finished=True,
)

同樣,此操作也可以在可延遲模式下使用運算子。

tests/system/google/cloud/dataflow/example_dataflow_template.py

start_template_job_deferrable = DataflowTemplatedJobStartOperator(
    task_id="start_template_job_deferrable",
    project_id=PROJECT_ID,
    template="gs://dataflow-templates/latest/Word_Count",
    parameters={"inputFile": f"gs://{BUCKET_NAME}/{CSV_FILE_NAME}", "output": GCS_OUTPUT},
    location=LOCATION,
    deferrable=True,
)

請參閱可與此運算子一起使用的 Google 提供的模板列表

以下是使用 DataflowStartFlexTemplateOperator 執行 Flex 模板的 Dataflow 作業示例

tests/system/google/cloud/dataflow/example_dataflow_template.py

start_flex_template_job = DataflowStartFlexTemplateOperator(
    task_id="start_flex_template_job",
    project_id=PROJECT_ID,
    body=BODY,
    location=LOCATION,
    append_job_name=False,
    wait_until_finished=True,
)

同樣,此操作也可以在可延遲模式下使用運算子。

tests/system/google/cloud/dataflow/example_dataflow_template.py

start_flex_template_job_deferrable = DataflowStartFlexTemplateOperator(
    task_id="start_flex_template_job_deferrable",
    project_id=PROJECT_ID,
    body=BODY,
    location=LOCATION,
    append_job_name=False,
    deferrable=True,
)

Dataflow YAML

Beam YAML 是一種無程式碼 SDK,用於使用 YAML 檔案配置 Apache Beam 流水線。您可以使用 Beam YAML 編寫和執行 Beam 流水線,而無需編寫任何程式碼。此 API 可用於定義流式和批處理流水線。

以下是使用 DataflowStartYamlJobOperator 執行 Dataflow YAML 作業的示例

tests/system/google/cloud/dataflow/example_dataflow_yaml.py

start_dataflow_yaml_job = DataflowStartYamlJobOperator(
    task_id="start_dataflow_yaml_job",
    job_name=DATAFLOW_YAML_JOB_NAME,
    yaml_pipeline_file=DATAFLOW_YAML_PIPELINE_FILE_URL,
    append_job_name=True,
    deferrable=False,
    region=REGION,
    project_id=PROJECT_ID,
    jinja_variables=BQ_VARIABLES,
)

可以透過傳遞引數 deferrable=True 在可延遲模式下執行此運算子。

tests/system/google/cloud/dataflow/example_dataflow_yaml.py

start_dataflow_yaml_job_def = DataflowStartYamlJobOperator(
    task_id="start_dataflow_yaml_job_def",
    job_name=DATAFLOW_YAML_JOB_NAME,
    yaml_pipeline_file=DATAFLOW_YAML_PIPELINE_FILE_URL,
    append_job_name=True,
    deferrable=True,
    region=REGION,
    project_id=PROJECT_ID,
    jinja_variables=BQ_VARIABLES_DEF,
    expected_terminal_state=DataflowJobStatus.JOB_STATE_DONE,
)

警告

此運算子要求在 Airflow Worker 上安裝 gcloud 命令 (Google Cloud SDK) <https://cloud.google.com/sdk/docs/install>__

請參閱Dataflow YAML 參考

停止流水線

要停止一個或多個 Dataflow 流水線,您可以使用 DataflowStopJobOperator。流式處理流水線預設被排空(drained),將 drain_pipeline 設定為 False 將改為取消它們。提供 job_id 以停止特定作業,或提供 job_name_prefix 以停止所有具有給定名稱字首的作業。

tests/system/google/cloud/dataflow/example_dataflow_native_python.py

stop_dataflow_job = DataflowStopJobOperator(
    task_id="stop_dataflow_job",
    location=LOCATION,
    job_name_prefix="start-python-pipeline",
)

請參閱:停止正在執行的流水線

刪除流水線

要刪除 Dataflow 流水線,您可以使用 DataflowDeletePipelineOperator。以下是使用此運算子的示例:

tests/system/google/cloud/dataflow/example_dataflow_pipeline.py

delete_pipeline = DataflowDeletePipelineOperator(
    task_id="delete_pipeline",
    pipeline_name=PIPELINE_NAME,
    project_id=GCP_PROJECT_ID,
    trigger_rule=TriggerRule.ALL_DONE,
)

更新流水線

流式處理流水線一旦建立並執行,其配置就不可更改,因為它是不可變的。要進行任何修改,您需要更新流水線的定義(例如,更新您的程式碼或模板),然後提交一個新的作業。實質上,您將建立一個帶有所需更新的新流水線例項。

對於批處理流水線,如果一個作業當前正在執行並且您想更新其配置,您必須取消該作業。這是因為 Dataflow 作業一旦開始,就變得不可變。儘管批處理流水線旨在處理有限量的資料並最終自行完成,但您無法更新正在進行的作業。如果在作業執行時需要更改任何引數或流水線邏輯,您將不得不取消當前的執行,然後啟動一個帶有更新配置的新作業。

如果批處理流水線已經成功完成,則沒有正在執行的作業可供更新;新配置僅適用於下一次作業提交。

感測器

當作業以非同步方式觸發時,可以使用感測器來檢查特定的作業屬性。

DataflowJobStatusSensor (Dataflow 作業狀態感測器).

tests/system/google/cloud/dataflow/example_dataflow_native_python_async.py

wait_for_python_job_async_done = DataflowJobStatusSensor(
    task_id="wait_for_python_job_async_done",
    job_id="{{task_instance.xcom_pull('start_python_job_async')['dataflow_job_id']}}",
    expected_statuses={DataflowJobStatus.JOB_STATE_DONE},
    location=LOCATION,
)

可以透過傳遞引數 deferrable=True 在可延遲模式下執行此運算子。

tests/system/google/cloud/dataflow/example_dataflow_sensors_deferrable.py

wait_for_beam_python_pipeline_job_status_def = DataflowJobStatusSensor(
    task_id="wait_for_beam_python_pipeline_job_status_def",
    job_id="{{task_instance.xcom_pull('start_beam_python_pipeline')['dataflow_job_id']}}",
    expected_statuses=DataflowJobStatus.JOB_STATE_DONE,
    location=LOCATION,
    deferrable=True,
)

DataflowJobMetricsSensor (Dataflow 作業指標感測器).

tests/system/google/cloud/dataflow/example_dataflow_native_python_async.py

def check_metric_scalar_gte(metric_name: str, value: int) -> Callable:
    """Check is metric greater than equals to given value."""

    def callback(metrics: list[dict]) -> bool:
        dag.log.info("Looking for '%s' >= %d", metric_name, value)
        for metric in metrics:
            context = metric.get("name", {}).get("context", {})
            original_name = context.get("original_name", "")
            tentative = context.get("tentative", "")
            if original_name == "Service-cpu_num_seconds" and not tentative:
                return metric["scalar"] >= value
        raise AirflowException(f"Metric '{metric_name}' not found in metrics")

    return callback

wait_for_python_job_async_metric = DataflowJobMetricsSensor(
    task_id="wait_for_python_job_async_metric",
    job_id="{{task_instance.xcom_pull('start_python_job_async')['dataflow_job_id']}}",
    location=LOCATION,
    callback=check_metric_scalar_gte(metric_name="Service-cpu_num_seconds", value=100),
    fail_on_terminal_state=False,
)

可以透過傳遞引數 deferrable=True 在可延遲模式下執行此運算子。

tests/system/google/cloud/dataflow/example_dataflow_sensors_deferrable.py

def check_metric_scalar_gte(metric_name: str, value: int) -> Callable:
    """Check is metric greater than equals to given value."""

    def callback(metrics: list[dict]) -> bool:
        dag.log.info("Looking for '%s' >= %d", metric_name, value)
        for metric in metrics:
            context = metric.get("name", {}).get("context", {})
            original_name = context.get("original_name", "")
            tentative = context.get("tentative", "")
            if original_name == "Service-cpu_num_seconds" and not tentative:
                return metric["scalar"] >= value
        raise AirflowException(f"Metric '{metric_name}' not found in metrics")

    return callback

wait_for_beam_python_pipeline_job_metric_def = DataflowJobMetricsSensor(
    task_id="wait_for_beam_python_pipeline_job_metric_def",
    job_id="{{task_instance.xcom_pull('start_beam_python_pipeline')['dataflow_job_id']}}",
    location=LOCATION,
    callback=check_metric_scalar_gte(metric_name="Service-cpu_num_seconds", value=100),
    fail_on_terminal_state=False,
    deferrable=True,
)

DataflowJobMessagesSensor (Dataflow 作業訊息感測器).

tests/system/google/cloud/dataflow/example_dataflow_native_python_async.py

def check_message(messages: list[dict]) -> bool:
    """Check message"""
    for message in messages:
        if "Adding workflow start and stop steps." in message.get("messageText", ""):
            return True
    return False

wait_for_python_job_async_message = DataflowJobMessagesSensor(
    task_id="wait_for_python_job_async_message",
    job_id="{{task_instance.xcom_pull('start_python_job_async')['dataflow_job_id']}}",
    location=LOCATION,
    callback=check_message,
    fail_on_terminal_state=False,
)

可以透過傳遞引數 deferrable=True 在可延遲模式下執行此運算子。

tests/system/google/cloud/dataflow/example_dataflow_sensors_deferrable.py

def check_job_message(messages: list[dict]) -> bool:
    """Check job message."""
    for message in messages:
        if "Adding workflow start and stop steps." in message.get("messageText", ""):
            return True
    return False

wait_for_beam_python_pipeline_job_message_def = DataflowJobMessagesSensor(
    task_id="wait_for_beam_python_pipeline_job_message_def",
    job_id="{{task_instance.xcom_pull('start_beam_python_pipeline')['dataflow_job_id']}}",
    location=LOCATION,
    callback=check_job_message,
    fail_on_terminal_state=False,
    deferrable=True,
)

DataflowJobAutoScalingEventsSensor (Dataflow 作業自動擴縮事件感測器).

tests/system/google/cloud/dataflow/example_dataflow_native_python_async.py

def check_autoscaling_event(autoscaling_events: list[dict]) -> bool:
    """Check autoscaling event"""
    for autoscaling_event in autoscaling_events:
        if "Worker pool started." in autoscaling_event.get("description", {}).get("messageText", ""):
            return True
    return False

wait_for_python_job_async_autoscaling_event = DataflowJobAutoScalingEventsSensor(
    task_id="wait_for_python_job_async_autoscaling_event",
    job_id="{{task_instance.xcom_pull('start_python_job_async')['dataflow_job_id']}}",
    location=LOCATION,
    callback=check_autoscaling_event,
    fail_on_terminal_state=False,
)

可以透過傳遞引數 deferrable=True 在可延遲模式下執行此運算子。

tests/system/google/cloud/dataflow/example_dataflow_sensors_deferrable.py

def check_autoscaling_event(autoscaling_events: list[dict]) -> bool:
    """Check autoscaling event."""
    for autoscaling_event in autoscaling_events:
        if "Worker pool started." in autoscaling_event.get("description", {}).get("messageText", ""):
            return True
    return False

wait_for_beam_python_pipeline_job_autoscaling_event_def = DataflowJobAutoScalingEventsSensor(
    task_id="wait_for_beam_python_pipeline_job_autoscaling_event_def",
    job_id="{{task_instance.xcom_pull('start_beam_python_pipeline')['dataflow_job_id']}}",
    location=LOCATION,
    callback=check_autoscaling_event,
    fail_on_terminal_state=False,
    deferrable=True,
)

參考

有關更多資訊,請檢視:

此條目是否有幫助?