reduceByKey 与 groupByKey 与 aggregateByKey 与 combineByKey 之间的火花差异
Posted
技术标签:
【中文标题】reduceByKey 与 groupByKey 与 aggregateByKey 与 combineByKey 之间的火花差异【英文标题】:Spark difference between reduceByKey vs. groupByKey vs. aggregateByKey vs. combineByKey 【发布时间】:2017-09-07 22:23:31 【问题描述】:谁能解释reducebykey
、groupbykey
、aggregatebykey
和combinebykey
之间的区别?我已阅读有关此的文档,但无法理解确切的区别。
有例子的解释会很棒。
【问题讨论】:
【参考方案1】:groupByKey:
语法:
sparkContext.textFile("hdfs://")
.flatMap(line => line.split(" ") )
.map(word => (word,1))
.groupByKey()
.map((x,y) => (x,sum(y)))
groupByKey
可能会导致磁盘不足问题,因为数据通过网络发送并收集到减少的工作人员身上。
reduceByKey:
语法:
sparkContext.textFile("hdfs://")
.flatMap(line => line.split(" "))
.map(word => (word,1))
.reduceByKey((x,y)=> (x+y))
数据在每个分区合并,每个分区的一个键只有一个输出通过网络发送。 reduceByKey
需要将所有值组合成另一个具有完全相同类型的值。
aggregateByKey:
同reduceByKey
,取一个初始值。
3 个参数作为输入
-
初始值
组合器逻辑
序列运算逻辑
例子:
val keysWithValuesList = Array("foo=A", "foo=A", "foo=A", "foo=A", "foo=B", "bar=C", "bar=D", "bar=D")
val data = sc.parallelize(keysWithValuesList)
//Create key value pairs
val kv = data.map(_.split("=")).map(v => (v(0), v(1))).cache()
val initialCount = 0;
val addToCounts = (n: Int, v: String) => n + 1
val sumPartitionCounts = (p1: Int, p2: Int) => p1 + p2
val countByKey = kv.aggregateByKey(initialCount)(addToCounts, sumPartitionCounts)
输出: 按键汇总结果 酒吧 -> 3 富 -> 5
combineByKey:
3 个参数作为输入
-
初始值:不像
aggregateByKey
,不需要总是传递常量,我们可以传递一个返回新值的函数。
合并函数
组合功能
例子:
val result = rdd.combineByKey(
(v) => (v,1),
( (acc:(Int,Int),v) => acc._1 +v , acc._2 +1 ) ,
( acc1:(Int,Int),acc2:(Int,Int) => (acc1._1+acc2._1) , (acc1._2+acc2._2))
).map( case (k,v) => (k,v._1/v._2.toDouble) )
result.collect.foreach(println)
reduceByKey
,aggregateByKey
,combineByKey
首选 groupByKey
参考: Avoid groupByKey
【讨论】:
不应该有人在聚合函数中添加一个if clause
来检查组合器是否只是加法,如果是,使用reduceByKey 逻辑?我在这里缺少什么来理解为什么不能在编译时完成吗?仅通过对组合器进行硬编码来提高效率意味着如果没有对通用组合器进行多次此类检查,则应该进行此类检查,对吧?
这些检查甚至可以并行完成,不会妨碍计算的启动,并且可以优化..【参考方案2】:
groupByKey()
只是根据键对数据集进行分组。当 RDD 尚未分区时,这将导致数据混洗。
reduceByKey()
类似于分组+聚合。我们可以说reduceByKey()
等价于 dataset.group(...).reduce(...)。与groupByKey()
不同,它将洗牌更少的数据。
aggregateByKey()
在逻辑上与 reduceByKey()
相同,但它允许您返回不同类型的结果。换句话说,它允许您将输入作为类型 x 并将聚合结果作为类型 y。例如 (1,2),(1,4) 作为输入, (1,"six") 作为输出。它还采用 零值,将应用于每个键的开头。
注意:一个相似之处是它们都是宽操作。
【讨论】:
有人知道rdd.groupByKey
、rdd.reduceByKey
和sql.groupBy
之间是否有区别?我有一个大型数据集,想使用性能最高的方法。谢谢【参考方案3】:
虽然 reducebykey 和 groupbykey 会产生相同的答案,但 reduceByKey 示例在大型数据集上效果更好。那是 因为 Spark 知道它可以将输出与每个节点上的公共键结合起来 在洗牌数据之前进行分区。
另一方面,当调用 groupByKey - 所有的键值对 被洗牌了。这是很多不必要的数据 通过网络传输。
更多详情请查看以下链接
https://databricks.gitbooks.io/databricks-spark-knowledge-base/content/best_practices/prefer_reducebykey_over_groupbykey.html
【讨论】:
感谢大家的回复 在什么情况下我们应该使用 groupByKey ?如果函数不是关联函数怎么办?【参考方案4】:虽然它们都将获取相同的结果,但两个函数的性能存在显着差异。与groupByKey()
相比,reduceByKey()
更适用于更大的数据集。
在reduceByKey()
中,在对数据进行混洗之前,将同一台机器上具有相同键的对组合(通过使用传递给reduceByKey()
的函数)。然后再次调用该函数以减少每个分区中的所有值以产生一个最终结果。
在groupByKey()
中,所有的键值对都被打乱了。这是通过网络传输的大量不必要的数据。
【讨论】:
【参考方案5】:ReduceByKey reduceByKey(func, [numTasks])
-
数据被组合在一起,因此在每个分区中,每个键至少应该有一个值。 然后 shuffle 发生,并通过网络发送到某个特定的 executor 以执行一些操作,例如 reduce。
GroupByKey - groupByKey([numTasks])
它不会合并键的值,而是直接发生随机播放过程 这里有很多数据被发送到每个分区,与初始数据几乎相同。
每个键的值的合并是在洗牌之后完成的。 这里大量数据存储在最终工作节点上,因此导致内存不足问题。
AggregateByKey - aggregateByKey(zeroValue)(seqOp, combOp, [numTasks])
它类似于reduceByKey,但您可以在执行聚合时提供初始值。
reduceByKey
的使用
reduceByKey
可以在我们在大型数据集上运行时使用。
reduceByKey
当输入输出值类型相同时
超过aggregateByKey
此外,建议不要使用groupByKey
,而更喜欢reduceByKey
。详情可以参考here。
您也可以参考此question 以更详细地了解reduceByKey
和aggregateByKey
。
【讨论】:
【参考方案6】:那么除了这4个,我们还有
foldByKey 与 reduceByKey 相同,但具有用户定义的零值。
AggregateByKey 将 3 个参数作为输入,并使用 2 个函数进行合并(一个用于在相同分区上合并,另一个用于跨分区合并值。第一个参数是 ZeroValue)
而
ReduceBykey 只接受 1 个参数,这是一个用于合并的函数。
CombineByKey 有 3 个参数,所有 3 个都是函数。与 aggregateBykey 类似,但它可以具有 ZeroValue 函数。
GroupByKey 不带参数并将所有内容分组。此外,它是跨分区数据传输的开销。
【讨论】:
以上是关于reduceByKey 与 groupByKey 与 aggregateByKey 与 combineByKey 之间的火花差异的主要内容,如果未能解决你的问题,请参考以下文章