PySpark reduceByKey 只有一个键

Posted

技术标签:

【中文标题】PySpark reduceByKey 只有一个键【英文标题】:PySpark reduceByKey by only one key 【发布时间】:2018-08-23 14:04:30 【问题描述】:

我有这样的rdd

// Structure List[Tuple(x1, x2, value), Tuple(x1, x2, value)]
data = [('23', '98', 34), ('23', '89', 39), ('23', '12', 30), ('24', '12', 34), ('24', '14', 37), ('24', '16', 30)]

我正在寻找最终结果是 x1 的得分最大值以及与之关联的 x2 值。像这样

data = [('23', '89', 39), ('24', '14', 37)]

我尝试了reduceByKey,但它给了我每个组合的最大值,这不是我想要的。

来自comment:

这是我尝试过的:

max_by_group = (
    data.map(lambda x: (x[0], x))
        .reduceByKey(lambda x1, x2: max(x1, x2, key=lambda x: x[-1])) 
        .values()
)

【问题讨论】:

这就是我所做的 max_by_group = ( data.map(lambda x: (x[0], x)) .reduceByKey(lambda x1, x2: max(x1, x2, key=lambda x : x[-1])) .values() ) 【参考方案1】:

groupBy第一个元素,然后通过元组中的第三个元素找到每个组的最大值:

(rdd.groupBy(lambda x: x[0])
    .mapValues(lambda x: max(x, key=lambda y: y[2]))
    .values()
).collect()
# [('24', '14', 37), ('23', '89', 39)]

或使用reduceByKey:

(rdd.map(lambda x: (x[0], x))
    .reduceByKey(lambda x, y: x if x[2] > y[2] else y)
    .values()
).collect()
# [('24', '14', 37), ('23', '89', 39)]

【讨论】:

非常感谢。我尝试了这两种方法。自从有近十亿条记录以来,这需要一段时间。但令我惊讶的是,reduceByKey 比 groupBy 快,不知道为什么。您的 reduceByKey 解决方案也与我的几乎相同:) reduceByKey在适用时比groupBy效率更高,因为reduceByKey经过优化,它在洗牌之前为每个分区组合数据;按分区聚合之后的改组可以最大限度地减少跨集群的数据传输,这通常是昂贵的。 嗯,我不知道。非常感谢您分享此信息。【参考方案2】:

如果您使用rdds,@Psidom 的答案就是您要寻找的答案。另一种选择是convert your rdd to a DataFrame。

rdd = sc.parallelize(data)
df = rdd.toDF(["x1", "x2", "value"])
df.show()
#+---+---+-----+
#| x1| x2|value|
#+---+---+-----+
#| 23| 98|   34|
#| 23| 89|   39|
#| 23| 12|   30|
#| 24| 12|   34|
#| 24| 14|   37|
#| 24| 16|   30|
#+---+---+-----+

现在你可以group by x1 and filter the rows with the maximum value:

import pyspark.sql.functions as f
from pyspark.sql import Window
w = Window.partitionBy('x1')
df.withColumn('maxValue', f.max('value').over(w))\
    .where(f.col('value') == f.col('maxValue'))\
    .drop('maxValue')\
    .show()
#+---+---+-----+
#| x1| x2|value|
#+---+---+-----+
#| 23| 89|   39|
#| 24| 14|   37|
#+---+---+-----+

【讨论】:

【参考方案3】:

从 itertools 导入 groupby:

[max(list(j),key=lambda x:x[2]) for i,j in groupby(data,key = lambda x:x[0])]

Out[335]: [('23', '89', 39), ('24', '14', 37)]

【讨论】:

这适用于 python,但 OP 正在询问 spark rdds。

以上是关于PySpark reduceByKey 只有一个键的主要内容,如果未能解决你的问题,请参考以下文章

PySpark 计数 groupby 与 None 键

如何在 PySpark 中使用自定义行分组来 reduceByKey?

Pyspark - 在作为列表的 spark 数据框列上使用 reducebykey

如何在pyspark中使用reduceByKey作为多键和单值[重复]

PySpark reduceByKey 对多个值

IndexError:在pyspark shell上使用reduceByKey操作时列出索引超出范围