面对 Spark 上小数据集的大数据溢出

Posted

技术标签:

【中文标题】面对 Spark 上小数据集的大数据溢出【英文标题】:Facing large data spills for small datasets on spark 【发布时间】:2021-08-16 18:31:14 【问题描述】:

我正在尝试在此处可用的 NOA 数据集上运行一些 spark sql:

https://www.ncei.noaa.gov/data/global-summary-of-the-day/access/2021/

我正在尝试运行一些涉及分组和排序的查询。

df
      .groupBy("COUNTRY_FULL")
      .agg(max("rank"), last("consecutive").as("consecutive"))
      .withColumn("maxDays", maxDaysTornodoUdf(col("consecutive")))
      .sort(col("maxDays").desc)
      .limit(1)
      .show()

输入大小仅为 50 MB 压缩 csv,我在本地运行(4 核)。 这些是我使用的设置。

spark.driver.memory:14g spark.sql.windowExec.buffer.in.memory.threshold:20000 spark.sql.windowExec.buffer.spill.threshold : 20000 spark.sql.shuffle.partitions : 400

对于这么小的数据,我看到太多磁盘溢出

21/08/16 10:23:13 INFO UnsafeExternalSorter: Thread 54 spilling sort data of 416.0 MB to disk (371  times so far)
21/08/16 10:23:14 INFO UnsafeExternalSorter: Thread 79 spilling sort data of 416.0 MB to disk (130  times so far)
21/08/16 10:23:14 INFO UnsafeExternalSorter: Thread 53 spilling sort data of 400.0 MB to disk (240  times so far)
21/08/16 10:23:14 INFO UnsafeExternalSorter: Thread 69 spilling sort data of 400.0 MB to disk (24  times so far)
21/08/16 10:23:16 INFO UnsafeExternalSorter: Thread 54 spilling sort data of 416.0 MB to disk (372  times so far)
21/08/16 10:23:16 INFO UnsafeExternalSorter: Thread 79 spilling sort data of 416.0 MB to disk (131  times so far)

但是,当我检查 Spark UI 时,溢出似乎并不多

最终,火花作业终止并出现错误Not Enough memory 我不明白发生了什么。

【问题讨论】:

【参考方案1】:

您使用 400 作为spark.sql.shuffle.partitions,这对于您正在处理的数据大小来说太大了。

为更少的数据量使用更多的随机分区会导致更多的分区/任务,这会降低性能。阅读配置随机分区的最佳实践here。

尝试减少随机分区。您可以尝试将其设置为spark.sparkContext.defaultParallelism

【讨论】:

我该怎么做? 我尝试使用较低的值 4,但结果是一样的 spark.conf.set("spark.sql.shuffle.partitions",spark.sparkContext.defaultParallelism ) 仍然看到相同的日志。分区减少到 4 个,但溢出仍然保持不变

以上是关于面对 Spark 上小数据集的大数据溢出的主要内容,如果未能解决你的问题,请参考以下文章

Spark的五种JOIN策略解析

图解大数据 | 基于Spark RDD的大数据处理分析

Scala和Spark的大数据分析

如何加快 Spark 中的大数据框连接

大数据 | MongoDB + Spark: 完整的大数据解决方案

Spark入门,概述,部署,以及学习(Spark是一种快速通用可扩展的大数据分析引擎)