pyspark折叠方法输出

Posted

技术标签:

【中文标题】pyspark折叠方法输出【英文标题】:pyspark fold method output 【发布时间】:2015-03-19 16:38:14 【问题描述】:

我对 fold 的输出感到惊讶,我无法想象它在做什么。

我希望something.fold(0, lambda a,b: a+1) 会返回something 中的元素数量,因为折叠从0 开始并为每个元素添加1

sc.parallelize([1,25,8,4,2]).fold(0,lambda a,b:a+1 )
8

我来自 Scala,其中 fold 就像我所描述的那样工作。那么 fold 应该如何在 pyspark 中工作?谢谢你的想法。

【问题讨论】:

【参考方案1】:

要了解这里发生了什么,让我们看一下 Spark 的 fold 操作的定义。由于您使用的是 PySpark,我将展示 Python 版本的代码,但 Scala 版本表现出完全相同的行为(您也可以browse the source on GitHub):

def fold(self, zeroValue, op):
    """
    Aggregate the elements of each partition, and then the results for all
    the partitions, using a given associative function and a neutral "zero
    value."
    The function Cop(t1, t2) is allowed to modify Ct1 and return it
    as its result value to avoid object allocation; however, it should not
    modify Ct2.
    >>> from operator import add
    >>> sc.parallelize([1, 2, 3, 4, 5]).fold(0, add)
    15
    """
    def func(iterator):
        acc = zeroValue
        for obj in iterator:
            acc = op(obj, acc)
        yield acc
    vals = self.mapPartitions(func).collect()
    return reduce(op, vals, zeroValue)

(对比见Scala implementation of RDD.fold)。

Spark 的fold 首先折叠每个分区,然后折叠结果。问题是一个空分区被折叠到零元素,所以最终的驱动程序端折叠最终会为 每个 分区折叠一个值,而不是为每个 非空 分区。这意味着fold的结果对分区数很敏感:

>>> sc.parallelize([1,25,8,4,2], 100).fold(0,lambda a,b:a+1 )
100
>>> sc.parallelize([1,25,8,4,2], 50).fold(0,lambda a,b:a+1 )
50
>>> sc.parallelize([1,25,8,4,2], 1).fold(0,lambda a,b:a+1 )
1

在最后一种情况下,发生的情况是单个分区被折叠到正确的值,然后该值在驱动程序处与零值折叠以产生 1。

看来,Spark 的fold() 操作实际上要求 fold 函数除了具有关联性之外还具有可交换性。实际上,Spark 中的其他地方也强加了此要求,例如,混洗分区中元素的顺序在运行时可能是不确定的(请参阅SPARK-5750)。

我已打开 Spark JIRA 票证来调查此问题:https://issues.apache.org/jira/browse/SPARK-6416。

【讨论】:

感谢您的详尽回答,我忽略了考虑分区如何影响折叠。因此,为了澄清 Scala 的情况,RDD 分区与 Scala 的典型折叠行为有重要区别。也就是说,List(1,2,3,4).fold(0)((a,b)=> a+1) 将与 Spark 的 sc.parallelize(List(1,2,3,4)).fold(0)((a,b)=> a+1) 完全不同 @keegan,是的,没错:分区行为,加上某些排序不确定性的来源,意味着当您使用非可交换运算符时,Spark 折叠的行为当前不会像您期望的那样表现.为了进一步说明这一点,请尝试以下操作:在 Scala 中,Seq(2.0, 3.0).fold(1.0)((a, b) => pow(b, a)) 将始终返回 9.0。在 Spark 中尝试相同的操作:sc.parallelize(Seq(2.0, 3.0), 2).fold(1.0)((a, b) => pow(b, a))。在我的多核笔记本电脑上,Spark 版本将不确定地返回 8.0 还是 9.0,具体取决于哪个分区先完成。【参考方案2】:

让我试着举一些简单的例子来解释 spark 的 fold 方法。我将在这里使用 pyspark。

rdd1 = sc.parallelize(list([]),1)

上面一行将创建一个带有一个分区的空 rdd

rdd1.fold(10, lambda x,y:x+y)

这个产量输出为 20

rdd2 = sc.parallelize(list([1,2,3,4,5]),2)

上面的行将创建值为 1 到 5 的 rdd,总共将有 2 个分区

rdd2.fold(10, lambda x,y:x+y)

这将产生输出为 45

因此,在上述情况下,为了简单起见,这里发生的情况是您将第 0 个元素设为 10。因此,您将在 RDD 中获得的所有数字的总和现在加上 10(即第 0 个元素+所有其他元素 => 10+1+2+3+4+5 = 25)。现在我们也有两个分区(即分区数*零元素=> 2*10 = 20) fold 发出的最终输出是 25+20 = 45

使用类似的过程很清楚为什么 rdd1 上的折叠操作会产生 20 作为输出。

当我们有类似 rdd1.reduce(lambda x,y:x+y) 的空列表时,Reduce 失败

ValueError: Can not reduce() empty RDD

如果我们认为我们可以在 rdd 中有空列表,可以使用折叠 rdd1.fold(0, lambda x,y:x+y)

正如预期的那样,这将产生输出为 0。

【讨论】:

以上是关于pyspark折叠方法输出的主要内容,如果未能解决你的问题,请参考以下文章

有啥方法可以提高 PySpark 输出的效率吗?

如何捕获 pyspark foreachPartition 的日志输出?

在 pyspark 中转换或处理日期数据类型的最佳方法是啥

在 PySpark 中加入 270 列

PySpark 函数基于多列数据框创建自定义输出

运行 pyspark 时获取 Java 输出