再识spark

Posted ruanjianwei

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了再识spark相关的知识,希望对你有一定的参考价值。

一.示例

1.统计PV和UV

1.1统计PV

 val conf = new SparkConf()
   conf.setMaster("local").setAppName("pvuv")
   val sc = new SparkContext(conf)
   val lineRDD = sc.textFile("./pvuv.txt")
?
   lineRDD.map(x=>{
     val sp=x.split("\s")
    (sp(5),1)
  }).reduceByKey(_+_).foreach(println)

1.2统计UV

lineRDD.map(x=>{
    val sp=x.split("\s")
    (sp(5),sp(0))
  }).distinct().countByKey().foreach(println)

2.二次排序

?
SparkConf sparkConf = new SparkConf()
.setMaster("local")
.setAppName("SecondarySortTest");
final JavaSparkContext sc = new JavaSparkContext(sparkConf);
?
JavaRDD<String> secondRDD = sc.textFile("secondSort.txt");
?
JavaPairRDD<SecondSortKey, String> pairSecondRDD = secondRDD.mapToPair(new PairFunction<String, SecondSortKey, String>() {
?
/**
*
*/
private static final long serialVersionUID = 1L;
?
@Override
public Tuple2<SecondSortKey, String> call(String line) throws Exception {
          String[] splited = line.split(" ");
          int first = Integer.valueOf(splited[0]);
          int second = Integer.valueOf(splited[1]);
          SecondSortKey secondSortKey = new SecondSortKey(first,second);
          return new Tuple2<SecondSortKey, String>(secondSortKey,line);
}
});
?
pairSecondRDD.sortByKey(false).foreach(new  
              VoidFunction<Tuple2<SecondSortKey,String>>() {

/**
*
*/
private static final long serialVersionUID = 1L;
?
@Override
public void call(Tuple2<SecondSortKey, String> tuple) throws Exception {
            System.out.println(tuple._2);
}
});
?
public class SecondSortKey  implements Serializable,Comparable<SecondSortKey>{
/**
*
*/
private static final long serialVersionUID = 1L;
private int first;
private int second;
public int getFirst() {
return first;
}
public void setFirst(int first) {
this.first = first;
}
public int getSecond() {
return second;
}
public void setSecond(int second) {
this.second = second;
}
public SecondSortKey(int first, int second) {
super();
this.first = first;
this.second = second;
}
@Override
public int compareTo(SecondSortKey o1) {
if(getFirst() - o1.getFirst() ==0 ){
return getSecond() - o1.getSecond();
}else{
return getFirst() - o1.getFirst();
}
}
}
?

3.分组取topN

SparkConf conf = new SparkConf()
.setMaster("local")
.setAppName("TopOps");
JavaSparkContext sc = new JavaSparkContext(conf);
JavaRDD<String> linesRDD = sc.textFile("scores.txt");
?
JavaPairRDD<String, Integer> pairRDD = linesRDD.mapToPair(new PairFunction<String, String, Integer>() {
?
/**
*
*/
private static final long serialVersionUID = 1L;
?
@Override
public Tuple2<String, Integer> call(String str) throws Exception {
String[] splited = str.split(" ");
String clazzName = splited[0];
Integer score = Integer.valueOf(splited[1]);
return new Tuple2<String, Integer> (clazzName,score);
      }
});
?
pairRDD.groupByKey().foreach(new
           VoidFunction<Tuple2<String,Iterable<Integer>>>() {
?
   /**
    *
    */
   private static final long serialVersionUID = 1L;
?
   @Override
   public void call(Tuple2<String, Iterable<Integer>> tuple) throws Exception {
String clazzName = tuple._1;
Iterator<Integer> iterator = tuple._2.iterator();

Integer[] top3 = new Integer[3];

while (iterator.hasNext()) {
        Integer score = iterator.next();
?
          for (int i = 0; i < top3.length; i++) {
    if(top3[i] == null){
               top3[i] = score;
               break;
    }else if(score > top3[i]){
                for (int j = 2; j > i; j--) {
           top3[j] = top3[j-1];
                }
               top3[i] = score;
               break;
    }
      }
}
System.out.println("class Name:"+clazzName);
for(Integer sscore : top3){
     System.out.println(sscore);
}
}
});

一.广播变量和累加器

