Spark 流式接收器内存不足 (OOM)

Posted

技术标签:

【中文标题】Spark 流式接收器内存不足 (OOM)【英文标题】:Spark streaming receiver Out of Memory (OOM) 【发布时间】:2016-07-14 07:10:05 【问题描述】:

我遇到了接收器一次又一次重启的问题。 我正在使用 Spark 1.6.1。 我使用 Spark Streaming 从流中接收数据,然后使用 map 反序列化 pb 数据。

我的测试包含两种情况:

    接收数据直接打印即可:应用稳定 接收和反序列化:这会产生问题。发生时间不规律。 有 500Mb/分钟。我已将执行程序内存设置为 8GB。问题就像某些东西非常分配内存一样。但是,我不知道为什么。

我的代码:

val conf = new SparkConf().setAppName(args(8))
conf.set("spark.serializer",   "org.apache.spark.serializer.KryoSerializer")
conf.set("spark.streaming.stopGracefullyOnShutdown", "true")
conf.set("spark.streaming.backpressure.enabled","true")
conf.set("spark.speculation","true")
val sc = new SparkContext(conf)
val ssc = new StreamingContext(sc, Seconds(args(7).toInt))
val bigPipeStreams = (1 to args(3).toInt).map
    i => ssc.networkStream(
    new MyBigpipeLogagentReceiver(args(0),args(1),args(2),i,args(4),args(5),args(6).toInt)
)

val lines = ssc.union(bigPipeStreams)   
def deserializePbData(value: String) : String = 

if (null == value || value.isEmpty) 
    return ""

var cuid = ""
var os = ""
var channel = ""
var sv = ""
var resid = ""
var appid = ""
var prod = ""
try  //if exception,useless data,just drop it
    val timeStrIndex = value.indexOf(",\"time_str\"")
    var strAfterTruncation = ""
    if (-1 != timeStrIndex) 
        strAfterTruncation = value.substring(0,timeStrIndex) + ""
     else 
        strAfterTruncation = value
    
    val jsonData = JSONObject.fromObject(strAfterTruncation)
    //val jsonData = value.getAsJsonArray()
    val binBody = jsonData.getString("bin_body")
    val pbData = binBody.substring(1,binBody.length()-1).split(",").foldLeft(ArrayBuffer.empty[Byte])((b,a) => b +java.lang.Byte.parseByte(a)).drop(8).toArray
    Lighttpd.lighttpd_log.parseFrom(pbData).getRequest().getUrl().getUrlFields().getAutokvList().asScala.foreach(a => 
        a.getKey() match 
            case "cuid" => cuid += a.getValue()
            case "os" => os += a.getValue()
            case "channel" => channel += a.getValue()
            case "sv" => sv += a.getValue()
            case "resid" => resid += a.getValue()
            case "appid" => appid += a.getValue()
            case "prod" => prod += a.getValue()
            case _ => null
        
    )
    val decodeCuid = URLDecoder.decode(cuid, "UTF-8")
    os = os.toLowerCase()
    if (os.matches("android(.*)")) 
        os = "android"
     else if (os.matches("iphone(.*)")) 
        os = "iphone"
     else if (os.matches("ipad(.*)")) 
        os = "ipad"
     else if (os.matches("s60(.*)")) 
        os = "symbian"
     else if (os.matches("wp7(.*)")) 
        os = "wp7"
     else if (os.matches("wp(.*)")) 
        os = "wp"
     else if (os.matches("tizen(.*)")) 
        os = "tizen"

    val ifHasLogid = Lighttpd.lighttpd_log.parseFrom(pbData).hasLogid()
    val time = Lighttpd.lighttpd_log.parseFrom(pbData).getTime()
    if (ifHasLogid) 
        val logid = Lighttpd.lighttpd_log.parseFrom(pbData).getLogid()
        if (logid.isEmpty || logid.toString().equals("-") || !resid.toString().equals("01") || channel.isEmpty |!appid.isEmpty || !prod.isEmpty) 
            ""
         else 
            decodeCuid + "\001" + os + "\001" + channel + "\001" + sv + "\001" + "1" + "\001" + "1" + "\001" + time + "\n"
        
     else 
        ""
    
 catch 
    case _:Throwable => ""


lines.map(parseData).print()

错误文本:

