spark - 在大型数据帧上执行 groupby 和聚合时,java 堆内存不足
Posted
技术标签:
【中文标题】spark - 在大型数据帧上执行 groupby 和聚合时,java 堆内存不足【英文标题】:spark - java heap out of memory when doing groupby and aggregation on a large dataframe 【发布时间】:2019-10-03 15:28:39 【问题描述】:我是 spark 新手,没有 Java 编程经验。 我正在使用 pyspark 处理一个非常大的时间序列数据集,其中包含近 4000 个数字(浮点)列和数十亿行。
我想用这个数据集实现以下目标:
时间序列数据以 10 毫秒为间隔。我想按 1s 间隔对数据进行分组,并使用均值作为聚合函数。
这是我用来读取分区拼花文件的代码。
df = (spark.read.option("mergeSchema", "true")
.parquet("/data/"))
这是我编写的 groupby 和聚合代码:
col_list = [... list of numeric columns in the dataframe ...]
agg_funcs = [mean] # I also want to add other aggregation functions here later.
exprs = [f(df[c]).alias(f.__name__ + '_' + c) for f in agg_funcs for c in col_list]
result = (df.groupBy(['Year', 'Month', 'Day', 'Hour', 'Minute', 'Second'])
.agg(*exprs))
现在,我想将上述结果数据帧写入分区拼花:
(result.write.mode("overwrite")
.partitionBy('Year', 'Month', 'Day', 'Hour', 'Minute', 'Second')
.parquet('/out/'))
但是,我得到一个 java heap out of memory 错误。
我尝试增加spark.sql.shuffle.partitions
以使每个分区的大小更小,但这没有帮助。
我的 Spark 集群配置:
2 workers + 1 master
Both the worker nodes have 256 GB RAM and 32 cores each.
Master node has 8 cores and 32 GB RAM.
我为 Spark 作业指定的配置是:
"driverMemory": "8G",
"driverCores": 4,
"executorMemory": "20G",
"executorCores": 4,
"numExecutors": 14,
"conf":
"spark.sql.shuffle.partitions": 2000000
以下是 Ambari 关于集群配置的一些截图:
YARN memory
YARN CPU
谁能帮我理解为什么会出现内存问题以及如何解决它?谢谢。
【问题讨论】:
【参考方案1】:我认为这是由于数据倾斜而发生的,并且您的一个分区正在发生 OOM。
Spark 的 groupBy() 需要一次将所有键值加载到内存中才能执行 groupby。
增加分区不起作用,因为您可能拥有具有相似分组的大量数据。 按键检查您是否存在与相似组的数据偏差。
Check this article which explains this better.
【讨论】:
据我所知,没有数据倾斜,但我会在真正检查后回复。我正在处理的数据是每隔 10 毫秒收集一次的传感器数据。因此,数据几乎是均匀分布的。【参考方案2】:为什么不在 groupBy 之前连接 'Year', 'Month', 'Day', 'Hour', 'Minute', 'Second'
。在 groupBy 之后,您可以重新创建这些列。
我认为尝试不更改执行器核心,然后将其减少到 15,然后减少到 7。我认为 4 太低了
【讨论】:
从我阅读的几篇博客中,建议将执行器内核保持在 5 以下,以免影响 HDFS I/O 吞吐量。这不正确吗?例如,cloudera blog 表示每个执行程序 15 个内核可能导致 HDFS I/O 吞吐量不佳。 我同意,但我认为在这里您需要每个执行程序更多的内存。这就是为什么我建议减少执行者的数量。我不确定它会起作用。您可以在更改配置之前先连接字段。 我在 'epoch' 上做了一个 groupBy 而不是单独的列,并且还减少了执行者的数量。这一次,我没有收到任何内存不足错误,但我在org.apache.spark.sql.execution.datasources.parquet.VectorizedColumnReader.readPage(VectorizedColumnReader.java:536)
收到了一个空指针异常。我还注意到只有少数任务在完成所有读取/处理。
您可以在执行 groupBy 之前重新分区数据,以便在集群中划分数据。您可以粘贴用于读取镶木地板数据的代码吗?
我用我用来读取分区镶木地板文件的代码更新了我的问题。这次我尝试读取更大的数据集,并且更多任务正在读取数据。早些时候,我测试的数据集大小只有 30 MB 压缩(4.5 GB 未压缩)。另外,我认为空指针异常是由于 parquet 格式错误,因为我尝试了正确的 parquet 文件并且没有 NPE。奇怪的是,pandas + fastparquet
能够加载格式错误的 parquet 文件,而 pandas + pyarrow
抱怨对于特定列的预期长度大于实际长度。以上是关于spark - 在大型数据帧上执行 groupby 和聚合时,java 堆内存不足的主要内容,如果未能解决你的问题,请参考以下文章
使用 pandas 在数据帧上执行 groupby,按计数排序并获取 python 中的前 2 个计数
如何使用 spark-scala 在 spark 数据帧上执行枢轴?
在 pandas 数据帧上同时操作 groupby 和 resample?