使用 Python 计算 Spark 中成对 (K,V) RDD 中每个 KEY 的平均值

Posted

技术标签:

【中文标题】使用 Python 计算 Spark 中成对 (K,V) RDD 中每个 KEY 的平均值【英文标题】:Calculating the averages for each KEY in a Pairwise (K,V) RDD in Spark with Python 【发布时间】:2015-07-07 22:26:09 【问题描述】:

我想与 Python 解决方案分享这个特定的 Apache Spark,因为它的文档很差。

我想通过 KEY 计算 K/V 对的平均值(存储在 Pairwise RDD 中)。示例数据如下所示:

>>> rdd1.take(10) # Show a small sample.
[(u'2013-10-09', 7.60117302052786),
(u'2013-10-10', 9.322709163346612),
(u'2013-10-10', 28.264462809917358),
(u'2013-10-07', 9.664429530201343),
(u'2013-10-07', 12.461538461538463),
(u'2013-10-09', 20.76923076923077),
(u'2013-10-08', 11.842105263157894),
(u'2013-10-13', 32.32514177693762),
(u'2013-10-13', 26.249999999999996),
(u'2013-10-13', 10.693069306930692)]

现在下面的代码序列是一种不是最佳的方法,但它确实有效。这是我在想出更好的解决方案之前正在做的事情。这并不可怕,但是 - 正如您将在答案部分看到的那样 - 有一种更简洁、更有效的方法。

>>> import operator
>>> countsByKey = sc.broadcast(rdd1.countByKey()) # SAMPLE OUTPUT of countsByKey.value: u'2013-09-09': 215, u'2013-09-08': 69, ... snip ...
>>> rdd1 = rdd1.reduceByKey(operator.add) # Calculate the numerators (i.e. the SUMs).
>>> rdd1 = rdd1.map(lambda x: (x[0], x[1]/countsByKey.value[x[0]])) # Divide each SUM by it's denominator (i.e. COUNT)
>>> print(rdd1.collect())
  [(u'2013-10-09', 11.235365503035176),
   (u'2013-10-07', 23.39500642456595),
   ... snip ...
  ]

【问题讨论】:

【参考方案1】:

现在更好的方法是使用rdd.aggregateByKey() 方法。因为这种方法在 Apache Spark 和 Python 文档中的记录很差——这也是我写这篇问答的原因——直到最近我一直在使用上面的代码序列。但同样,它的效率较低,因此除非必要,否则避免这样做。

以下是使用rdd.aggregateByKey() 方法的方法(推荐):

通过 KEY,同时计算 SUM(我们要计算的平均值的分子)和 COUNT(我们要计算的平均值的分母):

>>> aTuple = (0,0) # As of Python3, you can't pass a literal sequence to a function.
>>> rdd1 = rdd1.aggregateByKey(aTuple, lambda a,b: (a[0] + b,    a[1] + 1),
                                       lambda a,b: (a[0] + b[0], a[1] + b[1]))

关于上面每个 ab 对的含义,以下是正确的(这样您就可以看到正在发生的事情):

   First lambda expression for Within-Partition Reduction Step::
   a: is a TUPLE that holds: (runningSum, runningCount).
   b: is a SCALAR that holds the next Value

   Second lambda expression for Cross-Partition Reduction Step::
   a: is a TUPLE that holds: (runningSum, runningCount).
   b: is a TUPLE that holds: (nextPartitionsSum, nextPartitionsCount).

最后,计算每个 KEY 的平均值,并收集结果。

>>> finalResult = rdd1.mapValues(lambda v: v[0]/v[1]).collect()
>>> print(finalResult)
      [(u'2013-09-09', 11.235365503035176),
       (u'2013-09-01', 23.39500642456595),
       (u'2013-09-03', 13.53240060820617),
       (u'2013-09-05', 13.141148418977687),
   ... snip ...
  ]

我希望aggregateByKey() 的这个问题和答案会有所帮助。

【讨论】:

