PySpark reduceByKey 只有一个键
Posted
技术标签:
【中文标题】PySpark reduceByKey 只有一个键【英文标题】:PySpark reduceByKey by only one key 【发布时间】:2018-08-23 14:04:30 【问题描述】:我有这样的rdd
// Structure List[Tuple(x1, x2, value), Tuple(x1, x2, value)]
data = [('23', '98', 34), ('23', '89', 39), ('23', '12', 30), ('24', '12', 34), ('24', '14', 37), ('24', '16', 30)]
我正在寻找最终结果是 x1 的得分最大值以及与之关联的 x2 值。像这样
data = [('23', '89', 39), ('24', '14', 37)]
我尝试了reduceByKey
,但它给了我每个组合的最大值,这不是我想要的。
来自comment:
这是我尝试过的:
max_by_group = (
data.map(lambda x: (x[0], x))
.reduceByKey(lambda x1, x2: max(x1, x2, key=lambda x: x[-1]))
.values()
)
【问题讨论】:
这就是我所做的 max_by_group = ( data.map(lambda x: (x[0], x)) .reduceByKey(lambda x1, x2: max(x1, x2, key=lambda x : x[-1])) .values() ) 【参考方案1】:groupBy
第一个元素,然后通过元组中的第三个元素找到每个组的最大值:
(rdd.groupBy(lambda x: x[0])
.mapValues(lambda x: max(x, key=lambda y: y[2]))
.values()
).collect()
# [('24', '14', 37), ('23', '89', 39)]
或使用reduceByKey
:
(rdd.map(lambda x: (x[0], x))
.reduceByKey(lambda x, y: x if x[2] > y[2] else y)
.values()
).collect()
# [('24', '14', 37), ('23', '89', 39)]
【讨论】:
非常感谢。我尝试了这两种方法。自从有近十亿条记录以来,这需要一段时间。但令我惊讶的是,reduceByKey 比 groupBy 快,不知道为什么。您的 reduceByKey 解决方案也与我的几乎相同:) 是reduceByKey
在适用时比groupBy
效率更高,因为reduceByKey
经过优化,它在洗牌之前为每个分区组合数据;按分区聚合之后的改组可以最大限度地减少跨集群的数据传输,这通常是昂贵的。
嗯,我不知道。非常感谢您分享此信息。【参考方案2】:
如果您使用rdd
s,@Psidom 的答案就是您要寻找的答案。另一种选择是convert your rdd
to a DataFrame。
rdd = sc.parallelize(data)
df = rdd.toDF(["x1", "x2", "value"])
df.show()
#+---+---+-----+
#| x1| x2|value|
#+---+---+-----+
#| 23| 98| 34|
#| 23| 89| 39|
#| 23| 12| 30|
#| 24| 12| 34|
#| 24| 14| 37|
#| 24| 16| 30|
#+---+---+-----+
现在你可以group by x1
and filter the rows with the maximum value
:
import pyspark.sql.functions as f
from pyspark.sql import Window
w = Window.partitionBy('x1')
df.withColumn('maxValue', f.max('value').over(w))\
.where(f.col('value') == f.col('maxValue'))\
.drop('maxValue')\
.show()
#+---+---+-----+
#| x1| x2|value|
#+---+---+-----+
#| 23| 89| 39|
#| 24| 14| 37|
#+---+---+-----+
【讨论】:
【参考方案3】:从 itertools 导入 groupby:
[max(list(j),key=lambda x:x[2]) for i,j in groupby(data,key = lambda x:x[0])]
Out[335]: [('23', '89', 39), ('24', '14', 37)]
【讨论】:
这适用于 python,但 OP 正在询问 spark rdds。以上是关于PySpark reduceByKey 只有一个键的主要内容,如果未能解决你的问题,请参考以下文章
如何在 PySpark 中使用自定义行分组来 reduceByKey?
Pyspark - 在作为列表的 spark 数据框列上使用 reducebykey