使用 Spring Cloud Bus Kafka 的多个实例

Posted

技术标签:

【中文标题】使用 Spring Cloud Bus Kafka 的多个实例【英文标题】:Multiple instance with Spring Cloud Bus Kafka 【发布时间】:2020-11-07 03:20:20 【问题描述】:

我的问题是如何使用 Spring Cloud Stream Kafka 管理多实例。

让我解释一下,在 Spring Cloud Stream 微服务上下文(eureka、configserver、kafka)中,我希望拥有相同微服务的 2 个实例。当我更改 GIT 存储库中的配置时,配置服务器(通过 webhook)会将消息推送到 Kafka 主题中。

如果我在我的微服务中使用相同的 group-id,则只有两个实例中的一个会收到通知,并重新加载他的 spring 上下文。 但我需要刷新所有实例...

因此,为此,我配置了一个唯一的组 ID:$spring.application.name.bus.$hostname 它工作得很好,但问题是,每次我启动我的服务的新实例时,它都会在 kafka 中创建一个新的消费者组。现在我有很多未使用的消费者群体。

[![微服务的消费者][1]][1] [1]:https://i.stack.imgur.com/6jIzx.png

这是我的服务的 Spring Cloud Stream 配置:

spring:
  cloud:
    bus:
      destination: sys.spring-cloud-bus.refresh
      enabled: true
      refresh:
        enabled: true
      env:
        enabled: true
      trace:
        enabled: false
    stream:
      bindings:
        # Override spring cloud bus configuration with a specific binder named "bus"
        springCloudBusInput:
          binder: bus
          destination: sys.spring-cloud-bus.refresh
          content-type: application/json
          group: $spring.application.name.bus.$hostname
        springCloudBusOutput:
          binder: bus
          destination: sys.spring-cloud-bus.refresh
          content-type: application/json
          group: $spring.application.name.bus.$hostname
      binders:
        bus:
          type: kafka
          defaultCandidate: false
          environment:
            spring:
              cloud:
                stream:
                  kafka:
                    binder:
                      brokers: kafka-dev.hcuge.ch:9092
      kafka:
        streams:
          bindings:
            springCloudBusInput:
              consumer:
                startOffset: latest # Reset offset to the latest value to avoid consume configserver notifications on startup
                resetOffsets: true

如何避免大量的消费者创造?我应该删除 kafka 中的旧消费者组吗? 我认为我的解决方案不是最好的方法,所以如果你有更好的选择,我很感兴趣;)

谢谢

【问题讨论】:

【参考方案1】:

如果您不提供组,巴士将使用随机组。

代理最终会根据其offsets.retention.minutes 属性(目前默认为7 天)移除未使用的组。

【讨论】:

感谢您的回答。所以你只需要在 Spring Cloud Bud 消息的持续时间上播放,删除旧的消费者。不傻;)好主意!

以上是关于使用 Spring Cloud Bus Kafka 的多个实例的主要内容,如果未能解决你的问题,请参考以下文章

spring cloud 使用spring cloud bus自动刷新配置

Spring Cloud Bus

带有 aws-kinesis 的 Spring Cloud Bus

SpringCloud学习之六:使用Spring Cloud Bus自动刷新配置

spring cloud 中消息总线(bus)使用

springCloud学习-消息总线(Spring Cloud Bus)