Spark追妻系列(Pair RDD下集)

Posted 数仓白菜白

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Spark追妻系列(Pair RDD下集)相关的知识,希望对你有一定的参考价值。

忙忙碌碌一天,啥都没干

小谈:

        今天啥也没干,不知不觉已经是大年初五了,再过五六七八天就要开始考科目四了,希望早早拿到驾照

combineByKey

 

先解释一个每个参数的意义

  1. createCombiner:分组内的创建组合的函数,通俗点就是将都进来的数据进行初始化,把当前的值作为参数,可以进行一些转换操作.分区内每种key调用一次
  2. mergeValue:分区内的合并函数,作用在每一个分区,也就是将分组后的Value合并到之前的转换后的C之后。将createCombiner结果与相同的key对应的值最累加
  3. mergeCombiners:该函数主要将多分区合并,各个分区相同的Key对应的结果做聚合

        combineByKey()会遍历分区中的所有元素,因此每个元素要么之前遇到过,要么没有遇到过,如果是一个新的元素(指的是这个key),那么就会使用createCombiner来创建那个键对应的累加器的初始值,只会每个键第一次出现的时候发生,并不是出现一个就发生一个

        如果是之前已经出现的key,那么就会使用mergeValue来将该键对应的累加器的当前值与这个新的值进行合并

        只讲参数意义不给代码实战就是耍流氓。

        下面通过一个案例来实战一个这个函数

        有一个源源梓同学,求她的成绩平均数。三门科目分数97,96,95.求她的平均成绩,三门科目,总分288分。平均分数为96分。

        来看代码

val wordCount = new SparkConf().setMaster("local").setAppName("WordCount") 
val sparkContext = new SparkContext(wordCount) 
val value1 = sparkContext.makeRDD(
List(("源源梓", 97), ("源源梓", 96), ("源源梓", 95)),2) 
val value = value1.combineByKey( 
//将成绩转换成 成绩 ==》 
(成绩,1) score => (score, 1), 
//分区内的规则 (成绩,1) =》 (成绩+成绩,1+1) 
(tuple1: (Int, Int), v) => (tuple1._1 + v, tuple1._2 + 1), 
//分区间的规则 成绩和成绩相加,次数和次数相加 
(tuple2: (Int, Int), tuple3: (Int, Int)) => 
(tuple2._1 + tuple3._1, tuple2._2 + tuple3._2) ) 
val value2 = value.map  
case (name, (score, num)) => (name, score / num)  
value2.collect().foreach(println(_))

        可能光看代码看不懂,下面来一一讲解一下。

1..首先创建RDD

       

 val value1 = sparkContext.makeRDD(List(("源源梓", 97), ("源源梓", 96), ("源源梓", 95)),2)

2.调用combineByKey算子

val value = value1.combineByKey( 
//createCombiner,通过key分组后,97,96,95 
//平均分数=总分数 / 科目门数 
//先对每门科目进行转换 
score => (score, 1) 
//分区内的规则 (成绩,1) =》 (成绩+成绩,1+1) 
//(97,1) 分区2:(96 + 95,2) 
(tuple1: (Int, Int), v) => (tuple1._1 + v, tuple1._2 + 1), 
//分区间的规则 成绩和成绩相加,次数和次数相加 
(tuple2: (Int, Int), tuple3: (Int, Int)) => 
(tuple2._1 + tuple3._1, tuple2._2 + tuple3._2) )

3.计算平均数

    

    //value的返回值类型
[String,[Int,Int]] 
//String 就是 源源梓 第一个Int:总分 第二个Int:科目数 
val value2 = value.map 
 case (name, (score, num)) => (name, score / num) 

        看一下上面案例的类似图解

 

        上面这个图解可以更明了的来看出来这个算子的作用。

        刚开始分组后,对分组的数据进行转换

        在分区内对相同组内的数据进行相加,总数相加的同时个数也相加

        分区间对相同key的数据进行相加,(这里的数据就是在分区内计算出来的数据)

sortByKey

        在一个(k,v)的RDD上调用,K必须实现Ordered特质,返回一个按照key进行排序

        先看一下这个算子

def sortByKey(ascending: Boolean = true, 
numPartitions: Int = self.partitions.length) : RDD[(K, V)] 
= self.withScope  val part = new RangePartitioner(numPartitions, self, ascending) 
new ShuffledRDD[K, V, V](self, part) .setKeyOrdering(if (ascending) ordering else ordering.reverse) 

        在使用这个算子的时候一定会发生shuffle,因为底层继承了new ShuffledRDD。当然拉,可以根据第二个参数来选择是升序还是降序排列。

        通过代码来认识一下

val wordCount = new SparkConf().setMaster("local").setAppName("WordCount") 
val sparkContext = new SparkContext(wordCount) 
val value1 = sparkContext.makeRDD(List((a, 97), (b, 96), (c, 95)),2) value1.sortByKey().collect().foreach(println(_))

        默认是升序的,那么结果就是

        (a,97) (b,96) (c,95)

MapValues

        有时候,我们只想访问Pair RDD的值部分,这时操作二元组很麻烦,这个时候使用Spark提供的mapValues(func)函数。功能类似于mapcase (x,y) : (x,func(y))

总结:

        今天摆烂了,感觉写的好少好少。明天一定好好学习,好好输出文章,积攒这么多的文笔,得要好好发挥发挥。

        明天将会输出 TopN案例和动作算子。如果可以还会讲RangePartitioner。

        还差四五百阅读量就可以破两万了,虽然这篇博客挺差劲的,希望明天醒来可以看到阅读量破两万。

        如果破两万,一定好好写博客。摆烂太难受了

以上是关于Spark追妻系列(Pair RDD下集)的主要内容,如果未能解决你的问题,请参考以下文章

Spark追妻系列(Spark初了解)

spark的Pair RDD的转化操作

spark中的pair rdd,看这一篇就够了

6.Pair RDD操作

spark pairrdd怎么根据value的值排序

spark系列-2Spark 核心数据结构:弹性分布式数据集 RDD