Spark算子: combineByKey 简单解析及案列

Posted 大数据小知识

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Spark算子: combineByKey 简单解析及案列相关的知识,希望对你有一定的参考价值。

简述

   我们都晓得,gruopByKey、aggregateByKey、foldByKey..等算子 是用于处理Key,Value的Pair数据,底层都是调用 combineByKeyWithClassTag函数,在日常业务数据处理过程中,如果简单一点的逻辑计算 仅需 使用 Spark 封装好的算子就好了,,当涉及到复杂一点的统计规则时, 我们就不得不使用 combineByKey 算子来实现.

  • combineByKey 源码
def combineByKey[C](
  //传入 value值 V ,通过逻辑处理返回 C 值或新的类型
      createCombiner: V => C, 
      // 传入新的 V value,与 第一次 createCombiner计算后的值C进行 进一步逻辑操作 比如 排序 去重等
      //注意: 这两个值 V 与C 的操作逻辑是用同一个分区中 计算的
      mergeValue: (C, V) => C, 
      //这个参数 用于 不同不分区中,C的合并
      mergeCombiners: (C, C) => C): RDD[(K, C)] = self.withScope {
    combineByKeyWithClassTag(createCombiner, mergeValue, mergeCombiners)(null)
  }
  • 从源码中 combineByKey 可以按照我们的思路来进行 逻辑运算处理复杂的逻辑统计,同时可以优化我们的算子,

案例:

   我们现在有一个需求 统计 每一个月最高温度的两天;

  • 数据 :
原始数据:
2019-6-1 39
2019-5-21 33
2019-6-1 38
2019-6-2 31
2018-3-11 18
2018-4-23 22
1970-8-23 23
1970-8-8 32

结果数据: 去重+排序
  ((2018,3),List((11,18)))
  ((2019,5),List((21,33)))
  ((2018,4),List((23,22)))
  ((1970,8),List((8,32), (23,23)))
  ((2019,6),List((1,39), (2,31)))

1.解决思路 :

按照我们一般的处理逻辑,应该是先去重 获取一天中最高温度的数据,然后 按月分组,组内数据以温度 从高->低 排序 取topN;

//取年月日为key , 使用 reduceByKey 去重 
val reduced: RDD[((Int, Int, Int), Int)] = data2.map(x=>((x._1,x._2,x._3),x._4))
           .reduceByKey((x:Int,y:Int)=>if(y>x) y else x )
// 取年月为KEY 分组,并对value 排序 取topN
val maped: RDD[((Int, Int), (Int, Int))] = reduced.map(t2=>((t2._1._1,t2._1._2),(t2._1._3,t2._2)))
reduced.map(t2=>((t2._1._1,t2._1._2),(t2._1._3,t2._2)))
       .groupByKey()
       .mapValues(arr=>arr.toList.sorted.take(2)).foreach(println)
  • 缺点: 用了groupByKey  容易OOM,且 发生了两次shuffle
  • DAG图

2.解决思路 :

  • 使用combineByKey 来实现需求;

 implicit val topNOrdering = new Ordering[(Int, Int)] { //降序
      override def compare(x: (Int, Int), y: (Int, Int)= {
        y._2.compareTo(x._2)
      }
    }
   
   val kv: RDD[((Int, Int), (Int, Int))] = data2.map(t4 => ((t4._1, t4._2), (t4._3, t4._4)))

    val res: RDD[((Int, Int), Array[(Int, Int)])] = kv.combineByKey(
      //      createCombiner: V => C,
      //      mergeValue: (C, V) => C,
      //      mergeCombiners: (C, C) => C

      //第一条记录怎么放:使用一个数组 用于 放置 value值 数组大小与TopN一致
      (v1: (Int, Int)) => {
         //数组长度 最好动态获取
        val intToInt: Array[(Int, Int)] = new Array[(Int, Int)](10)
        intToInt(0) = v1
        intToInt
      },
      //第二条 newv 值, 用于与 oldv 比较 用于去重+排序:
      (oldv: Array[(Int, Int)], newv: (Int, Int)) => {
        //去重,排序
        var flg = 0 //  0,1,2 新进来的元素特征:  日 a)相同  1)温度大 2)温度小   日 b)不同

        for (i <- 0 until oldv.length) {

          if (oldv(i) == null) oldv(i) = (00)

          if (oldv(i)._1 == newv._1) {
            if (oldv(i)._2 < newv._2) {
              flg = 1
              oldv(i) = newv //新值 温度大于旧值 则替换
            } else {
              flg = 2
            }
          }
        }
        if (flg == 0) { //不是同一天的数据,则 数组最后一位索引赋值 新值,再 排序 (则最后一位值肯定是当前中最小的值)
          oldv(oldv.length - 1) = newv
        }
        
        scala.util.Sorting.quickSort(oldv)
        oldv

      },
      (v1: Array[(Int, Int)], v2: Array[(Int, Int)]) => {
        //  合并不同分区中的 数据,并 去重 排序
        val union: Array[(Int, Int)] = v1.union(v2).distinct
        
        scala.util.Sorting.quickSort(union)
        union
      }

    )
    res.map(x => (x._1, x._2.toList)).foreach(println)
  • DAG图

总结

从以上两个对比,可以看出对于复杂的逻辑计算, combineByKey 算子的优势,

  1. shuffle 数的减少;
  2. 避免了可能的OOM情况发生;


以上是关于Spark算子: combineByKey 简单解析及案列的主要内容,如果未能解决你的问题,请参考以下文章

SparkCore算子之CombineByKey使用

spark算子:combineByKey

spark通过combineByKey算子实现条件性聚合的方法

reduceByKeygroupByKey和combineByKey

spark算子

Spark算子执行流程详解之五