Spring cloud stream消息分区
Posted dengpengbo
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Spring cloud stream消息分区相关的知识,希望对你有一定的参考价值。
??在上篇文章中我们给大家介绍了Stream的消息分组,可以实现消息的重复消费的问题,但在某些场景下分组还不能满足我们的需求,比如,同时有多条同一个用户的数据,发送过来,我们需要根据用户统计,但是消息被分散到了不同的集群节点上了,这时我们就可以考虑消息分区了。
??当生产者将消息数据发送给多个消费者实例时,保证同一消息数据始终是由同一个消费者实例接收和处理。
Stream 消息分区
创建项目
??将我们上篇文章中的分组的三个项目,拷贝一份修改名称及服务名称
没有分区的情况下演示
发送多条消息查看效果
@RunWith(SpringRunner.class)
@SpringBootTest(classes=StreamSenderStart.class)
public class StreamTest
@Autowired
private ISendeService sendService;
@Test
public void testStream()
Product p = new Product(999, "stream test ...999");
// 将需要发送的消息封装为Message对象
Message message = MessageBuilder
.withPayload(p)
.build();
for (int i = 0; i < 10; i++)
// 发送多条消息到队列中
sendService.send().send(message );
10条消息被随机的分散到了两个消费者中:
我们可以看到A中6条消息,B中4条消息,而且这是随机的,下次执行的结果肯定不一样。
分区
1.发送者中配置
spring.application.name=stream-partition-sender
server.port=9060
#设置服务注册中心地址,指向另一个注册中心
eureka.client.serviceUrl.defaultZone=http://dpb:[email protected]:8761/eureka/,http://dpb:[email protected]:8761/eureka/
#rebbitmq 链接信息
spring.rabbitmq.host=192.168.88.150
spring.rabbitmq.port=5672
spring.rabbitmq.username=dpb
spring.rabbitmq.password=123
spring.rabbitmq.virtualHost=/
# 对应 MQ 是 exchange outputProduct自定义的信息
spring.cloud.stream.bindings.outputProduct.destination=exchangeProduct
#通过该参数指定了分区键的表达式规则
spring.cloud.stream.bindings.outputProduct.producer.partitionKeyExpression=payload
#指定了消息分区的数量。
spring.cloud.stream.bindings.outputProduct.producer.partitionCount=2
2.消费者中配置
服务A
spring.application.name=stream-partition-receiverA
server.port=9070
#设置服务注册中心地址,指向另一个注册中心
eureka.client.serviceUrl.defaultZone=http://dpb:[email protected]:8761/eureka/,http://dpb:[email protected]:8761/eureka/
#rebbitmq 链接信息
spring.rabbitmq.host=192.168.88.150
spring.rabbitmq.port=5672
spring.rabbitmq.username=dpb
spring.rabbitmq.password=123
spring.rabbitmq.virtualHost=/
# 对应 MQ 是 exchange 和消息发送者的 交换器是同一个
spring.cloud.stream.bindings.inputProduct.destination=exchangeProduct
# 具体分组 对应 MQ 是 队列名称 并且持久化队列 inputProduct 自定义
spring.cloud.stream.bindings.inputProduct.group=groupProduct999
#开启消费者分区功能
spring.cloud.stream.bindings.inputProduct.consumer.partitioned=true
#指定了当前消费者的总实例数量
spring.cloud.stream.instanceCount=2
#设置当前实例的索引号,从 0 开始
spring.cloud.stream.instanceIndex=0
服务B
spring.application.name=stream-partition-receiverB
server.port=9071
#设置服务注册中心地址,指向另一个注册中心
eureka.client.serviceUrl.defaultZone=http://dpb:[email protected]:8761/eureka/,http://dpb:[email protected]:8761/eureka/
#rebbitmq 链接信息
spring.rabbitmq.host=192.168.88.150
spring.rabbitmq.port=5672
spring.rabbitmq.username=dpb
spring.rabbitmq.password=123
spring.rabbitmq.virtualHost=/
# 对应 MQ 是 exchange 和消息发送者的 交换器是同一个
spring.cloud.stream.bindings.inputProduct.destination=exchangeProduct
# 具体分组 对应 MQ 是 队列名称 并且持久化队列 inputProduct 自定义
spring.cloud.stream.bindings.inputProduct.group=groupProduct999
#开启消费者分区功能
spring.cloud.stream.bindings.inputProduct.consumer.partitioned=true
#指定了当前消费者的总实例数量
spring.cloud.stream.instanceCount=2
#设置当前实例的索引号,从 1 开始
spring.cloud.stream.instanceIndex=1
启动服务测试
10个消息都被消费者A给消费了,说明到达了我们需要的效果。
案例源码:https://github.com/q279583842q/springcloud-e-book
以上是关于Spring cloud stream消息分区的主要内容,如果未能解决你的问题,请参考以下文章
Spring Cloud Stream 3.x - 重播消息策略
第十章 消息驱动的微服务: Spring Cloud Stream
spring-cloud-stream kafka 消费者并发