记一次Spark Streaming 整合Flume-Pull方式报错
Posted 怒上王者
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了记一次Spark Streaming 整合Flume-Pull方式报错相关的知识,希望对你有一定的参考价值。
Spark Streaming从flume 中使用Pull拉取数据时,报如下错误:
12:18:35 INFO node.Application: Starting Sink spark-sink
12:18:35 INFO node.Application: Starting Source netcat-source
12:18:35 INFO source.NetcatSource: Source starting
12:18:35 INFO sink.SparkSink: Starting Spark Sink: spark-sink on port: 41414 and interface: 192.168.118.151 with pool size: 10 and transaction timeout: 60.
12:18:35 INFO source.NetcatSource: Created serverSocket:sun.nio.ch.ServerSocketChannelImpl[/192.168.118.151:44444]
12:18:36 INFO sink.SparkSink: Starting Avro server for sink: spark-sink
12:18:36 INFO sink.SparkSink: Blocking Sink Runner, sink will continue to run..
12:20:01 INFO ipc.NettyServer: [id: 0x78efe75f, /192.168.118.1:54529 => /192.168.118.151:41414] OPEN
12:20:01 INFO ipc.NettyServer: [id: 0x78efe75f, /192.168.118.1:54529 => /192.168.118.151:41414] BOUND: /192.168.118.151:41414
12:20:01 INFO ipc.NettyServer: [id: 0x78efe75f, /192.168.118.1:54529 => /192.168.118.151:41414] CONNECTED: /192.168.118.1:54529
12:20:02 WARN sink.SparkAvroCallbackHandler: Received an error batch - no events were received from channel!
12:20:02 WARN sink.SparkAvroCallbackHandler: Received an error batch - no events were received from channel!
12:20:02 WARN sink.SparkAvroCallbackHandler: Received an error batch - no events were received from channel!
12:20:02 WARN sink.SparkAvroCallbackHandler: Received an error batch - no events were received from channel!
12:20:02 WARN sink.SparkAvroCallbackHandler: Received an error batch - no events were received from channel!
12:20:02 WARN sink.SparkAvroCallbackHandler: Received an error batch - no events were received from channel!
12:20:02 WARN sink.SparkAvroCallbackHandler: Received an error batch - no events were received from channel!
12:20:02 WARN sink.SparkAvroCallbackHandler: Received an error batch - no events were received from channel!
12:20:02 WARN sink.SparkAvroCallbackHandler: Received an error batch - no events were received from channel!
12:20:02 WARN sink.SparkAvroCallbackHandler: Received an error batch - no events were received from channel!
12:20:02 WARN sink.TransactionProcessor: Error while processing transaction.
java.lang.IllegalStateException: begin() called when transaction is OPEN!
at com.google.common.base.Preconditions.checkState(Preconditions.java:145)
at org.apache.flume.channel.BasicTransactionSemantics.begin(BasicTransactionSemantics.java:131)
at org.apache.spark.streaming.flume.sink.TransactionProcessor$$anonfun$populateEvents$1.apply(TransactionProcessor.scala:114)
at org.apache.spark.streaming.flume.sink.TransactionProcessor$$anonfun$populateEvents$1.apply(TransactionProcessor.scala:113)
at scala.Option.foreach(Option.scala:236)
at org.apache.spark.streaming.flume.sink.TransactionProcessor.populateEvents(TransactionProcessor.scala:113)
at org.apache.spark.streaming.flume.sink.TransactionProcessor.call(TransactionProcessor.scala:243)
at org.apache.spark.streaming.flume.sink.TransactionProcessor.call(TransactionProcessor.scala:43)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:748)
12:20:02 WARN sink.TransactionProcessor: Spark was unable to successfully process the events. Transaction is being rolled back.
12:20:02 WARN sink.SparkAvroCallbackHandler: Received an error batch - no events were received from channel!
12:20:02 WARN sink.TransactionProcessor: Error while processing transaction.
java.lang.IllegalStateException: begin() called when transaction is OPEN!
at com.google.common.base.Preconditions.checkState(Preconditions.java:145)
at org.apache.flume.channel.BasicTransactionSemantics.begin(BasicTransactionSemantics.java:131)
at org.apache.spark.streaming.flume.sink.TransactionProcessor$$anonfun$populateEvents$1.apply(TransactionProcessor.scala:114)
at org.apache.spark.streaming.flume.sink.TransactionProcessor$$anonfun$populateEvents$1.apply(TransactionProcessor.scala:113)
at scala.Option.foreach(Option.scala:236)
at org.apache.spark.streaming.flume.sink.TransactionProcessor.populateEvents(TransactionProcessor.scala:113)
at org.apache.spark.streaming.flume.sink.TransactionProcessor.call(TransactionProcessor.scala:243)
at org.apache.spark.streaming.flume.sink.TransactionProcessor.call(TransactionProcessor.scala:43)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:748)
通过一番查找,终于找到了原因
报错原因:
flume的lib目录下的scala-library.xx jar 版本有问题。
解决办法:
在spark官网下载该jar包,将flume lib目录下原来的jar包删除,上传 新的scala-library-2.11.x.jar 包。
以上是关于记一次Spark Streaming 整合Flume-Pull方式报错的主要内容,如果未能解决你的问题,请参考以下文章
记一次Spark Streaming 整合Flume-Pull方式报错
Spark Streaming实时流处理项目实战Spark Streaming整合Flume实战二
Spark Streaming实时流处理项目实战Spark Streaming整合Flume实战二
013- Kafka应用之Kafka与Spark Streaming整合