使用 Kafka Spout 的 Apache Storm 给出错误:IllegalStateException

Posted

技术标签:

【中文标题】使用 Kafka Spout 的 Apache Storm 给出错误:IllegalStateException【英文标题】:Apache storm using Kafka Spout gives error: IllegalStateException 【发布时间】:2019-05-04 14:09:57 【问题描述】:
Version Info: 
   "org.apache.storm" % "storm-core" % "1.2.1" 
   "org.apache.storm" % "storm-kafka-client" % "1.2.1" 

我有一个风暴拓扑,如下所示:

螺栓A->螺栓B->螺栓C->螺栓D

boltA 只是对请求进行一些格式化并发出另一个元组。 boltB 进行一些处理并为每个被接受的元组发出大约 100 个元组。 boltCboltD 处理这些元组。所有的螺栓都实现BaseBasicBolt

我注意到每当boltD 将一些tuple 标记为失败并通过抛出FailedException 标记为重试时,在比我的拓扑超时时间少几分钟后,我会收到以下错误:

2018-11-30T20:01:05.261+05:30 util [ERROR] Async loop died!
java.lang.IllegalStateException: Attempting to emit a message that has already been committed. This should never occur when using the at-least-once processing guarantee.
        at org.apache.storm.kafka.spout.KafkaSpout.emitOrRetryTuple(KafkaSpout.java:471) ~[stormjar.jar:?]
        at org.apache.storm.kafka.spout.KafkaSpout.emitIfWaitingNotEmitted(KafkaSpout.java:440) ~[stormjar.jar:?]
        at org.apache.storm.kafka.spout.KafkaSpout.nextTuple(KafkaSpout.java:308) ~[stormjar.jar:?]
        at org.apache.storm.daemon.executor$fn__4975$fn__4990$fn__5021.invoke(executor.clj:654) ~[storm-core-1.2.1.jar:1.2.1]
        at org.apache.storm.util$async_loop$fn__557.invoke(util.clj:484) [storm-core-1.2.1.jar:1.2.1]
        at clojure.lang.AFn.run(AFn.java:22) [clojure-1.7.0.jar:?]
        at java.lang.Thread.run(Thread.java:745) [?:1.8.0_60]
2018-11-30T20:01:05.262+05:30 executor [ERROR]
java.lang.IllegalStateException: Attempting to emit a message that has already been committed. This should never occur when using the at-least-once processing guarantee.
        at org.apache.storm.kafka.spout.KafkaSpout.emitOrRetryTuple(KafkaSpout.java:471) ~[stormjar.jar:?]
        at org.apache.storm.kafka.spout.KafkaSpout.emitIfWaitingNotEmitted(KafkaSpout.java:440) ~[stormjar.jar:?]
        at org.apache.storm.kafka.spout.KafkaSpout.nextTuple(KafkaSpout.java:308) ~[stormjar.jar:?]
        at org.apache.storm.daemon.executor$fn__4975$fn__4990$fn__5021.invoke(executor.clj:654) ~[storm-core-1.2.1.jar:1.2.1]
        at org.apache.storm.util$async_loop$fn__557.invoke(util.clj:484) [storm-core-1.2.1.jar:1.2.1]
        at clojure.lang.AFn.run(AFn.java:22) [clojure-1.7.0.jar:?]
        at java.lang.Thread.run(Thread.java:745) [?:1.8.0_60]

似乎正在发生这种情况,当boltB 从 1 个元组中发出 100 个并且 boltD 使这 100 个元组中的一个元组失败时,我收到了这个错误。无法理解如何解决这个问题,理想情况下,当所有 100 个元组都是 acked 时,它应该 ack 一个原始元组,但在所有这 100 个元组都是 acked 之前,原始元组可能是 acked,这会导致此错误.

编辑:

我可以使用以下拓扑和两个螺栓重现此问题,在集群模式下运行大约 5 分钟后即可重现:

螺栓A

case class Abc(index: Int, rand: Boolean)

