Amazon EMR

Amazon EMR(之前稱為 Amazon Elastic MapReduce)是一個託管叢集平臺,可簡化在 AWS 上執行大資料框架(如 Apache Hadoop 和 Apache Spark)來處理和分析海量資料。使用這些框架和相關的開源專案,您可以處理資料以用於分析目的和商業智慧工作負載。Amazon EMR 還允許您將大量資料轉換並移動到其他 AWS 資料儲存和資料庫中,例如 Amazon Simple Storage Service (Amazon S3) 和 Amazon DynamoDB,以及從中移出。

先決任務

要使用這些運算子,您必須執行以下幾項操作

運算子

注意

為了成功執行示例,您需要為 Amazon EMR 建立 IAM 服務角色(EMR_EC2_DefaultRoleEMR_DefaultRole)。您可以使用 AWS CLI 建立這些角色:aws emr create-default-roles

建立一個 EMR 作業流

您可以使用 EmrCreateJobFlowOperator 建立新的 EMR 作業流。叢集將在完成步驟後自動終止。

預設行為是在叢集啟動後立即將 DAG 任務節點標記為成功(wait_policy=None)。可以透過使用不同的 wait_policy 修改此行為。可用選項包括

  • WaitPolicy.WAIT_FOR_COMPLETION - DAG 任務節點等待叢集執行

  • WaitPolicy.WAIT_FOR_STEPS_COMPLETION - DAG 任務節點等待叢集終止

透過將 deferrable=True 作為引數傳遞,此運算子可以在可延遲模式下執行。使用 deferrable 模式將釋放 worker 插槽,從而提高 Airflow 叢集中資源的利用效率。但是,此模式需要 Airflow triggerer 在您的部署中可用。

作業流配置

要在 EMR 上建立作業流,您需要指定 EMR 叢集的配置

tests/system/amazon/aws/example_emr.py

SPARK_STEPS = [
    {
        "Name": "calculate_pi",
        "ActionOnFailure": "CONTINUE",
        "HadoopJarStep": {
            "Jar": "command-runner.jar",
            "Args": ["/usr/lib/spark/bin/run-example", "SparkPi", "10"],
        },
    }
]

JOB_FLOW_OVERRIDES: dict[str, Any] = {
    "Name": "PiCalc",
    "ReleaseLabel": "emr-7.1.0",
    "Applications": [{"Name": "Spark"}],
    "Instances": {
        "InstanceGroups": [
            {
                "Name": "Primary node",
                "Market": "ON_DEMAND",
                "InstanceRole": "MASTER",
                "InstanceType": "m5.xlarge",
                "InstanceCount": 1,
            },
        ],
        # If the EMR steps complete too quickly the cluster will be torn down before the other system test
        # tasks have a chance to run (such as the modify cluster step, the addition of more EMR steps, etc).
        # Set KeepJobFlowAliveWhenNoSteps to False to avoid the cluster from being torn down prematurely.
        "KeepJobFlowAliveWhenNoSteps": True,
        "TerminationProtected": False,
    },
    "Steps": SPARK_STEPS,
    "JobFlowRole": "EMR_EC2_DefaultRole",
    "ServiceRole": "EMR_DefaultRole",
}

在這裡,我們建立一個 EMR 單節點叢集 PiCalc。它只有一個步驟 calculate_pi,使用 Spark 計算 Pi 的值。配置 'KeepJobFlowAliveWhenNoSteps': False 告訴叢集在步驟完成後關閉。或者,可以使用沒有 Steps 值的配置,並在以後使用 EmrAddStepsOperator 新增步驟。詳情如下。

注意

透過 EMR API 啟動的 EMR 叢集(例如此處的叢集)預設情況下並非對所有使用者可見,因此您可能在 EMR 管理控制檯中看不到該叢集 - 您可以透過在 JOB_FLOW_OVERRIDES 字典末尾新增 'VisibleToAllUsers': True 來更改此設定。

有關更多配置資訊,請參閱 Boto3 EMR 客戶端

建立作業流

在以下程式碼中,我們使用上述配置建立一個新的作業流。

