可能是什么原因,卡夫卡消费者承认,抛出InterruptedException?

Posted

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了可能是什么原因,卡夫卡消费者承认,抛出InterruptedException?相关的知识,希望对你有一定的参考价值。

这是来自KafkaMessageListenerContainer的代码,在什么情况下它们是InterruptedException的可能性,可以被这个代码抛出,突然出现在我的应用程序日志中,它消耗了来自主题和进程的消息,然后确认消息,看到下面的错误消息

Interrupted while queuing ack for


@Override
            public void acknowledge() {
                try {
                    if (ListenerConsumer.this.autoCommit) {
                        throw new IllegalStateException("Manual acks are not allowed when auto commit is used");
                    }
                    ListenerConsumer.this.acks.put(this.record);
                }
                catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                    **throw new KafkaException("Interrupted while queuing ack for " + this.record, e);**
                }
                if (this.immediate) {
                    ListenerConsumer.this.consumer.wakeup();
                }
            }
答案

你在看什么版本;该代码不再存在(因为1.3 - 当前版本是2.1.4)。

在任何情况下,线程的任何中断(例如,关闭任务执行程序)都将导致该问题。

以上是关于可能是什么原因,卡夫卡消费者承认,抛出InterruptedException?的主要内容,如果未能解决你的问题,请参考以下文章

卡夫卡流与卡夫卡消费者如何决定使用啥

卡夫卡消费者:受控阅读主题

Kafka Consumer Lag Monitoring

卡夫卡动物园管理员的目的

kafka消费者中死信队列的好选择是啥

卡夫卡消费者不返回任何事件