Kafka Consumer 在 Spring Boot 中没有收到消息

Posted

技术标签:

【中文标题】Kafka Consumer 在 Spring Boot 中没有收到消息【英文标题】:Kafka Consumer is not receiving message in Spring Boot 【发布时间】:2019-09-01 09:51:07 【问题描述】:

我的 spring/java 消费者无法访问生产者生成的消息。但是,当我从控制台/终端运行消费者时,它能够接收到 spring/java 生产者生成的消息。

消费者配置:

@Component
@ConfigurationProperties(prefix="kafka.consumer")
public class KafkaConsumerProperties 

    private String bootstrap;
    private String group;
    private String topic;

    public String getBootstrap() 
        return bootstrap;
    

    public void setBootstrap(String bootstrap) 
        this.bootstrap = bootstrap;
    

    public String getGroup() 
        return group;
    

    public void setGroup(String group) 
        this.group = group;
    

    public String getTopic() 
        return topic;
    

    public void setTopic(String topic) 
        this.topic = topic;
    

监听器配置:

@Configuration
@EnableKafka
public class KafkaListenerConfig 

    @Autowired
    private KafkaConsumerProperties kafkaConsumerProperties;

    @Bean
    public Map<String, Object> getConsumerProperties() 
        Map<String, Object> properties = new HashMap<>();
        properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaConsumerProperties.getBootstrap());
        properties.put(ConsumerConfig.GROUP_ID_CONFIG, kafkaConsumerProperties.getGroup());
        properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true);
        properties.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "100");
        properties.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "15000");
        return properties;
    

    @Bean
    public Deserializer stringKeyDeserializer() 
        return new StringDeserializer();
    

    @Bean
    public Deserializer transactionJsonValueDeserializer() 
        return new JsonDeserializer(Transaction.class);
    

    @Bean
    public ConsumerFactory<String, Transaction> consumerFactory() 
        return new DefaultKafkaConsumerFactory<>(getConsumerProperties(), stringKeyDeserializer(), transactionJsonValueDeserializer());
    

    @Bean
    public ConcurrentKafkaListenerContainerFactory<String, Transaction> kafkaListenerContainerFactory() 
        ConcurrentKafkaListenerContainerFactory<String, Transaction> factory =
                new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConcurrency(1);
        factory.setConsumerFactory(consumerFactory());
        return factory;
    


卡夫卡监听器:

@Service
public class TransactionConsumer 
    private static final Logger LOGGER = LoggerFactory.getLogger(Transaction.class);

    @KafkaListener(topics="transactions", containerFactory = "kafkaListenerContainerFactory")
    public void onReceive(Transaction transaction) 
        LOGGER.info("transaction = ",transaction);
    

消费者应用:

@SpringBootApplication
public class ConsumerApplication 

    public static void main(String[] args) 
        SpringApplication.run(ConsumerApplication.class, args);
    



测试用例 1:通过 我启动了我的 spring/java 生产者并从控制台运行消费者。当我生成消息表单生产者时,我的控制台消费者能够访问该消息。

测试用例 2:失败 我启动了我的 spring/java 消费者并从控制台运行生产者。当我生成消息表单控制台生产者时,我的 spring/java 消费者无法访问该消息。

测试用例 3:失败 我启动了我的 spring/java 消费者并运行了 spring/java 生产者。当我从 spring/java 生产者生成消息时,我的 spring/java 消费者无法访问该消息。

问题

    我的消费者代码有什么问题吗?

    我是否缺少我的 kafka-listener 的任何配置?

    我需要显式运行侦听器吗? (我不这么认为,因为我可以在终端日志上看到连接到主题,但我仍然不确定)

【问题讨论】:

会不会和你的测试方式有关?你找到解决办法了吗? 【参考方案1】:

好的,你在Consumer Configs 中缺少AUTO_OFFSET_RESET_CONFIG

properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");

auto.offset.reset

当 Kafka 中没有初始偏移量或服务器上不再存在当前偏移量时该怎么办(例如,因为该数据已被删除):

最早的:自动将偏移重置为最早的偏移

最新的:自动将偏移量重置为最新的偏移量

none:如果没有为消费者组找到先前的偏移量,则向消费者抛出异常

其他:向消费者抛出异常

注意: auto.offset.resetearliest 仅在 kafka 没有该消费者组的偏移量时才有效(因此,在您的情况下,需要将此属性添加到新的消费者组并重新启动应用程序)

【讨论】:

感谢您的回复。我尝试了您提出的解决方案,但是它不起作用。我尝试了所有选项,但没有运气。你知道如何在 Java 代码中设置 '--from-beginning' 选项(我们在控制台中使用)吗? Do you know how to set '--from-beginning' option (that we use in console) in Java code ? 这是我的答案@Developer 我有完全相同的问题场景。这个解决方案对我来说也没有运气。您是否找到任何可能的解决方法?

以上是关于Kafka Consumer 在 Spring Boot 中没有收到消息的主要内容,如果未能解决你的问题,请参考以下文章

总结kafka的consumer消费能力很低的情况下的处理方案

Kafka Consumer 指标在从 Spring Boot 2.2.2 升级到 2.3.0 后消失

在Spring Kafka Consumer Unit Test中没有调用MessageListner的onMessage

Spring Integration Kafka Consumer Listener 不接收消息

springboot kafka集成(实现producer和consumer)

从 Spring Boot 2.1.6 升级到 2.2.2 时,Prometheus 不导出 Kafka Consumer 数据