Spring Kafka消费者无法消费记录

Posted

技术标签:

【中文标题】Spring Kafka消费者无法消费记录【英文标题】:Spring Kafka consumer not able to consume records 【发布时间】:2019-11-21 02:40:54 【问题描述】:

我们正在使用 Spring Kafka 批量消费记录。我们有时会遇到应用程序启动的问题,即使有足够的未读消息,它也不会消耗任何记录。相反,我们不断地看到信息日志说。

[INFO]-[FetchSessionHandler:handleError:440] - [Consumer clientId=consumer-2, groupId=groupId] Error sending fetch request (sessionId=INVALID, epoch=INITIAL) to node 1027: org.apache.kafka.common.errors.DisconnectException. 

人们正面临这个问题,每个人都说忽略它,因为它只是一个信息日志。甚至,我们看到应用程序在没有做任何事情的情况下开始获取记录。但是,开始消费记录可能需要多长时间是非常不可预测的:(

我们在使用 Spring Cloud Stream 时没有看到这个错误。不确定我们是否遗漏了 spring-kafka 中的任何配置。

过去有人遇到过这个问题,如果我们遗漏了什么,请告诉我们。 我们的主题负载很大,如果有很多滞后,会发生这种情况吗?

我们正在使用 2.2.2.RELEASE 的 Spring Kafka Spring boot 2.1.2.RELEASE Kafka 0.10.0.1(我们知道它已经很老了,因为不可避免的原因我们不得不使用它:()

这是我们的代码:

application.yml

li.topics: CUSTOM.TOPIC.JSON
    spring:
      application:
        name: DataPublisher
      kafka:
        listener:
          type: batch
          ack-mode: manual_immediate
        consumer:
          enable-auto-commit: false
          max-poll-records: 500
          fetch-min-size: 1
          fetch-max-wait: 1000
          group-id: group-dev-02
          key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
          value-deserializer:CustomResourceDeserialiser
          auto-offset-reset: earliest

消费者:

public class CustomKafkaBatchConsumer 


  @KafkaListener(topics = "#'$li.topics'.split(',')", id = "$spring.kafka.consumer.group-id")
  public void receiveData(@Payload List<CustomResource> customResources,
      Acknowledgment acknowledgment,
      @Header(KafkaHeaders.RECEIVED_PARTITION_ID) List<Integer> partitions,
      @Header(KafkaHeaders.OFFSET) List<Long> offsets) 


反序列化器:

public class CustomResourceDeserialiser implements Deserializer<CustomResource> 

  @Override
  public void configure(Map<String, ?> configs, boolean isKey) 
  

  @Override
  public CustomResource deserialize(String topic, byte[] data) 
    if (data != null) 
      try 
        ObjectMapper objectMapper = ObjectMapperFactory.getInstance();
        return objectMapper.readValue(data, CustomResource.class);
       catch (IOException e) 
        log.error("Failed to deserialise with ",e.getMessage());
      
    
    return null;
  

  @Override
  public void close() 

  

【问题讨论】:

我没有答案,但是...&gt;We didn't see this error when we were using Spring cloud stream. 这是一个红鲱鱼 - Spring Cloud Stream 在下面使用 Spring Kafka。 感谢@GaryRussell 可能是我们在尝试使用 Spring 云流时尝试减少延迟,但没有遇到问题。感谢您的快速回复。 【参考方案1】:

这可能是因为这个Kafka-8052 - Intermittent INVALID_FETCH_SESSION_EPOCH error on FETCH request 问题。这在 Kafka 2.3.0 中已修复

不幸的是,截至 2019 年 8 月 21 日,Spring 云流还没有升级它的依赖项,还没有 2.3.0 版本的 kafka-clients。

您可以尝试将这些作为 显式 依赖项添加到您的 gradle 中

    compile ('org.apache.kafka:kafka-streams:2.3.0')
    compile ('org.apache.kafka:kafka-clients:2.3.0')
    compile ('org.apache.kafka:connect-json:2.3.0')
    compile ('org.apache.kafka:connect-api:2.3.0')

更新

这也可能是由 kafka Broker - 客户端不兼容引起的。如果您的集群落后于客户端版本,您可能会看到诸如此类的各种奇怪问题。例如,假设您的 kafka 代理在 1.x.x 上,而您的 kafka-consumer 在 2.x.x 上,这可能会发生

【讨论】:

【参考方案2】:

我之前也遇到过同样的问题,解决方法是减少当前分区数或增加消费者数量。就我而言,我们在 60 个分区上有大约 100M 的数据,当单个 pod 运行时我遇到了同样的错误。我扩展了 30 个 pod(30 个消费者),问题就解决了。

【讨论】:

以上是关于Spring Kafka消费者无法消费记录的主要内容,如果未能解决你的问题,请参考以下文章

Spring Boot Kafka:由于 NoSuchBeanDefinitionException 而无法启动消费者

Spring Kafka SeekToCurrentErrorHandler 找出失败的记录

Springboot Kafka - 消费者幂等性

Kafka Consumer 在 Spring Boot 中没有收到消息

Kafka消费者——结合spring开发

spring-cloud-stream kafka 消费者并发