Spring Cloud 系列之 Stream 消息驱动

Posted 哈喽沃德先生

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Spring Cloud 系列之 Stream 消息驱动相关的知识,希望对你有一定的参考价值。

本篇文章为系列文章,未读第一集的同学请猛戳这里:Spring Cloud 系列之 Stream 消息驱动(一)

本篇文章讲解 Stream 如何实现消息分组和消息分区。

  

消息分组

  

  点击链接观看:Stream 消息分组视频(获取更多请关注公众号「哈喽沃德先生」)

  

  如果有多个消息消费者,那么消息生产者发送的消息会被多个消费者都接收到,这种情况在某些实际场景下是有很大问题的,比如在如下场景中,订单系统做集群部署,都会从 RabbitMQ 中获取订单信息,如果一个订单消息同时被两个服务消费,系统肯定会出现问题。为了避免这种情况,Stream 提供了消息分组来解决该问题。

  在 Stream 中处于同一个 group 中的多个消费者是竞争关系,能够保证消息只会被其中一个应用消费。不同的组是可以消费的,同一个组会发生竞争关系,只有其中一个可以消费。通过 spring.cloud.stream.bindings.<bindingName>.group 属性指定组名。

  

问题演示

  

  在 stream-demo 项目下创建 stream-consumer02 子项目。

  项目代码使用入门案例中消息消费者的代码。

  单元测试代码如下:

package com.example;

import com.example.producer.MessageProducer;
import org.junit.jupiter.api.Test;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;

@SpringBootTest(classes = {StreamProducerApplication.class})
public class MessageProducerTest {

    @Autowired
    private MessageProducer messageProducer;

    @Test
    public void testSend() {
        messageProducer.send("hello spring cloud stream");
    }

}

  

测试

  

  运行单元测试发送消息,两个消息消费者控制台打印结果如下:

  stream-consumer 的控制台:

message = hello spring cloud stream

  stream-consumer02 的控制台:

message = hello spring cloud stream

  通过结果可以看到消息被两个消费者同时消费了,原因是因为它们属于不同的分组,默认情况下分组名称是随机生成的,通过 RabbitMQ 也可以得知:

  

配置分组

  

  stream-consumer 的分组配置为:group-A

server:
  port: 8002 # 端口

spring:
  application:
    name: stream-consumer # 应用名称
  rabbitmq:
    host: 192.168.10.101  # 服务器 IP
    port: 5672            # 服务器端口
    username: guest       # 用户名
    password: guest       # 密码
    virtual-host: /       # 虚拟主机地址
  cloud:
    stream:
      bindings:
        # 消息接收通道
        # 与 org.springframework.cloud.stream.messaging.Sink 中的 @Input("input") 注解的 value 相同
        input:
          destination: stream.message # 绑定的交换机名称
          group: group-A

  

  stream-consumer02 的分组配置为:group-A

server:
  port: 8003 # 端口

spring:
  application:
    name: stream-consumer # 应用名称
  rabbitmq:
    host: 192.168.10.101  # 服务器 IP
    port: 5672            # 服务器端口
    username: guest       # 用户名
    password: guest       # 密码
    virtual-host: /       # 虚拟主机地址
  cloud:
    stream:
      bindings:
        # 消息接收通道
        # 与 org.springframework.cloud.stream.messaging.Sink 中的 @Input("input") 注解的 value 相同
        input:
          destination: stream.message # 绑定的交换机名称
          group: group-A

  

测试

  

  运行单元测试发送消息,此时多个消息消费者只有其中一个可以消费。RabbitMQ 结果如下:

  

消息分区

  

  点击链接观看:Stream 消息分区视频(获取更多请关注公众号「哈喽沃德先生」)

  

  通过消息分组可以解决消息被重复消费的问题,但在某些场景下分组还不能满足我们的需求。比如,同时有多条同一个用户的数据发送过来,我们需要根据用户统计,但是消息被分散到了不同的集群节点上了,这时我们就可以考虑使用消息分区了。

  当生产者将消息发送给多个消费者时,保证同一消息始终由同一个消费者实例接收和处理。消息分区是对消息分组的一种补充。

问题演示

  

  先给大家演示一下消息未分区的效果,单元测试代码如下:

package com.example;

import com.example.producer.MessageProducer;
import org.junit.jupiter.api.Test;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;

@SpringBootTest(classes = {StreamProducerApplication.class})
public class MessageProducerTest {

    @Autowired
    private MessageProducer messageProducer;

    @Test
    public void testSend() {
        for (int i = 1; i <= 10; i++) {
            messageProducer.send("hello spring cloud stream");
        }
    }

}

  

测试

  

  运行单元测试发送消息,两个消息消费者控制台打印结果如下:

  stream-consumer 的控制台:

message = hello spring cloud stream
message = hello spring cloud stream
message = hello spring cloud stream
message = hello spring cloud stream
message = hello spring cloud stream

  stream-consumer02 的控制台:

message = hello spring cloud stream
message = hello spring cloud stream
message = hello spring cloud stream
message = hello spring cloud stream
message = hello spring cloud stream

  假设这 10 条消息都来自同一个用户,正确的方式应该都由一个消费者消费所有消息,否则系统肯定会出现问题。为了避免这种情况,Stream 提供了消息分区来解决该问题。

  

