Amazon EMR¶
Amazon EMR(之前稱為 Amazon Elastic MapReduce)是一個託管叢集平臺,可簡化在 AWS 上執行大資料框架(如 Apache Hadoop 和 Apache Spark)來處理和分析海量資料。使用這些框架和相關的開源專案,您可以處理資料以用於分析目的和商業智慧工作負載。Amazon EMR 還允許您將大量資料轉換並移動到其他 AWS 資料儲存和資料庫中,例如 Amazon Simple Storage Service (Amazon S3) 和 Amazon DynamoDB,以及從中移出。
先決任務¶
要使用這些運算子,您必須執行以下幾項操作
透過 pip 安裝 API 庫。
pip install 'apache-airflow[amazon]'詳細資訊請參見 Airflow® 安裝
設定連線.
運算子¶
注意
為了成功執行示例,您需要為 Amazon EMR 建立 IAM 服務角色(EMR_EC2_DefaultRole 和 EMR_DefaultRole)。您可以使用 AWS CLI 建立這些角色:aws emr create-default-roles。
建立一個 EMR 作業流¶
您可以使用 EmrCreateJobFlowOperator 建立新的 EMR 作業流。叢集將在完成步驟後自動終止。
預設行為是在叢集啟動後立即將 DAG 任務節點標記為成功(wait_policy=None)。可以透過使用不同的 wait_policy 修改此行為。可用選項包括
WaitPolicy.WAIT_FOR_COMPLETION- DAG 任務節點等待叢集執行WaitPolicy.WAIT_FOR_STEPS_COMPLETION- DAG 任務節點等待叢集終止
透過將 deferrable=True 作為引數傳遞,此運算子可以在可延遲模式下執行。使用 deferrable 模式將釋放 worker 插槽,從而提高 Airflow 叢集中資源的利用效率。但是,此模式需要 Airflow triggerer 在您的部署中可用。
作業流配置¶
要在 EMR 上建立作業流,您需要指定 EMR 叢集的配置
tests/system/amazon/aws/example_emr.py
SPARK_STEPS = [
{
"Name": "calculate_pi",
"ActionOnFailure": "CONTINUE",
"HadoopJarStep": {
"Jar": "command-runner.jar",
"Args": ["/usr/lib/spark/bin/run-example", "SparkPi", "10"],
},
}
]
JOB_FLOW_OVERRIDES: dict[str, Any] = {
"Name": "PiCalc",
"ReleaseLabel": "emr-7.1.0",
"Applications": [{"Name": "Spark"}],
"Instances": {
"InstanceGroups": [
{
"Name": "Primary node",
"Market": "ON_DEMAND",
"InstanceRole": "MASTER",
"InstanceType": "m5.xlarge",
"InstanceCount": 1,
},
],
# If the EMR steps complete too quickly the cluster will be torn down before the other system test
# tasks have a chance to run (such as the modify cluster step, the addition of more EMR steps, etc).
# Set KeepJobFlowAliveWhenNoSteps to False to avoid the cluster from being torn down prematurely.
"KeepJobFlowAliveWhenNoSteps": True,
"TerminationProtected": False,
},
"Steps": SPARK_STEPS,
"JobFlowRole": "EMR_EC2_DefaultRole",
"ServiceRole": "EMR_DefaultRole",
}
在這裡,我們建立一個 EMR 單節點叢集 PiCalc。它只有一個步驟 calculate_pi,使用 Spark 計算 Pi 的值。配置 'KeepJobFlowAliveWhenNoSteps': False 告訴叢集在步驟完成後關閉。或者,可以使用沒有 Steps 值的配置,並在以後使用 EmrAddStepsOperator 新增步驟。詳情如下。
注意
透過 EMR API 啟動的 EMR 叢集(例如此處的叢集)預設情況下並非對所有使用者可見,因此您可能在 EMR 管理控制檯中看不到該叢集 - 您可以透過在 JOB_FLOW_OVERRIDES 字典末尾新增 'VisibleToAllUsers': True 來更改此設定。
有關更多配置資訊,請參閱 Boto3 EMR 客戶端。
建立作業流¶
在以下程式碼中,我們使用上述配置建立一個新的作業流。
tests/system/amazon/aws/example_emr.py
create_job_flow = EmrCreateJobFlowOperator(
task_id="create_job_flow",
job_flow_overrides=JOB_FLOW_OVERRIDES,
)
向 EMR 作業流新增步驟¶
要向現有的 EMR 作業流新增步驟,您可以使用 EmrAddStepsOperator。透過將 deferrable=True 作為引數傳遞,此運算子可以在可延遲模式下執行。使用 deferrable 模式將釋放 worker 插槽,從而提高 Airflow 叢集中資源的利用效率。但是,此模式需要 Airflow triggerer 在您的部署中可用。
tests/system/amazon/aws/example_emr.py
add_steps = EmrAddStepsOperator(
task_id="add_steps",
job_flow_id=create_job_flow.output,
steps=SPARK_STEPS,
execution_role_arn=execution_role_arn,
)
終止 EMR 作業流¶
要終止 EMR 作業流,您可以使用 EmrTerminateJobFlowOperator。透過將 deferrable=True 作為引數傳遞,此運算子可以在可延遲模式下執行。使用 deferrable 模式將釋放 worker 插槽,從而提高 Airflow 叢集中資源的利用效率。但是,此模式需要 Airflow triggerer 在您的部署中可用。
tests/system/amazon/aws/example_emr.py
remove_cluster = EmrTerminateJobFlowOperator(
task_id="remove_cluster",
job_flow_id=create_job_flow.output,
)
修改 Amazon EMR 容器¶
要修改現有的 EMR 容器,您可以使用 EmrContainerSensor。
tests/system/amazon/aws/example_emr.py
modify_cluster = EmrModifyClusterOperator(
task_id="modify_cluster", cluster_id=create_job_flow.output, step_concurrency_level=1
)
啟動 EMR Notebook 執行¶
您可以使用 EmrStartNotebookExecutionOperator 在附加到執行中叢集的現有 Notebook 上啟動 Notebook 執行。
tests/system/amazon/aws/example_emr_notebook_execution.py
start_execution = EmrStartNotebookExecutionOperator(
task_id="start_execution",
editor_id=editor_id,
cluster_id=cluster_id,
relative_path="EMR-System-Test.ipynb",
service_role="EMR_Notebooks_DefaultRole",
)
停止 EMR Notebook 執行¶
您可以使用 EmrStopNotebookExecutionOperator 停止正在執行的 Notebook 執行。
tests/system/amazon/aws/example_emr_notebook_execution.py
stop_execution = EmrStopNotebookExecutionOperator(
task_id="stop_execution",
notebook_execution_id=notebook_execution_id_1,
)
感測器¶
等待 EMR Notebook 執行狀態¶
要監控 EMR Notebook 執行的狀態,您可以使用 EmrNotebookExecutionSensor。
tests/system/amazon/aws/example_emr_notebook_execution.py
wait_for_execution_start = EmrNotebookExecutionSensor(
task_id="wait_for_execution_start",
notebook_execution_id=notebook_execution_id_1,
target_states={"RUNNING"},
poke_interval=5,
)
等待 Amazon EMR 作業流狀態¶
要監控 EMR 作業流的狀態,您可以使用 EmrJobFlowSensor。
tests/system/amazon/aws/example_emr.py
check_job_flow = EmrJobFlowSensor(task_id="check_job_flow", job_flow_id=create_job_flow.output)
等待 Amazon EMR 步驟狀態¶
要監控 EMR 作業步驟的狀態,您可以使用 EmrStepSensor。
tests/system/amazon/aws/example_emr.py
wait_for_step = EmrStepSensor(
task_id="wait_for_step",
job_flow_id=create_job_flow.output,
step_id=get_step_id(add_steps.output),
)
限流¶
Amazon EMR 的服務配額相對較低,可以在此處檢視詳細資訊。因此,在使用本頁列出的任何運算子和感測器時,您可能會遇到限流問題。要規避此限制,請考慮自定義 AWS 連線配置以修改預設的 Boto3 重試策略。請參閱 AWS 連線配置文件。