spark性能调优04-算子调优
Posted YL10000
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了spark性能调优04-算子调优相关的知识,希望对你有一定的参考价值。
1、使用MapPartitions代替map
1.1 为什么要死使用MapPartitions代替map
普通的map,每条数据都会传入function中进行计算一次;而是用MapPartitions时,function会一次接受所有partition的数据出入到function中计算一次,性能较高。
但是如果内存不足时,使用MapPartitions,一次将所有的partition数据传入,可能会发生OOM异常
1.2 如何使用
有map的操作的地方,都可以使用MapPartitions进行替换
/** * 使用mapPartitionsToPair代替mapToPair */ JavaPairRDD<String, Row> sessionRowPairRdd =dateRangeRdd.mapPartitionsToPair(new PairFlatMapFunction<Iterator<Row>, String, Row>() { private static final long serialVersionUID = 1L; public Iterable<Tuple2<String, Row>> call(Iterator<Row> rows) throws Exception { List<Tuple2<String, Row>> list=new ArrayList<Tuple2<String, Row>>(); while (rows.hasNext()) { Row row=rows.next(); list.add(new Tuple2<String, Row>(row.getString(2), row)); } return list; } }); /*JavaPairRDD<String, Row> sessionRowPairRdd = dateRangeRdd .mapToPair(new PairFunction<Row, String, Row>() { private static final long serialVersionUID = 1L; // 先将数据映射为<sessionId,row> public Tuple2<String, Row> call(Row row) throws Exception { return new Tuple2<String, Row>(row.getString(2), row); } });*/
2、使用coalesce对过滤后的Rdd进行重新分区和压缩
2.1 为什么使用coalesce
默认情况下,经过过滤后的数据的分区数和原分区数是一样的,这就导致过滤后各个分区中的数据可能差距很大,在之后的操作中造成数据倾斜
使用coalesce可以使过滤后的Rdd的分区数减少,并让每个分区中的数据趋于平等
2.2 如何使用
//过滤符合要求的ClickCategoryIdRow
filteredSessionRdd.filter(new Function<Tuple2<String,Row>, Boolean>() { private static final long serialVersionUID = 1L; public Boolean call(Tuple2<String, Row> tuple2) throws Exception { return (Long.valueOf(tuple2._2.getLong(6))!=null)?true:false; } }) //使用coalesce将过滤后的数据重新分区和压缩,时新的分区中的数据大致相等 .coalesce(100)
3、使用foreachPartition替代foreach
3.1 为什么使用foreachPartition
默认使用的foreach,每条数据都会传入function进行计算;如果操作数据库,每条数据都会获取一个数据库连接并发送sql进行保存,消耗资源比较大,性能低。
使用foreachPartition,会把所用partition的数据一次出入function,只需要获取一次数据库连接,性能高。
3.2 如何使用
/** * 使用foreachPartition替代foreach */ sessionRdd.join(sessionRowPairRdd).foreachPartition(new VoidFunction<Iterator<Tuple2<String,Tuple2<String,Row>>>>() { private static final long serialVersionUID = 1L; public void call(Iterator<Tuple2<String, Tuple2<String, Row>>> iterator) throws Exception { List<SessionDetail> sessionDetails=new ArrayList<SessionDetail>(); if (iterator.hasNext()) { Tuple2<String, Tuple2<String, Row>> tuple2=iterator.next(); String sessionId=tuple2._1; Row row=tuple2._2._2; SessionDetail sessionDetail=new SessionDetail(); sessionDetail.setSessionId(sessionId); sessionDetail.setTaskId((int)taskId); sessionDetail.setUserId((int)row.getLong(1)); sessionDetails.add(sessionDetail); } DaoFactory.getSessionDetailDao().batchInsertSessionDao(sessionDetails); } }); /* sessionRdd.join(sessionRowPairRdd).foreach(new VoidFunction<Tuple2<String,Tuple2<String,Row>>>() { private static final long serialVersionUID = 1L; public void call(Tuple2<String, Tuple2<String, Row>> tuple2) throws Exception { String sessionId=tuple2._1; Row row=tuple2._2._2; SessionDetail sessionDetail=new SessionDetail(); sessionDetail.setSessionId(sessionId); sessionDetail.setTaskId((int)taskId); sessionDetail.setUserId((int)row.getLong(1)); DaoFactory.getSessionDetailDao().insertSessionDao(sessionDetail); } });*/
4、使用repartition进行调整并行度
4.1 为什么要使用repartition
spark.default.parallelism设置的并行度只能对没有Spark SQL(DataFrame)的阶段有用,对Spark SQL的并行度是无法设置的,该并行度是通过hdfs文件所在的block块决定的。
可以通过repartition调整之后的并行度
4.2 如何使用
sqlContext.sql("select * from user_visit_action where date >= ‘" + startDate + "‘ and date <= ‘" + endDate + "‘").javaRDD() //使用repartition调整并行度 .repartition(100)
5、使用reduceByKey进行本地聚合
5.1 reduceByKey有哪些优点
reduceByKey相对于普通的shuffle操作(如groupByKey)的一个最大的优点,会进行map端的本地聚合,从而减少文件的输出,减少磁盘IO,网络传输,内存占比以及reduce端的聚合操作数据。
5.2 使用场景
只有是针对每个不同的key进行相应的操作都可以使用reduceByKey进行处理
以上是关于spark性能调优04-算子调优的主要内容,如果未能解决你的问题,请参考以下文章