kafka + storm 错误 Async loop died

Posted 当你觉得为时已晚时,恰恰是最早的时候

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了kafka + storm 错误 Async loop died相关的知识,希望对你有一定的参考价值。

错误如下:

13524 [Thread-10-kafka-spout] ERROR b.s.util - Async loop died!

java.lang.RuntimeException: java.nio.channels.ClosedChannelException

at storm.kafka.ZkCoordinator.refresh(ZkCoordinator.java:103) ~[storm-0.0.1-SNAPSHOT.jar:?]

at storm.kafka.ZkCoordinator.getMyManagedPartitions(ZkCoordinator.java:69) ~[storm-0.0.1-SNAPSHOT.jar:?]

at storm.kafka.KafkaSpout.nextTuple(KafkaSpout.java:135) ~[storm-0.0.1-SNAPSHOT.jar:?]

at backtype.storm.daemon.executor$fn__5624$fn__5639$fn__5670.invoke(executor.clj:607) ~[storm-core-0.10.1.jar:0.10.1]

at backtype.storm.util$async_loop$fn__545.invoke(util.clj:479) [storm-core-0.10.1.jar:0.10.1]

at clojure.lang.AFn.run(AFn.java:22) [clojure-1.6.0.jar:?]

at java.lang.Thread.run(Thread.java:745) [?:1.8.0_66]

Caused by: java.nio.channels.ClosedChannelException

at kafka.network.BlockingChannel.send(BlockingChannel.scala:100) ~[storm-0.0.1-SNAPSHOT.jar:?]

at kafka.consumer.SimpleConsumer.liftedTree1$1(SimpleConsumer.scala:78) ~[storm-0.0.1-SNAPSHOT.jar:?]

at kafka.consumer.SimpleConsumer.kafka$consumer$SimpleConsumer$$sendRequest(SimpleConsumer.scala:68) ~[storm-0.0.1-SNAPSHOT.jar:?]

at kafka.consumer.SimpleConsumer.getOffsetsBefore(SimpleConsumer.scala:127) ~[storm-0.0.1-SNAPSHOT.jar:?]

at kafka.javaapi.consumer.SimpleConsumer.getOffsetsBefore(SimpleConsumer.scala:79) ~[storm-0.0.1-SNAPSHOT.jar:?]

at storm.kafka.KafkaUtils.getOffset(KafkaUtils.java:74) ~[storm-0.0.1-SNAPSHOT.jar:?]

at storm.kafka.KafkaUtils.getOffset(KafkaUtils.java:64) ~[storm-0.0.1-SNAPSHOT.jar:?]

at storm.kafka.PartitionManager.<init>(PartitionManager.java:89) ~[storm-0.0.1-SNAPSHOT.jar:?]

at storm.kafka.ZkCoordinator.refresh(ZkCoordinator.java:98) ~[storm-0.0.1-SNAPSHOT.jar:?]

... 6 more

13527 [Thread-10-kafka-spout] ERROR b.s.d.executor - 

java.lang.RuntimeException: java.nio.channels.ClosedChannelException

at storm.kafka.ZkCoordinator.refresh(ZkCoordinator.java:103) ~[storm-0.0.1-SNAPSHOT.jar:?]

at storm.kafka.ZkCoordinator.getMyManagedPartitions(ZkCoordinator.java:69) ~[storm-0.0.1-SNAPSHOT.jar:?]

at storm.kafka.KafkaSpout.nextTuple(KafkaSpout.java:135) ~[storm-0.0.1-SNAPSHOT.jar:?]

at backtype.storm.daemon.executor$fn__5624$fn__5639$fn__5670.invoke(executor.clj:607) ~[storm-core-0.10.1.jar:0.10.1]

at backtype.storm.util$async_loop$fn__545.invoke(util.clj:479) [storm-core-0.10.1.jar:0.10.1]

at clojure.lang.AFn.run(AFn.java:22) [clojure-1.6.0.jar:?]

at java.lang.Thread.run(Thread.java:745) [?:1.8.0_66]

Caused by: java.nio.channels.ClosedChannelException

at kafka.network.BlockingChannel.send(BlockingChannel.scala:100) ~[storm-0.0.1-SNAPSHOT.jar:?]

at kafka.consumer.SimpleConsumer.liftedTree1$1(SimpleConsumer.scala:78) ~[storm-0.0.1-SNAPSHOT.jar:?]

at kafka.consumer.SimpleConsumer.kafka$consumer$SimpleConsumer$$sendRequest(SimpleConsumer.scala:68) ~[storm-0.0.1-SNAPSHOT.jar:?]

at kafka.consumer.SimpleConsumer.getOffsetsBefore(SimpleConsumer.scala:127) ~[storm-0.0.1-SNAPSHOT.jar:?]

at kafka.javaapi.consumer.SimpleConsumer.getOffsetsBefore(SimpleConsumer.scala:79) ~[storm-0.0.1-SNAPSHOT.jar:?]

at storm.kafka.KafkaUtils.getOffset(KafkaUtils.java:74) ~[storm-0.0.1-SNAPSHOT.jar:?]

