如何从 pyspark.rdd.PipelinedRDD 中过滤掉值?

Posted

技术标签:

【中文标题】如何从 pyspark.rdd.PipelinedRDD 中过滤掉值?【英文标题】:How to filter out values from pyspark.rdd.PipelinedRDD? 【发布时间】:2017-10-26 08:59:20 【问题描述】:

我有一个名为myRDDpyspark.rdd.PipelinedRDD。这是它的示例内容:

[((111, u'BB', u'A'), (444, u'BB', u'A')),
 ((222, u'BB', u'A'), (888, u'BB', u'A')),
 ((333, u'BB', u'B'), (999, u'BB', u'A')),...]

我需要删除所有第三列值不一致的条目。预期的结果是这样的:

[((111, u'BB', u'A'), (444, u'BB', u'A')),
 ((222, u'BB', u'A'), (888, u'BB', u'A')),...]

我该怎么做?

【问题讨论】:

也许这可以帮助***.com/questions/25914789/… 【参考方案1】:

您可以使用带有 lambda 表达式的过滤器来检查每个元组对的第三个元素是否相同,例如:

l = [((111, u'BB', u'A'), (444, u'BB', u'A')),
     ((222, u'BB', u'A'), (888, u'BB', u'A')),
     ((333, u'BB', u'B'), (999, u'BB', u'A'))]

rdd = sc.parallelize(l)
rdd = rdd.filter(lambda x: x[0][2] == x[1][2])
result = rdd.collect()
print result

>>> [((111, u'BB', u'A'), (444, u'BB', u'A')), ((222, u'BB', u'A'), (888, u'BB', u'A'))]

要回答您的后续评论,请记住,lambda 只是一个函数,如果您有更复杂的逻辑,您可以将它写成一个函数。你可以这样做:

def do_stuff(x):
    if (x[0][2] == 'C') or (x[1][2] == 'C'):
        return x     
    else:
        if x[0][2] == x[1][2]: return x
    return None

rdd = rdd.map(do_stuff).filter(lambda x: x is not None)

res = rdd.collect()

【讨论】:

效果很好。顺便说一句,是否可以添加异常,例如排除值C?如果对的第三列中的任何一个值为C,则不应进行比较。

以上是关于如何从 pyspark.rdd.PipelinedRDD 中过滤掉值?的主要内容,如果未能解决你的问题,请参考以下文章

如何将数据从回收器适配器发送到片段 |如何从 recyclerview 适配器调用片段函数

如何从 Firebase 获取所有设备令牌?

如何直接从类调用从接口继承的方法?

如何从服务器获取和设置 android 中的 API(从服务器获取 int 值)?如何绑定和实现这个

如何从Mac从android studio中的fabric注销? [复制]

如何从设备中获取 PDF 文件以便能够从我的应用程序中上传?