Amazon EMR on Amazon EKS¶
Amazon EMR on EKS 為 Amazon EMR 提供了一種部署選項,允許您在 Amazon EKS 上執行開源大資料框架。
先決條件任務¶
要使用這些 Operator,您需要做幾件事
透過 pip 安裝 API 庫。
pip install 'apache-airflow[amazon]'詳細資訊請參閱 Airflow® 安裝
設定連線.
Operator¶
建立 Amazon EMR EKS 虛擬叢集¶
EmrEksCreateClusterOperator 將建立 Amazon EMR on EKS 虛擬叢集。下面的示例 DAG 展示瞭如何建立 EMR on EKS 虛擬叢集。
要在 Amazon EKS 上建立 Amazon EMR 叢集,您需要指定一個虛擬叢集名稱、您希望使用的 eks 叢集以及一個 eks 名稱空間。
有關更多詳細資訊,請參閱 EMR on EKS 開發指南。
tests/system/amazon/aws/example_emr_eks.py
create_emr_eks_cluster = EmrEksCreateClusterOperator(
task_id="create_emr_eks_cluster",
virtual_cluster_name=virtual_cluster_name,
eks_cluster_name=eks_cluster_name,
eks_namespace=eks_namespace,
)
向 Amazon EMR 虛擬叢集提交作業¶
注意
本示例假設您已經配置了 EMR on EKS 虛擬叢集。有關更多資訊,請參閱 EMR on EKS 入門指南。
EmrContainerOperator 將向 Amazon EMR on Amazon EKS 虛擬叢集提交新作業。下面的示例作業計算數學常數 Pi。在生產作業中,您通常會引用 Amazon Simple Storage Service (S3) 上的 Spark 指令碼。
要為 Amazon EMR on Amazon EKS 建立作業,您需要指定您的虛擬叢集 ID、您要使用的 Amazon EMR 版本、您的 IAM 執行角色以及 Spark 提交引數。
您還可以選擇提供配置覆蓋,例如 Spark、Hive 或 Log4j 屬性,以及將 Spark 日誌傳送到 Amazon S3 或 Amazon Cloudwatch 的監控配置。
在此示例中,我們展示瞭如何新增 applicationConfiguration 以使用 AWS Glue Data Catalog,以及 monitoringConfiguration 以將日誌傳送到 Amazon CloudWatch 中的 /aws/emr-eks-spark 日誌組。有關作業配置的更多詳細資訊,請參閱 EMR on EKS 指南。
tests/system/amazon/aws/example_emr_eks.py
job_driver_arg = {
"sparkSubmitJobDriver": {
"entryPoint": f"s3://{s3_bucket_name}/{S3_FILE_NAME}",
"sparkSubmitParameters": "--conf spark.executors.instances=2 --conf spark.executors.memory=2G "
"--conf spark.executor.cores=2 --conf spark.driver.cores=1",
}
}
configuration_overrides_arg = {
"monitoringConfiguration": {
"cloudWatchMonitoringConfiguration": {
"logGroupName": "/emr-eks-jobs",
"logStreamNamePrefix": "airflow",
}
},
}
我們將 virtual_cluster_id 和 execution_role_arn 值作為 operator 引數傳遞,但您可以將它們儲存在連線中或在 DAG 中提供。您的 AWS 區域應在 aws_default 連線中定義為 {"region_name": "us-east-1"},或者是一個透過 aws_conn_id 引數傳遞給 operator 的自定義連線名稱。operator 返回作業執行的作業 ID。
tests/system/amazon/aws/example_emr_eks.py
job_starter = EmrContainerOperator(
task_id="start_job",
virtual_cluster_id=str(create_emr_eks_cluster.output),
execution_role_arn=job_role_arn,
release_label="emr-7.0.0-latest",
job_driver=job_driver_arg,
configuration_overrides=configuration_overrides_arg,
name="pi.py",
)
Sensor¶
等待 Amazon EMR 虛擬叢集作業¶
要等待 Amazon EMR 虛擬叢集作業的狀態達到終端狀態,您可以使用 EmrContainerSensor
tests/system/amazon/aws/example_emr_eks.py
job_waiter = EmrContainerSensor(
task_id="job_waiter",
virtual_cluster_id=str(create_emr_eks_cluster.output),
job_id=str(job_starter.output),
)