使用spring cloud stream kafka动态改变instanceindex

Posted

技术标签:

【中文标题】使用spring cloud stream kafka动态改变instanceindex【英文标题】:Dynamically changing the instanceindex with spring cloud stream kafka 【发布时间】:2021-07-30 03:15:46 【问题描述】:

类似于:Changing spring-cloud-stream instance index/count at runtime

我对在微服务器架构中启动批处理进行了 poc,并且我正在使用 Spring 批处理和 Spring Cloud Stream Kafka。我正在寻找一种方法来动态创建消费者(处理器)应用程序的多个实例。 我看到可以用

定义多个实例
spring.cloud.stream.instanceCount=n 
spring.cloud.stream.instanceIndex=[0, ..., n-1]

但是我还没有找到动态更改instanceIndex 值的方法。 是否可以使用 Spring Cloud Stream kafka 动态修改此值。

感谢您的帮助。

【问题讨论】:

【参考方案1】:

我找到了如何自动动态更改 instanceindex 的解决方案。 我在属性文件中设置 minPortNum & maxPortNum 和 instanceCount 的值,并根据 port-minPortNum 的值改变 instanceIndex,然后我想运行多少消费者就运行多少。


@component
class ServerPortCustomize implements WebServerFactoryCustomizer 
    @value("$port.number.min")
    private Integer minPortNum;
    @value("$port.number.max")
    private Integer maxPortNum;
    @value("$spring.cloud.stream.instanceCount")
    private Integer instanceCount;
    
    @Override
    public void customize(final ConfigurableWebServerFactory factory) 
        final int port = SocketUtils.findAvailableTcpPort(minPortNum, maxPortNum);
        factory.setPort(port);
        System.getProperties().put("server.port", port);
        System.getProperties().put("spring.cloud.stream.instanceIndex", port-minPortNum);
    

对于这个例子,我已经设置 spring.cloud.stream.instanceCount=3,所以我运行三个消费者,实例索引为值 0、1 和 2

【讨论】:

以上是关于使用spring cloud stream kafka动态改变instanceindex的主要内容,如果未能解决你的问题,请参考以下文章

spring-cloud-stream kafka 消费者并发

spring-cloud-stream-kafka 在应用程序启动后仅使用最新消息

Spring Cloud(12)——基于Kafka的Stream实现

Spring Cloud 2020.0.0 中的 Spring Cloud Bus/Stream 问题

Spring Cloud Stream

Spring Cloud Stream 验证