Spring Boot Kafka监听器不一致

Posted

技术标签:

【中文标题】Spring Boot Kafka监听器不一致【英文标题】:Spring Boot Kafka listener is inconsistent 【发布时间】:2021-03-11 06:42:13 【问题描述】:

我试图让几个不同的 Spring Cloud 微服务都连接到 Kafka/Zookeeper 集群,都在 Kubernetes 中。微服务使用 org.springframework.kafka:spring-kafka - 作为事件的消费者和生产者。

所有服务都可以连接到 kafka - 并且创建了主题;但是每个服务的消费者非常不一致。

例如,当服务启动一次时,所有的消费者都会监听消息并调用函数。但是,当我重新启动所有内容(包括 kafka 和 zookeeper)时,它要么无法正常工作,要么不同服务中的一些消费者会工作等等......

这是我的一些配置——我没有任何基于 Java 的配置——就在我的 application.yml 中,如下所示:

spring:

  ....

  kafka:
    consumer:
      bootstrap-servers: api-kafka.default.svc.cluster.local:9092
      auto-offset-reset: earliest
      key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      group-id: api-event
      enable-auto-commit: false

    producer:
      bootstrap-servers: api-kafka.default.svc.cluster.local:9092
      key-serializer: org.apache.kafka.common.serialization.StringSerializer
      value-serializer: org.apache.kafka.common.serialization.StringSerializer
      ack-mode: manual

...

还有我的主要课程:

@EnableCaching
@SpringBootApplication
@EnableJpaRepositories
@EnableDiscoveryClient
@EnableKafka /* <<<<<<<------------- ENABLED HERE */
public class ExampleServiceApplication 

  public static void main(String[] args) 
    SpringApplication.run(ExampleServiceApplication.class, args);
  

  .....


最后,我的消费者:

@Component
public class MessageListener 

  @KafkaListener(
      topics = "myTopic")
  public void eventListener(String serializedMessage) 
    try 
....

消息可以正常发送到代理,但不会被其他服务使用。

我意识到每个服务属性上都没有到主题的映射,我该如何通过 application.yml 做到这一点?

我敢打赌我犯了一个非常严重的错误,但是是的!非常感谢任何 cmets 或帮助

【问题讨论】:

你的 kafka 主题有多少个分区?如果您有一个分区,那么只有一项服务使用消息是正常的。如果希望更多服务并行消费主题消息,则需要增加分区数。 @OctavianR。哦,好吧 - 有趣 - 不知道!据我所知,我对所有服务只有一个分区,所以可能就是这样......我想我需要定义具有 NewTopic 类型的 Java bean 并使用正确数量的分区构建主题。感谢您的评论 - 我现在就试试 但是每个实体都有不同的主题,其中 EventTypes 作为字段等...-但这应该不会影响太大 不,应该不重要 【参考方案1】:

顺便说一句,您可以在此处阅读更多关于分区数量与并行消费者(具有相同组 id 的消费者)数量之间的关系。

https://docs.confluent.io/platform/current/streams/architecture.html

稍微简化一下,您的应用程序可以运行的最大并行度受流任务的最大数量的限制,这本身取决于应用程序正在读取的输入主题的最大分区数。例如,如果您的输入主题有 5 个分区,那么您最多可以运行 5 个应用程序实例。这些实例将协作处理主题的数据。如果您运行的应用程序实例数量多于输入主题的分区,“多余”的应用程序实例将启动但保持空闲;但是,如果其中一个繁忙的实例出现故障,则其中一个空闲的实例将恢复前者的工作。我们在常见问题解答中提供了更详细的说明和示例。

【讨论】:

我正在阅读 confluent docs atm,同时在 minikube 本地运行我的服务。我目前有每个服务的 1 个副本,但假设每个服务有 2 个 - 这篇文章是说我可以还是不能 - 还是只是在需要之前闲置? 你可以拥有,但是超过分区数的消费者会一直处于空闲状态, 非常感谢您的帮助 - 只需再做一些测试,我就会正确地标记您的答案!

以上是关于Spring Boot Kafka监听器不一致的主要内容,如果未能解决你的问题,请参考以下文章

spring boot 学习 ---- 使用事件监听

21. Spring Boot过滤器监听器从零开始学Spring Boot

spring-kafka 监听器签名

009-Spring Boot 事件监听

Spring Boot监听器流程

Spring Boot 自定义kafka 消费者配置 ContainerFactory最佳实践