kafka异常使用Spring-kafka遇到的坑

Posted AK774S

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了kafka异常使用Spring-kafka遇到的坑相关的知识,希望对你有一定的参考价值。

查看一下压缩策略

bin/kafka-topics.sh --describe --zookeeper xxxx:2181 --topic SHI_TOPIC1

Topic:SHI_TOPIC1 PartitionCount:1 ReplicationFactor:1 Configs:cleanup.policy=compact

Topic: SHI_TOPIC1 Partition: 0 Leader: 0 Replicas: 0 Isr: 0

Configs:cleanup.policy=compact :

然后再检查一下自己发送消息的时候是不是没有传 key

[参考链接](()

问题堆栈信息

org.springframework.kafka.listener.ListenerExecutionFailedException: invokeHandler Failed;

nested exception is java.lang.IllegalStateException: No Acknowledgment available as an argument,

the listener container must have a MANUAL AckMode to populate the Acknowledgment.;

nested exception is java.lang.IllegalStateException:

No Acknowledgment available as an argument, the listener container must have a MANUAL AckMode to populate the Acknowledgment.

问题原因

解决方案

问题堆栈信息

Failed to start bean ‘org.springframework.kafka.config.internalKafkaListenerEndpointRegistry’; nested exception is java.lang.IllegalStateException: Consumer cannot be configured for auto commit for ackMode MANUAL_IMMEDIATE

问题原因

不能再配置中既配置kafka.consumer.enable-auto-commit=true 自动提交; 然后又在监听器中使用手动提交

例如:

kafka.consumer.enable-auto-commit=true

@Autowired

private ConsumerFactory consumerFactory;

@Bean

public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<Integer, String>> kafkaManualAckListenerContainerFactory()

ConcurrentKafkaListenerContainerFactory<Integer, String> factory =

new ConcurrentKafkaListenerContainerFactory<>();

factory.setConsumerFactory(consumerFactory);

//设置提交偏移量的方式 当Acknowledgment.acknowledge()侦听器调用该方法时,立即提交偏移量

factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL_IMMEDIATE);

return factory;

/**

  • 手动ack 提交记录

  • @param data

  • @param ack

  • @throws InterruptedException

*/

@KafkaListener(id = “consumer-id2”,topics = “SHI_TOPIC1”,concurrency = “1”,

clientIdPrefix = “myClientId2”,containerFactory = “kafkaManualAckListenerContainerFactory”)

public void consumer2(String data, Acknowledgment ack)

log.info(“consumer-id2-手动ack,提交记录,data:”,data);

ack.acknowledge();

解决方法:

将自动提交关掉,或者去掉手动提交;

如果你想他们都同时存在,某些情况自动提交;某些情况手动提交; 那你创建 一个新的

consumerFactory 将它的是否自动提交设置为false;比如

@Configuration

@EnableKafka

public class KafkaConfig

@Autowired

private KafkaProperties properties;

/**

  • 创建一个新的消费者工厂

  • 创建多个工厂的时候 SpringBoot就不会自动帮忙创建工厂了;所以默认的还是自己创建一下

  • @return

*/

@Bean

public ConsumerFactory<Object, Object> kafkaConsumerFactory()

Map<String, Object> map = properties.buildConsumerProperties();

DefaultKafkaConsumerFactory<Object, Object> factory = new DefaultKafkaConsumerFactory<>( map);

return factory;

/**

  • 创建一个新的消费者工厂

  • 但是修改为不自动提交

  • @return

*/

@Bean

public ConsumerFactory<Object, Object> kafkaManualConsumerFactory()

Map<String, Object> map = properties.buildConsumerProperties();

map.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,false);

DefaultKafkaConsumerFactory<Object, Object> factory = new DefaultKafkaConsumerFactory<>( map);

return factory;

/**

  • 手动提交的监听器工厂 (使用的消费组工厂必须 kafka.consumer.enable-auto-commit = false)

  • @return

*/

@Bean

public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<Integer, String>> kafkaManualAckListenerContainerFactory()

ConcurrentKafkaListenerContainerFactory<Integer, String> factory =

new ConcurrentKafkaListenerContainerFactory<>();