at storm.kafka.KafkaUtils.getOffset(KafkaUtils.java:64) ~[storm-0.0.1-SNAPSHOT.jar:?]

at storm.kafka.PartitionManager.<init>(PartitionManager.java:89) ~[storm-0.0.1-SNAPSHOT.jar:?]

at storm.kafka.ZkCoordinator.refresh(ZkCoordinator.java:98) ~[storm-0.0.1-SNAPSHOT.jar:?]

... 6 more

13528 [Thread-10-kafka-spout] ERROR o.a.s.s.o.a.z.s.NioserverCnxnFactory - Thread Thread[Thread-10-kafka-spout,5,main] died

java.lang.RuntimeException: java.lang.InterruptedException

at backtype.storm.util$wrap_in_runtime.invoke(util.clj:49) ~[storm-core-0.10.1.jar:0.10.1]

at backtype.storm.zookeeper$exists_node_QMARK_$fn__4306.invoke(zookeeper.clj:103) ~[storm-core-0.10.1.jar:0.10.1]

at backtype.storm.zookeeper$exists_node_QMARK_.invoke(zookeeper.clj:99) ~[storm-core-0.10.1.jar:0.10.1]

at backtype.storm.zookeeper$mkdirs.invoke(zookeeper.clj:115) ~[storm-core-0.10.1.jar:0.10.1]

at backtype.storm.cluster$mk_distributed_cluster_state$reify__4580.mkdirs(cluster.clj:130) ~[storm-core-0.10.1.jar:0.10.1]

at backtype.storm.cluster$mk_storm_cluster_state$reify__5120.report_error(cluster.clj:461) ~[storm-core-0.10.1.jar:0.10.1]

at backtype.storm.daemon.executor$throttled_report_error_fn$fn__5469.invoke(executor.clj:193) ~[storm-core-0.10.1.jar:0.10.1]

at backtype.storm.daemon.executor$mk_executor_data$fn__5523$fn__5524.invoke(executor.clj:256) ~[storm-core-0.10.1.jar:0.10.1]

at backtype.storm.util$async_loop$fn__545.invoke(util.clj:489) ~[storm-core-0.10.1.jar:0.10.1]

at clojure.lang.AFn.run(AFn.java:22) ~[clojure-1.6.0.jar:?]

at java.lang.Thread.run(Thread.java:745) [?:1.8.0_66]

Caused by: java.lang.InterruptedException

at java.lang.Object.wait(Native Method) ~[?:1.8.0_66]

at java.lang.Object.wait(Object.java:502) ~[?:1.8.0_66]

at org.apache.storm.shade.org.apache.zookeeper.ClientCnxn.submitRequest(ClientCnxn.java:1342) ~[storm-core-0.10.1.jar:0.10.1]

at org.apache.storm.shade.org.apache.zookeeper.ZooKeeper.exists(ZooKeeper.java:1040) ~[storm-core-0.10.1.jar:0.10.1]

at org.apache.storm.shade.org.apache.curator.framework.imps.ExistsBuilderImpl$2.call(ExistsBuilderImpl.java:172) ~[storm-core-0.10.1.jar:0.10.1]

at org.apache.storm.shade.org.apache.curator.framework.imps.ExistsBuilderImpl$2.call(ExistsBuilderImpl.java:161) ~[storm-core-0.10.1.jar:0.10.1]

at org.apache.storm.shade.org.apache.curator.RetryLoop.callWithRetry(RetryLoop.java:107) ~[storm-core-0.10.1.jar:0.10.1]

at org.apache.storm.shade.org.apache.curator.framework.imps.ExistsBuilderImpl.pathInForeground(ExistsBuilderImpl.java:157) ~[storm-core-0.10.1.jar:0.10.1]

at org.apache.storm.shade.org.apache.curator.framework.imps.ExistsBuilderImpl.forPath(ExistsBuilderImpl.java:148) ~[storm-core-0.10.1.jar:0.10.1]

at org.apache.storm.shade.org.apache.curator.framework.imps.ExistsBuilderImpl.forPath(ExistsBuilderImpl.java:36) ~[storm-core-0.10.1.jar:0.10.1]

at backtype.storm.zookeeper$exists_node_QMARK_$fn__4306.invoke(zookeeper.clj:102) ~[storm-core-0.10.1.jar:0.10.1]

... 9 more

13531 [ProcessThread(sid:0 cport:-1):] INFO  o.a.s.s.o.a.z.s.PrepRequestProcessor - Processed session termination for sessionid: 0x155260ee5c8000c

 

 

原因是版本不兼容问题,

storm-0.10.1

kafka_2.10-0.10.0.0

 

以上是关于kafka + storm 错误 Async loop died的主要内容,如果未能解决你的问题,请参考以下文章

Storm Ui错误kafka spout,不使用HDP

2017.4.7 storm1.0.3

Storm Kafka Spout 上的最大元组重播数

Storm/Kafka - 无法获得 kafka 的偏移滞后

SpringBoot整合Kafka和Storm

SpringBoot整合Kafka和Storm