1.广播变量

  • 广播变量理解图

    • 技术图片

  • 源码

    • val conf = new SparkConf()
      conf.setMaster("local").setAppName("brocast")
      val sc = new SparkContext(conf)
      val list = List("hello xasxt")
      val broadCast = sc.broadcast(list)
      val lineRDD = sc.textFile("./words.txt")
      lineRDD.filter { x => broadCast.value.contains(x) }.foreach { println}
      sc.stop()
  • 注意

    • 广播变量只能在driver端定义,不能Executor端定义

    • 只能在driver端修改变量的值

2.累加器

  • 累加器理解图

    • 技术图片

  • 源码

    • val conf = new SparkConf()
      conf.setMaster("local").setAppName("accumulator")
      val sc = new SparkContext(conf)
      val accumulator = sc.longAccumulator
      sc.textFile("./words.txt").foreach { x =>{accumulator.add(1)}}
      println(accumulator.value)
      sc.stop()

 

二.调度源码分析

 

三.SparkShuffle

1.shuffle概念

  • reduceByKey会将上一个RDD中的每一个key对应的所有value聚合成一个value,然后生成一个新的RDD,元素类型是<key,value>,每一个可以对应一个聚合起来的value

  • 由于聚合之key对应的value有可能在不同的partition上,name该如何聚合??

    • shuffle write:上一个stage的每个map task就必须保证将自己处理的当前分区的数据相同的key写入一个分区文件中,可能会写入多个不同的分区文件中。

    • shuffle read:reduce task就会从上一个stage的所有task所在的机器上寻找属于己的那些分区文件,这样就可以保证每一个key所对应的value都会汇聚到同一个节点上去处理和聚合

  • 常见的shuffle有两种类型:HaskShuffle和SortShuffle

2.HashShuffle

1.普通机制

示意图

技术图片

执行流程

每一个map task将不同结果写到不同的buffer中,每个buffer的大小为32K,buffer起到数据缓存的作用

每一个buffer文件最后对应一个磁盘小文件

reduce task来拉取对应的磁盘小文件

总结

map task的计算结果会根据分区器(默认是hashPartitioner)来决定写入到哪一个磁盘小文件中去,ReduceTask会 去map端拉取响应的磁盘小文件

产生磁盘小文件的个数

M(map task的个数)*R(reduce task的个数)

存在的问题

在shuffle write过程中会产生很多写磁盘小文件的对象

在shuffle read过程中会产生很多读取磁盘小文件的对象

在JVM堆内存中对象过多就会造成频繁的GC,若GC还无法解决运行所需要的内存的话,就会产生OOM问题

在数据传输中会有频繁的网络通信,出现通信故障的可能性大大增加,通信故障a) 导致的task失败,TaskScheduler不负责重试,由DAGScheduler负责重试Stage。

2.合并机制

示意图

技术图片

总结

产生磁盘小文件的个数:C(core的个数)*R(reduce的个数)

3.SortShuffle

1.普通机制

示意图

技术图片

执行流程

map task的计算结果会写入到一个内存数据结构里面,内存数据结构默认5M

在shuffle的时候还有一个定时器,不定期的取估算这个内存结构的大小,当内存结构中的数据超过5M,会申请更多的资源给内存数据结构

如果申请成功不会进行溢写,如果申请不成功,就会发生溢写磁盘

在溢写之前内存结构中的数据会进行排序分区

开始溢写磁盘,写磁盘是以batch的形式去写,一个batch是一万条数据

map task执行完成后,会将这些磁盘小文件合并成一个大的磁盘文件,同时生成一个索引文件

reduce task去map端拉取数据时,首先解析索引文件,根据索引文件再去拉取对应的数据

产生磁盘小文件的个数: 2*M(map task的个数)

2.bypass机制

示意图

技术图片

总结

bypass运行机制的触发条件: shuffle reduce task 的数量小于spark.shuffle.sort.bypassMergeThreshold的参数值。默认值是200

不需要进行map端的预聚合

产生的磁盘小文件为: 2*M(map task的个数)

4.shuffle文件寻址

1.主要对象

mapoutputtracker spark架构中的一个模块,是一个主从架构,管理磁盘小文件的地址

mapoutputtrackermaster是主队象,存在于driver中

mapoutputtrackerworker是从队象,存在于executor中

