在另一个 RDD 的基础上修剪一个 RDD
Posted
技术标签:
【中文标题】在另一个 RDD 的基础上修剪一个 RDD【英文标题】:Prune one RDD on the basis of another RDD 【发布时间】:2016-11-26 09:04:06 【问题描述】:spark 中是否有办法根据另一个 RDD 过滤一个 RDD 中的元素,即使它们不共享相同的键?
我有两个 RDD - abc 和 xyz
abc.collect() 看起来像这样
[[a,b,c],[a,c,g,e],[a,e,b,x],[b,c]]
xyz.collect() 看起来像这样
[a,b,c]
现在我想从 RDD abc 中过滤掉 xyz 中不存在的所有元素。
经过上述操作,RDD Abc 应该是这样的:
[[a,b,c],[a,c],[a,b],[b,c]]
我写了一个看起来像这样的代码:
def prune(eachlist):
for i in eachlist:
if i in xyz:
return i
abc = abc.map(prune)
但是,这给了我这个错误:
Exception: It appears that you are attempting to broadcast an RDD or reference an RDD from an action or transformation. RDD transformations and actions can only be invoked by the driver, not inside of other transformations; for example, rdd1.map(lambda x: rdd2.values.count() * x) is invalid because the values transformation and count action cannot be performed inside of the rdd1.map transformation
我已经尝试过过滤器,查找而不是映射无济于事。我不断收到同样的错误。
我知道我可以在 xyz 上执行收集操作并解决此错误,但我在大型数据集上运行此操作并且执行 .collect() 会因为超出太多内存而杀死我的 AWS 服务器。因此,我需要在不使用 .collect() 或任何类似的昂贵操作的情况下执行此操作。
【问题讨论】:
【参考方案1】:你可以:
# Add index
abc.zipWithIndex() \
# Flatten values
.flatMap(lambda x: [(k, x[1]) for k in x[0]]) \
# Join with xyz (works as filter)
.join(xyz.map(lambda x: (x, None))) \
# Group back by index
.map(lambda x: (x[1][0], x[0])) \
.groupByKey() \
.map(lambda x: list(x[1]))
或者您可以在xyz
上创建布隆过滤器并使用它来映射abc
。
【讨论】:
我试过了,但结果是一个不可散列的类型列表错误 又试了一次。这次一步一步,它奏效了。不知道为什么它第一次不起作用。谢谢 LostinOverflow以上是关于在另一个 RDD 的基础上修剪一个 RDD的主要内容,如果未能解决你的问题,请参考以下文章