Spark RDD:如何最有效地计算统计数据?

Posted

技术标签:

【中文标题】Spark RDD:如何最有效地计算统计数据?【英文标题】:Spark RDD: How to calculate statistics most efficiently? 【发布时间】:2016-10-11 15:43:57 【问题描述】:

假设存在类似如下的元组RDD:

(key1, 1)
(key3, 9)
(key2, 3)
(key1, 4)
(key1, 5)
(key3, 2)
(key2, 7)
...

计算每个键对应的统计信息的最有效(理想情况下是分布式)方法是什么? (目前,我特别希望计算标准偏差/方差。)据我了解,我的选择相当于:

    使用colStats function in MLLib: 如果认为需要进行其他统计计算,此方法的优点是可以轻松适应以后使用其他mllib.stat 函数。但是,它在包含每列数据的Vector 的 RDD 上运行,因此据我了解,这种方法需要在单个节点上收集每个键的完整值集,这似乎并不理想对于大型数据集。 Spark Vector 是否总是暗示Vector 中的数据驻留在本地,在单个节点上? 先执行groupByKey,然后执行stats 可能需要大量随机播放,as a result of the groupByKey operation。 执行aggregateByKey,初始化一个新的StatCounter,并使用StatCounter::merge作为序列和组合函数:这是recommended by this *** answer的方法,避免了选项2中的groupByKey。但是,我无法在 PySpark 中找到关于 StatCounter 的良好文档。

我喜欢选项 1,因为它使代码更具可扩展性,因为它可以使用具有类似合约的其他 MLLib 函数轻松适应更复杂的计算,但如果 Vector 输入固有地要求在本地收集数据集,那么它限制了代码可以有效操作的数据大小。在其他两个之间,选项 3 看起来更有效,因为它避免了 groupByKey,但我希望确认确实如此。

还有其他我没有考虑过的选择吗? (我目前使用的是 Python + PySpark,但如果存在语言差异,我也愿意接受 Java/Scala 中的解决方案。)

【问题讨论】:

finding min/max with pyspark in single pass over data的可能重复 【参考方案1】:

你可以试试reduceByKey。如果我们只想计算min(),这非常简单:

rdd.reduceByKey(lambda x,y: min(x,y)).collect()
#Out[84]: [('key3', 2.0), ('key2', 3.0), ('key1', 1.0)]

要计算mean,您首先需要创建(value, 1) 元组,我们在reduceByKey 操作中使用它来计算sumcount。最后我们将它们彼此分开以到达mean

meanRDD = (rdd
           .mapValues(lambda x: (x, 1))
           .reduceByKey(lambda x, y: (x[0]+y[0], x[1]+y[1]))
           .mapValues(lambda x: x[0]/x[1]))

meanRDD.collect()
#Out[85]: [('key3', 5.5), ('key2', 5.0), ('key1', 3.3333333333333335)]

对于variance,可以使用公式(sumOfSquares/count) - (sum/count)^2, 我们用以下方式翻译:

varRDD = (rdd
          .mapValues(lambda x: (1, x, x*x))
          .reduceByKey(lambda x,y: (x[0]+y[0], x[1]+y[1], x[2]+y[2]))
          .mapValues(lambda x: (x[2]/x[0] - (x[1]/x[0])**2)))

varRDD.collect()
#Out[106]: [('key3', 12.25), ('key2', 4.0), ('key1', 2.8888888888888875)]

我在虚拟数据中使用double类型的值而不是int来准确说明计算平均值和方差:

rdd = sc.parallelize([("key1", 1.0),
                      ("key3", 9.0),
                      ("key2", 3.0),
                      ("key1", 4.0),
                      ("key1", 5.0),
                      ("key3", 2.0),
                      ("key2", 7.0)])

【讨论】:

以上是关于Spark RDD:如何最有效地计算统计数据?的主要内容,如果未能解决你的问题,请参考以下文章

Spark 如何从故障节点恢复数据?

Spark之RDD

Spark学习之路 Spark之RDD[转]

Spark Core 的RDD

Spark快速入门之RDD编程模型

spark小案例——RDD,sparkSQL