kafka异常使用Spring-kafka遇到的坑
Posted AK774S
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了kafka异常使用Spring-kafka遇到的坑相关的知识,希望对你有一定的参考价值。
查看一下压缩策略
bin/kafka-topics.sh --describe --zookeeper xxxx:2181 --topic SHI_TOPIC1
Topic:SHI_TOPIC1 PartitionCount:1 ReplicationFactor:1 Configs:cleanup.policy=compact
Topic: SHI_TOPIC1 Partition: 0 Leader: 0 Replicas: 0 Isr: 0
Configs:cleanup.policy=compact
:
然后再检查一下自己发送消息的时候是不是没有传 key
[参考链接](()
问题堆栈信息
org.springframework.kafka.listener.ListenerExecutionFailedException: invokeHandler Failed;
nested exception is java.lang.IllegalStateException: No Acknowledgment available as an argument,
the listener container must have a MANUAL AckMode to populate the Acknowledgment.;
nested exception is java.lang.IllegalStateException:
No Acknowledgment available as an argument, the listener container must have a MANUAL AckMode to populate the Acknowledgment.
问题原因
解决方案
问题堆栈信息
Failed to start bean ‘org.springframework.kafka.config.internalKafkaListenerEndpointRegistry’; nested exception is java.lang.IllegalStateException: Consumer cannot be configured for auto commit for ackMode MANUAL_IMMEDIATE
问题原因
不能再配置中既配置
kafka.consumer.enable-auto-commit=true
自动提交; 然后又在监听器中使用手动提交
例如:
kafka.consumer.enable-auto-commit=true
@Autowired
private ConsumerFactory consumerFactory;
@Bean
public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<Integer, String>> kafkaManualAckListenerContainerFactory()
ConcurrentKafkaListenerContainerFactory<Integer, String> factory =
new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory);
//设置提交偏移量的方式 当Acknowledgment.acknowledge()侦听器调用该方法时,立即提交偏移量
factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL_IMMEDIATE);
return factory;
/**
-
手动ack 提交记录
-
@param data
-
@param ack
-
@throws InterruptedException
*/
@KafkaListener(id = “consumer-id2”,topics = “SHI_TOPIC1”,concurrency = “1”,
clientIdPrefix = “myClientId2”,containerFactory = “kafkaManualAckListenerContainerFactory”)
public void consumer2(String data, Acknowledgment ack)
log.info(“consumer-id2-手动ack,提交记录,data:”,data);
ack.acknowledge();
解决方法:
将自动提交关掉,或者去掉手动提交;
如果你想他们都同时存在,某些情况自动提交;某些情况手动提交; 那你创建 一个新的
consumerFactory
将它的是否自动提交设置为false;比如
@Configuration
@EnableKafka
public class KafkaConfig
@Autowired
private KafkaProperties properties;
/**
-
创建一个新的消费者工厂
-
创建多个工厂的时候 SpringBoot就不会自动帮忙创建工厂了;所以默认的还是自己创建一下
-
@return
*/
@Bean
public ConsumerFactory<Object, Object> kafkaConsumerFactory()
Map<String, Object> map = properties.buildConsumerProperties();
DefaultKafkaConsumerFactory<Object, Object> factory = new DefaultKafkaConsumerFactory<>( map);
return factory;
/**
-
创建一个新的消费者工厂
-
但是修改为不自动提交
-
@return
*/
@Bean
public ConsumerFactory<Object, Object> kafkaManualConsumerFactory()
Map<String, Object> map = properties.buildConsumerProperties();
map.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,false);
DefaultKafkaConsumerFactory<Object, Object> factory = new DefaultKafkaConsumerFactory<>( map);
return factory;
/**
-
手动提交的监听器工厂 (使用的消费组工厂必须 kafka.consumer.enable-auto-commit = false)
-
@return
*/
@Bean
public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<Integer, String>> kafkaManualAckListenerContainerFactory()
ConcurrentKafkaListenerContainerFactory<Integer, String> factory =
new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(kafkaManualConsumerFactory());
//设置 《一线大厂Java面试题解析+后端开发学习笔记+最新架构讲解视频+实战项目源码讲义》无偿开源 威信搜索公众号【编程进阶路】 提交偏移量的方式 当Acknowledgment.acknowledge()侦听器调用该方法时,立即提交偏移量
factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL_IMMEDIATE);
return factory;
Spring Boot、Spring-Kafka 和 Spring-Cloud 兼容性
【中文标题】Spring Boot、Spring-Kafka 和 Spring-Cloud 兼容性【英文标题】:Spring Boot, Spring-Kafka, and Spring-Cloud compatibility 【发布时间】:2019-05-01 08:21:05 【问题描述】:我的问题是关于 Spring-Kafka 和 Spring Cloud 的兼容性。
我正在使用以下版本:
Kafka 代理 1.0.2 Spring Boot 1.5.2 春季卡夫卡 1.2.2 Spring Cloud Stream Chelsea.SR2 (Spring Cloud Stream Core 1.2.2.RELEASE)前几天在另一个SO question问过Spring-Kafka和Spring Boot的兼容性问题。 Spring-Kafka project page 更新了有关兼容性的更多详细信息。项目网站声明如下:
所有brokers >= 0.10.x.x的用户(以及所有spring boot 1.5.x的用户)推荐使用spring-kafka 1.3.x版本
Spring Cloud Stream 和 Spring-Kafka 的 compatibility matrix 声明:
Spring Cloud Stream 1.2.x 与 Spring-Kafka 1.2.x、1.1.x 兼容。
Spring-Kafka 项目页面建议我升级到 1.3.X,但是我的 Spring Cloud Stream 版本不兼容 Spring-Kafka 1.3.X。
我希望将我的 Spring-Kafka 版本升级到 1.3.8。但我不想破坏 Spring Cloud Stream。
有没有人有过使用新版本的 Spring-Kafka 和旧版本的 Spring Cloud Stream 的经验?
【问题讨论】:
【参考方案1】:Spring Boot 1.5.2 非常旧(2017 年初)。当前的 Boot 1.5.x 版本是 1.5.18(今天发布)。
如果你使用 Initializr 启动一个新的 Spring Boot 1.5 项目,你会得到
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>1.5.18.RELEASE</version>
<relativePath/> <!-- lookup parent from repository -->
</parent>
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
<java.version>1.8</java.version>
<spring-cloud.version>Edgware.SR5</spring-cloud.version>
<spring-kafka.version>1.3.8.RELEASE</spring-kafka.version>
</properties>
Edgware.SR5 引入了 Ditmars Spring Cloud Stream 发布系列 (1.3.3),如您所见,也使用了 spring-kafka 1.3.8。
因此,我建议您升级所有内容以获取最新版本。
【讨论】:
我可以将 Spring Boot 升级到 1.5.17 - 我没有用 1.5.18 测试过。我正在使用 Spring Cloud Dalston.SR5,我无法将其升级到较新的版本。 Chelsea SR2 也很古老;尝试使所有这些库保持最新状态会更好。也就是说,我只是用它运行了一个测试,并将kafka-clients
覆盖到 0.11.0.2 并且一个简单的处理器应用程序(输入/输出)工作没有任何问题。 YMMV。
加里,感谢您的反馈。我知道它已经很老了,但我坚持使用这个版本。以上是关于kafka异常使用Spring-kafka遇到的坑的主要内容,如果未能解决你的问题,请参考以下文章
如何使用 testcontainers 和 spring-kafka 准备测试
如何使用 spring-kafka 暂停和恢复 @KafkaListener
使用 Spring-kafka 在 GC/消费者重新平衡时清理 Kafka Metric 计量器