十五Spring Cloud Stream 消息驱动
Posted 多加关注哟
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了十五Spring Cloud Stream 消息驱动相关的知识,希望对你有一定的参考价值。
1、消息驱动概述
1)是什么?
一句话:屏蔽底层消息中间件的差异,降低切换成本,统一消息的编程模型
什么是SpringCloudStream
官方定义的SpringCloudStream是一个构建消息驱动微服务的框架
应用程序通过inputs或者outputs来与SpringCloudStream中的binder对象交互。
通过我们来配置binding(绑定),Spring Cloud Stream的binder对象负责与消息中间件交互。
所以,我们只需要弄清楚如何与Spring Cloud Stream交互就可以方便使用消息驱动的方式
通过使用Spring Integration来连接消息代理中间件以实现消息事件驱动
SpringCloudStream为一些供应商的消息中间件产品提供了个性化的自动化配置实现,引用了 发布-订阅,消费组,分区的三个核心概念
目前支持 RabbitMQ和 kafka
2)Spring Cloud Stream 中文指导手册
http://m.wang1314.com/doc/webapp/topic/20971999.html
2、设计思想
1)标准的MQ
2)为什么需要Spring Cloud Stream?
RabbitMQ,kafka这些消息中间件的差异性给我们实际项目开发造成了一定的困扰,如果我们使用两个消息队列其中的一种,后期因为业务需求,我们想往另外一种消息队列进行迁移,这无疑是灾难性的,一大堆东西要推倒重做,因为它
和我们的系统耦合了,这时候SpringCloud Stream给我们提供了一种解耦的方式
3)Spring Cloud Stream 标准流程套路
Binder:方便的连接中间件,屏蔽差异
Channel:通道,队列Queue的一种抽象,在消息通讯系统中就是实现存储和转发的媒介,
通过Channel对队列进行配置
Source 和 Sink:从stream 发布消息就是输出,接受消息就是输入
4)编码api和常用注解
3、编码实现demo
新建三个工程
1 )构建消息驱动的生产者
项目结构
①、pom文件
<dependencies> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-actuator</artifactId> </dependency> <dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-starter-netflix-eureka-client</artifactId> </dependency> <dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-starter-stream-rabbit</artifactId> </dependency> <!--基础配置--> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-devtools</artifactId> <scope>runtime</scope> <optional>true</optional> </dependency> <dependency> <groupId>org.projectlombok</groupId> <artifactId>lombok</artifactId> <optional>true</optional> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-test</artifactId> <scope>test</scope> </dependency> </dependencies>
②、application.xml
server: port: 8801 spring: application: name: clou-stream-provider cloud: stream: binders: # 在此处配置要绑定的rabbitmq的服务信息; defaultRabbit: # 表示定义的名称,用于于binding整合 type: rabbit # 消息组件类型 environment: # 设置rabbitmq的相关的环境配置 spring: rabbitmq: host: localhost port: 5672 username: guest password: guest bindings: # 服务的整合处理 output: # 这个名字是一个通道的名称 destination: studyExchange # 表示要使用的Exchange名称定义 content-type: application/json # 设置消息类型,本次为json,文本则设置“text/plain” binder: defaultRabbit # 设置要绑定的消息服务的具体设置 eureka: client: # 客户端进行Eureka注册的配置 service-url: defaultZone: http://localhost:7001/eureka instance: lease-renewal-interval-in-seconds: 2 # 设置心跳的时间间隔(默认是30秒) lease-expiration-duration-in-seconds: 5 # 如果现在超过了5秒的间隔(默认是90秒) instance-id: 127.0.0.1 # 在信息列表时显示主机名称 prefer-ip-address: true # 访问的路径变为IP地址
③、主启动类
@SpringBootApplication public class StreamMqMain8801 { public static void main(String[] args) { SpringApplication.run(StreamMqMain8801.class,args); } }
④、业务类
@RestController public class SendMessageController { @Autowired private IMessageProvider iMessageProvider; @GetMapping("/sengMessage") public String sendMessage(){ return iMessageProvider.send(); } } ==================================================================== @EnableBinding(Source.class) //定义消息的推送管道 public class MessageProviderImpl implements IMessageProvider { @Resource private MessageChannel output; @Override public String send() { String serial = UUID.randomUUID().toString(); output.send(MessageBuilder.withPayload(serial).build()); System.out.println("serial******************"+serial); return null; } }
⑤、查看RabbitMQ,交换机上出现了我们yml文件上定义的 studyExchange
访问http://localhost:8801/sengMessage
2)消息驱动之消费者
项目结构:
① 、pom文件
<dependencies> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> </dependency> <dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-starter-netflix-eureka-client</artifactId> </dependency> <dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-starter-stream-rabbit</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-actuator</artifactId> </dependency> <!--基础配置--> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-devtools</artifactId> <scope>runtime</scope> <optional>true</optional> </dependency> <dependency> <groupId>org.projectlombok</groupId> <artifactId>lombok</artifactId> <optional>true</optional> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-test</artifactId> <scope>test</scope> </dependency> </dependencies>
②、application.yml
server: port: 8802 spring: application: name: cloud-stream-consumer cloud: stream: binders: # 在此处配置要绑定的rabbitmq的服务信息; defaultRabbit: # 表示定义的名称,用于于binding整合 type: rabbit # 消息组件类型 environment: # 设置rabbitmq的相关的环境配置 spring: rabbitmq: host: localhost port: 5672 username: guest password: guest bindings: # 服务的整合处理 input: # 这个名字是一个通道的名称 destination: studyExchange # 表示要使用的Exchange名称定义 content-type: application/json # 设置消息类型,本次为对象json,如果是文本则设置“text/plain” binder: defaultRabbit # 设置要绑定的消息服务的具体设置 eureka: client: # 客户端进行Eureka注册的配置 service-url: defaultZone: http://localhost:7001/eureka instance: lease-renewal-interval-in-seconds: 2 # 设置心跳的时间间隔(默认是30秒) lease-expiration-duration-in-seconds: 5 # 如果现在超过了5秒的间隔(默认是90秒) instance-id: 127.0.0.1 # 在信息列表时显示主机名称 prefer-ip-address: true # 访问的路径变为IP地址
③、主启动类
@SpringBootApplication public class StreamMqMain8802 { public static void main(String[] args) { SpringApplication.run(StreamMqMain8802.class, args); } }
④、业务类
@Component @EnableBinding(Sink.class) public class ReceiveMessageController { @Value("${server.port}") private String serverPort; @StreamListener(Sink.INPUT) public void input(Message<String> message){ System.out.println("消费者1号-"+serverPort+"接收到消息:"+message.getPayload()); } }
⑤、测试:
访问:http://localhost:8801/sengMessage
4、分组消费与持久化
1)重复消费问题
生产者生产一条消息后,两个消费者 8802 和 8803 都收到了消息
会有什么后果:
8802服务和8803服务默认处于不同的组
2)分组
①、
微服务放置在同一个group中,就能保证消息只能被其中的一个应用消费一次,
同一个group内会发生竞争关系,只有其中一个可以消费
不同组的应用可以重复消费同一个消息
②、自定义分组:
将8802 和 8803分配到同一个组:
3)持久化
添加了分组,就自动实现了持久化,当启动了服务消费者,会自动从交换机中读取消息
当8802去掉group配置后,重启,不会读取交换机中缓存的消息
以上是关于十五Spring Cloud Stream 消息驱动的主要内容,如果未能解决你的问题,请参考以下文章