Spark算子: combineByKey 简单解析及案列
Posted 大数据小知识
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Spark算子: combineByKey 简单解析及案列相关的知识,希望对你有一定的参考价值。
简述
我们都晓得,gruopByKey、aggregateByKey、foldByKey..等算子 是用于处理Key,Value的Pair数据,底层都是调用 combineByKeyWithClassTag函数,在日常业务数据处理过程中,如果简单一点的逻辑计算 仅需 使用 Spark 封装好的算子就好了,,当涉及到复杂一点的统计规则时, 我们就不得不使用 combineByKey 算子来实现.
-
combineByKey 源码
def combineByKey[C](
//传入 value值 V ,通过逻辑处理返回 C 值或新的类型
createCombiner: V => C,
// 传入新的 V value,与 第一次 createCombiner计算后的值C进行 进一步逻辑操作 比如 排序 去重等
//注意: 这两个值 V 与C 的操作逻辑是用同一个分区中 计算的
mergeValue: (C, V) => C,
//这个参数 用于 不同不分区中,C的合并
mergeCombiners: (C, C) => C): RDD[(K, C)] = self.withScope {
combineByKeyWithClassTag(createCombiner, mergeValue, mergeCombiners)(null)
}
-
从源码中 combineByKey 可以按照我们的思路来进行 逻辑运算处理复杂的逻辑统计,同时可以优化我们的算子,
案例:
我们现在有一个需求 统计 每一个月最高温度的两天;
-
数据 :
原始数据:
2019-6-1 39
2019-5-21 33
2019-6-1 38
2019-6-2 31
2018-3-11 18
2018-4-23 22
1970-8-23 23
1970-8-8 32
结果数据: 去重+排序
((2018,3),List((11,18)))
((2019,5),List((21,33)))
((2018,4),List((23,22)))
((1970,8),List((8,32), (23,23)))
((2019,6),List((1,39), (2,31)))
1.解决思路 :
按照我们一般的处理逻辑,应该是先去重 获取一天中最高温度的数据,然后 按月分组,组内数据以温度 从高->低 排序 取topN;
//取年月日为key , 使用 reduceByKey 去重
val reduced: RDD[((Int, Int, Int), Int)] = data2.map(x=>((x._1,x._2,x._3),x._4))
.reduceByKey((x:Int,y:Int)=>if(y>x) y else x )
// 取年月为KEY 分组,并对value 排序 取topN
val maped: RDD[((Int, Int), (Int, Int))] = reduced.map(t2=>((t2._1._1,t2._1._2),(t2._1._3,t2._2)))
reduced.map(t2=>((t2._1._1,t2._1._2),(t2._1._3,t2._2)))
.groupByKey()
.mapValues(arr=>arr.toList.sorted.take(2)).foreach(println)
-
缺点: 用了groupByKey 容易OOM,且 发生了两次shuffle -
DAG图
2.解决思路 :
-
使用combineByKey 来实现需求;
implicit val topNOrdering = new Ordering[(Int, Int)] { //降序
override def compare(x: (Int, Int), y: (Int, Int)) = {
y._2.compareTo(x._2)
}
}
val kv: RDD[((Int, Int), (Int, Int))] = data2.map(t4 => ((t4._1, t4._2), (t4._3, t4._4)))
val res: RDD[((Int, Int), Array[(Int, Int)])] = kv.combineByKey(
// createCombiner: V => C,
// mergeValue: (C, V) => C,
// mergeCombiners: (C, C) => C
//第一条记录怎么放:使用一个数组 用于 放置 value值 数组大小与TopN一致
(v1: (Int, Int)) => {
//数组长度 最好动态获取
val intToInt: Array[(Int, Int)] = new Array[(Int, Int)](10)
intToInt(0) = v1
intToInt
},
//第二条 newv 值, 用于与 oldv 比较 用于去重+排序:
(oldv: Array[(Int, Int)], newv: (Int, Int)) => {
//去重,排序
var flg = 0 // 0,1,2 新进来的元素特征: 日 a)相同 1)温度大 2)温度小 日 b)不同
for (i <- 0 until oldv.length) {
if (oldv(i) == null) oldv(i) = (0, 0)
if (oldv(i)._1 == newv._1) {
if (oldv(i)._2 < newv._2) {
flg = 1
oldv(i) = newv //新值 温度大于旧值 则替换
} else {
flg = 2
}
}
}
if (flg == 0) { //不是同一天的数据,则 数组最后一位索引赋值 新值,再 排序 (则最后一位值肯定是当前中最小的值)
oldv(oldv.length - 1) = newv
}
scala.util.Sorting.quickSort(oldv)
oldv
},
(v1: Array[(Int, Int)], v2: Array[(Int, Int)]) => {
// 合并不同分区中的 数据,并 去重 排序
val union: Array[(Int, Int)] = v1.union(v2).distinct
scala.util.Sorting.quickSort(union)
union
}
)
res.map(x => (x._1, x._2.toList)).foreach(println)
-
DAG图
总结
从以上两个对比,可以看出对于复杂的逻辑计算, combineByKey 算子的优势,
-
shuffle 数的减少; -
避免了可能的OOM情况发生;
以上是关于Spark算子: combineByKey 简单解析及案列的主要内容,如果未能解决你的问题,请参考以下文章
spark通过combineByKey算子实现条件性聚合的方法