class BoltA  extends BaseBasicBolt 

  override def execute(input: Tuple, collector: BasicOutputCollector): Unit = 
    val inp = input.getBinaryByField("value").getObj[someObj]
    val randomGenerator = new Random()

    var i = 0
    val rand = randomGenerator.nextBoolean()
    1 to 100 foreach 
      collector.emit(new Values(Abc(i, rand).getJsonBytes))
      i += 1
    
  

  override def declareOutputFields(declarer: OutputFieldsDeclarer): Unit = 
    declarer.declare(new Fields("boltAout"))
  


螺栓B

class BoltB  extends BaseBasicBolt 

  override def execute(input: Tuple, collector: BasicOutputCollector): Unit = 
    val abc = input.getBinaryByField("boltAout").getObj[Abc]
    println(s"Received $abc.indexth tuple in BoltB")
    if(abc.index >= 97 && abc.rand)
      println(s"throwing FailedException for $abc.indexth tuple for")
      throw new FailedException()
    
  

  override def declareOutputFields(declarer: OutputFieldsDeclarer): Unit = 
  

KafkaSpout:

private def getKafkaSpoutConfig(source: Config) = KafkaSpoutConfig.builder("connections.kafka.producerConnProps.metadata.broker.list", "queueName")
    .setProp(ConsumerConfig.GROUP_ID_CONFIG, "grp")
    .setProp(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArrayDeserializer")
    .setOffsetCommitPeriodMs(100)
    .setRetry(new KafkaSpoutRetryExponentialBackoff(
      KafkaSpoutRetryExponentialBackoff.TimeInterval.milliSeconds(100),
      KafkaSpoutRetryExponentialBackoff.TimeInterval.milliSeconds(100),
      10,
      KafkaSpoutRetryExponentialBackoff.TimeInterval.milliSeconds(3000)
    ))
    .setFirstPollOffsetStrategy(offsetStrategyMapping(ConnektConfig.getOrElse("connections.kafka.consumerConnProps.offset.strategy", "UNCOMMITTED_EARLIEST")))
    .setMaxUncommittedOffsets(ConnektConfig.getOrElse("connections.kafka.consumerConnProps.max.uncommited.offset", 10000))
    .build()

其他配置:

messageTimeoutInSecons: 300

【问题讨论】:

【参考方案1】:

@Stig Rohde Døssinghere 提供了对此的修复。问题的确切原因已在here 描述如下:

在 STORM-2666 和后续版本的修复中,我们添加了逻辑来处理以下偏移量已经被确认后 spout 收到偏移量确认的情况。问题是 spout 可能会提交所有确认的偏移量,但不会向前调整消费者位置,或者正确清除 waitingToEmit。如果确认的偏移量远远落后于日志结束偏移量,则 spout 可能最终会轮询它已经提交的偏移量。

修复有点错误。当消费者位置落后于提交的偏移量时,我们确保向前调整位置,并清除任何在提交偏移量之后的 waitingToEmit 消息。除非我们调整消费者位置,否则我们不会清除waitingToEmit,结果证明这是一个问题。

例如,假设偏移量 1 失败,偏移量 2-10 已确认,maxPollRecords 为 10。假设 Kafka 中有 11 条记录 (1-11)。如果 spout 回溯到偏移量 1 来重放它,它将在轮询中从消费者那里获得偏移量 1-10。消费者位置现在是 11。spout 发出偏移量 1。假设它立即得到确认。在下一次轮询中,spout 将提交偏移量 1-10 并检查它是否应该调整消费者位置和 waitingToEmit。由于位置 (11) 在提交的偏移量 (10) 之前,因此它不会清除 waitingToEmit。由于 waitingToEmit 仍然包含上一次 poll 的偏移量 2-10,因此 spout 最终将再次发出这些元组。

可以看到修复here。

【讨论】:

以上是关于使用 Kafka Spout 的 Apache Storm 给出错误:IllegalStateException的主要内容,如果未能解决你的问题,请参考以下文章

jstorm在使用kafka作为spout的时候多线程问题

Storm-Kafka 多个 spout,如何分担负载?

Storm Ui错误kafka spout,不使用HDP

无法在kafka-storm中将偏移数据写入zookeeper

Storm Kafka Spout 上的最大元组重播数

storm问题记录 python 不断向kafka中写消息,spout做为消费者从kafka中读消息并emit给bolt,但是部分消息没有得到bolt的处理