Apache Beam Operator¶
Apache Beam 是一個開源的統一模型,用於定義批處理和流式資料並行處理 Pipeline。使用其中一個開源 Beam SDK,您可以構建一個程式來定義 Pipeline。然後,Pipeline 由 Beam 支援的分散式處理後端之一執行,這些後端包括 Apache Flink、Apache Spark 和 Google Cloud Dataflow。
注意
當 Apache Beam pipeline 在 Dataflow 服務上執行時,此 Operator 要求在 Airflow Worker 上安裝 gcloud 命令 (Google Cloud SDK) <https://cloud.google.com/sdk/docs/install>。
在 Apache Beam 中執行 Python Pipeline¶
必須為 BeamRunPythonPipelineOperator 指定 py_file 引數,因為它包含要由 Beam 執行的 pipeline。該 Python 檔案可以位於 Airflow 能夠下載的 GCS 上,或者位於本地檔案系統上(提供其絕對路徑)。
py_interpreter 引數指定執行 pipeline 時使用的 Python 版本,預設值為 python3。如果您的 Airflow 例項執行在 Python 2 上,請指定 python2 並確保您的 py_file 是 Python 2 檔案。為了獲得最佳結果,請使用 Python 3。
如果指定了 py_requirements 引數,將建立一個帶有指定依賴項的臨時 Python 虛擬環境,pipeline 將在該環境中執行。
py_system_site_packages 引數指定是否可在虛擬環境內訪問 Airflow 例項中的所有 Python 包(如果指定了 py_requirements 引數),建議避免使用此引數,除非 Dataflow 作業要求這樣做。
使用 DirectRunner 執行 Python Pipeline¶
tests/system/apache/beam/example_python.py
start_python_pipeline_local_direct_runner = BeamRunPythonPipelineOperator(
task_id="start_python_pipeline_local_direct_runner",
py_file="apache_beam.examples.wordcount",
py_options=["-m"],
py_requirements=["apache-beam[gcp]==2.59.0"],
py_interpreter="python3",
py_system_site_packages=False,
)
tests/system/apache/beam/example_python.py
start_python_pipeline_direct_runner = BeamRunPythonPipelineOperator(
task_id="start_python_pipeline_direct_runner",
py_file=GCS_PYTHON,
py_options=[],
pipeline_options={"output": GCS_OUTPUT},
py_requirements=["apache-beam[gcp]==2.59.0"],
py_interpreter="python3",
py_system_site_packages=False,
)
您可以使用可推遲模式 (deferrable mode) 來非同步執行此 Operator。這可以讓 Worker 在知道需要等待時釋放資源,並將恢復 Operator 的工作交給 Trigger。因此,當它掛起(推遲)時,它不會佔用 Worker 槽位,並且您的叢集不會在空閒的 Operator 或 Sensor 上浪費太多資源。
tests/system/apache/beam/example_python_async.py
start_python_pipeline_local_direct_runner = BeamRunPythonPipelineOperator(
task_id="start_python_pipeline_local_direct_runner",
py_file="apache_beam.examples.wordcount",
py_options=["-m"],
py_requirements=["apache-beam[gcp]==2.59.0"],
py_interpreter="python3",
py_system_site_packages=False,
deferrable=True,
)
tests/system/apache/beam/example_python_async.py
start_python_pipeline_direct_runner = BeamRunPythonPipelineOperator(
task_id="start_python_pipeline_direct_runner",
py_file=GCS_PYTHON,
py_options=[],
pipeline_options={"output": GCS_OUTPUT},
py_requirements=["apache-beam[gcp]==2.59.0"],
py_interpreter="python3",
py_system_site_packages=False,
deferrable=True,
)
使用 DataflowRunner 執行 Python Pipeline¶
tests/system/apache/beam/example_python.py
start_python_pipeline_dataflow_runner = BeamRunPythonPipelineOperator(
task_id="start_python_pipeline_dataflow_runner",
runner="DataflowRunner",
py_file=GCS_PYTHON,
pipeline_options={
"tempLocation": GCS_TMP,
"stagingLocation": GCS_STAGING,
"output": GCS_OUTPUT,
},
py_options=[],
py_requirements=["apache-beam[gcp]==2.59.0"],
py_interpreter="python3",
py_system_site_packages=False,
dataflow_config=DataflowConfiguration(
job_name="{{task.task_id}}", project_id=GCP_PROJECT_ID, location="us-central1"
),
)
tests/system/apache/beam/example_python_dataflow.py
start_python_job_dataflow_runner_async = BeamRunPythonPipelineOperator(
task_id="start_python_job_dataflow_runner_async",
runner="DataflowRunner",
py_file=GCS_PYTHON_DATAFLOW_ASYNC,
pipeline_options={
"tempLocation": GCS_TMP,
"stagingLocation": GCS_STAGING,
"output": GCS_OUTPUT,
},
py_options=[],
py_requirements=["apache-beam[gcp]==2.59.0"],
py_interpreter="python3",
py_system_site_packages=False,
dataflow_config=DataflowConfiguration(
job_name="{{task.task_id}}",
project_id=GCP_PROJECT_ID,
location="us-central1",
wait_until_finished=False,
),
)
wait_for_python_job_dataflow_runner_async_done = DataflowJobStatusSensor(
task_id="wait-for-python-job-async-done",
job_id="{{task_instance.xcom_pull('start_python_job_dataflow_runner_async')['dataflow_job_id']}}",
expected_statuses={DataflowJobStatus.JOB_STATE_DONE},
project_id=GCP_PROJECT_ID,
location="us-central1",
)
start_python_job_dataflow_runner_async >> wait_for_python_job_dataflow_runner_async_done
您可以使用可推遲模式 (deferrable mode) 來非同步執行此 Operator。這可以讓 Worker 在知道需要等待時釋放資源,並將恢復 Operator 的工作交給 Trigger。因此,當它掛起(推遲)時,它不會佔用 Worker 槽位,並且您的叢集不會在空閒的 Operator 或 Sensor 上浪費太多資源。
tests/system/apache/beam/example_python_async.py
start_python_pipeline_dataflow_runner = BeamRunPythonPipelineOperator(
task_id="start_python_pipeline_dataflow_runner",
runner="DataflowRunner",
py_file=GCS_PYTHON,
pipeline_options={
"tempLocation": GCS_TMP,
"stagingLocation": GCS_STAGING,
"output": GCS_OUTPUT,
},
py_options=[],
py_requirements=["apache-beam[gcp]==2.59.0"],
py_interpreter="python3",
py_system_site_packages=False,
dataflow_config=DataflowConfiguration(
job_name="{{task.task_id}}", project_id=GCP_PROJECT_ID, location="us-central1"
),
deferrable=True,
)
在 Apache Beam 中執行 Java Pipeline¶
對於 Java pipeline,必須為 BeamRunJavaPipelineOperator 指定 jar 引數,因為它包含要由 Apache Beam 執行的 pipeline。該 JAR 檔案可以位於 Airflow 能夠下載的 GCS 上,或者位於本地檔案系統上(提供其絕對路徑)。
使用 DirectRunner 執行 Java Pipeline¶
tests/system/apache/beam/example_beam.py
jar_to_local_direct_runner = GCSToLocalFilesystemOperator(
task_id="jar_to_local_direct_runner",
bucket=GCS_JAR_DIRECT_RUNNER_BUCKET_NAME,
object_name=GCS_JAR_DIRECT_RUNNER_OBJECT_NAME,
filename="/tmp/beam_wordcount_direct_runner_{{ ds_nodash }}.jar",
)
start_java_pipeline_direct_runner = BeamRunJavaPipelineOperator(
task_id="start_java_pipeline_direct_runner",
jar="/tmp/beam_wordcount_direct_runner_{{ ds_nodash }}.jar",
pipeline_options={
"output": "/tmp/start_java_pipeline_direct_runner",
"inputFile": GCS_INPUT,
},
job_class="org.apache.beam.examples.WordCount",
)
jar_to_local_direct_runner >> start_java_pipeline_direct_runner
使用 DataflowRunner 執行 Java Pipeline¶
tests/system/apache/beam/example_java_dataflow.py
jar_to_local_dataflow_runner = GCSToLocalFilesystemOperator(
task_id="jar_to_local_dataflow_runner",
bucket=GCS_JAR_DATAFLOW_RUNNER_BUCKET_NAME,
object_name=GCS_JAR_DATAFLOW_RUNNER_OBJECT_NAME,
filename="/tmp/beam_wordcount_dataflow_runner_{{ ds_nodash }}.jar",
)
start_java_pipeline_dataflow = BeamRunJavaPipelineOperator(
task_id="start_java_pipeline_dataflow",
runner="DataflowRunner",
jar="/tmp/beam_wordcount_dataflow_runner_{{ ds_nodash }}.jar",
pipeline_options={
"tempLocation": GCS_TMP,
"stagingLocation": GCS_STAGING,
"output": GCS_OUTPUT,
},
job_class="org.apache.beam.examples.WordCount",
dataflow_config={"job_name": "{{task.task_id}}", "location": "us-central1"},
)
jar_to_local_dataflow_runner >> start_java_pipeline_dataflow
在 Apache Beam 中執行 Go Pipeline¶
必須為 BeamRunGoPipelineOperator 指定 go_file 引數,因為它包含要由 Beam 執行的 pipeline。該 Go 檔案可以位於 Airflow 能夠下載的 GCS 上,或者位於本地檔案系統上(提供其絕對路徑)。當從本地檔案系統執行時,等效命令為 go run <go_file>。如果從 GCS 儲存桶拉取,它將事先使用 go run init example.com/main 和 go mod tidy 初始化模組並安裝依賴項。
使用 DirectRunner 執行 Go Pipeline¶
tests/system/apache/beam/example_go.py
start_go_pipeline_local_direct_runner = BeamRunGoPipelineOperator(
task_id="start_go_pipeline_local_direct_runner",
go_file="files/apache_beam/examples/wordcount.go",
)
tests/system/apache/beam/example_go.py
start_go_pipeline_direct_runner = BeamRunGoPipelineOperator(
task_id="start_go_pipeline_direct_runner",
go_file=GCS_GO,
pipeline_options={"output": GCS_OUTPUT},
)
使用 DataflowRunner 執行 Go Pipeline¶
tests/system/apache/beam/example_go.py
start_go_pipeline_dataflow_runner = BeamRunGoPipelineOperator(
task_id="start_go_pipeline_dataflow_runner",
runner="DataflowRunner",
go_file=GCS_GO,
pipeline_options={
"tempLocation": GCS_TMP,
"stagingLocation": GCS_STAGING,
"output": GCS_OUTPUT,
"WorkerHarnessContainerImage": "apache/beam_go_sdk:latest",
},
dataflow_config=DataflowConfiguration(
job_name="{{task.task_id}}", project_id=GCP_PROJECT_ID, location="us-central1"
),
)
tests/system/apache/beam/example_go_dataflow.py
start_go_job_dataflow_runner_async = BeamRunGoPipelineOperator(
task_id="start_go_job_dataflow_runner_async",
runner="DataflowRunner",
go_file=GCS_GO_DATAFLOW_ASYNC,
pipeline_options={
"tempLocation": GCS_TMP,
"stagingLocation": GCS_STAGING,
"output": GCS_OUTPUT,
"WorkerHarnessContainerImage": "apache/beam_go_sdk:latest",
},
dataflow_config=DataflowConfiguration(
job_name="{{task.task_id}}",
project_id=GCP_PROJECT_ID,
location="us-central1",
wait_until_finished=False,
),
)
wait_for_go_job_dataflow_runner_async_done = DataflowJobStatusSensor(
task_id="wait-for-go-job-async-done",
job_id="{{task_instance.xcom_pull('start_go_job_dataflow_runner_async')['dataflow_job_id']}}",
expected_statuses={DataflowJobStatus.JOB_STATE_DONE},
project_id=GCP_PROJECT_ID,
location="us-central1",
)
start_go_job_dataflow_runner_async >> wait_for_go_job_dataflow_runner_async_done
參考¶
如需更多資訊,請查閱: