Spark 开发调优

Posted 张章章Sam

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Spark 开发调优相关的知识,希望对你有一定的参考价值。

Spark性能优化 - 开发调优

优化一 避免创建重复的RDD

通常来说,我们在开发一个Spark作业时,首先是基于某个数据源(比如Hive表或HDFS文件)创建一个初始的RDD;接着对这个RDD执行某个算子操作,然后得到下一个RDD;以此类推,循环往复,直到计算出最终我们需要的结果。在这个过程中,多个RDD会通过不同的算子操作(比如map、reduce等)串起来,这个“RDD串”,就是RDD lineage,也就是“RDD的血缘关系链”。

我们在开发过程中要注意:对于同一份数据,只应该创建一个RDD,不能创建多个RDD来代表同一份数据。

一些Spark初学者在刚开始开发Spark作业时,或者是有经验的工程师在开发RDD lineage极其冗长的Spark作业时,可能会忘了自己之前对于某一份数据已经创建过一个RDD了,从而导致对于同一份数据,创建了多个RDD。这就意味着,我们的Spark作业会进行多次重复计算来创建多个代表相同数据的RDD,进而增加了作业的性能开销。

优化二:尽可能复用同一个RDD

1、我们除了要避免在开发过程中对一份完全相同的数据创建多个RDD之外,在对不同的数据执行算子操作时还要尽可能地复用一个RDD。

2、比如说,有一个RDD的数据格式是key-value类型的,另一个是单value类型的,这两个RDD的value数据是完全一样的。那么此时我们可以只使用key-value类型的那个RDD,因为其中已经包含了另一个的数据。对于类似这种多个RDD的数据有重叠或者包含的情况,我们应该尽量复用一个RDD,这样可以尽可能地减少RDD的数量,从而尽可能减少算子执行的次数。

优化三:对多次使用的RDD进行持久化

1、当我们在Spark代码中多次对一个RDD做了算子操作后,恭喜,你已经实现Spark作业第一步的优化了,也就是尽可能复用RDD。此时就该在这个基础之上,进行第二步优化了,也就是要保证对一个RDD执行多次算子操作时,这个RDD本身仅仅被计算一次。

2、Spark中对于一个RDD执行多次算子的默认原理是这样的:每次你对一个RDD执行一个算子操作时,都会重新从源头处计算一遍,计算出那个RDD来,然后再对这个RDD执行你的算子操作。这种方式的性能是很差的。

3、因此对于这种情况,我们的建议是:对多次使用的RDD进行持久化。此时Spark就会根据你的持久化策略,将RDD中的数据保存到内存或者磁盘中。以后每次对这个RDD进行算子操作时,都会直接从内存或磁盘中提取持久化的RDD数据,然后执行算子,而不会从源头处重新计算一遍这个RDD,再执行算子操作。

代码案例和使用场景分析

技术分享

如何选择一种最合适的持久化策略

1、默认情况下,性能最高的当然是MEMORY_ONLY,但前提是你的内存必须足够足够大,可以绰绰有余地存放下整个RDD的所有数据。因为不进行序列化与反序列化操作,就避免了这部分的性能开销;对这个RDD的后续算子操作,都是基于纯内存中的数据的操作,不需要从磁盘文件中读取数据,性能也很高;而且不需要复制一份数据副本,并远程传送到其他节点上。但是这里必须要注意的是,在实际的生产环境中,恐怕能够直接用这种策略的场景还是有限的,如果RDD中数据比较多时(比如几十亿),直接用这种持久化级别,会导致JVM的OOM内存溢出异常。

2、如果使用MEMORY_ONLY级别时发生了内存溢出,那么建议尝试使用MEMORY_ONLY_SER级别。该级别会将RDD数据序列化后再保存在内存中,此时每个partition仅仅是一个字节数组而已,大大减少了对象数量,并降低了内存占用。这种级别比MEMORY_ONLY多出来的性能开销,主要就是序列化与反序列化的开销。但是后续算子可以基于纯内存进行操作,因此性能总体还是比较高的。此外,可能发生的问题同上,如果RDD中的数据量过多的话,还是可能会导致OOM内存溢出的异常。

3、如果纯内存的级别都无法使用,那么建议使用MEMORY_AND_DISK_SER策略,而不是MEMORY_AND_DISK策略。因为既然到了这一步,就说明RDD的数据量很大,内存无法完全放下。序列化后的数据比较少,可以节省内存和磁盘的空间开销。同时该策略会优先尽量尝试将数据缓存在内存中,内存缓存不下才会写入磁盘。

