使用 pyspark 在单次遍历数据中查找最小值/最大值

Posted

技术标签:

【中文标题】使用 pyspark 在单次遍历数据中查找最小值/最大值【英文标题】:finding min/max with pyspark in single pass over data 【发布时间】:2016-04-11 21:43:27 【问题描述】:

我有一个包含大量数字列表的 RDD(文件中的行长度),我想知道如何在单次传递数据中获取最小值/最大值。

我知道 Min 和 Max 函数,但这需要两次传递。

【问题讨论】:

【参考方案1】:

试试这个:

>>> from pyspark.statcounter import StatCounter
>>> 
>>> rdd = sc.parallelize([9, -1, 0, 99, 0, -10])
>>> stats = rdd.aggregate(StatCounter(), StatCounter.merge, StatCounter.mergeStats)
>>> stats.minValue, stats.maxValue
(-10.0, 99.0)

【讨论】:

【参考方案2】:

这是一个使用累加器的有效但不优雅的解决方案。不优雅之处在于您必须事先定义零/初始值,以免它们干扰数据:

from pyspark.accumulators import AccumulatorParam
class MinMaxAccumulatorParam(AccumulatorParam): 
    def zero(self, value): 
        return value
    def addInPlace(self, val1, val2): 
        return(min(val1[0],val2[0]), max(val1[1],val2[1]))

minmaxAccu = sc.accumulator([500,-500], MinMaxAccumulatorParam())

def g(x):
    global minmaxAccu
    minmaxAccu += (x,x)

rdd = sc.parallelize([1, 2, 3, 4, 5])

rdd.foreach(g)

In [149]: minmaxAccu.value
Out[149]: (1, 5)

【讨论】:

以上是关于使用 pyspark 在单次遍历数据中查找最小值/最大值的主要内容,如果未能解决你的问题,请参考以下文章

在pyspark中查找列表的最大值/最小值

pyspark 将最小值添加回数据框

从 PySpark 中的 RDD 中的数据中查找最小和最大日期

我应该在单次选择后提交吗

如何在单次访问报告中将 3 个连续行放入 3 个堆叠行中

使用 pyspark 跟踪和查找数据框中的最新值