使用 Kafka 直接流在 Yarn 上引发堆内存泄漏

Posted

技术标签:

【中文标题】使用 Kafka 直接流在 Yarn 上引发堆内存泄漏【英文标题】:Spark off heap memory leak on Yarn with Kafka direct stream 【发布时间】:2015-10-02 03:02:48 【问题描述】:

我正在使用 java 1.8.0_45 和 Kafka 直接流在 Yarn(Apache 发行版 2.6.0)上运行 spark 流式传输 1.4.0。我也在使用支持 scala 2.11 的 spark。

我看到的问题是驱动程序和执行程序容器都在逐渐增加物理内存使用量,直到纱线容器杀死它。我在我的驱动程序中配置了多达 192M 堆和 384 个堆外空间,但它最终用完了

堆内存在常规的 GC 周期中似乎没有问题。在任何此类运行中都没有遇到过 OutOffMemory

事实上,我并没有在 kafka 队列上产生任何流量,但仍然会发生这种情况。这是我正在使用的代码

object SimpleSparkStreaming extends App 

val conf = new SparkConf()
val ssc = new StreamingContext(conf,Seconds(conf.getLong("spark.batch.window.size",1L)));
ssc.checkpoint("checkpoint")
val topics = Set(conf.get("spark.kafka.topic.name")); 
    val kafkaParams = Map[String, String]("metadata.broker.list" -> conf.get("spark.kafka.broker.list"))
            val kafkaStream = KafkaUtils.createDirectStream[String,String,StringDecoder,StringDecoder](ssc, kafkaParams, topics)
            kafkaStream.foreachRDD(rdd => 
                rdd.foreach(x => 
                    println(x._2)
                )

            )
    kafkaStream.print()
            ssc.start() 

            ssc.awaitTermination()


我在 CentOS 7 上运行这个。用于 spark 提交的命令如下

./bin/spark-submit --class com.rasa.cloud.prototype.spark.SimpleSparkStreaming \
--conf spark.yarn.executor.memoryOverhead=256 \
--conf spark.yarn.driver.memoryOverhead=384 \
--conf spark.kafka.topic.name=test \
--conf spark.kafka.broker.list=172.31.45.218:9092 \
--conf spark.batch.window.size=1 \
--conf spark.app.name="Simple Spark Kafka application" \
--master yarn-cluster \
--num-executors 1 \
--driver-memory 192m \
--executor-memory 128m \
--executor-cores 1 \
/home/centos/spark-poc/target/lib/spark-streaming-prototype-0.0.1-SNAPSHOT.jar 

非常感谢任何帮助

问候,

阿普尔瓦

【问题讨论】:

我遇到了同样的问题你找到解决办法了吗? 我也有类似的问题,但还没有达到饱和点:***.com/questions/35693211/… 如果您找到解决方案,请告诉我 我发现自己处于同样的情况,您是否找到了原因或解决方法? 【参考方案1】:

尝试增加执行器核心。在您的示例中,唯一的核心专用于使用流数据,没有核心在传入数据中进行处理。

【讨论】:

这是DirectStream,一个执行器核心就可以了spark.apache.org/docs/latest/…【参考方案2】:

这可能是内存泄漏...您是否尝试过 conf.set("spark.executor.extraJavaOptions","-XX:+UseG1GC") ?

【讨论】:

【参考方案3】:

这不是 Kafka 的答案,这将与 Spark 隔离,并且它的编目系统在一致性持久性和大型操作方面很差。如果您一直在写入一个 perisitence 层(即在一个循环中,在一个大型操作之后重新持久化一个 DF 然后再次运行)或运行一个大型查询(即 inputDF.distinct.count); Spark 作业将开始将一些数据放入内存中,并且无法有效地删除过时的对象。

这意味着超时后能够快速运行一次的对象将逐渐减速,直到没有可用内存为止。对于家里的每个人来说,启动一个在环境中加载了大型 DataFrame 的 AWS EMR,运行以下查询:

var iterator = 1
val endState = 15
var currentCount = 0
while (iterator <= endState) 
  currentCount = inputDF.distinct.count
  print("The number of unique records are : " + currentCount)
  iterator = iterator + 1

在作业运行时观察 Spark UI 的内存管理,如果 DF 对会话来说足够大,您将开始注意到每次后续运行的运行时间都会下降,主要是块变得陈旧,但 Spark 是无法确定何时清理这些块。

我找到解决此问题的最佳方法是在本地编写我的 DF,清除持久层并重新加载数据。这是解决问题的“大锤”方法,但对于我的业务案例这是一个易于实现的解决方案,它使我们的大型表的运行时间增加了 90%(需要 540 分钟到大约 40 分钟,内存更少)。

我目前使用的代码是:

val interimDF = inputDF.action
val tempDF = interimDF.write.format(...).option("...","...").save("...")
spark.catalog.clearCache
val interimDF = spark.read..format(...).option("...","...").save("...").persist
interimDF.count

如果您不在子子流程中取消持久化 DF,这里有一个派生词:

val interimDF = inputDF.action
val tempDF = interimDF.write.format(...).option("...","...").save("...")
for ((k,v) <- sc.getPersistentRDDs) 
  v.unpersist()

val interimDF = spark.read..format(...).option("...","...").save("...").persist
interimDF.count

【讨论】:

以上是关于使用 Kafka 直接流在 Yarn 上引发堆内存泄漏的主要内容,如果未能解决你的问题,请参考以下文章

如何使用 kafka 流在 kafka 中进行请求响应?

Log4J调用kafka时JVM堆 内存溢出问题定位

C语言堆内存管理上出现的问题,内存泄露,野指针使用,非法释放指针

如何限制kafka-streams中的rocksdb内存使用量

血泪教训,线程池引发的内存泄露

堆栈内存映射