RDD操作对pyspark中的值进行排序

Posted

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了RDD操作对pyspark中的值进行排序相关的知识,希望对你有一定的参考价值。

我具有如下文件格式,

0, Alpha,-3.9, 4, 2001-02-01, 5, 20
0, Beta,-3.8, 3, 2002-02-01, 6, 21
1, Gamma,-3.7, 8, 2003-02-01, 7, 22
0, Alpha,-3.5, 4, 2004-02-01, 8, 23
0, Alpha,-3.9, 4, 2005-02-01, 8, 27

我想使用rdd操作按1st elements在每行中对不同的3rd elements进行排序。我更喜欢得到以下输出,

(Beta, 3)
(Alpha, 4)
(Gamma, 8)

这是我目前所做的,

rdd = sc.textFile(myDataset)
list_ = rdd.map(lambda line: line.split(",")).map(lambda e : e[1]).distinct().collect() 
new_ = list_.sortBy(lambda e : e[2])

但是我无法根据需要进行排序。谁能说出仅基于rdd的操作方法吗?

答案

[rdd = sc.textFile(myDataset)是正确的。

list_ = rdd.map(lambda line: line.split(",")).map(lambda e : e[1]).distinct().collect() 
new_ = list_.sortBy(lambda e : e[2]) # e[2] does not exist.

您已经在list_上调用了collect,因此它不再是RDD。然后,您继续在其上调用sortBy,因此它将不起作用。也许您在发布时犯了这个错误。主要问题是地图操作。您需要创建一个pairWiseRdd,但尚未创建一个。因此,没有e[2]可供选择。见下文。

>>> rdd.map(lambda line: line.split(",")).map(lambda e : e[1]).collect()
[' Alpha', ' Beta', ' Gamma', ' Alpha', ' Alpha']

以上没有您使用distinct()所需的值相反,您需要执行此操作

>>> list_ = rdd.map(lambda line: line.split(",")).map(lambda e : (e[1],e[3]))
>>> list_.collect()
[(' Alpha', ' 4'),
 (' Beta', ' 3'),
 (' Gamma', ' 8'),
 (' Alpha', ' 4'),
 (' Alpha', ' 4')]
>>> distinct_rdd = list_.distinct() #making stuff distinct
>>> distinct_rdd.collect()
[(' Alpha', ' 4'), (' Beta', ' 3'), (' Gamma', ' 8')]

现在我们已经制作了pairWiseRdd,我们可以使用每对的第二个值对其进行排序。

>>> sorted_rdd = distinct_rdd.sortBy( lambda x:x[1] )
>>> sorted_rdd.collect()
[(' Beta', ' 3'), (' Alpha', ' 4'), (' Gamma', ' 8')]

以上是关于RDD操作对pyspark中的值进行排序的主要内容,如果未能解决你的问题,请参考以下文章

如何仅使用堆栈操作对堆栈进行排序?

我应该在 PySpark 中选择 RDD 还是 DataFrame 之一?

PySpark - 按第二列对 RDD 进行排序

如何使用 Redux connect 中的操作对测试组件进行快照?

Pyspark - 尝试迭代 numpy 数组时出错

使用 pyspark 过滤数组中基于 RDD 的值