4、通常不建议使用DISK_ONLY和后缀为_2的级别:因为完全基于磁盘文件进行数据的读写,会导致性能急剧降低,有时还不如重新计算一次所有RDD。后缀为_2的级别,必须将所有数据都复制一份副本,并发送到其他节点上,数据复制以及网络传输会导致较大的性能开销,除非是要求作业的高可用性,否则不建议使用。

优化四:尽量避免使用shuffle类算子

1、如果有可能的话,要尽量避免使用shuffle类算子。因为Spark作业运行过程中,最消耗性能的地方就是shuffle过程。shuffle过程,简单来说,就是将分布在集群中多个节点上的同一个key,拉取到同一个节点上,进行聚合或join等操作。比如reduceByKey、join等算子,都会触发shuffle操作。

2、shuffle过程中,各个节点上的相同key都会先写入本地磁盘文件中,然后其他节点需要通过网络传输拉取各个节点上的磁盘文件中的相同key。而且相同key都拉取到同一个节点进行聚合操作时,还有可能会因为一个节点上处理的key过多,导致内存不够存放,进而溢写到磁盘文件中。因此在shuffle过程中,可能会发生大量的磁盘文件读写的IO操作,以及数据的网络传输操作。磁盘IO和网络数据传输也是shuffle性能较差的主要原因。

3、因此在我们的开发过程中,能避免则尽可能避免使用reduceByKey、join、distinct、repartition等会进行shuffle的算子,尽量使用map类的非shuffle算子。这样的话,没有shuffle操作或者仅有较少shuffle操作的Spark作业,可以大大减少性能开销。

优化五:使用map-side预聚合的shuffle操作

如果因为业务需要,一定要使用shuffle操作,无法用map类的算子来替代,那么尽量使用可以map-side预聚合的算子。

1、所谓的map-side预聚合,说的是在每个节点本地对相同的key进行一次聚合操作,类似于MapReduce中的本地combiner。map-side预聚合之后,每个节点本地就只会有一条相同的key,因为多条相同的key都被聚合起来了。其他节点在拉取所有节点上的相同key时,就会大大减少需要拉取的数据数量,从而也就减少了磁盘IO以及网络传输开销。通常来说,在可能的情况下,建议使用reduceByKey或者aggregateByKey算子来替代掉groupByKey算子。因为reduceByKey和aggregateByKey算子都会使用用户自定义的函数对每个节点本地的相同key进行预聚合。而groupByKey算子是不会进行预聚合的,全量的数据会在集群的各个节点之间分发和传输,性能相对来说比较差。

2、比如如下两幅图,就是典型的例子,分别基于reduceByKey和groupByKey进行单词计数。其中第一张图是groupByKey的原理图,可以看到,没有进行任何本地聚合时,所有数据都会在集群节点之间传输;第二张图是reduceByKey的原理图,可以看到,每个节点本地的相同key数据,都进行了预聚合,然后才传输到其他节点上进行全局聚合。

Java版本

public class AggregateOps {
public static void main(String[] args) {
    SparkConf conf = new SparkConf().setAppName(AggregateOps.class.getSimpleName()).setMaster("local[10]");
    JavaSparkContext sc = new JavaSparkContext(conf);
    List<String> list = Arrays.asList("hello you", "hello me", "you love me", "me love","hello you");
    JavaRDD<String> linesRDD = sc.parallelize(list,10);
    JavaRDD<String> wordsRDD = linesRDD.flatMap(new FlatMapFunction<String, String>() {
        @Override
        public Iterable<String> call(String s) throws Exception {
            return Arrays.asList(s.split(" "));
        }
    });
    JavaPairRDD<String, Integer> wordRDD = wordsRDD.mapToPair(new PairFunction<String, String, Integer>() {
        @Override
        public Tuple2<String, Integer> call(String s) throws Exception {
            return new Tuple2<String, Integer>(s, 1);
        }
    });
    JavaPairRDD<String, Integer> aggregateByKeyRDD = wordRDD.aggregateByKey(0, new Function2<Integer, Integer, Integer>() {
        @Override
        public Integer call(Integer integer, Integer integer2) throws Exception {
            System.out.println("seq : " + integer + "---" + integer2);
            return Math.max(integer, integer2);
        }
    }, new Function2<Integer, Integer, Integer>() {
        @Override
        public Integer call(Integer integer, Integer integer2) throws Exception {
            System.out.println("comb : " + integer + "---" + integer2);
            return integer + integer2;
        }
    });
    aggregateByKeyRDD.foreach(new VoidFunction<Tuple2<String, Integer>>() {
        @Override
        public void call(Tuple2<String, Integer> tuple2) throws Exception {
            System.out.println(tuple2._1()+" ---- "+tuple2._2());
        }
    });
    sc.close();

}
}

