在pyspark如何广播和巨大的rdd?

Posted

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了在pyspark如何广播和巨大的rdd?相关的知识,希望对你有一定的参考价值。

当我打印出我的rdd的第一个元素如下:

print("input = {}".format(input.take(1)[0]))

我得到一个结果:(u'motor', [0.001,..., 0.9])

[0.001,..., 0.9]的类型是一个列表。

输入rdd中的元素数等于53304100

当我想要将输入RDD广播如下时,我的问题出现了:

brod = sc.broadcast(input.collect())

生成的异常如下(我只显示了异常的第一部分):

    WARN TaskSetManager: Lost task 56.0 in stage 1.0 (TID 176, 172.16.140.144, executor 0): org.apache.spark.api.python.PythonException: Traceback (most recent call last):
  File "/cvmfs/soft.computecanada.ca/easybuild/software/2017/Core/spark/2.3.0/python/lib/pyspark.zip/pyspark/worker.py", line 229, in main
    process()
  File "/cvmfs/soft.computecanada.ca/easybuild/software/2017/Core/spark/2.3.0/python/lib/pyspark.zip/pyspark/worker.py", line 224, in process
    serializer.dump_stream(func(split_index, iterator), outfile)
  File "/cvmfs/soft.computecanada.ca/easybuild/software/2017/Core/spark/2.3.0/python/lib/pyspark.zip/pyspark/serializers.py", line 372, in dump_stream
    vs = list(itertools.islice(iterator, batch))
TypeError: <lambda>() missing 1 required positional argument: 'document'
答案

如果您的RDD太大,应用程序可能会遇到OutOfMemory错误,这会导致collect方法将所有数据拉入驱动程序的内存,这通常不够大。

因此,您可以尝试增加驾驶员的记忆力

pyspark --driver-memory 4g

以上是关于在pyspark如何广播和巨大的rdd?的主要内容,如果未能解决你的问题,请参考以下文章

PySpark:如何从一个巨大的 RDD 中获取样本 RDD?

Pyspark - 使用广播字典中的日期过滤 RDD

如何在 PySpark 中广播 RDD?

pyspark - 使用 RDD 进行聚合比 DataFrame 快得多

如何在pyspark中标准化RDD?

如何在 PySpark 中的 RDD 中的列中查找标准差