记一次spark内存泄露问题
Posted rongyongfeikai2
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了记一次spark内存泄露问题相关的知识,希望对你有一定的参考价值。
问题定位:
引擎里有一处代码detectDf.persist(detectDf为DataFrame),后续回收动作用的代码为
val rdds = sc.getPersistentRDDs
rdds.foreach(x => x._2.unpersist())
分析:
1.DataFrame跟RDD相比,就是多了schema部分; DataFrame=RDD+schema
2.根据前几天堆内存的分析,schema是会放一份在driver端的
3.代码,只unpersist了RDD,那么schema部分就一直在driver里,这部分无法回收
结论:
这部分,是比数据本身小非常多的。所以也很符合之前观察到的,非常缓慢的内存随时间增长(当时观察1个小时涨30M)。
基线不断加载到driver侧,会大量占用driver内存空间,频繁触发gc以及新生代->老年代间的数据迁移。它只是明显化了这个现象,而非问题本身。
自测结果:
在代码内对于detectDf加上unpersist()后。在单机环境,以driver 1g/1000eps情况下运行UEBA,基线总量为50万。job运行时间为20220703 21:23:37 ~ 目前,基线总量不变的情况下,driver内存占用在一定范围上下浮动,无持续上涨情况。
可以用如下代码快速复现此现象:
import java.util.ArrayList
import org.apache.spark.sql.SQLContext
import scala.collection.JavaConverters
import org.apache.spark.storage.StorageLevel
val list:ArrayList[String] = new ArrayList[String]()
for(i <- 0 to 10000)
list.add("\\"sip\\":\\"1.1.1.1\\",\\"dip\\":\\"2.2.2.2\\"")
val sqlContext = new SQLContext(sc)
for(i <- 0 to 10000)
val rdd = sc.parallelize(JavaConverters.asScalaIteratorConverter(list.iterator()).asScala.toSeq)
val df = sqlContext.read.json(rdd)
df.persist(StorageLevel.MEMORY_AND_DISK)
df.count()
df.rdd.unpersist()
以上是关于记一次spark内存泄露问题的主要内容,如果未能解决你的问题,请参考以下文章