解释 Spark 中的聚合功能(使用 Python 和 Scala)

Posted

技术标签:

【中文标题】解释 Spark 中的聚合功能(使用 Python 和 Scala)【英文标题】:Explain the aggregate functionality in Spark (with Python and Scala) 【发布时间】:2015-03-30 05:06:48 【问题描述】:

我正在寻找对通过 python 中的 spark 可用的聚合功能的更好解释。

我的例子如下(使用Spark 1.2.0版本的pyspark)

sc.parallelize([1,2,3,4]).aggregate(
  (0, 0),
  (lambda acc, value: (acc[0] + value, acc[1] + 1)),
  (lambda acc1, acc2: (acc1[0] + acc2[0], acc1[1] + acc2[1])))

输出:

(10, 4)

我得到了(10,4) 的预期结果,它是1+2+3+4 和4 个元素的总和。如果我将传递给聚合函数的初始值从(0,0) 更改为(1,0),我会得到以下结果

sc.parallelize([1,2,3,4]).aggregate(
    (1, 0),
    (lambda acc, value: (acc[0] + value, acc[1] + 1)),
    (lambda acc1, acc2: (acc1[0] + acc2[0], acc1[1] + acc2[1])))

输出:

(19, 4)

值增加 9。如果我将其更改为 (2,0),则值将变为 (28,4),依此类推。

有人可以向我解释这个值是如何计算的吗?我预计值会增加 1 而不是 9,预计会看到 (11,4) 而我看到的是 (19,4)

【问题讨论】:

【参考方案1】:

我并没有完全相信 接受的答案,而 JohnKnight 的回答有所帮助,所以这是我的观点:

首先,用我自己的话解释一下aggregate():

原型

聚合(zeroValue,seqOp,combOp)

说明

aggregate() 允许您获取一个 RDD 并生成一个与原始 RDD 中存储的类型不同的单个值。

参数

    zeroValue: 初始化值,你的结果,在你想要的 格式。 seqOp:要对 RDD 记录应用的操作。运行一次 分区中的每条记录。 combOp:定义结果对象如何(每个分区一个), 合并。

示例

计算一个列表的总和和该列表的长度。以一对(sum, length) 的形式返回结果。

在 Spark shell 中,我首先创建了一个包含 4 个元素和 2 个分区的列表:

listRDD = sc.parallelize([1,2,3,4], 2)

然后我定义了我的 seqOp

seqOp = (lambda local_result, list_element: (local_result[0] + list_element, local_result[1] + 1) )

还有我的combOp

combOp = (lambda some_local_result, another_local_result: (some_local_result[0] + another_local_result[0], some_local_result[1] + another_local_result[1]) )

然后我汇总:

listRDD.aggregate( (0, 0), seqOp, combOp)
Out[8]: (10, 4)

如您所见,我给变量取了描述性名称,但让我进一步解释一下:

第一个分区有子列表 [1, 2]。我们将 seqOp 应用于该列表的每个元素,这将产生一个本地结果,一对(sum, length),它将在本地反映结果,仅在第一个分区中。

所以,让我们开始吧:local_result 被初始化为我们提供给aggregate()zeroValue 参数,即(0, 0),list_element 是列表的第一个元素,即1。作为结果是这样的:

0 + 1 = 1
0 + 1 = 1

现在,本地结果是 (1, 1),这意味着,到目前为止,对于第一个分区,在仅处理第一个元素之后,总和为 1,长度为 1。注意,local_result 得到从 (0, 0) 更新到 (1, 1)。

1 + 2 = 3
1 + 1 = 2

现在本地结果是 (3, 2),这将是第一个分区的最终结果,因为它们不是第一个分区的子列表中的其他元素。

对第二个分区做同样的事情,我们得到 (7, 2)。

现在我们将 combOp 应用于每个局部结果,这样我们就可以形成最终的全局结果,如下所示:(3,2) + (7,2) = (10, 4)


“图”中描述的示例:

            (0, 0) <-- zeroValue

[1, 2]                  [3, 4]

0 + 1 = 1               0 + 3 = 3
0 + 1 = 1               0 + 1 = 1

