如何在 pyspark 中执行这个排序过程?

Posted

技术标签:

【中文标题】如何在 pyspark 中执行这个排序过程?【英文标题】:How to execute this sorting process in pyspark? 【发布时间】:2017-04-08 21:19:16 【问题描述】:

我尝试了 map、mapValues 和 sort,但没有任何效果。 问题描述如下: “根据相似度(值中的第二个),如果相同,则选择ID最小的用户(值中的第一个)。” 键值对的列表是:

[
    (18, [(2, 0.5)]),
    (30, [(19, 0.5), (6, 0.25)]),
    (6, [(30, 0.25), (20, 0.2), (19, 0.2)]),
    (19, [(30, 0.5), (8, 0.2), (6, 0.2)]),
    (2, [(18, 0.5)]),
    (26, [(9, 0.2)]),
    (9, [(26, 0.2)])
]

我想得到:

[
    (18, [(2, 0.5)]),
    (30, [(19, 0.5), (6, 0.25)]),
    (6, [(30, 0.25), (19, 0.2)]),
    (19, [(30, 0.5), (6, 0.2)]),
    (2, [(18, 0.5)]),
    (26, [(9, 0.2)]),
    (9, [(26, 0.2)])
]

非常感谢!

【问题讨论】:

【参考方案1】:

非常简单。只需要弄清楚必要的转换:

data = [(18, [(2, 0.5)]),
 (30, [(19, 0.5), (6, 0.25)]),
 (6, [(30, 0.25), (20, 0.2), (19, 0.2)]),
 (19, [(30, 0.5), (8, 0.2), (6, 0.2)]),
 (2, [(18, 0.5)]),
 (26, [(9, 0.2)]),
 (9, [(26, 0.2)])]

rdd1 = sc.parallelize(data)

rdd2 = rdd1.flatMapValues(lambda x:x)

rdd3 = rdd2.map(lambda x: ((x[0], x[1][1]),x[1][0]))

rdd4 = rdd3.reduceByKey(min)

rdd5 = rdd4.map(lambda x: (x[0][0], (x[1], x[0][1])))

rdd6 = rdd5.reduceByKey(lambda x,y: [x,y])
rdd6.collect()
[(9, (26, 0.2)),
 (26, (9, 0.2)),
 (18, (2, 0.5)),
 (30, [(6, 0.25), (19, 0.5)]),
 (2, (18, 0.5)),
 (6, [(30, 0.25), (19, 0.2)]),
 (19, [(30, 0.5), (6, 0.2)])]

【讨论】:

以上是关于如何在 pyspark 中执行这个排序过程?的主要内容,如果未能解决你的问题,请参考以下文章

如何避免pyspark中加入操作中的过度洗牌?

连接后如何在 Pyspark Dataframe 中选择和排序多个列

如何在 Pyspark 中对数据框进行排序 [重复]

如何在pyspark中按字母顺序对嵌套结构的列进行排序?

Pyspark 数据框 OrderBy 分区级别还是整体?

如何在 pyspark 管道中打印最佳模型参数