EMR 上 Spark 批处理作业的优化

Posted

技术标签:

【中文标题】EMR 上 Spark 批处理作业的优化【英文标题】:Optimization of Spark batch job on EMR 【发布时间】:2020-02-13 20:46:01 【问题描述】:

我们正在 EMR 集群上运行 spark-job,集群配置如下所示。

Resources:
Node Type:CORE - 2 INSTANCES OF
r4.8xlarge
32 vCore, 244 GiB memory, EBS only storage
EBS Storage:32 GiB

Node Type: MASTER
1 Instance of r4.4xlarge
16 vCore, 122 GiB memory, EBS only storage
EBS Storage:32 GiB

Node Type: TASK- 
2 INSTANCES Of 
r4.4xlarge
16 vCore, 122 GiB memory, EBS only storage
EBS Storage:32 GiB

我们在 EMR 控制台上使用以下参数进行 spark-submit:

/usr/bin/spark-submit --deploy-mode cluster --conf spark.sql.parquet.fs.optimized.committer.optimization-enabled=true --conf spark.sql.files.ignoreCorruptFiles=true --driver-memory 5g --master yarn --class class_name s3://location_of_jar -c s3://location of input to jar -w xyz.json

我们认为这些论点没有充分利用可用的全部可用资源。任何人都可以通过更改任何 spark-defaults.conf 文件或传递更多参数来建议是否有任何其他优化方法可以在 EMR 上执行 spark-submit,以便优化利用所有可用资源?我们一次运行一项工作。集群上没有运行并行作业

【问题讨论】:

【参考方案1】:

如果不知道每个执行程序分配的资源、工作的性质、您正在处理的数据量等,很难给出适当的建议。我认为您现在能做的最好的事情就是在创建 EMR 集群的同时安装 ganglia。 ganglia web ui 可通过http://master-public-dns-name/ganglia/ 获得

先看看cpu和内存使用情况。如果您为 Spark 作业优化分配资源,然后相应地调整每个执行程序的资源,这将为您提供一个足够好的想法。

可以使用以下方式在您的 spark-submit 命令中设置执行器、执行器内存和内核的数量(这些是示例值):

--num-executors 10
--executor-cores 1
--executor-memory 5g

查看神经节图表后,您会了解哪些资源被利用不足/过度利用。相应地改变这些。

如果您不想玩弄这些数字并让 spark 决定什么是最佳组合,则可能值得使用以下行将动态资源分配设置为 true:

--conf spark.shuffle.service.enabled=true
--conf spark.dynamicAllocation.enabled=true

这里要注意的一点是,yarn 将获得分配给核心 + 任务节点的总内存的 75%。此外,驱动程序和每个执行程序都有与之相关的内存开销。查找 spark 文档。在手动为驱动程序和执行程序分配资源时请记住这一点。

【讨论】:

【参考方案2】:

分析spark job的第一步是spark-ui。所以使用tracking url 并查看logs, jobs, executors, streaming

http://cluster_manager_host:8088/

更详细的内存和cpu利用率分析,你也可以使用Gangalia工具。

http://cluster_manager_host/Gangalia

在这之后你可以做的是:

您必须进行自定义配置,例如

(i) 执行者数量--conf num-executors x

(ii) 执行者内存--conf executor-memory y

(iii) 核心数量-- conf executor-cores z

(iv) 启用动态资源分配--conf spark.dynamicAllocation.enabled=true

(v) 启用最大资源分配--conf maximizeResourceAllocation=true

(vi) 将serialisation从默认更改为Kryo--conf org.apache.spark.serializer.KryoSerializer

(vii) 根据您的配置将分区数量从默认更改为自定义rdd=rdd.repatition(sparkConf.defaultParalallism*2)

如果在上面的正确配置之后你的工作仍然很慢,请更改你的代码并使用正确的函数和对象。喜欢

(i) 如果您要将数据发送到任何外部目的地,例如 Kinesis、DB 或 Kafka,请使用 mapPartitions 或 foreachPatitions 并减少对象创建次数。

(ii) 如果你调用的是外部 API,那么也遵循上述策略。

(iii) 使用适当的数据结构。

更多信息,您可以参考:here

希望对你有帮助。

【讨论】:

以上是关于EMR 上 Spark 批处理作业的优化的主要内容,如果未能解决你的问题,请参考以下文章