blockmanagerpark架构中的一个模块,是一个主从架构,块管理者

BlockManagerMaster,主对象,存在于Driver中。会在及群众有用到广播变量或缓存数据或删除缓存数据的时候,通知BlockManagerSlave传输或者删除数据。

BlockManagerworker,从对象,存在于executor中。BlockManagerworker与BlockManagerworker之间通信无论在Driver端的BlockManager还是在Excutor端的BlockManager都含有四个对象:

diskstore:负责磁盘的管理

memorystore:负责内存的管理

connectionmanager:负责连接其他的blockmanagerworker

blocktransferservice:负责数据的传输

2.shuffle寻址流程

寻址图

技术图片

寻址流程

当map task执行完成后,会将task的执行情况和磁盘小文件的地址封装到MpStatus对象中通过mapoutputtrackerworker对象向mapoutputtrackermaster汇报

在所有的map task执行完毕后,driver中就掌握所有的磁盘小文件的地址

在reduce task执行之前,会通过executor中mapoutputtrackerworker向driver端的mapoutputtrackermaster获取磁盘小文件的地址

获取到磁盘小文件的地址后,会通过blockmanager中的connectionmanager连接数据所在节点上的connectionmanager,然后通过blocktransferservice进行数据的传输

blocktransferservice默认启动5个task去节点拉取数据.默认情况下,5个task拉取数据量不能超过48M

5.内存管理

Spark执行应用程序时,Spark集群会启动Driver和Executor两种JVM进程,Driver负责创建SparkContext上下文,提交任务,task的分发等。Executor负责task的计算任务,并将结果返回给Driver。同时需要为需要持久化的RDD提供储存。Driver端的内存管理比较简单,这里所说的Spark内存管理针对Executor端的内存管理。

使用静态内存可以通过参数spark.memory.useLegacyMode 设置为true(默认为false)使用静态内存管理。

1.静态内存管理

中存储内存、执行内存和其他内存的大小在 Spark 应用程序运行期间均为固定的,但用户可以应用程序启动前进行配置。

技术图片

整个内存被分为三部分:

task计算占20%

shuffle聚合内存占20%,其中的10%预留,防止OOM

剩下60%中的10%预留,防止OOM问题,其中这60%中的90%中的80%用于存储RDD的缓存数据和广播变量,剩下的90%中的20%用于解压序列化数据

2.统一内存管理

与静态内存管理的区别在于储存内存和执行内存共享同一块空间,可以互相借用对方的空间。

技术图片

主要分四部分:

预留总内存的重的300M,用于JVM自身运行

总内存-300M的25%用于task计算(spark2.0以后是40%)

总内存-300M的75%中的50%用于shuffle聚合,剩下的用于存储RDD缓存数据和广播变量.这里的两部分可以相互动态借用

reduce 中OOM如何处理?

减少每次拉取的数据量

提高shuffle聚合的内存比例

提高Excutor的总内存

6.shuffle调优

SparkShuffle调优配置项如何使用?

在代码中,不推荐使用,硬编码。

new SparkConf().set(“spark.shuffle.file.buffer”,”64”)

在提交spark任务的时候,推荐使用。

spark-submit --conf spark.shuffle.file.buffer=64 –conf ….

在conf下的spark-default.conf配置文件中,不推荐,因为是写死后所有应用程序都要用。

部分调优参数