factory.setConsumerFactory(kafkaManualConsumerFactory());

//设置 《一线大厂Java面试题解析+后端开发学习笔记+最新架构讲解视频+实战项目源码讲义》无偿开源 威信搜索公众号【编程进阶路】 提交偏移量的方式 当Acknowledgment.acknowledge()侦听器调用该方法时,立即提交偏移量

factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL_IMMEDIATE);

return factory;

Spring Boot、Spring-Kafka 和 Spring-Cloud 兼容性

【中文标题】Spring Boot、Spring-Kafka 和 Spring-Cloud 兼容性【英文标题】:Spring Boot, Spring-Kafka, and Spring-Cloud compatibility 【发布时间】:2019-05-01 08:21:05 【问题描述】:

我的问题是关于 Spring-Kafka 和 Spring Cloud 的兼容性。

我正在使用以下版本:

Kafka 代理 1.0.2 Spring Boot 1.5.2 春季卡夫卡 1.2.2 Spring Cloud Stream Chelsea.SR2 (Spring Cloud Stream Core 1.2.2.RELEASE)

前几天在另一个SO question问过Spring-Kafka和Spring Boot的兼容性问题。 Spring-Kafka project page 更新了有关兼容性的更多详细信息。项目网站声明如下:

所有brokers >= 0.10.x.x的用户(以及所有spring boot 1.5.x的用户)推荐使用spring-kafka 1.3.x版本

Spring Cloud Stream 和 Spring-Kafka 的 compatibility matrix 声明:

Spring Cloud Stream 1.2.x 与 Spring-Kafka 1.2.x、1.1.x 兼容。

Spring-Kafka 项目页面建议我升级到 1.3.X,但是我的 Spring Cloud Stream 版本不兼容 Spring-Kafka 1.3.X。

我希望将我的 Spring-Kafka 版本升级到 1.3.8。但我不想破坏 Spring Cloud Stream。

有没有人有过使用新版本的 Spring-Kafka 和旧版本的 Spring Cloud Stream 的经验?

【问题讨论】:

【参考方案1】:

Spring Boot 1.5.2 非常旧(2017 年初)。当前的 Boot 1.5.x 版本是 1.5.18(今天发布)。

如果你使用 Initializr 启动一个新的 Spring Boot 1.5 项目,你会得到

<parent>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-parent</artifactId>
    <version>1.5.18.RELEASE</version>
    <relativePath/> <!-- lookup parent from repository -->
</parent>

<properties>
    <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
    <project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
    <java.version>1.8</java.version>
    <spring-cloud.version>Edgware.SR5</spring-cloud.version>
    <spring-kafka.version>1.3.8.RELEASE</spring-kafka.version>
</properties>

Edgware.SR5 引入了 Ditmars Spring Cloud Stream 发布系列 (1.3.3),如您所见,也使用了 spring-kafka 1.3.8。

因此,我建议您升级所有内容以获取最新版本。

【讨论】:

我可以将 Spring Boot 升级到 1.5.17 - 我没有用 1.5.18 测试过。我正在使用 Spring Cloud Dalston.SR5,我无法将其升级到较新的版本。 Chelsea SR2 也很古老;尝试使所有这些库保持最新状态会更好。也就是说,我只是用它运行了一个测试,并将kafka-clients 覆盖到 0.11.0.2 并且一个简单的处理器应用程序(输入/输出)工作没有任何问题。 YMMV。 加里,感谢您的反馈。我知道它已经很老了,但我坚持使用这个版本。

以上是关于kafka异常使用Spring-kafka遇到的坑的主要内容,如果未能解决你的问题,请参考以下文章

如何使用 testcontainers 和 spring-kafka 准备测试

如何使用 spring-kafka 暂停和恢复 @KafkaListener

使用 Spring-kafka 在 GC/消费者重新平衡时清理 Kafka Metric 计量器

Spring Boot、Spring-Kafka 和 Spring-Cloud 兼容性

spring-kafka 固定事件迁移实现

使用 Spring-Kafka 和 Confluent 模式注册表将带有 JSON 模式的记录发送到 Kafka