Kafka Consumer 在 Spring Boot 中没有收到消息
Posted
技术标签:
【中文标题】Kafka Consumer 在 Spring Boot 中没有收到消息【英文标题】:Kafka Consumer is not receiving message in Spring Boot 【发布时间】:2019-09-01 09:51:07 【问题描述】:我的 spring/java 消费者无法访问生产者生成的消息。但是,当我从控制台/终端运行消费者时,它能够接收到 spring/java 生产者生成的消息。
消费者配置:
@Component
@ConfigurationProperties(prefix="kafka.consumer")
public class KafkaConsumerProperties
private String bootstrap;
private String group;
private String topic;
public String getBootstrap()
return bootstrap;
public void setBootstrap(String bootstrap)
this.bootstrap = bootstrap;
public String getGroup()
return group;
public void setGroup(String group)
this.group = group;
public String getTopic()
return topic;
public void setTopic(String topic)
this.topic = topic;
监听器配置:
@Configuration
@EnableKafka
public class KafkaListenerConfig
@Autowired
private KafkaConsumerProperties kafkaConsumerProperties;
@Bean
public Map<String, Object> getConsumerProperties()
Map<String, Object> properties = new HashMap<>();
properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaConsumerProperties.getBootstrap());
properties.put(ConsumerConfig.GROUP_ID_CONFIG, kafkaConsumerProperties.getGroup());
properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true);
properties.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "100");
properties.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "15000");
return properties;
@Bean
public Deserializer stringKeyDeserializer()
return new StringDeserializer();
@Bean
public Deserializer transactionJsonValueDeserializer()
return new JsonDeserializer(Transaction.class);
@Bean
public ConsumerFactory<String, Transaction> consumerFactory()
return new DefaultKafkaConsumerFactory<>(getConsumerProperties(), stringKeyDeserializer(), transactionJsonValueDeserializer());
@Bean
public ConcurrentKafkaListenerContainerFactory<String, Transaction> kafkaListenerContainerFactory()
ConcurrentKafkaListenerContainerFactory<String, Transaction> factory =
new ConcurrentKafkaListenerContainerFactory<>();
factory.setConcurrency(1);
factory.setConsumerFactory(consumerFactory());
return factory;
卡夫卡监听器:
@Service
public class TransactionConsumer
private static final Logger LOGGER = LoggerFactory.getLogger(Transaction.class);
@KafkaListener(topics="transactions", containerFactory = "kafkaListenerContainerFactory")
public void onReceive(Transaction transaction)
LOGGER.info("transaction = ",transaction);
消费者应用:
@SpringBootApplication
public class ConsumerApplication
public static void main(String[] args)
SpringApplication.run(ConsumerApplication.class, args);
测试用例 1:通过 我启动了我的 spring/java 生产者并从控制台运行消费者。当我生成消息表单生产者时,我的控制台消费者能够访问该消息。
测试用例 2:失败 我启动了我的 spring/java 消费者并从控制台运行生产者。当我生成消息表单控制台生产者时,我的 spring/java 消费者无法访问该消息。
测试用例 3:失败 我启动了我的 spring/java 消费者并运行了 spring/java 生产者。当我从 spring/java 生产者生成消息时,我的 spring/java 消费者无法访问该消息。
问题
我的消费者代码有什么问题吗?
我是否缺少我的 kafka-listener 的任何配置?
我需要显式运行侦听器吗? (我不这么认为,因为我可以在终端日志上看到连接到主题,但我仍然不确定)
【问题讨论】:
会不会和你的测试方式有关?你找到解决办法了吗? 【参考方案1】:好的,你在Consumer Configs
中缺少AUTO_OFFSET_RESET_CONFIG
properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
auto.offset.reset
当 Kafka 中没有初始偏移量或服务器上不再存在当前偏移量时该怎么办(例如,因为该数据已被删除):
最早的:自动将偏移重置为最早的偏移
最新的:自动将偏移量重置为最新的偏移量
none:如果没有为消费者组找到先前的偏移量,则向消费者抛出异常
其他:向消费者抛出异常
注意: auto.offset.reset
到 earliest
仅在 kafka 没有该消费者组的偏移量时才有效(因此,在您的情况下,需要将此属性添加到新的消费者组并重新启动应用程序)
【讨论】:
感谢您的回复。我尝试了您提出的解决方案,但是它不起作用。我尝试了所有选项,但没有运气。你知道如何在 Java 代码中设置 '--from-beginning' 选项(我们在控制台中使用)吗?Do you know how to set '--from-beginning' option (that we use in console) in Java code ?
这是我的答案@Developer
我有完全相同的问题场景。这个解决方案对我来说也没有运气。您是否找到任何可能的解决方法?以上是关于Kafka Consumer 在 Spring Boot 中没有收到消息的主要内容,如果未能解决你的问题,请参考以下文章
总结kafka的consumer消费能力很低的情况下的处理方案
Kafka Consumer 指标在从 Spring Boot 2.2.2 升级到 2.3.0 后消失
在Spring Kafka Consumer Unit Test中没有调用MessageListner的onMessage
Spring Integration Kafka Consumer Listener 不接收消息
springboot kafka集成(实现producer和consumer)
从 Spring Boot 2.1.6 升级到 2.2.2 时,Prometheus 不导出 Kafka Consumer 数据