016-07-12T12:00:01.546+0800: 5096.643: [GC (Allocation Failure) 
Desired survivor size 442499072 bytes, new threshold 1 (max 15)
[PSYoungGen: 0K->0K(2356736K)] 5059009K->5059009K(7949312K), 0.0103342 secs] [Times: user=0.21 sys=0.00, real=0.01 secs] 
2016-07-12T12:00:01.556+0800: 5096.654: [Full GC (Allocation Failure) [PSYoungGen: 0K->0K(2356736K)] [ParOldGen:    5059009K->5057376K(5592576K)] 5059009K->5057376K(7949312K), [Metaspace: 44836K->44490K(1089536K)], 0.8769617 secs] [Times: user=17.88   sys=0.04, real=0.88 secs] 
2016-07-12T12:00:02.434+0800: 5097.531: Total time for which application threads were stopped: 1.2951974 seconds, Stopping threads  took: 0.0000662 seconds
java.lang.OutOfMemoryError: Java heap space
Dumping heap to java_pid24310.hprof ...
2016-07-12T12:00:30.960+0800: 5126.057: Total time for which application threads were stopped: 28.5260812 seconds, Stopping threads     took: 0.0000995 seconds
Heap dump file created [5211252802 bytes in 28.526 secs]
#
# java.lang.OutOfMemoryError: Java heap space
# -XX:OnOutOfMemoryError="kill %p"
#   Executing /bin/sh -c "kill 24310"...
2016-07-12T12:00:31.589+0800: 5126.686: Total time for which application threads were stopped: 0.6289627 seconds, Stopping threads  took: 0.0001258 seconds
2016-07-12T12:00:31.595+0800: 5126.692: Total time for which application threads were stopped: 0.0004822 seconds, Stopping threads  took: 0.0001493 seconds
2016-07-12 12:00:31.597 [Thread-5] ERROR [Logging.scala:95] - Uncaught exception in thread Thread[Thread-5,5,main]
java.lang.OutOfMemoryError: Java heap space
    at java.util.Arrays.copyOf(Arrays.java:3236) ~[na:1.8.0_51]
    at java.io.ByteArrayOutputStream.grow(ByteArrayOutputStream.java:118) ~[na:1.8.0_51]
    at java.io.ByteArrayOutputStream.ensureCapacity(ByteArrayOutputStream.java:93) ~[na:1.8.0_51]
    at java.io.ByteArrayOutputStream.write(ByteArrayOutputStream.java:153) ~[na:1.8.0_51]
    at java.io.BufferedOutputStream.flushBuffer(BufferedOutputStream.java:82) ~[na:1.8.0_51]
    at java.io.BufferedOutputStream.write(BufferedOutputStream.java:126) ~[na:1.8.0_51]
    at com.esotericsoftware.kryo.io.Output.flush(Output.java:155) ~[    spark-assembly-1.6.1.0-baidu-SNAPSHOT-hadoop2.5.1.3-baidu-SNAPSHOT.jar:1.6.1.0-baidu-SNAPSHOT]
    at com.esotericsoftware.kryo.io.Output.require(Output.java:135) ~[  spark-assembly-1.6.1.0-baidu-SNAPSHOT-hadoop2.5.1.3-baidu-SNAPSHOT.jar:1.6.1.0-baidu-SNAPSHOT]
    at com.esotericsoftware.kryo.io.Output.writeString_slow(Output.java:420) ~[ spark-assembly-1.6.1.0-baidu-SNAPSHOT-hadoop2.5.1.3-baidu-SNAPSHOT.jar:1.6.1.0-baidu-SNAPSHOT]
    at com.esotericsoftware.kryo.io.Output.writeString(Output.java:326) ~[  spark-assembly-1.6.1.0-baidu-SNAPSHOT-hadoop2.5.1.3-baidu-SNAPSHOT.jar:1.6.1.0-baidu-SNAPSHOT]
    at com.esotericsoftware.kryo.serializers.DefaultSerializers$StringSerializer.write(DefaultSerializers.java:153) ~[  spark-assembly-1.6.1.0-baidu-SNAPSHOT-hadoop2.5.1.3-baidu-SNAPSHOT.jar:1.6.1.0-baidu-SNAPSHOT]
    at com.esotericsoftware.kryo.serializers.DefaultSerializers$StringSerializer.write(DefaultSerializers.java:146) ~[  spark-assembly-1.6.1.0-baidu-SNAPSHOT-hadoop2.5.1.3-baidu-SNAPSHOT.jar:1.6.1.0-baidu-SNAPSHOT]
    at com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:568) ~[ spark-assembly-1.6.1.0-baidu-SNAPSHOT-hadoop2.5.1.3-baidu-SNAPSHOT.jar:1.6.1.0-baidu-SNAPSHOT]
    at org.apache.spark.serializer.KryoSerializationStream.writeObject(KryoSerializer.scala:194) ~[ spark-assembly-1.6.1.0-baidu-SNAPSHOT-hadoop2.5.1.3-baidu-SNAPSHOT.jar:1.6.1.0-baidu-SNAPSHOT]
    at org.apache.spark.serializer.SerializationStream.writeAll(Serializer.scala:153) ~[    spark-assembly-1.6.1.0-baidu-SNAPSHOT-hadoop2.5.1.3-baidu-SNAPSHOT.jar:1.6.1.0-baidu-SNAPSHOT]
    at org.apache.spark.storage.BlockManager.dataSerializeStream(BlockManager.scala:1196) ~[    spark-assembly-1.6.1.0-baidu-SNAPSHOT-hadoop2.5.1.3-baidu-SNAPSHOT.jar:1.6.1.0-baidu-SNAPSHOT]
    at org.apache.spark.storage.BlockManager.dataSerialize(BlockManager.scala:1202) ~[  spark-assembly-1.6.1.0-baidu-SNAPSHOT-hadoop2.5.1.3-baidu-SNAPSHOT.jar:1.6.1.0-baidu-SNAPSHOT]
    at org.apache.spark.storage.BlockManager.doPut(BlockManager.scala:858) ~[   spark-assembly-1.6.1.0-baidu-SNAPSHOT-hadoop2.5.1.3-baidu-SNAPSHOT.jar:1.6.1.0-baidu-SNAPSHOT]
    at org.apache.spark.storage.BlockManager.putIterator(BlockManager.scala:645) ~[ spark-assembly-1.6.1.0-baidu-SNAPSHOT-hadoop2.5.1.3-baidu-SNAPSHOT.jar:1.6.1.0-baidu-SNAPSHOT]
    at org.apache.spark.streaming.receiver.BlockManagerBasedBlockHandler.storeBlock(ReceivedBlockHandler.scala:77) ~[   spark-assembly-1.6.1.0-baidu-SNAPSHOT-hadoop2.5.1.3-baidu-SNAPSHOT.jar:1.6.1.0-baidu-SNAPSHOT]
    at org.apache.spark.streaming.receiver.ReceiverSupervisorImpl.pushAndReportBlock(ReceiverSupervisorImpl.scala:157) ~[   spark-assembly-1.6.1.0-baidu-SNAPSHOT-hadoop2.5.1.3-baidu-SNAPSHOT.jar:1.6.1.0-baidu-SNAPSHOT]
    at org.apache.spark.streaming.receiver.ReceiverSupervisorImpl.pushArrayBuffer(ReceiverSupervisorImpl.scala:128) ~[  spark-assembly-1.6.1.0-baidu-SNAPSHOT-hadoop2.5.1.3-baidu-SNAPSHOT.jar:1.6.1.0-baidu-SNAPSHOT]
    at org.apache.spark.streaming.receiver.ReceiverSupervisorImpl$$anon$3.onPushBlock(ReceiverSupervisorImpl.scala:109) ~[  spark-assembly-1.6.1.0-baidu-SNAPSHOT-hadoop2.5.1.3-baidu-SNAPSHOT.jar:1.6.1.0-baidu-SNAPSHOT]
    at org.apache.spark.streaming.receiver.BlockGenerator.pushBlock(BlockGenerator.scala:296) ~[    spark-assembly-1.6.1.0-baidu-SNAPSHOT-hadoop2.5.1.3-baidu-SNAPSHOT.jar:1.6.1.0-baidu-SNAPSHOT]
    at org.apache.spark.streaming.receiver.BlockGenerator.org$apache$spark$streaming$receiver$BlockGenerator$$keepPushingBlocks(    BlockGenerator.scala:268) ~[spark-assembly-1.6.1.0-baidu-SNAPSHOT-hadoop2.5.1.3-baidu-SNAPSHOT.jar:1.6.1.0-baidu-SNAPSHOT]
    at org.apache.spark.streaming.receiver.BlockGenerator$$anon$1.run(BlockGenerator.scala:109) ~[  spark-assembly-1.6.1.0-baidu-SNAPSHOT-hadoop2.5.1.3-baidu-SNAPSHOT.jar:1.6.1.0-baidu-SNAPSHOT]
