如何删除元素如何根据另一个rdd从一个rdd中删除元素并在pyspark中创建新的rdd?

Posted

技术标签:

【中文标题】如何删除元素如何根据另一个rdd从一个rdd中删除元素并在pyspark中创建新的rdd?【英文标题】:How to remove elements how to delete elemts from one rdd based on other rdd and create new rdd in pyspark? 【发布时间】:2018-12-04 13:50:59 【问题描述】:

我已经创建了 2 个如下所示的 Rdd

rdd=sc.parallelize([(0,'A'),(0,'B'),(1,'D'),(1,'B'),(1,'C'),(2,"A"),(2, "B"),(2, "E")])
rdd1=rdd.groupByKey().map(lambda x :list(x[1]))
rdd1.collect()
[['A', 'B'], ['D', 'B', 'C'], ['A', 'B', 'E']]
rdd2=sc.parallelize(['D','E'])
rdd2.collect()
Out[204]: ['D', 'E']

现在我想从 rdd1 中删除元素,如果它出现在 rdd2 中,即

我在 rdd2 ('D','E') 中有 2 个元素

现在我想从 rdd1 中删除这些元素。

我预期的 rdde3 是:

[['A', 'B'], ['B', 'C'], ['A', 'B']]

【问题讨论】:

【参考方案1】:

首先将第二个 rdd 的所有元素收集到一个列表中。应用过滤条件,然后进行分组。

from pyspark import SparkContext

sc = SparkContext('local')
rdd=sc.parallelize([(0,'A'),(0,'B'),(1,'D'),(1,'B'),(1,'C'),(2,"A"),(2, "B"),(2, "E")])
print(rdd.collect())
rdd1=rdd.groupByKey().map(lambda x :list(x[1]))
list1 = rdd1.collect()
print(list1)
rdd2=sc.parallelize(['D','E'])
list2 =rdd2.collect()
print(list2)

rdd2list = rdd2.collect()
filteredrdd = rdd.filter(lambda x: x[1] not in rdd2list)
finalrdd=filteredrdd.groupByKey().map(lambda x :list(x[1]))
print(finalrdd.collect())

这是 finalrdd 的输出:

[['A', 'B'], ['B', 'C'], ['A', 'B']]

根据您的评论更新:

def filter_list(x):
    return [ele for ele in x if ele not in rdd2list]


final2rdd = rdd1.map(lambda x: filter_list(x))
print(final2rdd.collect())

这是final2rdd的输出,和之前一样:

[['A', 'B'], ['B', 'C'], ['A', 'B']]

【讨论】:

您好,感谢您的回复,但根据我的问题,我想从 rdd1 中过滤。因为我想做与 rdd1 不同级别的多次相同的工作(过滤器)。你能帮我从rdd1而不是rdd过滤吗?提前致谢。 @Sai :我已根据您的评论更新了答案。

以上是关于如何删除元素如何根据另一个rdd从一个rdd中删除元素并在pyspark中创建新的rdd?的主要内容,如果未能解决你的问题,请参考以下文章

从 Spark RDD 中删除元素

如何从pyspark数据框列值中删除方括号

Spark:scala - 如何将集合从 RDD 转换为另一个 RDD

如何在 spark rdd 中获取最大值并将其删除?

pyspark 如何像在 scala .drop 中一样删除 rdd 列

从另一个访问特定的 RDD 分区