Scala版本

    object MapSideShuffle {
    def main(args: Array[String]): Unit = {
      sideMap()
    }
    def nosideMap(): Unit ={
    val conf = new SparkConf().setAppName("MapSideShuffle").setMaster("local[2]")
    val sc = new SparkContext(conf)
     val linesRDD: RDD[String] = sc.textFile("E:/test/word.txt")
    val wordsRDD: RDD[String] = linesRDD.flatMap(_.split(" "))
    val wordRDD: RDD[(String, Int)] = wordsRDD.map((_,1))
    val noSideMap: RDD[(String, Int)] = wordRDD.reduceByKey(_+_)
    noSideMap.foreach(println(_))
    }
    def sideMap(): Unit ={
    val conf = new SparkConf().setAppName("MapSideShuffle").setMaster("local[4]")
    val sc = new SparkContext(conf)
    val linesRDD: RDD[String] = sc.textFile("E:/test/word.txt",4)
    val wordsRDD: RDD[String] = linesRDD.flatMap(_.split(" "))
    val wordRDD: RDD[(String, Int)] = wordsRDD.map((_,1))
    val ll=wordRDD.aggregateByKey(0)(seq,comd).collect()
    ll.foreach(println(_))
    }
    //a 是zerovalue aggregateByKey 的初始化传入值,seq函数返回最大值,这个可以自己按需求设计
    //因为这个返回的是这个K的默认值了,comd阶段,进行的操作是 根据key值相同的,进行相应的逻辑运算
    def seq (a : Int,b :Int): Int ={
    println("seq :" + a +"   "+b)
    Math.max(a,b)
     }
     def comd(a :Int,b :Int): Int ={
    println("comie :" + a +"   "+b)
    a + b
      }
    }

优化六:使用高性能的算子操作

除了shuffle相关的算子有优化原则之外,其他的算子也都有着相应的优化原则。
1、使用reduceByKey/aggregateByKey替代groupByKey
2、使用mapPartitions替代普通map
mapPartitions类的算子,一次函数调用会处理一个partition所有的数据,而不是一次函数调用处理一条,性能相对来说会高一些。但是有的时候,使用mapPartitions会出现OOM(内存溢出)的问题。因为单次函数调用就要处理掉一个partition所有的数据,如果内存不够,垃圾回收时是无法回收掉太多对象的,很可能出现OOM异常。所以使用这类操作时要慎重!
3、使用foreachPartitions替代foreach
原理类似于“使用mapPartitions替代map”,也是一次函数调用处理一个partition的所有数据,而不是一次函数调用处理一条数据。在实践中发现,foreachPartitions类的算子,对性能的提升还是很有帮助的。比如在foreach函数中,将RDD中所有数据写mysql,那么如果是普通的foreach算子,就会一条数据一条数据地写,每次函数调用可能就会创建一个数据库连接,此时就势必会频繁地创建和销毁数据库连接,性能是非常低下;但是如果用foreachPartitions算子一次性处理一个partition的数据,那么对于每个partition,只要创建一个数据库连接即可,然后执行批量插入操作,此时性能是比较高的。实践中发现,对于1万条左右的数据量写MySQL,性能可以提升30%以上。

Scala

    object MapPartitionsToPair {
    def main(args: Array[String]): Unit = {
    val conf = new SparkConf().setAppName("MapPartitionsToPair").setMaster("local")
    val sc = new SparkContext(conf)
    val file: RDD[String] = sc.textFile("E:/test/word.txt",3)
    val linesRDD = file.flatMap(_.split(" "))
    linesRDD.mapPartitions(x => {
      var list: List[Tuple2[String, Int]] =  List[Tuple2[String, Int]]()
      for(word <- x){
        list.+:=((word,1))
      }
      var tuples: Iterator[Tuple2[String, Int]] = list.toIterator
      tuples
    }).reduceByKey(_+_).foreachPartition(line =>{
      while (line.hasNext){
        var next: (String, Int) = line.next()
        println(next._1+"-----"+next._2)
      }
    })
 }
}

Java

