SpringCloudSpring Cloud Stream 消息驱动(二十三)
Posted H__D
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了SpringCloudSpring Cloud Stream 消息驱动(二十三)相关的知识,希望对你有一定的参考价值。
Spring Cloud Stream介绍
Spring Cloud Stream,官方定义Spring Cloud Stream是一个构建消息驱动微服务的框架
应用程序通过 inputs 或者 outputs 来与 Spring Cloud Stream 中的binder对象交互,通过配置binding(绑定),而 Spring Cloud Stream 的binder对象负载与消息中间件交互,所以,我们只需要高清楚如何与 Spring Cloud Stream 交互就可以方便使用消息驱动的方式,通过使用Spring Integration 来连接消息代理中间件以实现消息事件驱动。
Spring Cloud Stream 为一些供应商的消息中间件产品提供了个性化的自动化配置实现,引用了发布-订阅、消费组、分区的三个核心概念
目前仅支持RabbitMQ、Kafka
官网:https://spring.io/projects/spring-cloud-stream
中文手册:https://www.springcloud.cc/spring-cloud-greenwich.html#spring-cloud-stream-overview-introducing
Spring Cloud Stream处理架构
组成 | 说明 |
---|---|
Middleware | 中间件,目前只支持RabbitMQ和Kafka |
Binder | Binder是应用与消息中间件之间的封装,目前实行了Kafka和RabbitMQ的Binder,通过Binder可以很方便的连接中间件,可以动态的改变消息类型(对应于Kafka的topic,RabbitMQ的exchange),这些都可以通过配置文件来实现 |
@Input | 注解标识输入通道,通过该输入通道接收到的消息进入应用程序 |
@Output | 注解标识输出通道,发布的消息将通过该通道离开应用程序 |
@StreamListener | 监听队列,用于消费者的队列的消息接收 |
@EnableBinding | 指信道channel和exchange绑定在一起 |
Spring Cloud Stream标准流程
Spring Cloud Stream生产者
环境准备
使用Eureka作为注册中心,搭建参考:【SpringCloud】快速入门(一)
使用RabbitMQ作为中间件,搭建参考:【RabbitMQ】 RabbitMQ安装
1、新建一个Spring Cloud Stream生产者模块(springcloud-stream-rabbitmq-provider8801)
2、编辑POM文件,引入stream依赖和eureka依赖
1 <!-- spring cloud stream rabbit --> 2 <dependency> 3 <groupId>org.springframework.cloud</groupId> 4 <artifactId>spring-cloud-starter-stream-rabbit</artifactId> 5 </dependency> 6 7 <!-- eureka client --> 8 <dependency> 9 <groupId>org.springframework.cloud</groupId> 10 <artifactId>spring-cloud-starter-netflix-eureka-client</artifactId> 11 </dependency>
完整pom文件如下:
1 <?xml version="1.0" encoding="UTF-8"?> 2 <project xmlns="http://maven.apache.org/POM/4.0.0" 3 xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" 4 xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> 5 <parent> 6 <artifactId>test-springcloud</artifactId> 7 <groupId>com.test</groupId> 8 <version>1.0-SNAPSHOT</version> 9 </parent> 10 <modelVersion>4.0.0</modelVersion> 11 12 <artifactId>springcloud-stream-rabbitmq-provider8801</artifactId> 13 14 <dependencies> 15 16 <!-- spring cloud stream rabbit --> 17 <dependency> 18 <groupId>org.springframework.cloud</groupId> 19 <artifactId>spring-cloud-starter-stream-rabbit</artifactId> 20 </dependency> 21 22 <!-- eureka client --> 23 <dependency> 24 <groupId>org.springframework.cloud</groupId> 25 <artifactId>spring-cloud-starter-netflix-eureka-client</artifactId> 26 </dependency> 27 28 <!-- spring boot --> 29 <dependency> 30 <groupId>org.springframework.boot</groupId> 31 <artifactId>spring-boot-starter-web</artifactId> 32 </dependency> 33 <dependency> 34 <groupId>org.springframework.boot</groupId> 35 <artifactId>spring-boot-starter-actuator</artifactId> 36 </dependency> 37 38 <dependency> 39 <groupId>org.springframework.boot</groupId> 40 <artifactId>spring-boot-devtools</artifactId> 41 <scope>runtime</scope> 42 <optional>true</optional> 43 </dependency> 44 45 <dependency> 46 <groupId>org.projectlombok</groupId> 47 <artifactId>lombok</artifactId> 48 <optional>true</optional> 49 </dependency> 50 <dependency> 51 <groupId>org.springframework.boot</groupId> 52 <artifactId>spring-boot-starter-test</artifactId> 53 <scope>test</scope> 54 </dependency> 55 56 </dependencies> 57 </project>
3、编辑配置文件,application.yml
1 # 端口 2 server: 3 port: 8801 4 5 spring: 6 application: 7 name: cloud-stream-provider 8 cloud: 9 stream: 10 binders: 11 # 表示定义的名称,用于binding的服务信息 12 defaultRabbit: 13 # 消息组件类型 14 type: rabbit 15 # 设置rabbitmq的相关配置的环境配置 16 environment: 17 spring: 18 rabbitmq: 19 host: 127.0.0.1 20 port: 5672 21 username: guest 22 password: guest 23 # 服务的整合处理 24 bindings: 25 # 这个名字是一个通道的名称 26 output: 27 # 表示要使用的Exchange 名称定义 28 destination: studyExchange 29 # 设置消息类型,本次为json,文本则设置"text/plain" 30 content-type: application/json 31 # 设置要绑定的消息服务的具体设置 32 binder: defaultRabbit 33 34 eureka: 35 client: 36 service-url: 37 defaultZone: http://localhost:8761/eureka
4、编写主启动方法类
1 @SpringBootApplication 2 public class StreamMQMain8801 { 3 4 public static void main(String[] args) { 5 SpringApplication.run(StreamMQMain8801.class, args); 6 } 7 }
5、编辑业务接口,IMessageProvider用来定义发送消息方法
1 public interface IMessageProvider { 2 public String send(); 3 }
6、编辑业务接口实现,MessageProviderImpl
1 import org.springframework.messaging.support.MessageBuilder; 2 3 import java.util.UUID; 4 5 // 定义消息的推送管道 6 @EnableBinding(Source.class) 7 public class MessageProviderImpl implements IMessageProvider { 8 9 @Autowired 10 // 消息发送通道 11 private MessageChannel output; 12 13 public String send() { 14 String serial = UUID.randomUUID().toString(); 15 output.send(MessageBuilder.withPayload(serial).build()); 16 System.out.println("====serial: " + serial); 17 return null; 18 } 19 }
7、编写controller
1 @RestController 2 public class SendMessageController { 3 4 @Autowired 5 private IMessageProvider messageProvider; 6 7 @RequestMapping(value = "/sendMessage") 8 public String sendMessage(){ 9 return messageProvider.send(); 10 } 11 }
8、测试
1)启动Eureka注册中心,启动RabbitMQ消息中间件,启动Stream生产者项目
2)查看RabbitMQ的web界面,在Exchang模块中,可以看到里面新增了一个名为studyExchange的交互器,类型为topic
3)新建一个queue,名为:test.news,且绑定到studyExchange。
4)访问地址:http://localhost:8801/sendMessage,发送消息,查看RabbitMQ后台
可以看到test.news此queue,已经收到消息
Spring Cloud Stream消费者
1、新建一个Spring Cloud Stream消费者模块(springcloud-stream-rabbitmq-consumer8802)
2、编辑POM文件,引入stream依赖和eureka依赖,同上
完整pom文件如下:
1 <?xml version="1.0" encoding="UTF-8"?> 2 <project xmlns="http://maven.apache.org/POM/4.0.0" 3 xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" 4 xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> 5 <parent> 6 <artifactId>test-springcloud</artifactId> 7 <groupId>com.test</groupId> 8 <version>1.0-SNAPSHOT</version> 9 </parent> 10 <modelVersion>4.0.0</modelVersion> 11 12 <artifactId>springcloud-stream-rabbitmq-consumer8802</artifactId> 13 14 <dependencies> 15 16 <!-- spring cloud stream rabbit --> 17 <dependency> 18 <groupId>org.springframework.cloud</groupId> 19 <artifactId>spring-cloud-starter-stream-rabbit</artifactId> 20 </dependency> 21 22 <!-- eureka client --> 23 <dependency> 24 <groupId>org.springframework.cloud</groupId> 25 <artifactId>spring-cloud-starter-netflix-eureka-client</artifactId> 26 </dependency> 27 28 <!-- spring boot --> 29 <dependency> 30 <groupId>org.springframework.boot</groupId> 31 <artifactId>spring-boot-starter-web</artifactId> 32 </dependency> 33 <dependency> 34 <groupId>org.springframework.boot</groupId> 35 <artifactId>spring-boot-starter-actuator</artifactId> 36 </dependency> 37 38 <dependency> 39 <groupId>org.springframework.boot</groupId> 40 <artifactId>spring-boot-devtools</artifactId> 41 <scope>runtime</scope> 42 <optional>true</optional> 43 </dependency> 44 45 <dependency> 46 <groupId>org.projectlombok</groupId> 47 <artifactId>lombok</artifactId> 48 <optional>true</optional> 49 </dependency> 50 <dependency> 51 <groupId>org.springframework.boot</groupId> 52 <artifactId>spring-boot-starter-test</artifactId> 53 <scope>test</scope> 54 </dependency> 55 56 </dependencies> 57 </project>
3、便捷配置文件application.yml
1 # 端口 2 server: 3 port: 8802 4 5 spring: 6 application: 7 name: cloud-stream-consumer 8 cloud: 9 stream: 10 binders: 11 # 表示定义的名称,用于binding的服务信息 12 defaultRabbit: 13 # 消息组件类型 14 type: rabbit 15 # 设置rabbitmq的相关配置的环境配置 16 environment: 17 spring: 18 rabbitmq: 19 host: 127.0.0.1 20 port: 5672 21 username: guest 22 password: guest 23 # 服务的整合处理 24 bindings: 25 # 这个名字是一个通道的名称 26 input: 27 # 表示要使用的Exchange 名称定义 28 destination: studyExchange 29 # 设置消息类型,本次为json,文本则设置"text/plain" 30 content-type: application/json 31 # 设置要绑定的消息服务的具体设置 32 binder: defaultRabbit 33 # 消费分组 34 # group: testA 35 36 eureka: 37 client: 38 service-url: 39 defaultZone: http://localhost:8761/eureka
4、编写主启动方法类
1 @SpringBootApplication 2 public class StreamMQMain8802 { 3 public static void main(String[] args) { 4 SpringApplication.run(StreamMQMain8802.class, args); 5 } 6 }
5、编辑消息监听组件
1 @Component 2 @EnableBinding(Sink.class) 3 public class ReceiveMessageListenerController { 4 5 @Value("${server.port}") 6 private String serverPort; 7 8 @StreamListener(Sink.INPUT) 9 public void input(Message<String> message){ 10 System.out.println("消费者" + serverPort + ",消费信息:" + message.getPayload()); 11 } 12 }
6、测试
1)启动Eureka注册中心,启动RabbitMQ消息中间件,启动Stream生产者项目,以及启动Stream消费者项目
2)查看RaibbitMQ的Web后台,发现Queue中多了队列,即Stream消费者项目监听的队列,且此队列绑定了studyExchange
3)访问地址:http://localhost:8801/sendMessage,发送消息,查看RabbitMQ后台
可以看到test.news此queue,已经收到消息,且Stream消费者项目也收到消息,并处理了
以上是关于SpringCloudSpring Cloud Stream 消息驱动(二十三)的主要内容,如果未能解决你的问题,请参考以下文章
SpringCloudSpring Cloud Config 配置中心(二十)
SpringCloudSpring Cloud Config 客户端(二十一)
SpringCloudSpring Cloud Alibaba 及 Nacos介绍(二十六)
SpringCloudSpring Cloud Alibaba 之 Nacos配置中心(二十八)