当键的数量少于每个键的值的数量时,PySpark reduceByKey 会继续失败
Posted
技术标签:
【中文标题】当键的数量少于每个键的值的数量时,PySpark reduceByKey 会继续失败【英文标题】:PySpark reduceByKey keep failing when the number of keys is small compared to the number of values per key 【发布时间】:2020-01-30 23:55:10 【问题描述】:我有以下代码。
x = sc.parallelize([
(1, [3]),
(1, [4])
])
y = x.reduceByKey(lambda accum, n: accum.extend(n))
print(y.collect())
我希望y
是
[(1, [3, 4])]
然而,y
最终变成了
[(1, None)]
问题 1:为什么我得到的是 None
而不是数组?
为了解决上述问题,我执行以下操作。
import itertools
def merge(accum, n):
arr = [accum, n]
return list(itertools.chain.from_iterable(arr))
x = sc.parallelize([
(1, [3]),
(1, [4])
])
y = x.reduceByKey(lambda accum, n: merge(accum, n))
print(y.collect())
这一次,y
是 [(1, [3, 4])]
。
问题 2:如何在第二种情况下合并(或减少)itertools
?
当我在一个非常大的数据集(数百万条记录)上应用 itertools
方法时,我收到以下错误。
Py4JJavaError: 调用时出错 z:org.apache.spark.api.python.PythonRDD.collectAndServe。 : org.apache.spark.SparkException:作业因阶段失败而中止: 阶段 108.0 中的任务 13 失败 4 次,最近一次失败:丢失任务 13.3 在阶段 108.0 (TID 14305, 15.5.15.31, executor 8): org.apache.spark.api.python.PythonException: Traceback (最近 最后调用):文件“/databricks/spark/python/pyspark/worker.py”,行 480,主要 process() 文件“/databricks/spark/python/pyspark/worker.py”,第 472 行,正在进行中 serializer.dump_stream(out_iter, outfile) 文件“/databricks/spark/python/pyspark/serializers.py”,第 509 行,在 转储流 write_int(len(bytes), stream) 文件“/databricks/spark/python/pyspark/serializers.py”,第 833 行,在 write_int stream.write(struct.pack("!i", value)) struct.error: 'i' 格式需要 -2147483648
问题 3:在互联网上搜索,我对这个错误没有任何线索。这个错误是什么意思?
我想做的是通过键对PairRDD
中的项目进行分组,然后遍历与单个键关联的值以构建逻辑对象。如果我使用 PySpark 的 groupByKey
函数,我会遇到 Spark 的 2 GB 限制。
问题 4:关于如何按键对项目进行分组,然后将所有这些项目(每个键)转换为其他项目(不触发内存限制)的任何想法?
在一个非常大的数据集上,尝试以下操作会给我RecursionError: maximum recursion depth exceeded while pickling an object
。
y = x.reduceByKey(lambda a, b: [a, b])
【问题讨论】:
arr1.extend(arr2)
返回None
。这就解释了第一个问题。
That won't take you anywhere
【参考方案1】:
答案 1。 extend() 函数扩展列表但返回 None。如果要添加两个数组,则应使用“+”运算符。
x = sc.parallelize([
(1, [3]),
(1, [4])
])
y = x.reduceByKey(lambda accum, n: accum+n)
print(y.collect())
您可以阅读有关此here 的更多信息。
【讨论】:
以上是关于当键的数量少于每个键的值的数量时,PySpark reduceByKey 会继续失败的主要内容,如果未能解决你的问题,请参考以下文章