1 + 2 = 3               3 + 4 = 7
1 + 1 = 2               1 + 1 = 2       
    |                       |
    v                       v
  (3, 2)                  (7, 2)
      \                    / 
       \                  /
        \                /
         \              /
          \            /
           \          / 
           ------------
           |  combOp  |
           ------------
                |
                v
             (10, 4)

受到这个伟大的example 的启发。


所以现在如果zeroValue 不是 (0, 0) 而是 (1, 0),那么人们会期望得到 (8 + 4, 2 + 2) = (12, 4),这不会解释你的经历。即使我们改变了我的例子的分区数,我也无法再得到它。

这里的关键是 JohnKnight 的回答,其中指出 zeroValue 不仅类似于分区数,而且可能应用的次数比您预期的要多。

【讨论】:

真的很高兴它帮助了@Neethu! @ab_tech_sp 这真的应该是公认的答案。特别是因为这个 Q 中最受好评的答案是 Scala(??)!【参考方案2】:

使用 Scala 进行解释

Aggregate 让您可以随意转换和组合 RDD 的值。

它使用两个功能:

第一个将原始集合 [T] 的元素转换并添加到本地聚合 [U] 中,并采用以下形式:(U,T) => U。您可以将其视为折叠,因此它也该操作需要零。此操作在本地并行应用于每个分区。

这里是问题的关键所在:这里应该使用的唯一值是归约操作的零值。 此操作在每个分区上本地执行,因此,将任何内容添加到该零值将添加到结果乘以 RDD 的分区数。

第二个操作取前一个操作[U]的结果类型的2个值,并将其组合成一个值。此操作将减少每个分区的部分结果并产生实际总数。

例如: 给定一个字符串的 RDD:

val rdd:RDD[String] = ???