配置分区

  

  消息生产者配置分区键的表达式规则消息分区的数量

server:
  port: 8001 # 端口

spring:
  application:
    name: stream-producer # 应用名称
  rabbitmq:
    host: 192.168.10.101  # 服务器 IP
    port: 5672            # 服务器端口
    username: guest       # 用户名
    password: guest       # 密码
    virtual-host: /       # 虚拟主机地址
  cloud:
    stream:
      bindings:
        # 消息发送通道
        # 与 org.springframework.cloud.stream.messaging.Source 中的 @Output("output") 注解的 value 相同
        output:
          destination: stream.message # 绑定的交换机名称
          producer:
            partition-key-expression: payload # 配置分区键的表达式规则
            partition-count: 2 # 配置消息分区的数量

  通过 partition-key-expression 参数指定分区键的表达式规则,用于区分每个消息被发送至对应分区的输出 channel

  该表达式作用于传递给 MessageChannelsend 方法的参数,该参数实现 org.springframework.messaging.Message 接口的 GenericMessage 类。

  

  源码 MessageChannel.java

package org.springframework.messaging;

@FunctionalInterface
public interface MessageChannel {
    long INDEFINITE_TIMEOUT = -1L;

    default boolean send(Message<?> message) {
        return this.send(message, -1L);
    }

    boolean send(Message<?> var1, long var2);
}

  源码 GenericMessage.java

package org.springframework.messaging.support;

import java.io.Serializable;
import java.util.Map;
import org.springframework.lang.Nullable;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageHeaders;
import org.springframework.util.Assert;
import org.springframework.util.ObjectUtils;

public class GenericMessage<T> implements Message<T>, Serializable {
    private static final long serialVersionUID = 4268801052358035098L;
    private final T payload;
    private final MessageHeaders headers;
    
    ...

}

  

  如果 partition-key-expression 的值是 payload,将会使用所有放在 GenericMessage 中的数据作为分区数据。payload 是消息的实体类型,可以为自定义类型比如 UserRole 等等。

  如果 partition-key-expression 的值是 headers["xxx"],将由 MessageBuilder 类的 setHeader() 方法完成赋值,比如:

package com.example.producer;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.messaging.Source;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.stereotype.Component;

/**
 * 消息生产者
 */
@Component
@EnableBinding(Source.class)
public class MessageProducer {

    @Autowired
    private Source source;

    /**
     * 发送消息
     *
     * @param message
     */
    public void send(String message) {
        source.output().send(MessageBuilder.withPayload(message).setHeader("xxx", 0).build());
    }

}

  

  消息消费者配置消费者总数当前消费者的索引开启分区支持

  stream-consumer 的 application.yml

server:
  port: 8002 # 端口

spring:
  application:
    name: stream-consumer # 应用名称
  rabbitmq:
    host: 192.168.10.101  # 服务器 IP
    port: 5672            # 服务器端口
    username: guest       # 用户名
    password: guest       # 密码
    virtual-host: /       # 虚拟主机地址
  cloud:
    stream:
      instance-count: 2 # 消费者总数
      instance-index: 0 # 当前消费者的索引
      bindings:
        # 消息接收通道
        # 与 org.springframework.cloud.stream.messaging.Sink 中的 @Input("input") 注解的 value 相同
        input:
          destination: stream.message # 绑定的交换机名称
          group: group-A
          consumer:
            partitioned: true # 开启分区支持

  stream-consumer02 的 application.yml

server:
  port: 8003 # 端口

spring:
  application:
    name: stream-consumer # 应用名称
  rabbitmq:
    host: 192.168.10.101  # 服务器 IP
    port: 5672            # 服务器端口
    username: guest       # 用户名
    password: guest       # 密码
    virtual-host: /       # 虚拟主机地址
  cloud:
    stream:
      instance-count: 2 # 消费者总数
      instance-index: 1 # 当前消费者的索引
      bindings:
        # 消息接收通道
        # 与 org.springframework.cloud.stream.messaging.Sink 中的 @Input("input") 注解的 value 相同
        input:
          destination: stream.message # 绑定的交换机名称
          group: group-A
          consumer:
            partitioned: true # 开启分区支持

  

测试

  

  运行单元测试发送消息,此时多个消息消费者只有其中一个可以消费所有消息。RabbitMQ 结果如下:

  至此 Stream 消息驱动所有的知识点就讲解结束了。

本文采用 知识共享「署名-非商业性使用-禁止演绎 4.0 国际」许可协议

大家可以通过 分类 查看更多关于 Spring Cloud 的文章。

  

以上是关于Spring Cloud 系列之 Stream 消息驱动的主要内容,如果未能解决你的问题,请参考以下文章

Spring Cloud系列之 ConfigBusStreamSleuth

Spring Cloud系列之 ConfigBusStreamSleuth

Java之 Spring Cloud 微服务的 Spring Cloud Stream(第四个阶段)SpringBoot项目实现商品服务器端调用

Spring Cloud Stream整合Rabbit之重复投递

Spring Cloud Stream整合Rabbit之重复投递

spring cloud stream 3.1.2 源码搭配rocketmq学习