tests/system/amazon/aws/example_emr.py

create_job_flow = EmrCreateJobFlowOperator(
    task_id="create_job_flow",
    job_flow_overrides=JOB_FLOW_OVERRIDES,
)

向 EMR 作業流新增步驟

要向現有的 EMR 作業流新增步驟,您可以使用 EmrAddStepsOperator。透過將 deferrable=True 作為引數傳遞,此運算子可以在可延遲模式下執行。使用 deferrable 模式將釋放 worker 插槽,從而提高 Airflow 叢集中資源的利用效率。但是,此模式需要 Airflow triggerer 在您的部署中可用。

tests/system/amazon/aws/example_emr.py

add_steps = EmrAddStepsOperator(
    task_id="add_steps",
    job_flow_id=create_job_flow.output,
    steps=SPARK_STEPS,
    execution_role_arn=execution_role_arn,
)

終止 EMR 作業流

要終止 EMR 作業流,您可以使用 EmrTerminateJobFlowOperator。透過將 deferrable=True 作為引數傳遞,此運算子可以在可延遲模式下執行。使用 deferrable 模式將釋放 worker 插槽,從而提高 Airflow 叢集中資源的利用效率。但是,此模式需要 Airflow triggerer 在您的部署中可用。

tests/system/amazon/aws/example_emr.py

remove_cluster = EmrTerminateJobFlowOperator(
    task_id="remove_cluster",
    job_flow_id=create_job_flow.output,
)

修改 Amazon EMR 容器

要修改現有的 EMR 容器,您可以使用 EmrContainerSensor

tests/system/amazon/aws/example_emr.py

modify_cluster = EmrModifyClusterOperator(
    task_id="modify_cluster", cluster_id=create_job_flow.output, step_concurrency_level=1
)

啟動 EMR Notebook 執行

您可以使用 EmrStartNotebookExecutionOperator 在附加到執行中叢集的現有 Notebook 上啟動 Notebook 執行。

tests/system/amazon/aws/example_emr_notebook_execution.py

start_execution = EmrStartNotebookExecutionOperator(
    task_id="start_execution",
    editor_id=editor_id,
    cluster_id=cluster_id,
    relative_path="EMR-System-Test.ipynb",
    service_role="EMR_Notebooks_DefaultRole",
)

停止 EMR Notebook 執行

您可以使用 EmrStopNotebookExecutionOperator 停止正在執行的 Notebook 執行。

tests/system/amazon/aws/example_emr_notebook_execution.py

stop_execution = EmrStopNotebookExecutionOperator(
    task_id="stop_execution",
    notebook_execution_id=notebook_execution_id_1,
)

感測器

等待 EMR Notebook 執行狀態

要監控 EMR Notebook 執行的狀態,您可以使用 EmrNotebookExecutionSensor

tests/system/amazon/aws/example_emr_notebook_execution.py

wait_for_execution_start = EmrNotebookExecutionSensor(
    task_id="wait_for_execution_start",
    notebook_execution_id=notebook_execution_id_1,
    target_states={"RUNNING"},
    poke_interval=5,
)

等待 Amazon EMR 作業流狀態

要監控 EMR 作業流的狀態,您可以使用 EmrJobFlowSensor

tests/system/amazon/aws/example_emr.py

check_job_flow = EmrJobFlowSensor(task_id="check_job_flow", job_flow_id=create_job_flow.output)

等待 Amazon EMR 步驟狀態

要監控 EMR 作業步驟的狀態,您可以使用 EmrStepSensor

tests/system/amazon/aws/example_emr.py

wait_for_step = EmrStepSensor(
    task_id="wait_for_step",
    job_flow_id=create_job_flow.output,
    step_id=get_step_id(add_steps.output),
)

限流

Amazon EMR 的服務配額相對較低,可以在此處檢視詳細資訊。因此,在使用本頁列出的任何運算子和感測器時,您可能會遇到限流問題。要規避此限制,請考慮自定義 AWS 連線配置以修改預設的 Boto3 重試策略。請參閱 AWS 連線配置文件

參考

此條目有幫助嗎?