假设您想要汇总该 RDD 中字符串的长度,您可以这样做:

    第一个操作会将字符串转换为大小(int)并累积大小的值。

    val stringSizeCummulator: (Int, String) => Int = (total, string) => total + string.lenght`

    为加法运算提供零(0)

    val 零 = 0

    将两个整数相加的操作:

    val add: (Int, Int) => Int = _ + _

把它们放在一起:

rdd.aggregate(ZERO, stringSizeCummulator, add)

使用 Spark 2.4 及更高版本

rdd.aggregate(ZERO)(stringAccumulator,add)

那么,为什么需要零? 当 cummulator 函数应用于分区的第一个元素时,没有运行总计。此处使用零。

例如。我的 RDD 是:

分区 1:[“跳转”、“结束”] 分区 2:["the", "wall"]

这将导致:

P1:

    stringSizeCummulator(ZERO, "Jump") = 4 stringSizeCummulator(4, "over") = 8

P2:

    stringSizeCummulator(ZERO, "the") = 3 stringSizeCummulator(3, "wall") = 7

减少:add(P1, P2) = 15

【讨论】:

你是对的。当我通过指定不同的值开始使用 spark.default.parallelism 设置时,当我将 (1,0) 作为聚合函数的初始值传递时,每次运行返回的值都会发生变化。你的解释更有意义。谢谢。 关于Python的问题,使用scala的anwser? pyspark中是否存在这种东西? @pltrdy 希望这是这里唯一的问题!这个答案没有解释为什么 OP 会出现这种行为。看起来很有吸引力,我也投了赞成票,但我不认为它回答了这个问题......:/【参考方案3】:

我没有足够的声望点来评论 Maasg 之前的回答。 实际上,零值对 seqop 应该是“中性的”,这意味着它不会干扰 seqop 结果,例如 0 对 add 或 1 对 *;

你不应该尝试使用非中性值,因为它可能会被任意应用。 此行为不仅与分区数有关。

我尝试了与问题中所述相同的实验。 对于 1 个分区,零值应用了 3 次。 有 2 个分区,6 次。 有 3 个分区,9 次,这样会继续下去。

【讨论】:

【参考方案4】:

您可以使用以下代码(在 scala 中)准确查看 aggregate 正在做什么。它构建了所有添加和合并操作的树:

sealed trait Tree[+A]
case class Leaf[A](value: A) extends Tree[A]
case class Branch[A](left: Tree[A], right: Tree[A]) extends Tree[A]

val zero : Tree[Int] = Leaf(0)
val rdd = sc.parallelize(1 to 4).repartition(3)

然后,在 shell 中:

scala> rdd.glom().collect()
res5: Array[Array[Int]] = Array(Array(4), Array(1, 2), Array(3))

所以,我们有这 3 个分区:[4]、[1,2] 和 [3]。

scala> rdd.aggregate(zero)((l,r)=>Branch(l, Leaf(r)), (l,r)=>Branch(l,r))
res11: Tree[Int] = Branch(Branch(Branch(Leaf(0),Branch(Leaf(0),Leaf(4))),Branch(Leaf(0),Leaf(3))),Branch(Branch(Leaf(0),Leaf(1)),Leaf(2)))

您可以将结果表示为一棵树:

+
| \__________________
+                    +
| \________          | \
+          +         +   2
| \        | \       | \         
0  +       0  3      0  1
   | \
   0  4

您可以看到在驱动节点(树的左侧)上创建了第一个零元素,然后将所有分区的结果一个一个合并。您还看到,如果您像在问题中那样将 0 替换为 1,它将为每个分区上的每个结果添加 1,并且还会为驱动程序的初始值添加 1。所以,你给出的 zero 值被使用的总次数是:

number of partitions + 1.

所以,在你的情况下,

aggregate(
  (X, Y),
  (lambda acc, value: (acc[0] + value, acc[1] + 1)),
  (lambda acc1, acc2: (acc1[0] + acc2[0], acc1[1] + acc2[1])))

将是:

(sum(elements) + (num_partitions + 1)*X, count(elements) + (num_partitions + 1)*Y)

aggregate 的实现非常简单。定义在RDD.scala, line 1107:

  def aggregate[U: ClassTag](zeroValue: U)(seqOp: (U, T) => U, combOp: (U, U) => U): U = withScope 
    // Clone the zero value since we will also be serializing it as part of tasks
    var jobResult = Utils.clone(zeroValue, sc.env.serializer.newInstance())
    val cleanSeqOp = sc.clean(seqOp)
    val cleanCombOp = sc.clean(combOp)
    val aggregatePartition = (it: Iterator[T]) => it.aggregate(zeroValue)(cleanSeqOp, cleanCombOp)
    val mergeResult = (index: Int, taskResult: U) => jobResult = combOp(jobResult, taskResult)
    sc.runJob(this, aggregatePartition, mergeResult)
    jobResult

【讨论】:

【参考方案5】:

很好的解释,它真的帮助我理解了聚合函数的底层工作。我玩了一段时间,发现如下。

如果你使用 acc 作为 (0,0) 那么它不会改变函数的输出结果。

如果初始累加器发生变化,那么它将处理结果如下

[ RDD 元素总和 + acc 初始值 * RDD 分区数 + acc初始值]

对于这里的问题,我建议检查分区,因为根据我的理解,分区数应该是 8,因为每次我们在 RDD 的分区上处理 seq op 时,它都会从 acc 结果的初始总和开始而且当它要进行comb Op时,它会再次使用acc初始值一次。

例如 列表 (1,2,3,4) & acc (1,0)

通过RDD.partitions.size在scala中获取分区

如果分区为 2 且元素数为 4 则 => [ 10 + 1 * 2 + 1 ] => (13,4)

如果分区为 4 且元素数为 4 则 => [ 10 + 1 * 4 + 1 ] => (15,4)

希望对您有所帮助,您可以查看here 以获得解释。谢谢。

【讨论】:

【参考方案6】:

感谢 gsamaras。

我的视图如下,

【讨论】:

【参考方案7】:

对于寻找上述示例的 Scala 等效代码的人来说,就是这里。相同的逻辑,相同的输入/结果。

scala> val listRDD = sc.parallelize(List(1,2,3,4), 2)
listRDD: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[2] at parallelize at <console>:21

scala> listRDD.collect()
res7: Array[Int] = Array(1, 2, 3, 4)

scala> listRDD.aggregate((0,0))((acc, value) => (acc._1+value,acc._2+1),(acc1,acc2) => (acc1._1+acc2._1,acc1._2+acc2._2))
res10: (Int, Int) = (10,4)

【讨论】:

【参考方案8】:

我尝试了很多关于这个问题的实验。最好为聚合设置分区数。 seqOp 将处理每个分区并应用初始值,此外,combOp 在组合所有分区时也会应用初始值。 所以,我提出这个问题的格式:

final result = sum(list) + num_Of_Partitions * initial_Value + 1

【讨论】:

这个公式显然不能成立,当初始值为0时,结果应该是列表的总和。【参考方案9】:

我将解释Spark中Aggregate操作的概念如下:

聚合函数的定义

**def aggregate** (initial value)(an intra-partition sequence operation)(an inter-partition combination operation)

val flowers = sc.parallelize(List(11, 12, 13, 24, 25, 26, 35, 36, 37, 24, 25, 16), 4) --> 4 表示我们的 Spark 集群中可用的分区数。

因此,rdd 被分为 4 个分区:

11, 12, 13
24, 25, 26
35, 36, 37
24, 25, 16

我们将问题陈述分为两部分: 问题的第一部分是汇总每个象限中采摘的花朵总数;这就是分区内序列聚合

11+12+13 = 36
24+25+26 = 75
35+36+37 = 108
24+25 +16 = 65

问题的第二部分是将这些单独的聚合跨分区求和;这就是分区间聚合。

36 + 75 + 108 + 65 = 284

存储在 RDD 中的总和可以进一步用于任何类型的转换或其他操作

所以代码变成这样:

val sum = flowers.aggregate(0)((acc, value) =&gt; (acc + value), (x,y) =&gt; (x+y))val sum = flowers.aggregate(0)(_+_, _+_)Answer: 284

解释: (0) - 是累加器 第一个+是分区内总和,加上花园每个象限中每个采摘者采摘的花朵总数。 第二个+是分区间总和,它聚合了每个象限的总和。

案例 1:

假设,如果我们需要在初始值之后减少函数。如果初始值不为零会怎样?如果是 4,例如:

该数字将添加到每个分区内聚合以及分区间聚合:

所以第一个计算是:

11+12+13 = 36 + 5 = 41
24+25+26 = 75 + 5 = 80
35+36+37 = 108 + 5 = 113
24+25 +16 = 65 + 5 = 70

这里是初始值为5的分区间聚合计算:

partition1 + partition2 + partition3+ partition4 + 5 = 41 + 80 + 113 + 70 = 309

因此,进入您的查询:总和可以根据 rdd 数据分布的分区数来计算。我认为您的数据分布如下,这就是为什么您的结果为 (19, 4)。因此,在进行聚合操作时,请指定分区值的数量:

val list = sc.parallelize(List(1,2,3,4))
val list2 = list.glom().collect
val res12 = list.aggregate((1,0))(
      (acc, value) => (acc._1 + value, acc._2 + 1),
      (acc1, acc2) => (acc1._1 + acc2._1, acc1._2 + acc2._2)
)

结果:

list: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[19] at parallelize at command-472682101230301:1
list2: Array[Array[Int]] = Array(Array(), Array(1), Array(), Array(2), Array(), Array(3), Array(), Array(4))
res12: (Int, Int) = (19,4)

解释:因为你的数据分布在8个分区,结果是这样的(使用上面解释的逻辑)

分区内加法:

0+1=1
1+1=2
0+1=1
2+1=3
0+1=1
3+1=4
0+1=1
4+1=5

total=18

分区间计算:

18+1 (1+2+1+3+1+4+1+5+1) = 19

谢谢

【讨论】:

以上是关于解释 Spark 中的聚合功能(使用 Python 和 Scala)的主要内容,如果未能解决你的问题,请参考以下文章

Spark中的日志聚合的配置

何时合并发生在Spark中的用户定义聚合函数UDAF中

如何使用 python 在 Spark 中转置 DataFrame 而不进行聚合

HyperLogLog函数在Spark中的高级应用

在 python (pyspark) 中使用 combinebykey spark rdd 计算组上的聚合

Mongodb解释聚合框架