EMR 上 Spark 批处理作业的优化
Posted
技术标签:
【中文标题】EMR 上 Spark 批处理作业的优化【英文标题】:Optimization of Spark batch job on EMR 【发布时间】:2020-02-13 20:46:01 【问题描述】:我们正在 EMR 集群上运行 spark-job,集群配置如下所示。
Resources:
Node Type:CORE - 2 INSTANCES OF
r4.8xlarge
32 vCore, 244 GiB memory, EBS only storage
EBS Storage:32 GiB
Node Type: MASTER
1 Instance of r4.4xlarge
16 vCore, 122 GiB memory, EBS only storage
EBS Storage:32 GiB
Node Type: TASK-
2 INSTANCES Of
r4.4xlarge
16 vCore, 122 GiB memory, EBS only storage
EBS Storage:32 GiB
我们在 EMR 控制台上使用以下参数进行 spark-submit:
/usr/bin/spark-submit --deploy-mode cluster --conf spark.sql.parquet.fs.optimized.committer.optimization-enabled=true --conf spark.sql.files.ignoreCorruptFiles=true --driver-memory 5g --master yarn --class class_name s3://location_of_jar -c s3://location of input to jar -w xyz.json
我们认为这些论点没有充分利用可用的全部可用资源。任何人都可以通过更改任何 spark-defaults.conf 文件或传递更多参数来建议是否有任何其他优化方法可以在 EMR 上执行 spark-submit,以便优化利用所有可用资源?我们一次运行一项工作。集群上没有运行并行作业
【问题讨论】:
【参考方案1】:如果不知道每个执行程序分配的资源、工作的性质、您正在处理的数据量等,很难给出适当的建议。我认为您现在能做的最好的事情就是在创建 EMR 集群的同时安装 ganglia。 ganglia web ui 可通过http://master-public-dns-name/ganglia/
获得
先看看cpu和内存使用情况。如果您为 Spark 作业优化分配资源,然后相应地调整每个执行程序的资源,这将为您提供一个足够好的想法。
可以使用以下方式在您的 spark-submit 命令中设置执行器、执行器内存和内核的数量(这些是示例值):
--num-executors 10
--executor-cores 1
--executor-memory 5g
查看神经节图表后,您会了解哪些资源被利用不足/过度利用。相应地改变这些。
如果您不想玩弄这些数字并让 spark 决定什么是最佳组合,则可能值得使用以下行将动态资源分配设置为 true:
--conf spark.shuffle.service.enabled=true
--conf spark.dynamicAllocation.enabled=true
这里要注意的一点是,yarn 将获得分配给核心 + 任务节点的总内存的 75%。此外,驱动程序和每个执行程序都有与之相关的内存开销。查找 spark 文档。在手动为驱动程序和执行程序分配资源时请记住这一点。
【讨论】:
【参考方案2】:分析spark job
的第一步是spark-ui
。所以使用tracking url
并查看logs, jobs, executors, streaming
。
http://cluster_manager_host:8088/
更详细的内存和cpu利用率分析,你也可以使用Gangalia
工具。
http://cluster_manager_host/Gangalia
在这之后你可以做的是:
您必须进行自定义配置,例如
(i) 执行者数量--conf num-executors x
(ii) 执行者内存--conf executor-memory y
(iii) 核心数量-- conf executor-cores z
(iv) 启用动态资源分配--conf spark.dynamicAllocation.enabled=true
(v) 启用最大资源分配--conf maximizeResourceAllocation=true
(vi) 将serialisation
从默认更改为Kryo
、--conf org.apache.spark.serializer.KryoSerializer
(vii) 根据您的配置将分区数量从默认更改为自定义rdd=rdd.repatition(sparkConf.defaultParalallism*2)
如果在上面的正确配置之后你的工作仍然很慢,请更改你的代码并使用正确的函数和对象。喜欢
(i) 如果您要将数据发送到任何外部目的地,例如 Kinesis、DB 或 Kafka,请使用 mapPartitions 或 foreachPatitions 并减少对象创建次数。
(ii) 如果你调用的是外部 API,那么也遵循上述策略。
(iii) 使用适当的数据结构。
更多信息,您可以参考:here
希望对你有帮助。
【讨论】:
以上是关于EMR 上 Spark 批处理作业的优化的主要内容,如果未能解决你的问题,请参考以下文章
在 emr 集群中使用 spark 从 oracle 中获取数据并进行处理
我的 spark 作业在 aws EMR 集群上长时间处于接受模式
在没有 spark UI 的情况下在 AWS EMR 中监控 spark 集群