SpringCloud Hoxton——Stream服务消息驱动

Posted 张起灵-小哥

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了SpringCloud Hoxton——Stream服务消息驱动相关的知识,希望对你有一定的参考价值。

1.开篇

什么是SpringCloudStream?官方定义 Spring Cloud Stream 是一个构建消息驱动微服务的框架。

应用程序通过 inputs 或者 outputs 来与 Spring Cloud Stream中binder对象交互。通过我们配置来binding(绑定) ,而 Spring Cloud Stream 的 binder对象负责与消息中间件交互。所以,我们只需要搞清楚如何与 Spring Cloud Stream 交互就可以方便使用消息驱动的方式。

通过使用Spring Integration来连接消息代理中间件以实现消息事件驱动。

Spring Cloud Stream 为一些供应商的消息中间件产品提供了个性化的自动化配置实现,引用了发布-订阅、消费组、分区的三个核心概念。

我们都知道消息中间件有很多种,常用的有4种(ActiveMQ、RabbitMQ、RocketMQ、Kafka),但是Spring Cloud Stream目前仅支持RabbitMQ、Kafka。

标准的MQ一般都是下面这张图的形式。

Message消息:生产者/消费者之间靠消息媒介传递消息内容。

MessageChannel消息通道:消息必须走特定的通道。

SubscribableChannel:由MessageHandler消息处理器所订阅,谁来负责收发处理。

前面我们提到了目前存在并且常用的MQ有四种,那么就产生了一个问题:比方说我们的项目种用到了RabbitMQ和Kafka,由于这两个消息中间件的架构上的不同,像RabbitMQ有exchange,kafka有Topic和Partitions分区。这些中间件的差异性导致我们实际项目开发给我们造成了一定的困扰,我们如果用了两个消息队列的其中一种,后面的业务需求,我想往另外一种消息队列进行迁移,这时候无疑就是一个灾难性的,一大堆东西都要重新推倒重新做,因为它跟我们的系统耦合了,这时候springcloud Stream给我们提供了一种解耦合的方式。(通过定义绑定器Binder作为中间层,实现了应用程序与消息中间件细节之间的隔离。)

Binder可以生成Binding,Binding用来绑定消息容器的生产者和消费者,它有两种类型,INPUT和OUTPUT,INPUT对应于消费者,OUTPUT对应于生产者。

Binder:很方便的连接中间件,屏蔽各中间件之间的差异。

Channel:消息通道,是队列Queue的一种抽象,在消息通讯系统中就是实现存储个转发的媒介。

Source、Sink:其参照对象是SpringCloud Stream自身,从Stream中发布消息就是输出Source,接收消息就是输入Sink。

在这其中常用的API、注解如下:👇👇👇


2.项目源码

github源码地址:https://github.com/2656307671/SpringCloud-Hoxton-Stream

gitee源码地址:https://gitee.com/szh-forever-young/SpringCloud-Hoxton-Stream

首先要确保你的RabbitMQ环境已经搭建成功,并且启动,通过浏览器可以访问它的web管理界面窗口(虚拟机IP:15672)。我是在linux上启动的RabbitMQ;如果你是在windows上配置的,那就是localhost:15672。

这里一共有三个微服务模块,7001注册中心、8801消息发送模块(消息驱动之生产者)、8802消息接收模块(消息驱动之消费者)。

测试依次启动7001、8801、8802。

测试url:localhost:8801/sendMessage(发送四次请求,也就是发送四条消息)

上面的测试只有一个消息接收模块,下面再添加一个8803,它也是消息接收模块(消息驱动之消费者)。

此时,我们测试,仍然是依次启动7001、8801、8802、8803。

测试url:localhost:8801/sendMessage(发送两次请求,也就是发送两条消息)

从上面的截图可以看到,虽然8802、8803这两个消息接收方都成功接收到了消息,但是它俩收到的消息是完全一样的,这就存在着重复消费的问题。

比如在如下场景中,订单系统我们做集群部署,都会从RabbitMQ中获取订单信息,那如果一个订单同时被两个服务获取到,那么就会造成数据错误,我们得避免这种情况。

这时我们就可以使用Stream中的消息分组来解决。注意在Stream中处于同一个group中的多个消费者是竞争关系,就能够保证消息只会被其中一个应用消费一次。不同组是可以全面消费的(重复消费),同一组内会发生竞争关系,只有其中一个可以消费。

Stream消息分组就是说:微服务应用放置于同一个group中,就能保证消息只会被其中的一个应用消费一次,因为同一个组内会发生竞争关系,要么你消费A,要么我消费A,不会出现我俩都消费A的情况。但是不同的组是可以重复消费的。

下面,我将8802、8803放置在不同的组(参见yml配置文件),然后测试启动顺序、url和上面的一样,看看结果如何?

从RabbitMQ的web管理界面中看到,这里的两个Exchange交换机对应着8802、8803,它们在不同的两个组中(the first group、the second group),而我们下面发了两条消息(执行两次localhost:8801/sendMessage),但是8802、8803仍然接收到了相同的消息。可见:将它俩放置在不同的组中,无法解决重复消费的问题。

那么下面自然就是将它俩放置在相同的组中(参见yml配置文件),再次测试。

此时再看,8802、8803各自收到一条消息。因为它俩在同组,就会出现竞争,解决了重复消费的问题。

最后,再来说一下Stream中的持久化问题。

我们先停掉8802、8803这两个微服务,然后将8802的分组去掉(也就是将它yml配置文件中的group标签删掉),8803的保留。

之后执行四次localhost:8801/sendMessage,发送四条消息到RabbitMQ,此时重启8802、8803。

得到的结果就不再截图了,我在这里说一下:8802无法获取RabbitMQ中的四条消息,因为8802此时没有分组,所以它的后台无法读取消息数据;而8803保留了group分组属性,它可以获取到刚刚发送的四条消息。

以上是关于SpringCloud Hoxton——Stream服务消息驱动的主要内容,如果未能解决你的问题,请参考以下文章

SpringCloud Hoxton——Gateway服务网关

SpringCloud Hoxton——Gateway服务网关

SpringCloud Hoxton——Zookeeper服务注册与发现

SpringCloud Hoxton——Zookeeper服务注册与发现

SpringCloud Hoxton——Stream服务消息驱动

SpringCloud Hoxton——Stream服务消息驱动