如何在pyspark中广播一个巨大的rdd?

Posted

技术标签:

【中文标题】如何在pyspark中广播一个巨大的rdd?【英文标题】:How broadcast a huge rdd in pyspark? 【发布时间】:2019-02-25 03:55:43 【问题描述】:

当我打印出我的 rdd 的第一个元素时:

print("input = ".format(input.take(1)[0]))

我得到的结果是:(u'motor', [0.001,..., 0.9])

[0.001,..., 0.9] 的类型是一个列表。

输入rdd中元素个数等于53304100

当我想按以下方式广播输入 RDD 时,我的问题就来了:

brod = sc.broadcast(input.collect())

生成的异常如下(我只展示了异常的第一部分):

    WARN TaskSetManager: Lost task 56.0 in stage 1.0 (TID 176, 172.16.140.144, executor 0): org.apache.spark.api.python.PythonException: Traceback (most recent call last):
  File "/cvmfs/soft.computecanada.ca/easybuild/software/2017/Core/spark/2.3.0/python/lib/pyspark.zip/pyspark/worker.py", line 229, in main
    process()
  File "/cvmfs/soft.computecanada.ca/easybuild/software/2017/Core/spark/2.3.0/python/lib/pyspark.zip/pyspark/worker.py", line 224, in process
    serializer.dump_stream(func(split_index, iterator), outfile)
  File "/cvmfs/soft.computecanada.ca/easybuild/software/2017/Core/spark/2.3.0/python/lib/pyspark.zip/pyspark/serializers.py", line 372, in dump_stream
    vs = list(itertools.islice(iterator, batch))
TypeError: <lambda>() missing 1 required positional argument: 'document'

【问题讨论】:

【参考方案1】:

如果你的 RDD 太大,应用程序可能会遇到 OutOfMemory 错误,这是由于 collect 方法将所有数据拉取到驱动程序的内存通常不够大。

所以你可以尝试通过

来增加你的驱动程序的内存
pyspark --driver-memory 4g

【讨论】:

你的意思是 spark-submit --driver-memory 4g? 我增加了驱动程序内存,我得到了一个新的异常。请看我修改后的帖子 我认为你应该尝试从 spark execor UI 分析,例如 (jaceklaskowski.gitbooks.io/mastering-apache-spark/…)。它不仅包含错误日志,还包含更多信息。可以贴吗?我猜你的 rdd 还是太大了,也许你可以尝试将分区数增加到 100 [rdd.repartition(100)] 以将数据处理分布在节点之间,并将 shuffle 数据保持在 2 GB 以下。此外,广播仅适用于小数据集。您也可以尝试删除 boradcast。 我再次运行代码(severak 次)。当我将分区数增加到 128(2*32*2)时,没有出现旧的异常。我更新帖子以显示新帖子。顺便说一句,如果您有想法,请在***.com/questions/54540970/… 上提供帮助

以上是关于如何在pyspark中广播一个巨大的rdd?的主要内容,如果未能解决你的问题,请参考以下文章

如何知道 pyspark 中广播变量的可用内存量?

PySpark:如何从一个巨大的 RDD 中获取样本 RDD?

在pyspark如何广播和巨大的rdd?

Spark篇---Spark中广播变量和累加器

如何在自组织网络中广播?

如何在 em-websocket 中广播或建立连接?