过滤掉pyspark RDD中的非数字值

Posted

技术标签:

【中文标题】过滤掉pyspark RDD中的非数字值【英文标题】:Filter out non digit values in pyspark RDD 【发布时间】:2020-08-23 09:01:50 【问题描述】:

我有一个如下所示的 RDD:

[["3331/587","Metro","1235","1000"],
["1234/232","City","8479","2000"],
["5987/215","Metro","1111","Unkown"],
["8794/215","Metro","1112","1000"],
["1254/951","City","6598","XXXX"],
["1584/951","City","1548","Unkown"],
["1833/331","Metro","1009","2000"],
["2213/987","City","1197", ]]

我想分别计算第二个条目(城市/地铁)中每个不同值的每行最后一个值(1000、2000 等)的平均值和最大值。我正在使用以下代码来收集“城市”值:

rdd.filter(lambda row: row[1] == 'City').map(lambda x: float(x[3])).collect()

但是,我得到了错误,可能是因为系列中的字符串值(例如“Unkown”)。

如何过滤掉包含字符串和空值的行(=只保留可转换为数字的行),然后计算最大值和平均值?

【问题讨论】:

【参考方案1】:

试试这个。

rdd = rdd.map(lambda l: [l[i].replace('"', '') for i in range(0, len(l))])
rdd = rdd.filter(lambda l: len(l) > 3) \
   .filter(lambda l: l[1] in ['City', 'Metro']) \
   .filter(lambda l: l[3].isdigit()) \
   .map(lambda l: (l[1], int(l[3]))) \

rdd_avg = rdd.aggregateByKey((0, 0), lambda a, b: (a[0] + b, a[1] + 1), lambda a, b: a + b).mapValues(lambda x: x[0] / x[1])
rdd_max = rdd.reduceByKey(lambda a, b: a if a > b else b)

print(rdd_avg.collect())
print(rdd_max.collect())

[('Metro', 1333.3333333333333), ('City', 2000.0)]
[('Metro', 2000), ('City', 2000)]

【讨论】:

谢谢。那很棒!可能还有其他一些字符串值,例如“未知”。我怎样才能排除所有这些并获得所有可以转换为数字的那些?另外,您计算了总和;如何获得平均值和最大值? 再检查一遍。 谢谢。 rdd.reduceByKey(lambda a, b: a + b) 给我求和,而不是平均。 使用此处的选项之一进行平均:***.com/questions/29930110/… 哦,我错过了。对不起。

以上是关于过滤掉pyspark RDD中的非数字值的主要内容,如果未能解决你的问题,请参考以下文章

如何从 pyspark.rdd.PipelinedRDD 中过滤掉值?

PySpark:在“NoneType”对象上过滤掉 RDD 元素失败是不可迭代的

如何在过滤器pyspark RDD中过滤掉某种模式[重复]

如何替换/删除 PySpark RDD 中的正则表达式?

Pyspark - 使用广播字典中的日期过滤 RDD

如何从 Pyspark 中的 RDD 中过滤