这真是一个很好的答案。但是,我会注意到,由于PEP 3113,这仅与 python 2.x 兼容,因为 python 3.x 不再支持 lambda 表达式中的元组解包 @Tgsmith61591 谢谢。我添加了中间“aTuple”变量来解决这个问题。 (叹气,我想不出更好的标识符名称,大声笑)。 PEP 3113 不错! "key1", (1, 1) "key1", (2, 1) => "key1", (3, 2) 基于对a、b的相同解释:.aggregateByKey( aTuple, lambda a, b: (a[0] + b[0], a[1] + 1), lambda a, b: (a[0] + b[0], a[1] + b[1] ])) 这对我有用【参考方案2】:

在我看来,一个更易读的等价于带有两个 lambda 的 aggregateByKey 是:

rdd1 = rdd1 \
    .mapValues(lambda v: (v, 1)) \
    .reduceByKey(lambda a,b: (a[0]+b[0], a[1]+b[1]))

这样,整个平均计算将是:

avg_by_key = rdd1 \
    .mapValues(lambda v: (v, 1)) \
    .reduceByKey(lambda a,b: (a[0]+b[0], a[1]+b[1])) \
    .mapValues(lambda v: v[0]/v[1]) \
    .collectAsMap()

【讨论】:

有人能解释一下 lambda 函数中 a 和 b 的意义吗? @pat【参考方案3】:

只是添加一个关于此问题的直观且更短(但不好)的解决方案的注释。 Sam's Teach Yourself Apache Spark in 24 Hours这本书在上一章已经很好地解释了这个问题。

使用groupByKey 可以像这样轻松解决问题:

rdd = sc.parallelize([
        (u'2013-10-09', 10),
        (u'2013-10-09', 10),
        (u'2013-10-09', 13),
        (u'2013-10-10', 40),
        (u'2013-10-10', 45),
        (u'2013-10-10', 50)
    ])

rdd \
.groupByKey() \
.mapValues(lambda x: sum(x) / len(x)) \
.collect()

输出:

[('2013-10-10', 45.0), ('2013-10-09', 11.0)]

这是直观且吸引人的,但不要使用它groupByKey 不对映射器进行任何组合,而是将所有单独的键值对带到减速器。

尽可能避免groupByKey。使用@pat 之类的reduceByKey 解决方案。

【讨论】:

感谢您的建议和参考【参考方案4】:

对 prismalytics.io 的回答略有增强。

在某些情况下,计算总和可能会溢出数字,因为我们要对大量值求和。相反,我们可以保留平均值并继续根据平均值计算平均值,并且减少两个部分的计数。

如果您有两个部分的平均值,分别为 (a1, c1) 和 (a2, c2),则总体平均值为: 总计/计数 = (total1 + total2)/ (count1 + counts2) = (a1*c1 + a2*c2)/(c1+c2)

如果我们标记R = c2/c1,它可以进一步重写为a1/(1+R) + a2*R/(1+R) 如果我们进一步将Ri标记为1/(1+R),我们可以写成a1*Ri + a2*R*Ri

myrdd = sc.parallelize([1.1, 2.4, 5, 6.0, 2, 3, 7, 9, 11, 13, 10])
sumcount_rdd = myrdd.map(lambda n : (n, 1))
def avg(A, B):
    R = 1.0*B[1]/A[1]
    Ri = 1.0/(1+R);
    av = A[0]*Ri + B[0]*R*Ri
    return (av, B[1] + A[1]);

(av, counts) = sumcount_rdd.reduce(avg)
print(av)

这种方法可以通过简单地使用 mapValues 而不是 map 和 reduceByKey 而不是 reduce 来转换为 key-value。

来自:https://www.knowbigdata.com/blog/interview-questions-apache-spark-part-2

【讨论】:

以上是关于使用 Python 计算 Spark 中成对 (K,V) RDD 中每个 KEY 的平均值的主要内容,如果未能解决你的问题,请参考以下文章

在 Apache Spark Python 中自定义 K-means 的距离公式

在 Scala 中使用 K-means 对 Spark 进行图像分割

从点列表中成对欧几里德距离

使用 Spark MLlib 做 K-means 聚类分析[转]

在 Spark 中使用剪影聚类

学术成果第2期 | 一种基于Apache Spark的时空Ripley’s K函数优化与加速方法