SpringCloud-2.0-周阳(15. 消息驱动 - SpringCloud Stream)
Posted ABin-阿斌
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了SpringCloud-2.0-周阳(15. 消息驱动 - SpringCloud Stream)相关的知识,希望对你有一定的参考价值。
上一篇 :14. 消息总线 - SpringCloud Bus
下一篇 :16. 请求链路追踪 - SpringCloud Sleuth
- 声明:原文作者:csdn:yuan_404
文章目录
1.概述
1.1 简介
-
官网地址 :https://spring.io/projects/spring-cloud-stream#overview
-
Spring Cloud Stream中文指导手册 :https://m.wang1314.com/doc/webapp/topic/20971999.html
什么是SpringCloudStream
- 官方定义 Spring Cloud Stream 是一个构建消息驱动微服务的框架。
- 应用程序通过 inputs 或者 outputs 来与 Spring Cloud Stream 中 binder 对象交互。
- 通过我们配置来 binding(绑定) ,而 Spring Cloud Stream 的 binder 对象负责与消息中间件交互。所以,我们只需要搞清楚如何与 Spring Cloud Stream 交互就可以了,不需要对所有的消息中间件(ActiveMQ、RabbitMQ、RocketMQ、Kafka)都很熟悉。
- 通过使用 Spring Integration 来连接消息代理中间件以实现消息事件驱动。
- Spring Cloud Stream 为一些供应商的消息中间件产品提供了个性化的自动化配置实现, 引用了发布-订阅、消费组、分区的三个核心概念。
- 目前仅支持RabbitMQ、Kafka。
也就是 :屏蔽底层消息中间件的差异,降低切换版本,统一消息的编程模型
1.2 设计思想
- 标准 MQ
- 生产者/消费者之间靠消息媒介传递信息内容 —— Message
- 消息必须走特定的通道 —— 消息通道 MessageChannel
- 消息通道里的消息如何被消费呢,谁负责收发处理 —— 消息通道 MessageChannel 的子接口 SubscribableChannel,由 MessageHandler 消息处理器订阅
- 为什么要引入 SpringCloud Stream
比方说我们用到了 RabbitMQ 和 Kafka ,由于这两个消息中间件的架构上的不同,像 RabbitMQ 有 exchange, kafka 有 Topic 和 Partitions 分区,
这些中间件的差异性导致我们实际项目开发给我们造成了一定的困扰,我们如果用了两个消息队列的其中一种,后面的业务需求,我想往另外一种消息队列进行迁移,这时候无疑就是一个灾难性的,一大堆东西都要重新推倒重新做,因为它跟我们的系统耦合了,这时候 springcloud Stream 给我们提供了—种解耦合的方式。
- SpringCloud Stream 屏蔽差异
在没有绑定器这个概念的情况下,我们的 SpringBoot 应用要直接与消息中间件进行信息交互的时候,由于各消息中间件构建的初衷不同,它们的实现细节上会有较大的差异性
通过定义绑定器binder作为中间层,完美地实现了应用程序与消息中间件细节之间的隔离。
通过向应用程序暴露统一的Channel通道,使得应用程序不需要再考虑各种不同的消息中间件实现。
INPUT对应于生产者
OUTPUT对应于消费者
- Stream 中的消息通信方式遵循了 发布-订阅模式
- 在 RabbitMQ 就是 Exchange 交换机
- 在kafka中就是Topic
1.3 Spring Cloud Stream 标准流程套路
- Binder :很方便的连接中间件,屏蔽差异
- Channel :通道,是队列 Queue 的一种抽象,在消息通讯系统中就是实现存储和转发的媒介,通过对 Channel 对队列进行配置
- Source和Sink :简单的可理解为参照对象是 Spring Cloud Stream 自身,从 Stream 发布消息就是输出,接受消息就是输入
1.4 编码API和常用注解
2 . 使用
- 前提 :确保 上一篇 中的 RabbitMQ 环境搭建成功
2.1 消息驱动 - 生产者
-
新建模块 :cloud-stream-rabbitmq-provider-8801
作为生产者,完成 消息的发送
-
修改 POM
<dependencies> <dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-starter-stream-rabbit</artifactId> </dependency> <!-- https://mvnrepository.com/artifact/org.springframework.cloud/spring-cloud-starter-eureka-server --> <dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-starter-netflix-eureka-client</artifactId> </dependency> <dependency> <groupId>com.demo.springcloud</groupId> <artifactId>cloud-api-commons</artifactId> <version>$project.version</version> </dependency> <!-- https://mvnrepository.com/artifact/org.springframework.boot/spring-boot-starter-web --> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> </dependency> <!-- https://mvnrepository.com/artifact/org.springframework.boot/spring-boot-starter-web --> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-actuator</artifactId> </dependency> <!-- https://mvnrepository.com/artifact/org.springframework.boot/spring-boot-devtools --> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-devtools</artifactId> <scope>runtime</scope> <optional>true</optional> </dependency> <!-- https://mvnrepository.com/artifact/org.projectlombok/lombok --> <dependency> <groupId>org.projectlombok</groupId> <artifactId>lombok</artifactId> <optional>true</optional> </dependency> <!-- https://mvnrepository.com/artifact/org.springframework.boot/spring-boot-starter-test --> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-test</artifactId> <scope>test</scope> </dependency> </dependencies>
-
编写 YML
server: port: 8801 spring: application: name: cloud-stream-provider cloud: stream: binders: # 在此处配置要绑定的rabbitmq的服务信息; defaultRabbit: # 表示定义的名称,用于于binding整合 type: rabbit # 消息组件类型 environment: # 设置rabbitmq的相关的环境配置 spring: rabbitmq: host: localhost port: 5672 username: guest password: guest bindings: # 服务的整合处理 output: # 这个名字是一个通道的名称 destination: studyExchange # 表示要使用的Exchange名称定义 content-type: application/json # 设置消息类型,本次为json,文本则设置“text/plain” # !=!=!实际这里可能会爆红,不用管,可以正常运行 !=!=! binder: defaultRabbit # 设置要绑定的消息服务的具体设置 eureka: client: # 客户端进行Eureka注册的配置 service-url: defaultZone: http://localhost:7001/eureka instance: lease-renewal-interval-in-seconds: 2 # 设置心跳的时间间隔(默认是30秒) lease-expiration-duration-in-seconds: 5 # 如果现在超过了5秒的间隔(默认是90秒) instance-id: send-8801.com # 在信息列表时显示主机名称 prefer-ip-address: true # 访问的路径变为IP地址
-
编写主启动类
@SpringBootApplication public class StreamMQMain8801 public static void main(String[] args) SpringApplication.run(StreamMQMain8801.class, args);
-
业务类
-
发送消息接口 :IMessageProvider
public interface IMessageProvider public String send();
-
发送消息接口实现类 :MessageProviderImpl
@EnableBinding(Source.class) //定义消息的推送管道, 绑定 信道(Channel)和 Exchange public class MessageProviderImpl implements IMessageProvider @Autowired private MessageChannel output; // 消息发送管道 @Override public String send() // 简单的消息 String serial = UUID.randomUUID().toString(); output.send(MessageBuilder.withPayload(serial).build()); System.out.println("*****serial: "+serial); return null;
-
Controller :SendMessageController
@RestController public class SendMessageController @Resource private IMessageProvider messageProvider; @GetMapping(value = "/sendMessage") public String sendMessage() return messageProvider.send();
- 启动测试
-
启动 RabbitMQ,并访问 http://localhost:15672/
-
启动 Eureka-7001、stream-provider-8801
-
查看 RabbitMQ 界面中的 Exchange
-
发送 http://localhost:8801/sendMessage 请求
多请求几次,观察结果
-
查看控制台输出
-
查看 RabbitMQ 界面 Overview
2.2 消息驱动 - 消费者
-
新建模块 :cloud-stream-rabbitmq-consumer-8802
-
修改 POM
<dependencies> <dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-starter-stream-rabbit</artifactId> </dependency> <!-- https://mvnrepository.com/artifact/org.springframework.cloud/spring-cloud-starter-eureka-server --> <dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-starter-netflix-eureka-client</artifactId> </dependency> <dependency> <groupId>com.demo.springcloud</groupId> <artifactId>cloud-api-commons</artifactId> <version>$project.version</version> </dependency> <!-- https://mvnrepository.com/artifact/org.springframework.boot/spring-boot-starter-web --> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> </dependency> <!-- https://mvnrepository.com/artifact/org.springframework.boot/spring-boot-starter-web --> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-actuator</artifactId> </dependency> <!-- https://mvnrepository.com/artifact/org.springframework.boot/spring-boot-devtools --> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-devtools</artifactId> <scope>runtime</scope> <optional>true</optional> </dependency> <!-- https://mvnrepository.com/artifact/org.projectlombok/lombok --> <dependency> <groupId>org.projectlombok</groupId> <artifactId>lombok</artifactId> <optional>true</optional> </dependency> <!-- https://mvnrepository.com/artifact/org.springframework.boot/spring-boot-starter-test --> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-test</artifactId> <scope>test</scope> </dependency> </dependencies>
-
编写 YML
server: port: 8802 spring: application: name: cloud-stream-consumer cloud: stream: binders: # 在此处配置要绑定的rabbitmq的服务信息; defaultRabbit: # 表示定义的名称,用于于binding整合 type: rabbit # 消息组件类型 environment: # 设置rabbitmq的相关的环境配置 spring: rabbitmq: host: localhost port: 5672 username: guest password: guest bindings: # 服务的整合处理 input: # 这个名字是一个通道的名称 destination: studyExchange # 表示要使用的Exchange名称定义 content-type: application/json # 设置消息类型,本次为json,文本则设置“text/plain” binder: defaultRabbit # 设置要绑定的消息服务的具体设置 eureka: client: # 客户端进行Eureka注册的配置 service-url: defaultZone: http://localhost:7001/eureka instance: lease-renewal-interval-in-seconds: 2 # 设置心跳的时间间隔(默认是30秒) lease-expiration-duration-in-seconds: 5 # 如果现在超过了5秒的间隔(默认是90秒) instance-id: receive-8802.com # 在信息列表时显示主机名称 prefer-ip-address: true # 访问的路径变为IP地址`
-
编写主启动类
@SpringBootApplication public class StreamMQMain8802 public static void main(String[] args) SpringApplication.run(StreamMQMain8802.class, args);
-
业务类
-
Controller :ReceiveMessageListenerController
@Component @EnableBinding(Sink.class) public class ReceiveMessageListenerController @Value("$server.port") private String serverPort; @StreamListener(Sink.INPUT) public void input(Message<String> message) System.out.println("消费者1号,接受:"+message.getPayload()+"\\t port:"+serverPort);
- 测试
-
发送 [http://localhost:8801/sendMessage]
-
发送者控制台
-
接收者控制台
2.3 分组消费与持久化
2.3.1 环境配置
-
参照 cloud-stream-rabbitmq-consumer-8802,再新建一个 cloud-stream-rabbitmq-consumer-8803 模块。
不同点只有 —— 端口号 和 instance-id
只要将这两处修改成 8803 即可
-
启动以下项目
2.3.2 重复消费问题
- 发出该请求 :http://localhost:8801/sendMessage
- 会发现 8802、8803 都接收到了该消息
- 这就是消息被 重复消费 了
- 产生的影响(通过小案例理解)
比如在如下场景中 :
订单系统我们做集群部署,都会从 RabbitMQ 中获取订单信息,那如果一个订单同时被两个服务获取到,那么就会造成数据错误,我们得避免这种情况。这时我们就可以使用Stream中的消息分组来解决。
注意在Stream中处于同一个group中的多个消费者是竞争关系,就能够保证消息只会被其中一个应用消费一次。不同组是可以全面消费的(重复消费)
同一组内发生竞争关系,只有其中一个可以消费
2.3.3 分组
-
先查看一下当前 8802、8803 两个消费者的分组情况
-
上面的组名是遗传随机字符,下面先自定义两个分组
- 先定义两个分组名 :MyGroup-1 、MyGroup-2
- 修改 8802 的 YML
group: MyGroup-1
- 修改 8803
group: MyGroup-2
- 重启这两个项目
- 查看 RabbitMQ 界面
- 到现在只是自定义了分组
- 这两个服务还是在两个不同的组中,所以还是存在重复消费
- 重复消费解决方案
- 8802 / 8803 实现轮询分组,每次只有一个消费者
- 8801 模块的发的消息只能被 8802 或 8803 其中一个接收到,这样避免了重复消费
- 将 8803 的分组修改为 MyGroup-1
-
重启服务
-
查看 RabbitMQ 界面
-
发送请求(发两次) :http://localhost:8801/sendMessage
2.3.4 持久化
- 当加上 Group 配置后,就自动支持持久化功能
- 下面演示一下持久化的效果
-
先停止 8802、8803 服务
-
删去 8802 的 YML 中 Group,8803 的不删
-
8801 生产者先发送几条消息
-
启动 8802 ,并观察控制台
控制台无输出,因为没有配置 Group,没有开启持久化,所以没有接收到消息
-
启动 8803 ,并观察控制台
控制台有输出,因为配置了 Group,开启了持久化,所以接收到了消息
以上是关于SpringCloud-2.0-周阳(15. 消息驱动 - SpringCloud Stream)的主要内容,如果未能解决你的问题,请参考以下文章
SpringCloud-2.0-周阳(16. 请求链路追踪 - SpringCloud Sleuth)
SpringCloud-2.0-周阳(13. 分布式配置中心 - SpringCloud Config)
SpringCloud-2.0-周阳(17. SpringCloud Alibaba入门简介)
SpringCloud-2.0-周阳(21. Sentinel 环境搭建)