2016-07-12 12:00:31.600 [SIGTERM handler] ERROR [SignalLogger.scala:57] - RECEIVED SIGNAL 15: SIGTERM
2016-07-12T12:00:31.611+0800: 5126.708: Total time for which application threads were stopped: 0.0005602 seconds, Stopping threads  took: 0.0001765 seconds
2016-07-12T12:00:31.617+0800: 5126.714: Total time for which application threads were stopped: 0.0004800 seconds, Stopping threads  took: 0.0001412 seconds
2016-07-12 12:00:32.483 [Bigpipe Receiver-SendThread(cq01-bigpipe-proxy01.cq01.baidu.com:2181)] WARN  [ClientCnxnSocket.java:139] -     Connected to an old server; r-o mode will be unavailable
2016-07-12T12:00:32.507+0800: 5127.604: Total time for which application threads were stopped: 0.0004604 seconds, Stopping threads  took: 0.0001198 seconds
2016-07-12T12:00:32.509+0800: 5127.606: Total time for which application threads were stopped: 0.0002919 seconds, Stopping threads  took: 0.0001800 seconds
2016-07-12T12:00:32.509+0800: 5127.607: Total time for which application threads were stopped: 0.0002692 seconds, Stopping threads  took: 0.0001612 seconds
2016-07-12 12:00:32.549 [Bigpipe Receiver-SendThread(tc-bigpipe-proxy03.tc.baidu.com:2181)] WARN  [ClientCnxnSocket.java:139] -     Connected to an old server; r-o mode will be unavailable
2016-07-12T12:00:34.220+0800: 5129.317: [GC (Allocation Failure) 
Desired survivor size 424148992 bytes, new threshold 2 (max 15)
[PSYoungGen: 1931776K->188775K(2363904K)] 6989152K->5246152K(7956480K), 0.2569385 secs] [Times: user=0.00 sys=5.19, real=0.26 secs] 
2016-07-12T12:00:34.477+0800: 5129.575: Total time for which application threads were stopped: 0.2575019 seconds, Stopping threads  took: 0.0000384 seconds
2016-07-12T12:00:35.478+0800: 5130.575: Total time for which application threads were stopped: 0.0002786 seconds, Stopping threads  took: 0.0000424 seconds
2016-07-12T12:00:37.600+0800: 5132.697: [GC (Allocation Failure) 
Desired survivor size 482344960 bytes, new threshold 3 (max 15)
[PSYoungGen: 2120551K->387013K(2268160K)] 7177928K->5444389K(7860736K), 0.5153031 secs] [Times: user=0.00 sys=9.89, real=0.52 secs] 
2016-07-12T12:00:38.116+0800: 5133.213: Total time for which application threads were stopped: 0.5157529 seconds, Stopping threads  took: 0.0000427 seconds
2016-07-12T12:00:40.116+0800: 5135.213: Total time for which application threads were stopped: 0.0003171 seconds, Stopping threads  took: 0.0001000 seconds
2016-07-12T12:00:40.419+0800: 5135.516: [GC (Allocation Failure) 
Desired survivor size 599785472 bytes, new threshold 2 (max 15)
[PSYoungGen: 2240965K->471033K(2324992K)] 7298341K->5633517K(7917568K), 0.3621433 secs] [Times: user=0.12 sys=7.11, real=0.36 secs] 
2016-07-12T12:00:40.781+0800: 5135.878: Total time for which application threads were stopped: 0.3626080 seconds, Stopping threads  took: 0.0000429 seconds
2016-07-12T12:00:41.781+0800: 5136.879: Total time for which application threads were stopped: 0.0003301 seconds, Stopping threads  took: 0.0000947 seconds
2016-07-12T12:00:43.108+0800: 5138.205: [GC (Allocation Failure) 
Desired survivor size 620756992 bytes, new threshold 3 (max 15)
[PSYoungGen: 2324985K->378481K(2054656K)] 7487469K->5831048K(7647232K), 0.2593685 secs] [Times: user=0.66 sys=4.96, real=0.26 secs] 
2016-07-12T12:00:43.368+0800: 5138.465: [Full GC (Ergonomics) [PSYoungGen: 378481K->0K(2054656K)] [ParOldGen:   5452566K->4713601K(5592576K)] 5831048K->4713601K(7647232K), [Metaspace: 44635K->44635K(1089536K)], 4.3137405 secs] [Times: user=9.78    sys=74.53, real=4.31 secs] 
2016-07-12T12:00:47.682+0800: 5142.779: Total time for which application threads were stopped: 4.5736603 seconds, Stopping threads  took: 0.0000449 seconds
2016-07-12T12:00:47.682+0800: 5142.779: Total time for which application threads were stopped: 0.0002430 seconds, Stopping threads  took: 0.0000856 seconds
2016-07-12T12:00:49.954+0800: 5145.052: [GC (Allocation Failure) 
Desired survivor size 597688320 bytes, new threshold 4 (max 15)
[PSYoungGen: 1583616K->161266K(2189824K)] 6297217K->4874867K(7782400K), 0.0388138 secs] [Times: user=0.00 sys=0.84, real=0.04 secs] 
2016-07-12T12:00:49.993+0800: 5145.091: Total time for which application threads were stopped: 0.0392926 seconds, Stopping threads  took: 0.0000449 seconds
2016-07-12T12:00:51.903+0800: 5147.000: [GC (Allocation Failure) 
Desired survivor size 596115456 bytes, new threshold 5 (max 15)
[PSYoungGen: 1744882K->324587K(2213888K)] 6458483K->5038189K(7806464K), 0.0334029 secs] [Times: user=0.69 sys=0.03, real=0.04 secs] 
2016-07-12T12:00:51.936+0800: 5147.034: Total time for which application threads were stopped: 0.0338707 seconds, Stopping threads  took: 0.0000404 seconds
2016-07-12T12:00:53.942+0800: 5149.039: [GC (Allocation Failure) 
Desired survivor size 654835712 bytes, new threshold 6 (max 15)
[PSYoungGen: 1954795K->490438K(2120704K)] 6668397K->5204039K(7713280K), 0.0441762 secs] [Times: user=0.95 sys=0.02, real=0.05 secs] 
2016-07-12T12:00:53.986+0800: 5149.083: Total time for which application threads were stopped: 0.0446174 seconds, Stopping threads  took: 0.0000456 seconds
2016-07-12T12:00:56.102+0800: 5151.199: [GC (Allocation Failure) 
Desired survivor size 763887616 bytes, new threshold 5 (max 15)
[PSYoungGen: 2120646K->639467K(1943552K)] 6834247K->5370280K(7536128K), 0.1124828 secs] [Times: user=1.07 sys=1.30, real=0.11 secs] 
2016-07-12T12:00:56.214+0800: 5151.312: Total time for which application threads were stopped: 0.1129348 seconds, Stopping threads  took: 0.0000396 seconds
2016-07-12T12:00:57.784+0800: 5152.881: [GC (Allocation Failure) 
Desired survivor size 895483904 bytes, new threshold 4 (max 15)
[PSYoungGen: 1943531K->745977K(2050048K)] 6674344K->5504073K(7642624K), 0.0971717 secs] [Times: user=1.20 sys=0.67, real=0.10 secs] 
2016-07-12T12:00:57.881+0800: 5152.979: Total time for which application threads were stopped: 0.0977363 seconds, Stopping threads  took: 0.0000941 seconds
2016-07-12T12:00:59.406+0800: 5154.504: [GC (Allocation Failure) 
Desired survivor size 935329792 bytes, new threshold 5 (max 15)
[PSYoungGen: 2050041K->599188K(1715200K)] 6808137K->5647517K(7307776K), 0.3651465 secs] [Times: user=0.98 sys=5.88, real=0.37 secs] 
2016-07-12T12:00:59.772+0800: 5154.869: Total time for which application threads were stopped: 0.3656089 seconds, Stopping threads  took: 0.0000479 seconds
2016-07-12T12:01:00.968+0800: 5156.066: [GC (Allocation Failure) 
Desired survivor size 954204160 bytes, new threshold 4 (max 15)
[PSYoungGen: 1568404K->697830K(1667072K)] 6616733K->5746159K(7259648K), 0.0978955 secs] [Times: user=1.91 sys=0.04, real=0.09 secs] 
2016-07-12T12:01:01.066+0800: 5156.164: Total time for which application threads were stopped: 0.0983759 seconds, Stopping threads  took: 0.0000482 seconds
2016-07-12T12:01:02.189+0800: 5157.287: [GC (Allocation Failure) 
Desired survivor size 954204160 bytes, new threshold 3 (max 15)
[PSYoungGen: 1667046K->465454K(1864192K)] 6715375K->5855655K(7456768K), 0.1261993 secs] [Times: user=2.41 sys=0.29, real=0.12 secs] 
2016-07-12T12:01:02.316+0800: 5157.413: [Full GC (Ergonomics) [PSYoungGen: 465454K->65236K(1864192K)] [ParOldGen:   5390200K->5592328K(5592576K)] 5855655K->5657564K(7456768K), [Metaspace: 44635K->44635K(1089536K)], 3.2729437 secs] [Times: user=12.34   sys=57.11, real=3.28 secs] 
2016-07-12T12:01:05.589+0800: 5160.686: Total time for which application threads were stopped: 3.3998619 seconds, Stopping threads  took: 0.0000521 seconds
2016-07-12T12:01:05.589+0800: 5160.686: Total time for which application threads were stopped: 0.0002330 seconds, Stopping threads  took: 0.0000949 seconds
2016-07-12T12:01:05.688+0800: 5160.785: Total time for which application threads were stopped: 0.0002935 seconds, Stopping threads  took: 0.0000514 seconds
Heap
 PSYoungGen      total 1864192K, used 146620K [0x0000000715580000, 0x00000007c0000000, 0x00000007c0000000)
  eden space 932352K, 8% used [0x0000000715580000,0x000000071a4fa138,0x000000074e400000)
  from space 931840K, 7% used [0x0000000787200000,0x000000078b1b5290,0x00000007c0000000)
  to   space 931840K, 0% used [0x000000074e400000,0x000000074e400000,0x0000000787200000)
 ParOldGen       total 5592576K, used 5592328K [0x00000005c0000000, 0x0000000715580000, 0x0000000715580000)
  object space 5592576K, 99% used [0x00000005c0000000,0x00000007155420a8,0x0000000715580000)
 Metaspace       used 44654K, capacity 44990K, committed 45864K, reserved 1089536K
  class space    used 6212K, capacity 6324K, committed 6440K, reserved 1048576K 

