记一次Spark Streaming 整合Flume-Pull方式报错

Posted 怒上王者

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了记一次Spark Streaming 整合Flume-Pull方式报错相关的知识,希望对你有一定的参考价值。

Spark Streaming从flume 中使用Pull拉取数据时,报如下错误:

 12:18:35 INFO node.Application: Starting Sink spark-sink
 12:18:35 INFO node.Application: Starting Source netcat-source
 12:18:35 INFO source.NetcatSource: Source starting
 12:18:35 INFO sink.SparkSink: Starting Spark Sink: spark-sink on port: 41414 and interface: 192.168.118.151 with pool size: 10 and transaction timeout: 60.
 12:18:35 INFO source.NetcatSource: Created serverSocket:sun.nio.ch.ServerSocketChannelImpl[/192.168.118.151:44444]
 12:18:36 INFO sink.SparkSink: Starting Avro server for sink: spark-sink
 12:18:36 INFO sink.SparkSink: Blocking Sink Runner, sink will continue to run..
 12:20:01 INFO ipc.NettyServer: [id: 0x78efe75f, /192.168.118.1:54529 => /192.168.118.151:41414] OPEN
 12:20:01 INFO ipc.NettyServer: [id: 0x78efe75f, /192.168.118.1:54529 => /192.168.118.151:41414] BOUND: /192.168.118.151:41414
 12:20:01 INFO ipc.NettyServer: [id: 0x78efe75f, /192.168.118.1:54529 => /192.168.118.151:41414] CONNECTED: /192.168.118.1:54529
 12:20:02 WARN sink.SparkAvroCallbackHandler: Received an error batch - no events were received from channel! 
 12:20:02 WARN sink.SparkAvroCallbackHandler: Received an error batch - no events were received from channel! 
 12:20:02 WARN sink.SparkAvroCallbackHandler: Received an error batch - no events were received from channel! 
 12:20:02 WARN sink.SparkAvroCallbackHandler: Received an error batch - no events were received from channel! 
 12:20:02 WARN sink.SparkAvroCallbackHandler: Received an error batch - no events were received from channel! 
 12:20:02 WARN sink.SparkAvroCallbackHandler: Received an error batch - no events were received from channel! 
 12:20:02 WARN sink.SparkAvroCallbackHandler: Received an error batch - no events were received from channel! 
 12:20:02 WARN sink.SparkAvroCallbackHandler: Received an error batch - no events were received from channel! 
 12:20:02 WARN sink.SparkAvroCallbackHandler: Received an error batch - no events were received from channel! 
 12:20:02 WARN sink.SparkAvroCallbackHandler: Received an error batch - no events were received from channel! 
 12:20:02 WARN sink.TransactionProcessor: Error while processing transaction.
java.lang.IllegalStateException: begin() called when transaction is OPEN!
	at com.google.common.base.Preconditions.checkState(Preconditions.java:145)
	at org.apache.flume.channel.BasicTransactionSemantics.begin(BasicTransactionSemantics.java:131)
	at org.apache.spark.streaming.flume.sink.TransactionProcessor$$anonfun$populateEvents$1.apply(TransactionProcessor.scala:114)
	at org.apache.spark.streaming.flume.sink.TransactionProcessor$$anonfun$populateEvents$1.apply(TransactionProcessor.scala:113)
	at scala.Option.foreach(Option.scala:236)
	at org.apache.spark.streaming.flume.sink.TransactionProcessor.populateEvents(TransactionProcessor.scala:113)
	at org.apache.spark.streaming.flume.sink.TransactionProcessor.call(TransactionProcessor.scala:243)
	at org.apache.spark.streaming.flume.sink.TransactionProcessor.call(TransactionProcessor.scala:43)
	at java.util.concurrent.FutureTask.run(FutureTask.java:266)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
	at java.lang.Thread.run(Thread.java:748)
 12:20:02 WARN sink.TransactionProcessor: Spark was unable to successfully process the events. Transaction is being rolled back.
 12:20:02 WARN sink.SparkAvroCallbackHandler: Received an error batch - no events were received from channel! 
 12:20:02 WARN sink.TransactionProcessor: Error while processing transaction.
java.lang.IllegalStateException: begin() called when transaction is OPEN!
	at com.google.common.base.Preconditions.checkState(Preconditions.java:145)
	at org.apache.flume.channel.BasicTransactionSemantics.begin(BasicTransactionSemantics.java:131)
	at org.apache.spark.streaming.flume.sink.TransactionProcessor$$anonfun$populateEvents$1.apply(TransactionProcessor.scala:114)
	at org.apache.spark.streaming.flume.sink.TransactionProcessor$$anonfun$populateEvents$1.apply(TransactionProcessor.scala:113)
	at scala.Option.foreach(Option.scala:236)
	at org.apache.spark.streaming.flume.sink.TransactionProcessor.populateEvents(TransactionProcessor.scala:113)
	at org.apache.spark.streaming.flume.sink.TransactionProcessor.call(TransactionProcessor.scala:243)
	at org.apache.spark.streaming.flume.sink.TransactionProcessor.call(TransactionProcessor.scala:43)
	at java.util.concurrent.FutureTask.run(FutureTask.java:266)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
	at java.lang.Thread.run(Thread.java:748)

通过一番查找,终于找到了原因
报错原因:
flume的lib目录下的scala-library.xx jar 版本有问题。

解决办法:
在spark官网下载该jar包,将flume lib目录下原来的jar包删除,上传 新的scala-library-2.11.x.jar 包。
在这里插入图片描述

下载地址:scala-library-2.11.8.jar
在这里插入图片描述

以上是关于记一次Spark Streaming 整合Flume-Pull方式报错的主要内容,如果未能解决你的问题,请参考以下文章

记一次Spark Streaming 整合Flume-Pull方式报错

Spark Streaming实时流处理项目实战Spark Streaming整合Flume实战二

Spark Streaming实时流处理项目实战Spark Streaming整合Flume实战二

013- Kafka应用之Kafka与Spark Streaming整合

Spark 系列(十六)—— Spark Streaming 整合 Kafka

Spark Streaming实时流处理项目实战Spark Streaming整合Kafka实战一