spark使用性能优化记录——二
Posted lsbigdata
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了spark使用性能优化记录——二相关的知识,希望对你有一定的参考价值。
之前做了记录了spark的一些配置调优,接下来记录一下本人在开发中用到的一些调优手段。
算子调优
MapPartitons提升Map类操作性能:
spark中每个task处理一个RDD的partition,一条一条数据--> task function
MapPartitons后所有的数据(一个分区的所有数据)--> task function
优点: 不用一条一条的去处理数据 ,而是直接都处理
缺点:内存不够大的时候一下子处理会出现OOM
实现:预估RDD数量 partition数量,分配给executor足够的内存资源使用
filter之后减少分区数量:
1、减少partition filter 之后的partition,数据不均匀 ,数据倾斜。
2、减少开始的partition数量
3、减少filter之后的partition数量——>filter().coalesce(100);
4、使用repartition解决Spark SQL低并行度的性能问题:
并行度设置提高性能:conf.set("spark.default.parallelism","500") 指定为core数量的2-3倍。
但是有spark sql的地方是不能设置并行度是,sql会根据hive表对应的hdfs文件block自动设置并行度,可能这个并行度严重低于你设置的,这样stage0会远远慢与stage1 阶段
解决方案:将sql查询出来的RDD使用repartition将并行度提高,即stage0的并行。
reduceByKey本地聚合:
reducebykey相对比普通的groupbykey 会进行map端(每个partition)的combiner聚合,每一个key对应一个values。
减少map端的数据数量,磁盘占用量磁盘IO stage1阶段拉取也减少了reduce端的内存占用,聚合数量也减少了。
使用场景:word count、一些复杂的对每个key进行一些字符串的拼接(实现有点难)
减少了shuffle阶段的资源消耗 。
foreachPartition优化写数据库性能
实际生产环境都使用这个
好处:只需要调用一次function函数,只要创建或获取一个数据库连接就可以,只要向数据库发送一次SQL语句和多组参数即可
坏处:也可能一次数据量特别大的时候,会发生OOM
数据倾斜
原理:ruduce阶段 task任务分布不均 shuffle过程产生 OOM 内存溢出
位置:在代码里找(groupbykey、countBykey、reduceBykey、join),或者查看log发现 定位第几个stage,是哪个stage task慢,定位代码。
解决方案:
聚合源数据:
90%的数据来源是hive(HSQL做离线 ETL 数据采集、清洗、导入)的数据仓库,所以聚合shuffle去hive离线晚上凌晨直接聚合,然后spark就不做这个操作了,对每个key聚合 value聚合,后期用spark sql 直接查询就行,减少数据量。
过滤导致倾斜的key:
放粗粒度聚合,不要聚合那么细。摒弃掉某些倾斜量特别大的key 直接查询获取数据的时候,在hive表中,用where过滤掉,直接弃掉某些数据
提高reduce并行度(shuffle阶段)
例如将原本一个task中的任务分配给5个task,配置的参数增加reduce的task数,调入shuffle操作的时候传入参数数字及 reduce的并行度
上两个方案直接避免数据倾斜,这个是减轻了shuffle reduce task的数据压力,倾斜问题,使用后依旧很慢,只是没有OOM了
使用随机key实现双重聚合
打散key(mapTopair 加随机数),让其跑去不同的task上--局部聚合--反向映射--还原key去新的task上计算
maptopair 打散,reducebykey局部聚合,maptopair还原reducebykey全局聚合
适用:reduceBykey 、groupBykey 场景适用
reduce join 转化为 map join
RDD join RDD
RDD->map-join-broadcast(广播变量)->RDD,避免shuffle,避免了数据倾斜
.collect() list --> sc.broadcast(list) --> maptopair 遍历tuple放到map里,拼接返回需要的值
适用场景:RDD的join,一个RDD比较小,一个较大
Spark SQL 数据倾斜
解决方案和spark core的一样
1.聚合数据 同core
2.过滤导致倾斜的key 在sql用where
3.提高shuffle并行度,groupByKey(1000) .set("spark.sql.shuffle.partitions","2000") 默认200。
4.双重group by
5.reduce join 转换为 map join .set("spark.sql/autoBroadcastJoinThreshold","")默认是10485760
6.采样倾斜key单独join,纯spark core 的方式 sample filter 等算子
7.随机key与扩容表:spark sql + spark core
这是我在实际开发中遇到问题使用的一些调优方法,当然spark的调优肯定不止这些,希望能在以后的学习中更进一步,学习更多的知识。
以上是关于spark使用性能优化记录——二的主要内容,如果未能解决你的问题,请参考以下文章