当键的数量少于每个键的值的数量时,PySpark reduceByKey 会继续失败

Posted

技术标签:

【中文标题】当键的数量少于每个键的值的数量时,PySpark reduceByKey 会继续失败【英文标题】:PySpark reduceByKey keep failing when the number of keys is small compared to the number of values per key 【发布时间】:2020-01-30 23:55:10 【问题描述】:

我有以下代码。

x = sc.parallelize([
  (1, [3]),
  (1, [4])
])

y = x.reduceByKey(lambda accum, n: accum.extend(n))
print(y.collect())

我希望y

[(1, [3, 4])]

然而,y 最终变成了

[(1, None)]

问题 1:为什么我得到的是 None 而不是数组?

为了解决上述问题,我执行以下操作。

import itertools

def merge(accum, n):
  arr = [accum, n]
  return list(itertools.chain.from_iterable(arr))

x = sc.parallelize([
  (1, [3]),
  (1, [4])
])

y = x.reduceByKey(lambda accum, n: merge(accum, n))
print(y.collect())

这一次,y[(1, [3, 4])]

问题 2:如何在第二种情况下合并(或减少)itertools

当我在一个非常大的数据集(数百万条记录)上应用 itertools 方法时,我收到以下错误。

Py4JJavaError: 调用时出错 z:org.apache.spark.api.python.PythonRDD.collectAndServe。 : org.apache.spark.SparkException:作业因阶段失败而中止: 阶段 108.0 中的任务 13 失败 4 次,最近一次失败:丢失任务 13.3 在阶段 108.0 (TID 14305, 15.5.15.31, executor 8): org.apache.spark.api.python.PythonException: Traceback (最近 最后调用):文件“/databricks/spark/python/pyspark/worker.py”,行 480,主要 process() 文件“/databricks/spark/python/pyspark/worker.py”,第 472 行,正在进行中 serializer.dump_stream(out_iter, outfile) 文件“/databricks/spark/python/pyspark/serializers.py”,第 509 行,在 转储流 write_int(len(bytes), stream) 文件“/databricks/spark/python/pyspark/serializers.py”,第 833 行,在 write_int stream.write(struct.pack("!i", value)) struct.error: 'i' 格式需要 -2147483648

问题 3:在互联网上搜索,我对这个错误没有任何线索。这个错误是什么意思?

我想做的是通过键对PairRDD 中的项目进行分组,然后遍历与单个键关联的值以构建逻辑对象。如果我使用 PySpark 的 groupByKey 函数,我会遇到 Spark 的 2 GB 限制。

问题 4:关于如何按键对项目进行分组,然后将所有这些项目(每个键)转换为其他项目(不触发内存限制)的任何想法?

在一个非常大的数据集上,尝试以下操作会给我RecursionError: maximum recursion depth exceeded while pickling an object

y = x.reduceByKey(lambda a, b: [a, b])

【问题讨论】:

arr1.extend(arr2) 返回None。这就解释了第一个问题。 That won't take you anywhere 【参考方案1】:

答案 1。 extend() 函数扩展列表但返回 None。如果要添加两个数组,则应使用“+”运算符。

x = sc.parallelize([
  (1, [3]),
  (1, [4])
])

y = x.reduceByKey(lambda accum, n: accum+n)
print(y.collect())

您可以阅读有关此here 的更多信息。

【讨论】:

以上是关于当键的数量少于每个键的值的数量时,PySpark reduceByKey 会继续失败的主要内容,如果未能解决你的问题,请参考以下文章

mongodb count 每个字段/键的不同值的数量

当键的名称中有点时(在 oracle 中),我如何访问 JSON 值?

有没有办法在多图内联中查找键的数量?

kafka主题分区的数量和数据中不同键的数量

groupBy 数据集每个键的数量有限制

关于map函数的方法参考,当键的类型为String时编译错误