如何在pyspark中广播一个巨大的rdd?
Posted
技术标签:
【中文标题】如何在pyspark中广播一个巨大的rdd?【英文标题】:How broadcast a huge rdd in pyspark? 【发布时间】:2019-02-25 03:55:43 【问题描述】:当我打印出我的 rdd 的第一个元素时:
print("input = ".format(input.take(1)[0]))
我得到的结果是:(u'motor', [0.001,..., 0.9])
[0.001,..., 0.9]
的类型是一个列表。
输入rdd中元素个数等于53304100
当我想按以下方式广播输入 RDD 时,我的问题就来了:
brod = sc.broadcast(input.collect())
生成的异常如下(我只展示了异常的第一部分):
WARN TaskSetManager: Lost task 56.0 in stage 1.0 (TID 176, 172.16.140.144, executor 0): org.apache.spark.api.python.PythonException: Traceback (most recent call last):
File "/cvmfs/soft.computecanada.ca/easybuild/software/2017/Core/spark/2.3.0/python/lib/pyspark.zip/pyspark/worker.py", line 229, in main
process()
File "/cvmfs/soft.computecanada.ca/easybuild/software/2017/Core/spark/2.3.0/python/lib/pyspark.zip/pyspark/worker.py", line 224, in process
serializer.dump_stream(func(split_index, iterator), outfile)
File "/cvmfs/soft.computecanada.ca/easybuild/software/2017/Core/spark/2.3.0/python/lib/pyspark.zip/pyspark/serializers.py", line 372, in dump_stream
vs = list(itertools.islice(iterator, batch))
TypeError: <lambda>() missing 1 required positional argument: 'document'
【问题讨论】:
【参考方案1】:如果你的 RDD 太大,应用程序可能会遇到 OutOfMemory 错误,这是由于 collect 方法将所有数据拉取到驱动程序的内存通常不够大。
所以你可以尝试通过
来增加你的驱动程序的内存pyspark --driver-memory 4g
【讨论】:
你的意思是 spark-submit --driver-memory 4g? 我增加了驱动程序内存,我得到了一个新的异常。请看我修改后的帖子 我认为你应该尝试从 spark execor UI 分析,例如 (jaceklaskowski.gitbooks.io/mastering-apache-spark/…)。它不仅包含错误日志,还包含更多信息。可以贴吗?我猜你的 rdd 还是太大了,也许你可以尝试将分区数增加到 100 [rdd.repartition(100)] 以将数据处理分布在节点之间,并将 shuffle 数据保持在 2 GB 以下。此外,广播仅适用于小数据集。您也可以尝试删除 boradcast。 我再次运行代码(severak 次)。当我将分区数增加到 128(2*32*2)时,没有出现旧的异常。我更新帖子以显示新帖子。顺便说一句,如果您有想法,请在***.com/questions/54540970/… 上提供帮助以上是关于如何在pyspark中广播一个巨大的rdd?的主要内容,如果未能解决你的问题,请参考以下文章