解释 Spark 中的聚合功能(使用 Python 和 Scala)
Posted
技术标签:
【中文标题】解释 Spark 中的聚合功能(使用 Python 和 Scala)【英文标题】:Explain the aggregate functionality in Spark (with Python and Scala) 【发布时间】:2015-03-30 05:06:48 【问题描述】:我正在寻找对通过 python 中的 spark 可用的聚合功能的更好解释。
我的例子如下(使用Spark 1.2.0版本的pyspark)
sc.parallelize([1,2,3,4]).aggregate(
(0, 0),
(lambda acc, value: (acc[0] + value, acc[1] + 1)),
(lambda acc1, acc2: (acc1[0] + acc2[0], acc1[1] + acc2[1])))
输出:
(10, 4)
我得到了(10,4)
的预期结果,它是1+2+3+4
和4 个元素的总和。如果我将传递给聚合函数的初始值从(0,0)
更改为(1,0)
,我会得到以下结果
sc.parallelize([1,2,3,4]).aggregate(
(1, 0),
(lambda acc, value: (acc[0] + value, acc[1] + 1)),
(lambda acc1, acc2: (acc1[0] + acc2[0], acc1[1] + acc2[1])))
输出:
(19, 4)
值增加 9。如果我将其更改为 (2,0)
,则值将变为 (28,4)
,依此类推。
有人可以向我解释这个值是如何计算的吗?我预计值会增加 1 而不是 9,预计会看到 (11,4)
而我看到的是 (19,4)
。
【问题讨论】:
【参考方案1】:我并没有完全相信 接受的答案,而 JohnKnight 的回答有所帮助,所以这是我的观点:
首先,用我自己的话解释一下aggregate():
原型:
聚合(zeroValue,seqOp,combOp)
说明:
aggregate()
允许您获取一个 RDD 并生成一个与原始 RDD 中存储的类型不同的单个值。
参数:
zeroValue
: 初始化值,你的结果,在你想要的
格式。
seqOp
:要对 RDD 记录应用的操作。运行一次
分区中的每条记录。
combOp
:定义结果对象如何(每个分区一个),
合并。
示例:
计算一个列表的总和和该列表的长度。以一对
(sum, length)
的形式返回结果。
在 Spark shell 中,我首先创建了一个包含 4 个元素和 2 个分区的列表:
listRDD = sc.parallelize([1,2,3,4], 2)
然后我定义了我的 seqOp:
seqOp = (lambda local_result, list_element: (local_result[0] + list_element, local_result[1] + 1) )
还有我的combOp:
combOp = (lambda some_local_result, another_local_result: (some_local_result[0] + another_local_result[0], some_local_result[1] + another_local_result[1]) )
然后我汇总:
listRDD.aggregate( (0, 0), seqOp, combOp)
Out[8]: (10, 4)
如您所见,我给变量取了描述性名称,但让我进一步解释一下:
第一个分区有子列表 [1, 2]。我们将 seqOp 应用于该列表的每个元素,这将产生一个本地结果,一对(sum, length)
,它将在本地反映结果,仅在第一个分区中。
所以,让我们开始吧:local_result
被初始化为我们提供给aggregate()
的zeroValue
参数,即(0, 0),list_element
是列表的第一个元素,即1。作为结果是这样的:
0 + 1 = 1
0 + 1 = 1
现在,本地结果是 (1, 1),这意味着,到目前为止,对于第一个分区,在仅处理第一个元素之后,总和为 1,长度为 1。注意,local_result
得到从 (0, 0) 更新到 (1, 1)。
1 + 2 = 3
1 + 1 = 2
现在本地结果是 (3, 2),这将是第一个分区的最终结果,因为它们不是第一个分区的子列表中的其他元素。
对第二个分区做同样的事情,我们得到 (7, 2)。
现在我们将 combOp 应用于每个局部结果,这样我们就可以形成最终的全局结果,如下所示:(3,2) + (7,2) = (10, 4)
“图”中描述的示例:
(0, 0) <-- zeroValue
[1, 2] [3, 4]
0 + 1 = 1 0 + 3 = 3
0 + 1 = 1 0 + 1 = 1
1 + 2 = 3 3 + 4 = 7
1 + 1 = 2 1 + 1 = 2
| |
v v
(3, 2) (7, 2)
\ /
\ /
\ /
\ /
\ /
\ /
------------
| combOp |
------------
|
v
(10, 4)
受到这个伟大的example 的启发。
所以现在如果zeroValue
不是 (0, 0) 而是 (1, 0),那么人们会期望得到 (8 + 4, 2 + 2) = (12, 4),这不会解释你的经历。即使我们改变了我的例子的分区数,我也无法再得到它。
这里的关键是 JohnKnight 的回答,其中指出 zeroValue
不仅类似于分区数,而且可能应用的次数比您预期的要多。
【讨论】:
真的很高兴它帮助了@Neethu! @ab_tech_sp 这真的应该是公认的答案。特别是因为这个 Q 中最受好评的答案是 Scala(??)!【参考方案2】:使用 Scala 进行解释
Aggregate 让您可以随意转换和组合 RDD 的值。
它使用两个功能:
第一个将原始集合 [T] 的元素转换并添加到本地聚合 [U] 中,并采用以下形式:(U,T) => U。您可以将其视为折叠,因此它也该操作需要零。此操作在本地并行应用于每个分区。
这里是问题的关键所在:这里应该使用的唯一值是归约操作的零值。 此操作在每个分区上本地执行,因此,将任何内容添加到该零值将添加到结果乘以 RDD 的分区数。
第二个操作取前一个操作[U]的结果类型的2个值,并将其组合成一个值。此操作将减少每个分区的部分结果并产生实际总数。
例如: 给定一个字符串的 RDD:
val rdd:RDD[String] = ???
假设您想要汇总该 RDD 中字符串的长度,您可以这样做:
第一个操作会将字符串转换为大小(int)并累积大小的值。
val stringSizeCummulator: (Int, String) => Int = (total, string) => total + string.lenght`
为加法运算提供零(0)
val 零 = 0
将两个整数相加的操作:
val add: (Int, Int) => Int = _ + _
把它们放在一起:
rdd.aggregate(ZERO, stringSizeCummulator, add)
使用 Spark 2.4 及更高版本
rdd.aggregate(ZERO)(stringAccumulator,add)
那么,为什么需要零? 当 cummulator 函数应用于分区的第一个元素时,没有运行总计。此处使用零。
例如。我的 RDD 是:
分区 1:[“跳转”、“结束”] 分区 2:["the", "wall"]这将导致:
P1:
-
stringSizeCummulator(ZERO, "Jump") = 4
stringSizeCummulator(4, "over") = 8
P2:
-
stringSizeCummulator(ZERO, "the") = 3
stringSizeCummulator(3, "wall") = 7
减少:add(P1, P2) = 15
【讨论】:
你是对的。当我通过指定不同的值开始使用 spark.default.parallelism 设置时,当我将 (1,0) 作为聚合函数的初始值传递时,每次运行返回的值都会发生变化。你的解释更有意义。谢谢。 关于Python的问题,使用scala的anwser? pyspark中是否存在这种东西? @pltrdy 希望这是这里唯一的问题!这个答案没有解释为什么 OP 会出现这种行为。看起来很有吸引力,我也投了赞成票,但我不认为它回答了这个问题......:/【参考方案3】:我没有足够的声望点来评论 Maasg 之前的回答。 实际上,零值对 seqop 应该是“中性的”,这意味着它不会干扰 seqop 结果,例如 0 对 add 或 1 对 *;
你不应该尝试使用非中性值,因为它可能会被任意应用。 此行为不仅与分区数有关。
我尝试了与问题中所述相同的实验。 对于 1 个分区,零值应用了 3 次。 有 2 个分区,6 次。 有 3 个分区,9 次,这样会继续下去。
【讨论】:
【参考方案4】:您可以使用以下代码(在 scala 中)准确查看 aggregate
正在做什么。它构建了所有添加和合并操作的树:
sealed trait Tree[+A]
case class Leaf[A](value: A) extends Tree[A]
case class Branch[A](left: Tree[A], right: Tree[A]) extends Tree[A]
val zero : Tree[Int] = Leaf(0)
val rdd = sc.parallelize(1 to 4).repartition(3)
然后,在 shell 中:
scala> rdd.glom().collect()
res5: Array[Array[Int]] = Array(Array(4), Array(1, 2), Array(3))
所以,我们有这 3 个分区:[4]、[1,2] 和 [3]。
scala> rdd.aggregate(zero)((l,r)=>Branch(l, Leaf(r)), (l,r)=>Branch(l,r))
res11: Tree[Int] = Branch(Branch(Branch(Leaf(0),Branch(Leaf(0),Leaf(4))),Branch(Leaf(0),Leaf(3))),Branch(Branch(Leaf(0),Leaf(1)),Leaf(2)))
您可以将结果表示为一棵树:
+
| \__________________
+ +
| \________ | \
+ + + 2
| \ | \ | \
0 + 0 3 0 1
| \
0 4
您可以看到在驱动节点(树的左侧)上创建了第一个零元素,然后将所有分区的结果一个一个合并。您还看到,如果您像在问题中那样将 0 替换为 1,它将为每个分区上的每个结果添加 1,并且还会为驱动程序的初始值添加 1。所以,你给出的 zero 值被使用的总次数是:
number of partitions + 1
.
所以,在你的情况下,
aggregate(
(X, Y),
(lambda acc, value: (acc[0] + value, acc[1] + 1)),
(lambda acc1, acc2: (acc1[0] + acc2[0], acc1[1] + acc2[1])))
将是:
(sum(elements) + (num_partitions + 1)*X, count(elements) + (num_partitions + 1)*Y)
aggregate
的实现非常简单。定义在RDD.scala, line 1107:
def aggregate[U: ClassTag](zeroValue: U)(seqOp: (U, T) => U, combOp: (U, U) => U): U = withScope
// Clone the zero value since we will also be serializing it as part of tasks
var jobResult = Utils.clone(zeroValue, sc.env.serializer.newInstance())
val cleanSeqOp = sc.clean(seqOp)
val cleanCombOp = sc.clean(combOp)
val aggregatePartition = (it: Iterator[T]) => it.aggregate(zeroValue)(cleanSeqOp, cleanCombOp)
val mergeResult = (index: Int, taskResult: U) => jobResult = combOp(jobResult, taskResult)
sc.runJob(this, aggregatePartition, mergeResult)
jobResult
【讨论】:
【参考方案5】:很好的解释,它真的帮助我理解了聚合函数的底层工作。我玩了一段时间,发现如下。
如果你使用 acc 作为 (0,0) 那么它不会改变函数的输出结果。
如果初始累加器发生变化,那么它将处理结果如下
[ RDD 元素总和 + acc 初始值 * RDD 分区数 + acc初始值]
对于这里的问题,我建议检查分区,因为根据我的理解,分区数应该是 8,因为每次我们在 RDD 的分区上处理 seq op 时,它都会从 acc 结果的初始总和开始而且当它要进行comb Op时,它会再次使用acc初始值一次。
例如 列表 (1,2,3,4) & acc (1,0)
通过RDD.partitions.size在scala中获取分区
如果分区为 2 且元素数为 4 则 => [ 10 + 1 * 2 + 1 ] => (13,4)
如果分区为 4 且元素数为 4 则 => [ 10 + 1 * 4 + 1 ] => (15,4)
希望对您有所帮助,您可以查看here 以获得解释。谢谢。
【讨论】:
【参考方案6】:感谢 gsamaras。
我的视图如下,
【讨论】:
【参考方案7】:对于寻找上述示例的 Scala 等效代码的人来说,就是这里。相同的逻辑,相同的输入/结果。
scala> val listRDD = sc.parallelize(List(1,2,3,4), 2)
listRDD: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[2] at parallelize at <console>:21
scala> listRDD.collect()
res7: Array[Int] = Array(1, 2, 3, 4)
scala> listRDD.aggregate((0,0))((acc, value) => (acc._1+value,acc._2+1),(acc1,acc2) => (acc1._1+acc2._1,acc1._2+acc2._2))
res10: (Int, Int) = (10,4)
【讨论】:
【参考方案8】:我尝试了很多关于这个问题的实验。最好为聚合设置分区数。 seqOp 将处理每个分区并应用初始值,此外,combOp 在组合所有分区时也会应用初始值。 所以,我提出这个问题的格式:
final result = sum(list) + num_Of_Partitions * initial_Value + 1
【讨论】:
这个公式显然不能成立,当初始值为0时,结果应该是列表的总和。【参考方案9】:我将解释Spark中Aggregate操作的概念如下:
聚合函数的定义
**def aggregate** (initial value)(an intra-partition sequence operation)(an inter-partition combination operation)
val flowers = sc.parallelize(List(11, 12, 13, 24, 25, 26, 35, 36, 37, 24, 25, 16), 4)
--> 4 表示我们的 Spark 集群中可用的分区数。
因此,rdd 被分为 4 个分区:
11, 12, 13
24, 25, 26
35, 36, 37
24, 25, 16
我们将问题陈述分为两部分: 问题的第一部分是汇总每个象限中采摘的花朵总数;这就是分区内序列聚合
11+12+13 = 36
24+25+26 = 75
35+36+37 = 108
24+25 +16 = 65
问题的第二部分是将这些单独的聚合跨分区求和;这就是分区间聚合。
36 + 75 + 108 + 65 = 284
存储在 RDD 中的总和可以进一步用于任何类型的转换或其他操作
所以代码变成这样:
val sum = flowers.aggregate(0)((acc, value) => (acc + value), (x,y) => (x+y))
或
val sum = flowers.aggregate(0)(_+_, _+_)
Answer: 284
解释: (0) - 是累加器 第一个+是分区内总和,加上花园每个象限中每个采摘者采摘的花朵总数。 第二个+是分区间总和,它聚合了每个象限的总和。
案例 1:
假设,如果我们需要在初始值之后减少函数。如果初始值不为零会怎样?如果是 4,例如:
该数字将添加到每个分区内聚合以及分区间聚合:
所以第一个计算是:
11+12+13 = 36 + 5 = 41
24+25+26 = 75 + 5 = 80
35+36+37 = 108 + 5 = 113
24+25 +16 = 65 + 5 = 70
这里是初始值为5的分区间聚合计算:
partition1 + partition2 + partition3+ partition4 + 5 = 41 + 80 + 113 + 70 = 309
因此,进入您的查询:总和可以根据 rdd 数据分布的分区数来计算。我认为您的数据分布如下,这就是为什么您的结果为 (19, 4)。因此,在进行聚合操作时,请指定分区值的数量:
val list = sc.parallelize(List(1,2,3,4))
val list2 = list.glom().collect
val res12 = list.aggregate((1,0))(
(acc, value) => (acc._1 + value, acc._2 + 1),
(acc1, acc2) => (acc1._1 + acc2._1, acc1._2 + acc2._2)
)
结果:
list: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[19] at parallelize at command-472682101230301:1
list2: Array[Array[Int]] = Array(Array(), Array(1), Array(), Array(2), Array(), Array(3), Array(), Array(4))
res12: (Int, Int) = (19,4)
解释:因为你的数据分布在8个分区,结果是这样的(使用上面解释的逻辑)
分区内加法:
0+1=1
1+1=2
0+1=1
2+1=3
0+1=1
3+1=4
0+1=1
4+1=5
total=18
分区间计算:
18+1 (1+2+1+3+1+4+1+5+1) = 19
谢谢
【讨论】:
以上是关于解释 Spark 中的聚合功能(使用 Python 和 Scala)的主要内容,如果未能解决你的问题,请参考以下文章
如何使用 python 在 Spark 中转置 DataFrame 而不进行聚合