public class MapPartitionsToPair {
public static void main(String[] args) {
    SparkConf conf = new SparkConf().setAppName(MapPartitionsToPair.class.getSimpleName()).setMaster("local[2]");
    JavaSparkContext sc = new JavaSparkContext(conf);
    JavaRDD<String> linesRDD = sc.textFile("E:/test/word.txt", 4);
    JavaRDD<String> wordRDD = linesRDD.flatMap(line -> Arrays.asList(line.split(" ")));
    wordRDD.mapPartitionsToPair(word -> {
        Set<Tuple2<String,Integer>> set = new HashSet<>();
        while (word.hasNext()){
            String next = word.next();
            set.add(new Tuple2<String,Integer>(next,1));
        }
        return set;
    }).reduceByKey((v1,v2)-> (v1+v2))
            .foreachPartition(x -> {
                System.out.println("---------------------------------");
                while (x.hasNext()){
                    Tuple2<String, Integer> tuple2 = x.next();
                    System.out.println(tuple2._1()+"    "+tuple2._2());
                }
            });
    }
}

4、使用filter之后进行coalesce操作

通常对一个RDD执行filter算子过滤掉RDD中较多数据后(比如30%以上的数据),建议使用coalesce算子,手动减少RDD的partition数量,将RDD中的数据压缩到更少的partition中去。因为filter之后,RDD的每个partition中都会有很多数据被过滤掉,此时如果照常进行后续的计算,其实每个task处理的partition中的数据量并不是很多,有一点资源浪费,而且此时处理的task越多,可能速度反而越慢。因此用coalesce减少partition数量,将RDD中的数据压缩到更少的partition之后,只要使用更少的task即可处理完所有的partition。在某些场景下,对于性能的提升会有一定的帮助。

Java

public class PartitionsOps {
public static void main(String[] args) {
    SparkConf conf = new SparkConf().setAppName(PartitionsOps.class.getSimpleName()).setMaster("local[2]");
    JavaSparkContext sc = new JavaSparkContext(conf);
    JavaRDD<String> linesRDD = sc.textFile("E:/test/word.txt",2);
    JavaRDD<String> wordsRDD = linesRDD.flatMap(x -> Arrays.asList(x.split(" ")));
    JavaPairRDD<String, Integer> wordRDD = wordsRDD.mapPartitionsToPair(line -> {
        Set<Tuple2<String, Integer>> set = new HashSet<Tuple2<String, Integer>>();
        while (line.hasNext()) {
            String s = line.next();
            set.add(new Tuple2<String, Integer>(s.trim(), 1));
        }
        return set;
    });
    JavaPairRDD<String, Integer> hadoopRDD = wordRDD.filter(new Function<Tuple2<String, Integer>, Boolean>() {
        @Override
        public Boolean call(Tuple2<String, Integer> tuple2) throws Exception {
            return tuple2._1().equalsIgnoreCase("shi");
        }
    });
    System.out.println(hadoopRDD.getNumPartitions());
    hadoopRDD = hadoopRDD.coalesce(1);
    System.out.println(hadoopRDD.getNumPartitions());
    hadoopRDD.reduceByKey((v1,v2) -> v1+v2).foreach(t -> System.out.println(t._1() +"  "+t._2()));
    sc.close();

}
}

Scala

object PartitionsOps {
def main(args: Array[String]): Unit = {
val conf = new SparkConf().setAppName("PartitionsOps").setMaster("local[2]")
val sc = new SparkContext(conf)
var linesRDD: RDD[String] = sc.textFile("E:/test/word.txt",2)
var hadoop = linesRDD.flatMap(_.split(" ")).mapPartitions(word =>{
  var list:List[Tuple2[String,Int]] = List[Tuple2[String,Int]]()
  for (x <- word){
    list.+:=(new Tuple2[String,Int](x,1))
  }
  var tuples: Iterator[(String, Int)] = list.toIterator
  tuples
}).filter(_._1.equalsIgnoreCase("shi"))
println(hadoop.getNumPartitions)
hadoop = hadoop.coalesce(1)
println(hadoop.getNumPartitions)
hadoop.reduceByKey(_+_).foreach(x =>println(x._1+"  "+x._2))
 }
}

5、使用repartitionAndSortWithinPartitions替代repartition与sort类操作

repartitionAndSortWithinPartitions是Spark官网推荐的一个算子,官方建议,如果需要在repartition重分区之后,还要进行排序,建议直接使用repartitionAndSortWithinPartitions算子。因为该算子可以一边进行重分区的shuffle操作,一边进行排序。shuffle与sort两个操作同时进行,比先shuffle再sort来说,性能可能是要高的。






以上是关于Spark 开发调优的主要内容,如果未能解决你的问题,请参考以下文章

Spark任务性能调优总结

Spark学习之路 SparkCore的调优之Shuffle调优

spark性能优化:shuffle调优

Spark学习之路 SparkCore的调优之开发调优[转]

Spark 开发调优

spark记录SparkCore的调优之开发调优