为啥由于 java.lang.OutOfMemoryError,Spark Streaming 在字符串解码时失败?
Posted
技术标签:
【中文标题】为啥由于 java.lang.OutOfMemoryError,Spark Streaming 在字符串解码时失败?【英文标题】:Why does Spark Streaming fail at String decoding due to java.lang.OutOfMemoryError?为什么由于 java.lang.OutOfMemoryError,Spark Streaming 在字符串解码时失败? 【发布时间】:2016-11-23 12:19:44 【问题描述】:我在 3 个节点的 YARN 集群上运行 Spark Streaming (createStream
API) 应用程序,每个节点有 128G RAM (!) 该应用程序从 Kafka 主题读取记录并写入 HDFS。
无论我为执行程序/驱动程序配置多少内存,大多数情况下应用程序都会因 Java 堆错误而失败/被终止(主要是接收器失败)。
16/11/23 13:00:20 WARN ReceiverTracker: Error reported by receiver for stream 0: Error handling message; exiting - java.lang.OutOfMemoryError: Java heap space
at java.lang.StringCoding$StringDecoder.decode(StringCoding.java:149)
at java.lang.StringCoding.decode(StringCoding.java:193)
at java.lang.String.<init>(String.java:426)
at java.lang.String.<init>(String.java:491)
at kafka.serializer.StringDecoder.fromBytes(Decoder.scala:50)
at kafka.serializer.StringDecoder.fromBytes(Decoder.scala:42)
at kafka.message.MessageAndMetadata.message(MessageAndMetadata.scala:32)
at org.apache.spark.streaming.kafka.KafkaReceiver$MessageHandler.run(KafkaInputDStream.scala:137)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
【问题讨论】:
【参考方案1】:如果您使用 KafkaUtil.createStream(....) 单个接收器将在 spark 执行器中运行,并且如果主题是分区的,则每个分区运行多个接收器线程。因此,如果您的流具有较大的字符串对象并且频率很高并且所有线程共享单个执行程序内存,您可能会遇到 OOM 问题。
以下是可能的解决方案。
由于作业在接收器内存不足时失败,首先检查批处理和块间隔属性。如果批处理间隔更大(例如 5 分钟),请尝试使用较小的值,例如(100 毫秒)。
将每秒接收记录的速率限制为“spark.streaming.receiver.maxRate”,同时确保 “spark.streaming.unpersist”值为“true”。
您可以使用 KafkaUtil.KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](streamingContext, kafkaParams, 话题)。在这种情况下,而不是单个接收器火花执行器 直接连接kafka分区leads并接收数据 并行(每个 kfka 分区是一个 KafkaRDD 分区)。不像 单个接收器执行器中的多个线程这里是多个 执行器将并行运行并分配负载。【讨论】:
你怎么知道我们得到了 OOM I 驱动程序或执行程序 来自你提到的日志 16/11/23 13:00:20 WARN ReceiverTracker:接收器为流 0 报告的错误:错误处理消息;退出 - java.lang.OutOfMemoryError: Java 堆空间 感谢我的应用程序通过设置'spark.流媒体。接收者。 maxRate',首先我尝试了“spark.streaming.backpressure.enabled”=“true”,因为此设置动态处理此类情况,但不知道为什么它不起作用。 @Prudvi Sagar 我对此有几个问题,我们有一些系统可以 24*7 向 Kafka 发送大量数据,在这种情况下,如果我的 spark 流应用程序宕机了几个小时或一天一旦流应用程序出现,火花将如何表现?据我所知,在检查点createStreams/DirectAPI
的帮助下,它将从卡夫卡的偏移量中读取它离开的位置。在这里我有疑问,火花将如何处理旧消息以及以什么标准spark 将为旧消息的配置批处理间隔选择事件/消息的数量,因为消息数量会很大。以上是关于为啥由于 java.lang.OutOfMemoryError,Spark Streaming 在字符串解码时失败?的主要内容,如果未能解决你的问题,请参考以下文章
为啥 Python pyusb usb.core 访问由于权限而被拒绝,为啥 rules.d 不能修复它?