使用spring cloud stream kafka动态改变instanceindex
Posted
技术标签:
【中文标题】使用spring cloud stream kafka动态改变instanceindex【英文标题】:Dynamically changing the instanceindex with spring cloud stream kafka 【发布时间】:2021-07-30 03:15:46 【问题描述】:类似于:Changing spring-cloud-stream instance index/count at runtime
我对在微服务器架构中启动批处理进行了 poc,并且我正在使用 Spring 批处理和 Spring Cloud Stream Kafka。我正在寻找一种方法来动态创建消费者(处理器)应用程序的多个实例。 我看到可以用
定义多个实例spring.cloud.stream.instanceCount=n
spring.cloud.stream.instanceIndex=[0, ..., n-1]
但是我还没有找到动态更改instanceIndex 值的方法。 是否可以使用 Spring Cloud Stream kafka 动态修改此值。
感谢您的帮助。
【问题讨论】:
【参考方案1】:我找到了如何自动动态更改 instanceindex 的解决方案。 我在属性文件中设置 minPortNum & maxPortNum 和 instanceCount 的值,并根据 port-minPortNum 的值改变 instanceIndex,然后我想运行多少消费者就运行多少。
@component
class ServerPortCustomize implements WebServerFactoryCustomizer
@value("$port.number.min")
private Integer minPortNum;
@value("$port.number.max")
private Integer maxPortNum;
@value("$spring.cloud.stream.instanceCount")
private Integer instanceCount;
@Override
public void customize(final ConfigurableWebServerFactory factory)
final int port = SocketUtils.findAvailableTcpPort(minPortNum, maxPortNum);
factory.setPort(port);
System.getProperties().put("server.port", port);
System.getProperties().put("spring.cloud.stream.instanceIndex", port-minPortNum);
对于这个例子,我已经设置 spring.cloud.stream.instanceCount=3,所以我运行三个消费者,实例索引为值 0、1 和 2
【讨论】:
以上是关于使用spring cloud stream kafka动态改变instanceindex的主要内容,如果未能解决你的问题,请参考以下文章
spring-cloud-stream kafka 消费者并发
spring-cloud-stream-kafka 在应用程序启动后仅使用最新消息
Spring Cloud(12)——基于Kafka的Stream实现