新错误:我认为是上传错误导致了oom问题。我想知道如何解决这个上传错误?

    2016-07-15 11:41:47.307 [shuffle-client-0] ERROR [TransportChannelHandler.java:128] - Connection to     nmg01-taihang-d10207.nmg01.baidu.com/10.76.48.22:30456 has been quiet for 120000 ms while there are outstanding requests. Assuming  connection is dead; please adjust spark.network.timeout if this is wrong.
2016-07-15 11:41:47.309 [shuffle-client-0] ERROR [TransportResponseHandler.java:122] - Still have 1 requests outstanding when   connection from nmg01-taihang-d10207.nmg01.baidu.com/10.76.48.22:30456 is closed
2016-07-15 11:41:47.314 [shuffle-client-0] ERROR [Logging.scala:95] - Error while uploading block input-0-1468553896200
java.io.IOException: Connection from nmg01-taihang-d10207.nmg01.baidu.com/10.76.48.22:30456 closed
at org.apache.spark.network.client.TransportResponseHandler.channelUnregistered(TransportResponseHandler.java:124) [spark-assembly-1.6.1.0-baidu-SNAPSHOT-hadoop2.5.1.3-baidu-SNAPSHOT.jar:1.6.1.0-baidu-SNAPSHOT]
at org.apache.spark.network.server.TransportChannelHandler.channelUnregistered(TransportChannelHandler.java:94) [spark-assembly-1.6.1.0-baidu-SNAPSHOT-hadoop2.5.1.3-baidu-SNAPSHOT.jar:1.6.1.0-baidu-SNAPSHOT]
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelUnregistered(AbstractChannelHandlerContext.java:158) [spark-assembly-1.6.1.0-baidu-SNAPSHOT-hadoop2.5.1.3-baidu-SNAPSHOT.jar:1.6.1.0-baidu-SNAPSHOT]
at io.netty.channel.AbstractChannelHandlerContext.fireChannelUnregistered(AbstractChannelHandlerContext.java:144) [spark-assembly-1.6.1.0-baidu-SNAPSHOT-hadoop2.5.1.3-baidu-SNAPSHOT.jar:1.6.1.0-baidu-SNAPSHOT]
at io.netty.channel.ChannelInboundHandlerAdapter.channelUnregistered(ChannelInboundHandlerAdapter.java:53) [spark-assembly-1.6.1.0-baidu-SNAPSHOT-hadoop2.5.1.3-baidu-SNAPSHOT.jar:1.6.1.0-baidu-SNAPSHOT]
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelUnregistered(AbstractChannelHandlerContext.java:158) [spark-assembly-1.6.1.0-baidu-SNAPSHOT-hadoop2.5.1.3-baidu-SNAPSHOT.jar:1.6.1.0-baidu-SNAPSHOT]
at io.netty.channel.AbstractChannelHandlerContext.fireChannelUnregistered(AbstractChannelHandlerContext.java:144) [spark-assembly-1.6.1.0-baidu-SNAPSHOT-hadoop2.5.1.3-baidu-SNAPSHOT.jar:1.6.1.0-baidu-SNAPSHOT]
at io.netty.channel.ChannelInboundHandlerAdapter.channelUnregistered(ChannelInboundHandlerAdapter.java:53) [spark-assembly-1.6.1.0-baidu-SNAPSHOT-hadoop2.5.1.3-baidu-SNAPSHOT.jar:1.6.1.0-baidu-SNAPSHOT]
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelUnregistered(AbstractChannelHandlerContext.java:158) [spark-assembly-1.6.1.0-baidu-SNAPSHOT-hadoop2.5.1.3-baidu-SNAPSHOT.jar:1.6.1.0-baidu-SNAPSHOT]
at io.netty.channel.AbstractChannelHandlerContext.fireChannelUnregistered(AbstractChannelHandlerContext.java:144) [spark-assembly-1.6.1.0-baidu-SNAPSHOT-hadoop2.5.1.3-baidu-SNAPSHOT.jar:1.6.1.0-baidu-SNAPSHOT]
at io.netty.channel.ChannelInboundHandlerAdapter.channelUnregistered(ChannelInboundHandlerAdapter.java:53) [spark-assembly-1.6.1.0-baidu-SNAPSHOT-hadoop2.5.1.3-baidu-SNAPSHOT.jar:1.6.1.0-baidu-SNAPSHOT]
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelUnregistered(AbstractChannelHandlerContext.java:158) [spark-assembly-1.6.1.0-baidu-SNAPSHOT-hadoop2.5.1.3-baidu-SNAPSHOT.jar:1.6.1.0-baidu-SNAPSHOT]
at io.netty.channel.AbstractChannelHandlerContext.fireChannelUnregistered(AbstractChannelHandlerContext.java:144) [spark-assembly-1.6.1.0-baidu-SNAPSHOT-hadoop2.5.1.3-baidu-SNAPSHOT.jar:1.6.1.0-baidu-SNAPSHOT]
at io.netty.channel.DefaultChannelPipeline.fireChannelUnregistered(DefaultChannelPipeline.java:739) [spark-assembly-1.6.1.0-baidu-SNAPSHOT-hadoop2.5.1.3-baidu-SNAPSHOT.jar:1.6.1.0-baidu-SNAPSHOT]
at io.netty.channel.AbstractChannel$AbstractUnsafe$8.run(AbstractChannel.java:659) [spark-assembly-1.6.1.0-baidu-SNAPSHOT-hadoop2.5.1.3-baidu-SNAPSHOT.jar:1.6.1.0-baidu-SNAPSHOT]
at io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:357) [spark-assembly-1.6.1.0-baidu-SNAPSHOT-hadoop2.5.1.3-baidu-SNAPSHOT.jar:1.6.1.0-baidu-SNAPSHOT]
at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:357) [spark-assembly-1.6.1.0-baidu-SNAPSHOT-hadoop2.5.1.3-baidu-SNAPSHOT.jar:1.6.1.0-baidu-SNAPSHOT]
at io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:111) [spark-assembly-1.6.1.0-baidu-SNAPSHOT-hadoop2.5.1.3-baidu-SNAPSHOT.jar:1.6.1.0-baidu-SNAPSHOT]
at java.lang.Thread.run(Thread.java:745) [na:1.8.0_51]
2016-07-15T11:41:47.316+0800: 2176.487: Total time for which application threads were stopped: 0.0002632 seconds, Stopping threads  took: 0.0000521 seconds
2016-07-15 11:41:47.316 [Thread-5] WARN  [Logging.scala:91] - Failed to replicate input-0-1468553896200 to BlockManagerId(2,    nmg01-taihang-d10207.nmg01.baidu.com, 30456), failure #0
java.io.IOException: Connection from nmg01-taihang-d10207.nmg01.baidu.com/10.76.48.22:30456 closed
at org.apache.spark.network.client.TransportResponseHandler.channelUnregistered(TransportResponseHandler.java:124) ~[spark-assembly-1.6.1.0-baidu-SNAPSHOT-hadoop2.5.1.3-baidu-SNAPSHOT.jar:1.6.1.0-baidu-SNAPSHOT]
at org.apache.spark.network.server.TransportChannelHandler.channelUnregistered(TransportChannelHandler.java:94) ~[spark-assembly-1.6.1.0-baidu-SNAPSHOT-hadoop2.5.1.3-baidu-SNAPSHOT.jar:1.6.1.0-baidu-SNAPSHOT]
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelUnregistered(AbstractChannelHandlerContext.java:158) ~[spark-assembly-1.6.1.0-baidu-SNAPSHOT-hadoop2.5.1.3-baidu-SNAPSHOT.jar:1.6.1.0-baidu-SNAPSHOT]
at io.netty.channel.AbstractChannelHandlerContext.fireChannelUnregistered(AbstractChannelHandlerContext.java:144) ~[spark-assembly-1.6.1.0-baidu-SNAPSHOT-hadoop2.5.1.3-baidu-SNAPSHOT.jar:1.6.1.0-baidu-SNAPSHOT]
at io.netty.channel.ChannelInboundHandlerAdapter.channelUnregistered(ChannelInboundHandlerAdapter.java:53) ~[spark-assembly-1.6.1.0-baidu-SNAPSHOT-hadoop2.5.1.3-baidu-SNAPSHOT.jar:1.6.1.0-baidu-SNAPSHOT]
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelUnregistered(AbstractChannelHandlerContext.java:158) ~[spark-assembly-1.6.1.0-baidu-SNAPSHOT-hadoop2.5.1.3-baidu-SNAPSHOT.jar:1.6.1.0-baidu-SNAPSHOT]
at io.netty.channel.AbstractChannelHandlerContext.fireChannelUnregistered(AbstractChannelHandlerContext.java:144) ~[spark-assembly-1.6.1.0-baidu-SNAPSHOT-hadoop2.5.1.3-baidu-SNAPSHOT.jar:1.6.1.0-baidu-SNAPSHOT]
at io.netty.channel.ChannelInboundHandlerAdapter.channelUnregistered(ChannelInboundHandlerAdapter.java:53) ~[spark-assembly-1.6.1.0-baidu-SNAPSHOT-hadoop2.5.1.3-baidu-SNAPSHOT.jar:1.6.1.0-baidu-SNAPSHOT]
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelUnregistered(AbstractChannelHandlerContext.java:158) ~[spark-assembly-1.6.1.0-baidu-SNAPSHOT-hadoop2.5.1.3-baidu-SNAPSHOT.jar:1.6.1.0-baidu-SNAPSHOT]
at io.netty.channel.AbstractChannelHandlerContext.fireChannelUnregistered(AbstractChannelHandlerContext.java:144) ~[spark-assembly-1.6.1.0-baidu-SNAPSHOT-hadoop2.5.1.3-baidu-SNAPSHOT.jar:1.6.1.0-baidu-SNAPSHOT]
at io.netty.channel.ChannelInboundHandlerAdapter.channelUnregistered(ChannelInboundHandlerAdapter.java:53) ~[spark-assembly-1.6.1.0-baidu-SNAPSHOT-hadoop2.5.1.3-baidu-SNAPSHOT.jar:1.6.1.0-baidu-SNAPSHOT]
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelUnregistered(AbstractChannelHandlerContext.java:158) ~[spark-assembly-1.6.1.0-baidu-SNAPSHOT-hadoop2.5.1.3-baidu-SNAPSHOT.jar:1.6.1.0-baidu-SNAPSHOT]
at io.netty.channel.AbstractChannelHandlerContext.fireChannelUnregistered(AbstractChannelHandlerContext.java:144) ~[spark-assembly-1.6.1.0-baidu-SNAPSHOT-hadoop2.5.1.3-baidu-SNAPSHOT.jar:1.6.1.0-baidu-SNAPSHOT]
at io.netty.channel.DefaultChannelPipeline.fireChannelUnregistered(DefaultChannelPipeline.java:739) ~[spark-assembly-1.6.1.0-baidu-SNAPSHOT-hadoop2.5.1.3-baidu-SNAPSHOT.jar:1.6.1.0-baidu-SNAPSHOT]
at io.netty.channel.AbstractChannel$AbstractUnsafe$8.run(AbstractChannel.java:659) ~[spark-assembly-1.6.1.0-baidu-SNAPSHOT-hadoop2.5.1.3-baidu-SNAPSHOT.jar:1.6.1.0-baidu-SNAPSHOT]
at io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:357) ~[spark-assembly-1.6.1.0-baidu-SNAPSHOT-hadoop2.5.1.3-baidu-SNAPSHOT.jar:1.6.1.0-baidu-SNAPSHOT]
at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:357) ~[spark-assembly-1.6.1.0-baidu-SNAPSHOT-hadoop2.5.1.3-baidu-SNAPSHOT.jar:1.6.1.0-baidu-SNAPSHOT]
at io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:111) ~[spark-assembly-1.6.1.0-baidu-SNAPSHOT-hadoop2.5.1.3-baidu-SNAPSHOT.jar:1.6.1.0-baidu-SNAPSHOT]
at java.lang.Thread.run(Thread.java:745) ~[na:1.8.0_51]
2016-07-15T11:41:48.316+0800: 2177.487: Total time for which application threads were stopped: 0.0003391 seconds, Stopping threads  took: 0.0000979 seconds
2016-07-15T11:41:51.312+0800: 2180.483: [GC (Allocation Failure) --[PSYoungGen: 2894863K->2894863K(3007488K)]   8299519K->9550273K(9998336K), 0.7462118 secs] [Times: user=9.78 sys=0.02, real=0.74 secs] 
2016-07-15T11:41:52.059+0800: 2181.230: [Full GC (Ergonomics) [PSYoungGen: 2894863K->0K(3007488K)] [ParOldGen:  6655410K->6895736K(6990848K)] 9550273K->6895736K(9998336K), [Metaspace: 44409K->44409K(1087488K)], 0.4061892 secs] [Times: user=7.50    sys=0.01, real=0.41 secs] 

