Spark中groupByKey() 和 reduceByKey() 和combineByKey()

Posted YaoYong_BigData

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Spark中groupByKey() 和 reduceByKey() 和combineByKey()相关的知识,希望对你有一定的参考价值。

 一、groupByKey:

        在一个(K,V)的RDD上调用,返回一个(K, Iterator[V])的RDD,也是对每个key进行操作,但只生成一个sequence,groupByKey本身不能自定义函数,需要先用groupByKey生成RDD,然后才能对此RDD通过map进行自定义函数操作。 

作用:

  • GroupByKey 算子的主要作用是按照 Key 分组, 和 ReduceByKey 有点类似, 但是 GroupByKey 并不求聚合, 只是列举 Key 对应的所有 Value。

注意点:

  • GroupByKey 是一个 Shuffled。

  • GroupByKey 和 ReduceByKey 不同, 因为需要列举 Key 对应的所有数据, 所以无法在 Map 端做 Combine, 所以 GroupByKey 的性能并没有 ReduceByKey 好。

二、reduceByKey:

        是对key的value进行merge操作,在一个(K,V)的RDD上调用,返回一个(K,V)的RDD,使用指定的reduce函数,将相同key的值聚合到一起,与groupByKey类似,reduce任务的个数可以通过第二个可选的参数来设置,最重要的是它能够在本地先进行merge操作,并且merge操作可以通过函数自定义。

作用:

  • 首先按照 Key 分组生成一个 Tuple, 然后针对每个组执行 reduce 算子。

调用:

def reduceByKey(func: (V, V) ⇒ V): RDD[(K, V)]

参数:

  • func → 执行数据处理的函数, 传入两个参数, 一个是当前值, 一个是局部汇总, 这个函数需要有一个输出, 输出就是这个 Key 的汇总结果。

注意点:

  • ReduceByKey 只能作用于 Key-Value 型数据, Key-Value 型数据在当前语境中特指 Tuple2。

  • ReduceByKey 是一个需要 Shuffled 的操作。

  • 和其它的 Shuffled 相比, ReduceByKey是高效的, 因为类似 MapReduce 的, 在 Map 端有一个 Cominer, 这样 I/O 的数据便会减少。

三、combineByKey()

        spark根据key做聚合的aggregateByKey,groupByKey,reduceByKey等常用算子其底层本质都是调用combineByKey实现的,即都是combineByKey的一种特殊情况,下面介绍下combineByKey算子:

源码有两种方式:

 /**
    * 
    * @param createCombiner
    * @param mergeValue
    * @param mergeCombiners
    * @tparam C
    * @return
    */
  def combineByKey[C](
                       createCombiner: V => C,
                       mergeValue: (C, V) => C,
                       mergeCombiners: (C, C) => C): RDD[(K, C)] = self.withScope 
    combineByKeyWithClassTag(createCombiner, mergeValue, mergeCombiners)(null)
/**
    * 
    * @param createCombiner
    * @param mergeValue
    * @param mergeCombiners
    * @param partitioner
    * @param mapSideCombine
    * @param serializer
    * @tparam C
    * @return
    */
  def combineByKey[C](
                       createCombiner: V => C,
                       mergeValue: (C, V) => C,
                       mergeCombiners: (C, C) => C,
                       partitioner: Partitioner,
                       mapSideCombine: Boolean = true,
                       serializer: Serializer = null): RDD[(K, C)] = self.withScope 
    combineByKeyWithClassTag(createCombiner, mergeValue, mergeCombiners,
      partitioner, mapSideCombine, serializer)(null)
  
 
  

 参数说明:

1、createCombiner:V => C ,分区内创建组合函数。这个函数把当前的值作为参数,此时我们可以对其做些附加操作(类型转换)并把它返回 (这一步类似于初始化操作)。

         说明:同一分区内,同一个key,只有第一次出现的value才会运行此函数。

2、mergeValue: (C, V) => C,分区内合并值函数。该函数把元素V合并到之前的元素C(createCombiner)上 (这个操作在每个分区内进行)。

        说明:同一分区内,同一个key,非第一次出现value运行此函数,其中C为上次的结果,V为本次值。

3、mergeCombiners: (C, C) => C,多分区合并组合器函数。该函数把2个元素C合并。

        说明:这个操作在不同分区间进行。

4、partitioner:自定义分区数,默认为HashPartitioner。

5、mapSideCombine:是否在map端进行Combine操作,默认为true。

过一下图,逻辑应该就清晰了:

补充一个更详细的图:

 

四、实例

  /**
   * ------输出结果------
   * (tim,1)
   * (lucy,1)
   * (Hello,3)
   * (lily,1)
   */
  @Test
  def reduceByKeyTest():Unit = 
    // 1. 创建 RDD
    val rdd1 = sc.parallelize(Seq("Hello lily", "Hello lucy", "Hello tim"))
    // 2. 处理数据
    val rdd2 = rdd1.flatMap(_.split(" ")).map((_, 1)).reduceByKey(_ + _)
    // 3. 得到结果
    val result = rdd2.collect()
    result.foreach(println(_))
    // 4. 关闭sc
    sc.stop()
  


  /**
   * ------输出结果------
   * (a,CompactBuffer(1, 1))
   * (b,CompactBuffer(1))
   */
  @Test
  def groupByKey(): Unit = 
    val rdd: RDD[(String, Int)] = sc.parallelize(Seq(("a", 1), ("a", 1), ("b", 1)))
    val rdd1: RDD[(String, Iterable[Int])] = rdd.groupByKey()
    val rdd2: Array[(String, Iterable[Int])] = rdd1.collect()
    rdd2.foreach(println(_))
  


  /**
   * ------输出结果------
   * (zhangsan,97.33333333333333)
   * (lisi,97.5)
   * ---求每个学生的平均成绩
   */
  @Test
  def combineByKey(): Unit = 
    // 1. 准备集合
    val rdd: RDD[(String, Double)] = sc.parallelize(Seq(
      ("zhangsan", 99.0),
      ("zhangsan", 96.0),
      ("lisi", 97.0),
      ("lisi", 98.0),
      ("zhangsan", 97.0))
    )

    // 2. 算子操作
    //   2.1. createCombiner 转换数据
    //   2.2. mergeValue 分区上的聚合
    //   2.3. mergeCombiners 把所有分区上的结果再次聚合, 生成最终结果
    val combineResult = rdd.combineByKey(
      createCombiner = (curr: Double) => (curr, 1),
      mergeValue = (curr: (Double, Int), nextValue: Double) => (curr._1 + nextValue, curr._2 + 1),
      mergeCombiners = (curr: (Double, Int), agg: (Double, Int)) => (curr._1 + agg._1, curr._2 + agg._2)
    )

    /**
     * ------输出结果------
     * (zhangsan,(292.0,3))
     * (lisi,(195.0,2))
     */
    //val resultRDD = combineResult
    val resultRDD = combineResult.map( item => (item._1, item._2._1 / item._2._2) )

    // 3. 获取结果, 打印结果
    resultRDD.collect().foreach(println(_))
  

以上是关于Spark中groupByKey() 和 reduceByKey() 和combineByKey()的主要内容,如果未能解决你的问题,请参考以下文章

Spark中groupByKey() 和 reduceByKey() 和combineByKey()

Spark中groupByKey() 和 reduceByKey() 和combineByKey()

Spark中groupByKey() 和 reduceByKey() 和combineByKey()

Spark笔记004-reduceByKey和groupBykey

Spark中groupBy groupByKey reduceByKey的区别

Spark 中 RDD 算子 ReduceByKey 和 GroupByKey 使用方法和区别