pyspark折叠方法输出
Posted
技术标签:
【中文标题】pyspark折叠方法输出【英文标题】:pyspark fold method output 【发布时间】:2015-03-19 16:38:14 【问题描述】:我对 fold
的输出感到惊讶,我无法想象它在做什么。
我希望something.fold(0, lambda a,b: a+1)
会返回something
中的元素数量,因为折叠从0
开始并为每个元素添加1
。
sc.parallelize([1,25,8,4,2]).fold(0,lambda a,b:a+1 )
8
我来自 Scala,其中 fold 就像我所描述的那样工作。那么 fold 应该如何在 pyspark 中工作?谢谢你的想法。
【问题讨论】:
【参考方案1】:要了解这里发生了什么,让我们看一下 Spark 的 fold
操作的定义。由于您使用的是 PySpark,我将展示 Python 版本的代码,但 Scala 版本表现出完全相同的行为(您也可以browse the source on GitHub):
def fold(self, zeroValue, op):
"""
Aggregate the elements of each partition, and then the results for all
the partitions, using a given associative function and a neutral "zero
value."
The function Cop(t1, t2) is allowed to modify Ct1 and return it
as its result value to avoid object allocation; however, it should not
modify Ct2.
>>> from operator import add
>>> sc.parallelize([1, 2, 3, 4, 5]).fold(0, add)
15
"""
def func(iterator):
acc = zeroValue
for obj in iterator:
acc = op(obj, acc)
yield acc
vals = self.mapPartitions(func).collect()
return reduce(op, vals, zeroValue)
(对比见Scala implementation of RDD.fold
)。
Spark 的fold
首先折叠每个分区,然后折叠结果。问题是一个空分区被折叠到零元素,所以最终的驱动程序端折叠最终会为 每个 分区折叠一个值,而不是为每个 非空 分区。这意味着fold
的结果对分区数很敏感:
>>> sc.parallelize([1,25,8,4,2], 100).fold(0,lambda a,b:a+1 )
100
>>> sc.parallelize([1,25,8,4,2], 50).fold(0,lambda a,b:a+1 )
50
>>> sc.parallelize([1,25,8,4,2], 1).fold(0,lambda a,b:a+1 )
1
在最后一种情况下,发生的情况是单个分区被折叠到正确的值,然后该值在驱动程序处与零值折叠以产生 1。
看来,Spark 的fold()
操作实际上要求 fold 函数除了具有关联性之外还具有可交换性。实际上,Spark 中的其他地方也强加了此要求,例如,混洗分区中元素的顺序在运行时可能是不确定的(请参阅SPARK-5750)。
我已打开 Spark JIRA 票证来调查此问题:https://issues.apache.org/jira/browse/SPARK-6416。
【讨论】:
感谢您的详尽回答,我忽略了考虑分区如何影响折叠。因此,为了澄清 Scala 的情况,RDD 分区与 Scala 的典型折叠行为有重要区别。也就是说,List(1,2,3,4).fold(0)((a,b)=> a+1)
将与 Spark 的 sc.parallelize(List(1,2,3,4)).fold(0)((a,b)=> a+1)
完全不同
@keegan,是的,没错:分区行为,加上某些排序不确定性的来源,意味着当您使用非可交换运算符时,Spark 折叠的行为当前不会像您期望的那样表现.为了进一步说明这一点,请尝试以下操作:在 Scala 中,Seq(2.0, 3.0).fold(1.0)((a, b) => pow(b, a))
将始终返回 9.0。在 Spark 中尝试相同的操作:sc.parallelize(Seq(2.0, 3.0), 2).fold(1.0)((a, b) => pow(b, a))
。在我的多核笔记本电脑上,Spark 版本将不确定地返回 8.0 还是 9.0,具体取决于哪个分区先完成。【参考方案2】:
让我试着举一些简单的例子来解释 spark 的 fold 方法。我将在这里使用 pyspark。
rdd1 = sc.parallelize(list([]),1)
上面一行将创建一个带有一个分区的空 rdd
rdd1.fold(10, lambda x,y:x+y)
这个产量输出为 20
rdd2 = sc.parallelize(list([1,2,3,4,5]),2)
上面的行将创建值为 1 到 5 的 rdd,总共将有 2 个分区
rdd2.fold(10, lambda x,y:x+y)
这将产生输出为 45
因此,在上述情况下,为了简单起见,这里发生的情况是您将第 0 个元素设为 10。因此,您将在 RDD 中获得的所有数字的总和现在加上 10(即第 0 个元素+所有其他元素 => 10+1+2+3+4+5 = 25)。现在我们也有两个分区(即分区数*零元素=> 2*10 = 20) fold 发出的最终输出是 25+20 = 45
使用类似的过程很清楚为什么 rdd1 上的折叠操作会产生 20 作为输出。
当我们有类似 rdd1.reduce(lambda x,y:x+y)
的空列表时,Reduce 失败
ValueError: Can not reduce() empty RDD
如果我们认为我们可以在 rdd 中有空列表,可以使用折叠
rdd1.fold(0, lambda x,y:x+y)
正如预期的那样,这将产生输出为 0。
【讨论】:
以上是关于pyspark折叠方法输出的主要内容,如果未能解决你的问题,请参考以下文章