【问题讨论】:

@XBin,请不要将错误信息包含为屏幕截图。人们无法在 Screensot 中搜索文本。它通常更难阅读。要包含错误,请包含错误的实际文本,复制为文本并在代码块中格式化。如果错误的屏幕截图为阅读问题的人提供了纯文本以外的其他信息,那么除了将错误文本复制到问题中之外,还应包含一个 没有代码,minimal reproducible example,这个问题可能是题外话:寻求调试帮助的问题(“为什么这段代码不起作用?”)必须包括: •期望的行为, •特定的问题或错误在问题本身中重现它所需的最短代码。没有明确问题陈述的问题对其他读者没有用处。请参阅:如何创建 minimal reproducible example、What topics can I ask about here? 和 How do I ask a good question?。这意味着有足够的代码*完全重现问题。 谢谢,我附上代码和日志文件。 @XBin,您应该edit 您的问题以添加任何其他信息。附加信息不应作为您问题的答案。 Stack Exchange 有一个 Question --> Answer 格式。任何影响问题如何构建的信息都应放在问题中。虽然可能首先在评论中对更多信息的请求做出回应,但应将附加信息编辑到问题中。应保留问题以包含完全理解问题所需的所有信息,以便任何人都能提供答案。 [续] @Xbin,此类信息绝不应放在答案中。对于要删除的问题的实际答案并非实际答案的答案有一个正常的方法。 【参考方案1】:

