spark性能调优04-算子调优

Posted YL10000

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了spark性能调优04-算子调优相关的知识,希望对你有一定的参考价值。

1、使用MapPartitions代替map

  1.1 为什么要死使用MapPartitions代替map

    普通的map,每条数据都会传入function中进行计算一次;而是用MapPartitions时,function会一次接受所有partition的数据出入到function中计算一次,性能较高。

    但是如果内存不足时,使用MapPartitions,一次将所有的partition数据传入,可能会发生OOM异常

  1.2 如何使用

    有map的操作的地方,都可以使用MapPartitions进行替换

        /**
         * 使用mapPartitionsToPair代替mapToPair
         */
        JavaPairRDD<String, Row> sessionRowPairRdd =dateRangeRdd.mapPartitionsToPair(new PairFlatMapFunction<Iterator<Row>, String, Row>() {

            private static final long serialVersionUID = 1L;

            public Iterable<Tuple2<String, Row>> call(Iterator<Row> rows) throws Exception {
                List<Tuple2<String, Row>> list=new ArrayList<Tuple2<String, Row>>();
                while (rows.hasNext()) {
                    Row row=rows.next();
                    list.add(new Tuple2<String, Row>(row.getString(2), row));
                }
                return list;
            }
        });
        
        /*JavaPairRDD<String, Row> sessionRowPairRdd = dateRangeRdd
                .mapToPair(new PairFunction<Row, String, Row>() {

                    private static final long serialVersionUID = 1L;
                    // 先将数据映射为<sessionId,row>
                    public Tuple2<String, Row> call(Row row) throws Exception {
                        return new Tuple2<String, Row>(row.getString(2), row);
                    }
                });*/    

2、使用coalesce对过滤后的Rdd进行重新分区和压缩

  2.1 为什么使用coalesce

    默认情况下,经过过滤后的数据的分区数和原分区数是一样的,这就导致过滤后各个分区中的数据可能差距很大,在之后的操作中造成数据倾斜

    使用coalesce可以使过滤后的Rdd的分区数减少,并让每个分区中的数据趋于平等

  2.2 如何使用   

       //过滤符合要求的ClickCategoryIdRow    
    filteredSessionRdd
.filter(new Function<Tuple2<String,Row>, Boolean>() { private static final long serialVersionUID = 1L; public Boolean call(Tuple2<String, Row> tuple2) throws Exception { return (Long.valueOf(tuple2._2.getLong(6))!=null)?true:false; } }) //使用coalesce将过滤后的数据重新分区和压缩,时新的分区中的数据大致相等 .coalesce(100)

3、使用foreachPartition替代foreach

  3.1 为什么使用foreachPartition

    默认使用的foreach,每条数据都会传入function进行计算;如果操作数据库,每条数据都会获取一个数据库连接并发送sql进行保存,消耗资源比较大,性能低。

    使用foreachPartition,会把所用partition的数据一次出入function,只需要获取一次数据库连接,性能高。

  3.2 如何使用

        /**
         * 使用foreachPartition替代foreach
         */
        sessionRdd.join(sessionRowPairRdd).foreachPartition(new VoidFunction<Iterator<Tuple2<String,Tuple2<String,Row>>>>() {
            private static final long serialVersionUID = 1L;
            public void call(Iterator<Tuple2<String, Tuple2<String, Row>>> iterator)
                    throws Exception {
                List<SessionDetail> sessionDetails=new ArrayList<SessionDetail>();
                if (iterator.hasNext()) {
                    Tuple2<String, Tuple2<String, Row>> tuple2=iterator.next();
                    String sessionId=tuple2._1;
                    Row row=tuple2._2._2;
                    SessionDetail sessionDetail=new SessionDetail();
                    sessionDetail.setSessionId(sessionId);
                    sessionDetail.setTaskId((int)taskId);
                    sessionDetail.setUserId((int)row.getLong(1));
                    sessionDetails.add(sessionDetail);
                }
                DaoFactory.getSessionDetailDao().batchInsertSessionDao(sessionDetails);
            }
        });
        
       /* sessionRdd.join(sessionRowPairRdd).foreach(new VoidFunction<Tuple2<String,Tuple2<String,Row>>>() {
            private static final long serialVersionUID = 1L;
            public void call(Tuple2<String, Tuple2<String, Row>> tuple2) throws Exception {
                String sessionId=tuple2._1;
                Row row=tuple2._2._2;
                SessionDetail sessionDetail=new SessionDetail();
                sessionDetail.setSessionId(sessionId);
                sessionDetail.setTaskId((int)taskId);
                sessionDetail.setUserId((int)row.getLong(1));
              DaoFactory.getSessionDetailDao().insertSessionDao(sessionDetail);
            }
        });*/   

 4、使用repartition进行调整并行度

  4.1 为什么要使用repartition

    spark.default.parallelism设置的并行度只能对没有Spark SQL(DataFrame)的阶段有用,对Spark SQL的并行度是无法设置的,该并行度是通过hdfs文件所在的block块决定的。

    可以通过repartition调整之后的并行度

  4.2 如何使用 

sqlContext.sql("select * from user_visit_action where date >= ‘" + startDate + "‘ and date <= ‘" + endDate + "").javaRDD()
    //使用repartition调整并行度
    .repartition(100)

 5、使用reduceByKey进行本地聚合

  5.1 reduceByKey有哪些优点

    reduceByKey相对于普通的shuffle操作(如groupByKey)的一个最大的优点,会进行map端的本地聚合,从而减少文件的输出,减少磁盘IO,网络传输,内存占比以及reduce端的聚合操作数据。

  5.2 使用场景

    只有是针对每个不同的key进行相应的操作都可以使用reduceByKey进行处理

以上是关于spark性能调优04-算子调优的主要内容,如果未能解决你的问题,请参考以下文章

Spark性能调优-RDD算子调优篇

Spark性能优化指南--基础篇

大数据之Spark:简述你知道的spark调优

转载 Spark性能优化指南——基础篇

[大数据性能调优] 第一章:性能调优的本质Spark资源使用原理和调优要点分析

大数据之Spark:Spark调优之RDD算子调优