EMR 上 Spark 批处理作业的优化

Posted

技术标签:

【中文标题】EMR 上 Spark 批处理作业的优化【英文标题】:Optimization of Spark batch job on EMR 【发布时间】:2020-02-13 20:46:01 【问题描述】:

我们正在 EMR 集群上运行 spark-job,集群配置如下所示。

Resources:
Node Type:CORE - 2 INSTANCES OF
r4.8xlarge
32 vCore, 244 GiB memory, EBS only storage
EBS Storage:32 GiB

Node Type: MASTER
1 Instance of r4.4xlarge
16 vCore, 122 GiB memory, EBS only storage
EBS Storage:32 GiB

Node Type: TASK- 
2 INSTANCES Of 
r4.4xlarge
16 vCore, 122 GiB memory, EBS only storage
EBS Storage:32 GiB

我们在 EMR 控制台上使用以下参数进行 spark-submit:

/usr/bin/spark-submit --deploy-mode cluster --conf spark.sql.parquet.fs.optimized.committer.optimization-enabled=true --conf spark.sql.files.ignoreCorruptFiles=true --driver-memory 5g --master yarn --class class_name s3://location_of_jar -c s3://location of input to jar -w xyz.json

我们认为这些论点没有充分利用可用的全部可用资源。任何人都可以通过更改任何 spark-defaults.conf 文件或传递更多参数来建议是否有任何其他优化方法可以在 EMR 上执行 spark-submit,以便优化利用所有可用资源?我们一次运行一项工作。集群上没有运行并行作业

【问题讨论】:

【参考方案1】:

如果不知道每个执行程序分配的资源、工作的性质、您正在处理的数据量等,很难给出适当的建议。我认为您现在能做的最好的事情就是在创建 EMR 集群的同时安装 ganglia。 ganglia web ui 可通过http://master-public-dns-name/ganglia/ 获得

先看看cpu和内存使用情况。如果您为 Spark 作业优化分配资源,然后相应地调整每个执行程序的资源,这将为您提供一个足够好的想法。

可以使用以下方式在您的 spark-submit 命令中设置执行器、执行器内存和内核的数量(这些是示例值):

--num-executors 10
--executor-cores 1
--executor-memory 5g

查看神经节图表后,您会了解哪些资源被利用不足/过度利用。相应地改变这些。

如果您不想玩弄这些数字并让 spark 决定什么是最佳组合,则可能值得使用以下行将动态资源分配设置为 true:

--conf spark.shuffle.service.enabled=true
--conf spark.dynamicAllocation.enabled=true

这里要注意的一点是,yarn 将获得分配给核心 + 任务节点的总内存的 75%。此外,驱动程序和每个执行程序都有与之相关的内存开销。查找 spark 文档。在手动为驱动程序和执行程序分配资源时请记住这一点。

【讨论】:

【参考方案2】:

分析spark job的第一步是spark-ui。所以使用tracking url 并查看logs, jobs, executors, streaming

http://cluster_manager_host:8088/

更详细的内存和cpu利用率分析,你也可以使用Gangalia工具。

http://cluster_manager_host/Gangalia

在这之后你可以做的是:

您必须进行自定义配置,例如

(i) 执行者数量--conf num-executors x

(ii) 执行者内存--conf executor-memory y

(iii) 核心数量-- conf executor-cores z

(iv) 启用动态资源分配--conf spark.dynamicAllocation.enabled=true

(v) 启用最大资源分配--conf maximizeResourceAllocation=true

(vi) 将serialisation从默认更改为Kryo--conf org.apache.spark.serializer.KryoSerializer

(vii) 根据您的配置将分区数量从默认更改为自定义rdd=rdd.repatition(sparkConf.defaultParalallism*2)

如果在上面的正确配置之后你的工作仍然很慢,请更改你的代码并使用正确的函数和对象。喜欢

(i) 如果您要将数据发送到任何外部目的地,例如 Kinesis、DB 或 Kafka,请使用 mapPartitions 或 foreachPatitions 并减少对象创建次数。

(ii) 如果你调用的是外部 API,那么也遵循上述策略。

(iii) 使用适当的数据结构。

更多信息,您可以参考:here

希望对你有帮助。

【讨论】:

以上是关于EMR 上 Spark 批处理作业的优化的主要内容,如果未能解决你的问题,请参考以下文章

在 emr 集群中使用 spark 从 oracle 中获取数据并进行处理

我的 spark 作业在 aws EMR 集群上长时间处于接受模式

AWS EMR 中的 Spark 物理内存问题

在没有 spark UI 的情况下在 AWS EMR 中监控 spark 集群

如何从 Lambda 函数在亚马逊 EMR 上执行 spark 提交?

Spark Dataframe 在 EMR 上加载 500k 文件