您的代码似乎存在结构错误。在查看您的代码的过程中(重新缩进以反映所发布的结构),我发现您的最后一个 else if 声明:

 else if (os.matches("tizen(.*)")) 
    os = "tizen"

打开一个块,但不关闭它“应该”的块。相反,该块实际上终止于:

 catch 

原意(并重新缩进)的完整代码是:

val conf = new SparkConf().setAppName(args(8))
conf.set("spark.serializer",   "org.apache.spark.serializer.KryoSerializer")
conf.set("spark.streaming.stopGracefullyOnShutdown", "true")
conf.set("spark.streaming.backpressure.enabled","true")
conf.set("spark.speculation","true")
val sc = new SparkContext(conf)
val ssc = new StreamingContext(sc, Seconds(args(7).toInt))
val bigPipeStreams = (1 to args(3).toInt).map
    i => ssc.networkStream(
        new MyBigpipeLogagentReceiver(args(0),args(1),args(2),i,args(4),args(5),args(6).toInt)
    )

val lines = ssc.union(bigPipeStreams)   
def deserializePbData(value: String) : String = 

    if (null == value || value.isEmpty) 
        return ""
    
    var cuid = ""
    var os = ""
    var channel = ""
    var sv = ""
    var resid = ""
    var appid = ""
    var prod = ""
    try  //if exception,useless data,just drop it
        val timeStrIndex = value.indexOf(",\"time_str\"")
        var strAfterTruncation = ""
        if (-1 != timeStrIndex) 
            strAfterTruncation = value.substring(0,timeStrIndex) + ""
         else 
            strAfterTruncation = value
        
        val jsonData = JSONObject.fromObject(strAfterTruncation)
        //val jsonData = value.getAsJsonArray()
        val binBody = jsonData.getString("bin_body")
        val pbData = binBody.substring(1,binBody.length()-1).split(",").foldLeft(ArrayBuffer.empty[Byte])((b,a) => b +java.lang.Byte.parseByte(a)).drop(8).toArray
        Lighttpd.lighttpd_log.parseFrom(pbData).getRequest().getUrl().getUrlFields().getAutokvList().asScala.foreach(a => 
            a.getKey() match 
                case "cuid" => cuid += a.getValue()
                case "os" => os += a.getValue()
                case "channel" => channel += a.getValue()
                case "sv" => sv += a.getValue()
                case "resid" => resid += a.getValue()
                case "appid" => appid += a.getValue()
                case "prod" => prod += a.getValue()
                case _ => null
            
        )
        val decodeCuid = URLDecoder.decode(cuid, "UTF-8")
        os = os.toLowerCase()
        if (os.matches("android(.*)")) 
            os = "android"
         else if (os.matches("iphone(.*)")) 
            os = "iphone"
         else if (os.matches("ipad(.*)")) 
            os = "ipad"
         else if (os.matches("s60(.*)")) 
            os = "symbian"
         else if (os.matches("wp7(.*)")) 
            os = "wp7"
         else if (os.matches("wp(.*)")) 
            os = "wp"
         else if (os.matches("tizen(.*)")) 
            os = "tizen"
        

        val ifHasLogid = Lighttpd.lighttpd_log.parseFrom(pbData).hasLogid()
        val time = Lighttpd.lighttpd_log.parseFrom(pbData).getTime()
        if (ifHasLogid) 
            val logid = Lighttpd.lighttpd_log.parseFrom(pbData).getLogid()
            if (logid.isEmpty || logid.toString().equals("-") || !resid.toString().equals("01") || channel.isEmpty |!appid.isEmpty || !prod.isEmpty) 
                ""
             else 
                decodeCuid + "\001" + os + "\001" + channel + "\001" + sv + "\001" + "1" + "\001" + "1" + "\001" + time + "\n"
            
         else 
            ""
        
     catch 
        case _:Throwable => ""
    

lines.map(parseData).print()

我尚未检查您的代码的功能。这只是一个语法/结构问题,当您非常简要地查看您发布的代码时会发现它。

【讨论】:

很抱歉将问题置于答案中。 stackflow 中的第一个问题。 关于不关闭块,是复制错误。我的代码已将其关闭。

以上是关于Spark 流式接收器内存不足 (OOM)的主要内容,如果未能解决你的问题,请参考以下文章

在 Linux 上防止内存不足 (OOM) 冻结的最佳方法是啥?

java中的引用类型

Java内存溢出(OOM)分析

在 join 和 reduceByKey 中触发执行器内存不足

Ubuntu 16G内存在android系统 10/11/12/13编译 framework时候经常oom内存不足,电脑卡死解决办法stubs-docs-non-updatable metalava

Ubuntu 16G内存在android系统 10/11/12/13编译 framework时候经常oom内存不足,电脑卡死解决办法stubs-docs-non-updatable metalava