如何在本地映射 RDD?
Posted
技术标签:
【中文标题】如何在本地映射 RDD?【英文标题】:How to I map over an RDD locally? 【发布时间】:2015-05-27 16:28:29 【问题描述】:作为my previous question 的后续,我如何在本地映射 RDD,即在不实际使用collect
的情况下将数据收集到本地流中(因为数据太大了)。
具体来说,我想写一些类似的东西
from subprocess import Popen, PIPE
with open('out','w') as out:
with open('err','w') as err:
myproc = Popen([.....],stdin=PIPE,stdout=out,stderr=err)
myrdd.iterate_locally(lambda x: myproc.stdin.write(x+'\n'))
如何实现这个iterate_locally
?
不有效吗:collect
返回值太大了:
myrdd.collect().foreach(lambda x: myproc.stdin.write(x+'\n'))
不工作:foreach
以分布式模式执行其参数,不在本地
myrdd.foreach(lambda x: myproc.stdin.write(x+'\n'))
相关:
Spark: Best practice for retrieving big data from RDD to local machine【问题讨论】:
【参考方案1】:RDD.foreachPartition
呢?您可以批量处理数据,如下所示:
myRdd.foreachPartition(it => it.collect.foreach(...))
如果您查看feature request history,RDD.foreachPartition
就是为了跨越这个中间地带而创建的。
【讨论】:
是否保证排序 RDD 按顺序处理? 我相信——相信这是RDD.zipWithIndex
的基础,它可以让您在RDD 上创建相当于Long 类型的AUTO INCREMENT 索引。我知道RDD.zipWithIndex
保留了 RDD 的原始顺序。
我得到pickle.PicklingError: Cannot pickle files that are not opened for reading
:显然,foreachPartition
仍然在分布式模式下执行,所以myproc.stdin
必须被挑选和运送(这显然是不可能的)。
嗯。我想这是有道理的。对此感到抱歉。【参考方案2】:
您最好的选择可能是将数据保存到本地计算机可以访问的源中,然后对其进行迭代。
如果这不是一个选项,并且假设您的本地计算机一次可以处理一个分区的数据,您是否可以有选择地一次带回一个分区(我会先缓存数据)然后执行类似于:
rdd.cache()
for partition in range(0, rdd.numPartitions):
data = rdd.mapPartitionsWithIndex(lambda index, itr: [(index, list(itr))]
localData = data.filter(lambda x: x[0] == partition).collect
# Do worker here
【讨论】:
对不起,我不明白你所说的“来源”是什么意思。 如果您不想这样做,我还添加了一个快速的 sn-p。以上是关于如何在本地映射 RDD?的主要内容,如果未能解决你的问题,请参考以下文章