SpringCloud H版 Stream 消息驱动讲解
Posted 小毕超
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了SpringCloud H版 Stream 消息驱动讲解相关的知识,希望对你有一定的参考价值。
一、SpringCloud Stream
上篇文章,我们讲解了config + bus 实现所有服务配制自动刷新的功能,本篇文章继续讲解下消息驱动Stream
上篇文章地址:https://blog.csdn.net/qq_43692950/article/details/122025347
官方定义 Spring Cloud Stream 是一个构建消息驱动微服务的框架。
应用程序通过 inputs
或者 outputs
来与 Spring Cloud Stream中binder
对象交互。
通过我们配置来binding
(绑定) ,而 Spring Cloud Stream 的 binder
对象负责与消息中间件交互。所以,我们只需要搞清楚如何与 Spring Cloud Stream 交互就可以方便使用消息驱动的方式。
通过使用Spring Integration来连接消息代理中间件以实现消息事件驱动。
Spring Cloud Stream 为一些供应商的消息中间件产品提供了个性化的自动化配置实现,引用了发布-订阅、消费组、分区的三个核心概念。
目前仅支持RabbitMQ
、Kafka
。
为什么使用stream
比方说我们用到了RabbitMQ和Kafka,由于这两个消息中间件的架构上的不同,像RabbitMQ
有exchange
,kafka
有Topic
和Partitions
分区,这些中间件的差异性导致我们实际项目开发给我们造成了一定的困扰,我们如果用了两个消息队列的其中一种,后面的业务需求,我想往另外一种消息队列进行迁移,这时候无疑就是一个灾难性的,一大堆东西都要重新推倒重新做,因为它跟我们的系统耦合了,这时候springcloud Stream给我们提供了一种解耦合的方式。
stream 中的注解
我们stream 基于 RabbitMQ 上使用,所以在搭建之前确保已经安装好RabbitMQ 。
二、搭建消息提供者
创建SpringBoot module 引入下面依赖:
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-actuator</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-stream-rabbit</artifactId>
</dependency>
配制文件修改:
server:
port: 7071
spring:
application:
name: cloud-stream-provider
cloud:
stream:
binders: # 在此处配置要绑定的rabbitmq的服务信息;
defaultRabbit: # 表示定义的名称,用于于binding整合
type: rabbit # 消息组件类型
environment: # 设置rabbitmq的相关的环境配置
spring:
rabbitmq:
host: 192.168.40.1
port: 5672
username: guest
password: guest
bindings: # 服务的整合处理
output: # 这个名字是一个通道的名称
destination: msgExchange # 表示要使用的Exchange名称定义
content-type: application/json # 设置消息类型,本次为json,文本则设置“text/plain”
binder: defaultRabbit # 设置要绑定的消息服务的具体设置
创建服务提供service
接口:
public interface IMessageProvider
public String send(String msg);
实现:
@EnableBinding(Source.class) //定义消息的推送管道
public class MessageProviderImpl implements IMessageProvider
@Resource
private MessageChannel output; // 消息发送管道
@Override
public String send(String msg)
return output.send(MessageBuilder.withPayload(msg).build()) ? "发送成功" : "发送失败!";
创建测试接口:
@RestController
public class SendMessageController
@Resource
private IMessageProvider messageProvider;
@GetMapping("/sendMessage/msg")
public ResponseTemplate sendMessage(@PathVariable String msg)
String result = messageProvider.send(msg);
return ResSuccessTemplate.builder().data(result).build();
启动消息提供者服务,查看RabbitMQ的控制台:
我们写的交换机已经注册到RabbitMQ中了。
下面继续搭建消息的消费者。
三、搭建消息消费者
再创建一个SpringBoot 的 module,同样引入下面依赖:
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-stream-rabbit</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-actuator</artifactId>
</dependency>
修改配制文件:
server:
port: 6062
spring:
application:
name: cloud-stream-consumer
cloud:
stream:
binders: # 在此处配置要绑定的rabbitmq的服务信息;
defaultRabbit: # 表示定义的名称,用于于binding整合
type: rabbit # 消息组件类型
environment: # 设置rabbitmq的相关的环境配置
spring:
rabbitmq:
host: www.bixc.net
port: 5672
username: guest
password: guest
bindings: # 服务的整合处理
input: # 这个名字是一个通道的名称
destination: msgExchange # 表示要使用的Exchange名称定义
content-type: application/json # 设置消息类型,本次为对象json,如果是文本则设置“text/plain”
binder: defaultRabbit # 设置要绑定的消息服务的具体设置
注意:这里的destination 要和 提供者的相同,才表示链接同一个交换机。
编写消息监听:
@Slf4j
@Component
@EnableBinding(Sink.class)
public class ReceiveMessageListenerController
@Value("$server.port")
private String serverPort;
@StreamListener(Sink.INPUT)
public void input(Message<String> message)
log.info("接受到的消息: , 当前 port: ", message.getPayload(), serverPort);
启动服务消费者,下面开始测试:
浏览器访问:http://localhost:7071/sendMessage/哈哈哈
已经发送成功了,再看下消费者端的打印日志:
同样也接受到了消息。
现在我们修改下消费者端的端口,再次启动一个相同的消费者,端口为6061。
再次发送消息,看下消费者端的请求:
两个消费者都接受到了消息,如果只想一个消费者接收消息,只需放在同一个组下面接口,上面配制中没有指定组,会自动生成一个组:
那现在我们加上分组,修改消费端的配制:
spring:
application:
name: cloud-stream-consumer
cloud:
stream:
binders: # 在此处配置要绑定的rabbitmq的服务信息;
defaultRabbit: # 表示定义的名称,用于于binding整合
type: rabbit # 消息组件类型
environment: # 设置rabbitmq的相关的环境配置
spring:
rabbitmq:
host: www.bixc.net
port: 5672
username: guest
password: guest
bindings: # 服务的整合处理
input: # 这个名字是一个通道的名称
destination: msgExchange # 表示要使用的Exchange名称定义
content-type: application/json # 设置消息类型,本次为对象json,如果是文本则设置“text/plain”
binder: defaultRabbit # 设置要绑定的消息服务的具体设置
group: groupA
添加上group: groupA
指定了组名,再来看RabbitMQ中的信息:
下面再发送下消息:
只有一个消费者收到了消息。
这里还有一个注意的是,如果不配置分组,每次启动都会随机分配一个分组,如果接受消息突然中断服务,再次启动就会造成消息的丢失,所以一定要指定组信息。
喜欢的小伙伴可以关注我的个人微信公众号,获取更多学习资料!
以上是关于SpringCloud H版 Stream 消息驱动讲解的主要内容,如果未能解决你的问题,请参考以下文章
SpringCloud学习—— SpringCloud Stream 消息驱动