spark - 在大型数据帧上执行 groupby 和聚合时,java 堆内存不足

Posted

技术标签:

【中文标题】spark - 在大型数据帧上执行 groupby 和聚合时,java 堆内存不足【英文标题】:spark - java heap out of memory when doing groupby and aggregation on a large dataframe 【发布时间】:2019-10-03 15:28:39 【问题描述】:

我是 spark 新手,没有 Java 编程经验。 我正在使用 pyspark 处理一个非常大的时间序列数据集,其中包含近 4000 个数字(浮点)列和数十亿行。

我想用这个数据集实现以下目标:

时间序列数据以 10 毫秒为间隔。我想按 1s 间隔对数据进行分组,并使用均值作为聚合函数。

这是我用来读取分区拼花文件的代码。

df = (spark.read.option("mergeSchema", "true")
           .parquet("/data/"))

这是我编写的 groupby 和聚合代码:

col_list = [... list of numeric columns in the dataframe ...]

agg_funcs = [mean]   # I also want to add other aggregation functions here later.

exprs     = [f(df[c]).alias(f.__name__ + '_' + c) for f in agg_funcs for c in col_list]

result = (df.groupBy(['Year', 'Month', 'Day', 'Hour', 'Minute', 'Second'])
            .agg(*exprs))

现在,我想将上述结果数据帧写入分区拼花:

(result.write.mode("overwrite")
       .partitionBy('Year', 'Month', 'Day', 'Hour', 'Minute', 'Second')
       .parquet('/out/'))

但是,我得到一个 java heap out of memory 错误。

我尝试增加spark.sql.shuffle.partitions 以使每个分区的大小更小,但这没有帮助。

我的 Spark 集群配置:

2 workers + 1 master
Both the worker nodes have 256 GB RAM and 32 cores each.
Master node has 8 cores and 32 GB RAM.

我为 Spark 作业指定的配置是:


    "driverMemory": "8G", 
    "driverCores": 4, 
    "executorMemory": "20G", 
    "executorCores": 4, 
    "numExecutors": 14, 
    "conf": 
        "spark.sql.shuffle.partitions": 2000000
    

以下是 Ambari 关于集群配置的一些截图:

YARN memory

YARN CPU

谁能帮我理解为什么会出现内存问题以及如何解决它?谢谢。

【问题讨论】:

【参考方案1】:

我认为这是由于数据倾斜而发生的,并且您的一个分区正在发生 OOM。

Spark 的 groupBy() 需要一次将所有键值加载到内存中才能执行 groupby。

增加分区不起作用,因为您可能拥有具有相似分组的大量数据。 按键检查您是否存在与相似组的数据偏差。

Check this article which explains this better.

【讨论】:

据我所知,没有数据倾斜,但我会在真正检查后回复。我正在处理的数据是每隔 10 毫秒收集一次的传感器数据。因此,数据几乎是均匀分布的。【参考方案2】:

为什么不在 groupBy 之前连接 'Year', 'Month', 'Day', 'Hour', 'Minute', 'Second'。在 groupBy 之后,您可以重新创建这些列。 我认为尝试不更改执行器核心,然后将其减少到 15,然后减少到 7。我认为 4 太低了

【讨论】:

从我阅读的几篇博客中,建议将执行器内核保持在 5 以下,以免影响 HDFS I/O 吞吐量。这不正确吗?例如,cloudera blog 表示每个执行程序 15 个内核可能导致 HDFS I/O 吞吐量不佳 我同意,但我认为在这里您需要每个执行程序更多的内存。这就是为什么我建议减少执行者的数量。我不确定它会起作用。您可以在更改配置之前先连接字段。 我在 'epoch' 上做了一个 groupBy 而不是单独的列,并且还减少了执行者的数量。这一次,我没有收到任何内存不足错误,但我在org.apache.spark.sql.execution.datasources.parquet.VectorizedColumnReader.readPage(VectorizedColumnReader.java:536) 收到了一个空指针异常。我还注意到只有少数任务在完成所有读取/处理。 您可以在执行 groupBy 之前重新分区数据,以便在集群中划分数据。您可以粘贴用于读取镶木地板数据的代码吗? 我用我用来读取分区镶木地板文件的代码更新了我的问题。这次我尝试读取更大的数据集,并且更多任务正在读取数据。早些时候,我测试的数据集大小只有 30 MB 压缩(4.5 GB 未压缩)。另外,我认为空指针异常是由于 parquet 格式错误,因为我尝试了正确的 parquet 文件并且没有 NPE。奇怪的是,pandas + fastparquet 能够加载格式错误的 parquet 文件,而 pandas + pyarrow 抱怨对于特定列的预期长度大于实际长度。

以上是关于spark - 在大型数据帧上执行 groupby 和聚合时,java 堆内存不足的主要内容,如果未能解决你的问题,请参考以下文章

使用 pandas 在数据帧上执行 groupby,按计数排序并获取 python 中的前 2 个计数

如何使用 spark-scala 在 spark 数据帧上执行枢轴?

在数据帧上的 pandas groupby 之后循环遍历组

在 pandas 数据帧上同时操作 groupby 和 resample?

具有两个分类变量的数据帧上的 Groupby 和 count() [重复]

如何在 pyspark 数据帧上应用 group by 并对结果对象进行转换