如何在本地映射 RDD?

Posted

技术标签:

【中文标题】如何在本地映射 RDD?【英文标题】:How to I map over an RDD locally? 【发布时间】:2015-05-27 16:28:29 【问题描述】:

作为my previous question 的后续,我如何在本地映射 RDD,即在不实际使用collect 的情况下将数据收集到本地流中(因为数据太大了)。

具体来说,我想写一些类似的东西

from subprocess import Popen, PIPE
with open('out','w') as out:
    with open('err','w') as err:
        myproc = Popen([.....],stdin=PIPE,stdout=out,stderr=err)
myrdd.iterate_locally(lambda x: myproc.stdin.write(x+'\n'))

如何实现这个iterate_locally

有效吗:collect 返回值太大了:

myrdd.collect().foreach(lambda x: myproc.stdin.write(x+'\n'))

工作:foreach 以分布式模式执行其参数,在本地

myrdd.foreach(lambda x: myproc.stdin.write(x+'\n'))

相关:

Spark: Best practice for retrieving big data from RDD to local machine

【问题讨论】:

【参考方案1】:

RDD.foreachPartition 呢?您可以批量处理数据,如下所示:

myRdd.foreachPartition(it => it.collect.foreach(...))

如果您查看feature request history,RDD.foreachPartition 就是为了跨越这个中间地带而创建的。

【讨论】:

是否保证排序 RDD 按顺序处理 我相信——相信这是RDD.zipWithIndex 的基础,它可以让您在RDD 上创建相当于Long 类型的AUTO INCREMENT 索引。我知道RDD.zipWithIndex 保留了 RDD 的原始顺序。 我得到pickle.PicklingError: Cannot pickle files that are not opened for reading:显然,foreachPartition 仍然在分布式模式下执行,所以myproc.stdin 必须被挑选和运送(这显然是不可能的)。 嗯。我想这是有道理的。对此感到抱歉。【参考方案2】:

您最好的选择可能是将数据保存到本地计算机可以访问的源中,然后对其进行迭代。

如果这不是一个选项,并且假设您的本地计算机一次可以处理一个分区的数据,您是否可以有选择地一次带回一个分区(我会先缓存数据)然后执行类似于:

rdd.cache()
for partition in range(0, rdd.numPartitions):
  data = rdd.mapPartitionsWithIndex(lambda index, itr: [(index, list(itr))]
  localData = data.filter(lambda x: x[0] == partition).collect
  # Do worker here

【讨论】:

对不起,我不明白你所说的“来源”是什么意思。 如果您不想这样做,我还添加了一个快速的 sn-p。

以上是关于如何在本地映射 RDD?的主要内容,如果未能解决你的问题,请参考以下文章

rdd 上的映射如何在 pyspark 中工作?

如何通过 RDD Scala 与 join 进行映射

Spark - 如何使用有状态映射器对已排序的 RDD 进行平面映射?

如何解决嵌套地图函数中的 SPARK-5063

Scala RDD 映射

如何将多个文本文件读入单个 RDD?