spark.shuffle.file.buffer
默认值:32k
参数说明:该参数用于设置shuffle write task的BufferedOutputStream的buffer缓冲大小。将数据写到磁盘文件之前,会先写入buffer缓冲中,待缓冲写满之后,才会溢写到磁盘。
调优建议:如果作业可用的内存资源较为充足的话,可以适当增加这个参数的大小(比如64k),从而减少shuffle write过程中溢写磁盘文件的次数,也就可以减少磁盘IO次数,进而提升性能。在实践中发现,合理调节该参数,性能会有1%~5%的提升。
?
spark.reducer.maxSizeInFlight
默认值:48m
参数说明:该参数用于设置shuffle read task的buffer缓冲大小,而这个buffer缓冲决定了每次能够拉取多少数据。
调优建议:如果作业可用的内存资源较为充足的话,可以适当增加这个参数的大小(比如96m),从而减少拉取数据的次数,也就可以减少网络传输的次数,进而提升性能。在实践中发现,合理调节该参数,性能会有1%~5%的提升。
?
spark.shuffle.io.maxRetries
默认值:3
参数说明:shuffle read task从shuffle write task所在节点拉取属于自己的数据时,如果因为网络异常导致拉取失败,是会自动进行重试的。该参数就代表了可以重试的最大次数。如果在指定次数之内拉取还是没有成功,就可能会导致作业执行失败。
调优建议:对于那些包含了特别耗时的shuffle操作的作业,建议增加重试最大次数(比如60次),以避免由于JVM的full gc或者网络不稳定等因素导致的数据拉取失败。在实践中发现,对于针对超大数据量(数十亿~上百亿)的shuffle过程,调节该参数可以大幅度提升稳定性。
shuffle file not find   taskScheduler不负责重试task,由DAGScheduler负责重试stage
?
spark.shuffle.io.retryWait
默认值:5s
参数说明:具体解释同上,该参数代表了每次重试拉取数据的等待间隔,默认是5s。
调优建议:建议加大间隔时长(比如60s),以增加shuffle操作的稳定性。
?
spark.shuffle.memoryFraction
默认值:0.2
参数说明:该参数代表了Executor内存中,分配给shuffle read task进行聚合操作的内存比例,默认是20%。
调优建议:在资源参数调优中讲解过这个参数。如果内存充足,而且很少使用持久化操作,建议调高这个比例,给shuffle read的聚合操作更多内存,以避免由于内存不足导致聚合过程中频繁读写磁盘。在实践中发现,合理调节该参数可以将性能提升10%左右。
?
spark.shuffle.manager
默认值:sort|hash
参数说明:该参数用于设置ShuffleManager的类型。Spark 1.5以后,有三个可选项:hash、sort和tungsten-sort。HashShuffleManager是Spark 1.2以前的默认选项,但是Spark 1.2以及之后的版本默认都是SortShuffleManager了。tungsten-sort与sort类似,但是使用了tungsten计划中的堆外内存管理机制,内存使用效率更高。
调优建议:由于SortShuffleManager默认会对数据进行排序,因此如果你的业务逻辑中需要该排序机制的话,则使用默认的SortShuffleManager就可以;而如果你的业务逻辑不需要对数据进行排序,那么建议参考后面的几个参数调优,通过bypass机制或优化的HashShuffleManager来避免排序操作,同时提供较好的磁盘读写性能。这里要注意的是,tungsten-sort要慎用,因为之前发现了一些相应的bug。
?
spark.shuffle.sort.bypassMergeThreshold
默认值:200
参数说明:当ShuffleManager为SortShuffleManager时,如果shuffle read task的数量小于这个阈值(默认是200),则shuffle write过程中不会进行排序操作,而是直接按照未经优化的HashShuffleManager的方式去写数据,但是最后会将每个task产生的所有临时磁盘文件都合并成一个文件,并会创建单独的索引文件。
调优建议:当你使用SortShuffleManager时,如果的确不需要排序操作,那么建议将这个参数调大一些,大于shuffle read task的数量。那么此时就会自动启用bypass机制,map-side就不会进行排序了,减少了排序的性能开销。但是这种方式下,依然会产生大量的磁盘文件,因此shuffle write性能有待提高。
?
spark.shuffle.consolidateFiles
默认值:false
参数说明:如果使用HashShuffleManager,该参数有效。如果设置为true,那么就会开启consolidate机制,会大幅度合并shuffle write的输出文件,对于shuffle read task数量特别多的情况下,这种方法可以极大地减少磁盘IO开销,提升性能。
调优建议:如果的确不需要SortShuffleManager的排序机制,那么除了使用bypass机制,还可以尝试将spark.shffle.manager参数手动指定为hash,使用HashShuffleManager,同时开启consolidate机制。在实践中尝试过,发现其性能比开启了bypass机制的SortShuffleManager要高出10%~30%。

以上是关于再识spark的主要内容,如果未能解决你的问题,请参考以下文章

python+spark程序代码片段

课程16Python再识函数

再识RPC-thrift

spark关于join后有重复列的问题(org.apache.spark.sql.AnalysisException: Reference '*' is ambiguous)(代码片段

再识angular

再识网络编程