SpringCloudSpring Cloud Stream 消息驱动(二十三)

Posted H__D

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了SpringCloudSpring Cloud Stream 消息驱动(二十三)相关的知识,希望对你有一定的参考价值。

Spring Cloud Stream介绍

  Spring Cloud Stream,官方定义Spring Cloud Stream是一个构建消息驱动微服务的框架

  应用程序通过 inputs 或者 outputs 来与 Spring Cloud Stream 中的binder对象交互,通过配置binding(绑定),而 Spring Cloud Stream 的binder对象负载与消息中间件交互,所以,我们只需要高清楚如何与 Spring Cloud Stream 交互就可以方便使用消息驱动的方式,通过使用Spring Integration 来连接消息代理中间件以实现消息事件驱动。

  Spring Cloud Stream 为一些供应商的消息中间件产品提供了个性化的自动化配置实现,引用了发布-订阅、消费组、分区的三个核心概念

  目前仅支持RabbitMQ、Kafka

  官网:https://spring.io/projects/spring-cloud-stream

  中文手册:https://www.springcloud.cc/spring-cloud-greenwich.html#spring-cloud-stream-overview-introducing

Spring Cloud Stream处理架构

    

组成说明
Middleware 中间件,目前只支持RabbitMQ和Kafka
Binder Binder是应用与消息中间件之间的封装,目前实行了Kafka和RabbitMQ的Binder,通过Binder可以很方便的连接中间件,可以动态的改变消息类型(对应于Kafka的topic,RabbitMQ的exchange),这些都可以通过配置文件来实现
@Input 注解标识输入通道,通过该输入通道接收到的消息进入应用程序
@Output 注解标识输出通道,发布的消息将通过该通道离开应用程序
@StreamListener 监听队列,用于消费者的队列的消息接收
@EnableBinding 指信道channel和exchange绑定在一起

Spring Cloud Stream标准流程

    

Spring Cloud Stream生产者

  环境准备

    使用Eureka作为注册中心,搭建参考:【SpringCloud】快速入门(一)

    使用RabbitMQ作为中间件,搭建参考:【RabbitMQ】 RabbitMQ安装

  1、新建一个Spring Cloud Stream生产者模块(springcloud-stream-rabbitmq-provider8801)

    

  2、编辑POM文件,引入stream依赖和eureka依赖

 1 <!-- spring cloud stream rabbit -->
 2 <dependency>
 3     <groupId>org.springframework.cloud</groupId>
 4     <artifactId>spring-cloud-starter-stream-rabbit</artifactId>
 5 </dependency>
 6 
 7 <!-- eureka client -->
 8 <dependency>
 9     <groupId>org.springframework.cloud</groupId>
10     <artifactId>spring-cloud-starter-netflix-eureka-client</artifactId>
11 </dependency> 

  完整pom文件如下:

 1 <?xml version="1.0" encoding="UTF-8"?>
 2 <project xmlns="http://maven.apache.org/POM/4.0.0"
 3          xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
 4          xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
 5     <parent>
 6         <artifactId>test-springcloud</artifactId>
 7         <groupId>com.test</groupId>
 8         <version>1.0-SNAPSHOT</version>
 9     </parent>
10     <modelVersion>4.0.0</modelVersion>
11 
12     <artifactId>springcloud-stream-rabbitmq-provider8801</artifactId>
13 
14     <dependencies>
15 
16         <!-- spring cloud stream rabbit -->
17         <dependency>
18             <groupId>org.springframework.cloud</groupId>
19             <artifactId>spring-cloud-starter-stream-rabbit</artifactId>
20         </dependency>
21 
22         <!-- eureka client -->
23         <dependency>
24             <groupId>org.springframework.cloud</groupId>
25             <artifactId>spring-cloud-starter-netflix-eureka-client</artifactId>
26         </dependency>
27 
28         <!-- spring boot -->
29         <dependency>
30             <groupId>org.springframework.boot</groupId>
31             <artifactId>spring-boot-starter-web</artifactId>
32         </dependency>
33         <dependency>
34             <groupId>org.springframework.boot</groupId>
35             <artifactId>spring-boot-starter-actuator</artifactId>
36         </dependency>
37 
38         <dependency>
39             <groupId>org.springframework.boot</groupId>
40             <artifactId>spring-boot-devtools</artifactId>
41             <scope>runtime</scope>
42             <optional>true</optional>
43         </dependency>
44 
45         <dependency>
46             <groupId>org.projectlombok</groupId>
47             <artifactId>lombok</artifactId>
48             <optional>true</optional>
49         </dependency>
50         <dependency>
51             <groupId>org.springframework.boot</groupId>
52             <artifactId>spring-boot-starter-test</artifactId>
53             <scope>test</scope>
54         </dependency>
55 
56     </dependencies>
57 </project>
pom.xml

  3、编辑配置文件,application.yml

 1 # 端口
 2 server:
 3   port: 8801
 4 
 5 spring:
 6   application:
 7     name: cloud-stream-provider
 8   cloud:
 9     stream:
10       binders:
11         # 表示定义的名称,用于binding的服务信息
12         defaultRabbit:
13           # 消息组件类型
14           type: rabbit
15           # 设置rabbitmq的相关配置的环境配置
16           environment:
17             spring:
18               rabbitmq:
19                 host: 127.0.0.1
20                 port: 5672
21                 username: guest
22                 password: guest
23       # 服务的整合处理
24       bindings:
25         # 这个名字是一个通道的名称
26         output:
27           # 表示要使用的Exchange 名称定义
28           destination: studyExchange
29           # 设置消息类型,本次为json,文本则设置"text/plain"
30           content-type: application/json
31           # 设置要绑定的消息服务的具体设置
32           binder: defaultRabbit
33 
34 eureka:
35   client:
36     service-url:
37       defaultZone: http://localhost:8761/eureka

  4、编写主启动方法类

1 @SpringBootApplication
2 public class StreamMQMain8801 {
3 
4     public static void main(String[] args) {
5         SpringApplication.run(StreamMQMain8801.class, args);
6     }
7 }

  5、编辑业务接口,IMessageProvider用来定义发送消息方法

1 public interface IMessageProvider {
2     public  String send();
3 }

  6、编辑业务接口实现,MessageProviderImpl

 1 import org.springframework.messaging.support.MessageBuilder;
 2 
 3 import java.util.UUID;
 4 
 5 // 定义消息的推送管道
 6 @EnableBinding(Source.class)
 7 public class MessageProviderImpl implements IMessageProvider {
 8 
 9     @Autowired
10     // 消息发送通道
11     private MessageChannel output;
12 
13     public String send() {
14         String serial = UUID.randomUUID().toString();
15         output.send(MessageBuilder.withPayload(serial).build());
16         System.out.println("====serial: " + serial);
17         return null;
18     }
19 }

   7、编写controller

 1 @RestController
 2 public class SendMessageController {
 3 
 4     @Autowired
 5     private IMessageProvider messageProvider;
 6 
 7     @RequestMapping(value = "/sendMessage")
 8     public String sendMessage(){
 9         return messageProvider.send();
10     }
11 }

   8、测试

    1)启动Eureka注册中心,启动RabbitMQ消息中间件,启动Stream生产者项目

    2)查看RabbitMQ的web界面,在Exchang模块中,可以看到里面新增了一个名为studyExchange的交互器,类型为topic

      

    3)新建一个queue,名为:test.news,且绑定到studyExchange。

      

    4)访问地址:http://localhost:8801/sendMessage,发送消息,查看RabbitMQ后台

      可以看到test.news此queue,已经收到消息

Spring Cloud Stream消费者

  1、新建一个Spring Cloud Stream消费者模块(springcloud-stream-rabbitmq-consumer8802) 

    

  2、编辑POM文件,引入stream依赖和eureka依赖,同上

    完整pom文件如下:

 1 <?xml version="1.0" encoding="UTF-8"?>
 2 <project xmlns="http://maven.apache.org/POM/4.0.0"
 3          xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
 4          xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
 5     <parent>
 6         <artifactId>test-springcloud</artifactId>
 7         <groupId>com.test</groupId>
 8         <version>1.0-SNAPSHOT</version>
 9     </parent>
10     <modelVersion>4.0.0</modelVersion>
11 
12     <artifactId>springcloud-stream-rabbitmq-consumer8802</artifactId>
13 
14     <dependencies>
15 
16         <!-- spring cloud stream rabbit -->
17         <dependency>
18             <groupId>org.springframework.cloud</groupId>
19             <artifactId>spring-cloud-starter-stream-rabbit</artifactId>
20         </dependency>
21 
22         <!-- eureka client -->
23         <dependency>
24             <groupId>org.springframework.cloud</groupId>
25             <artifactId>spring-cloud-starter-netflix-eureka-client</artifactId>
26         </dependency>
27 
28         <!-- spring boot -->
29         <dependency>
30             <groupId>org.springframework.boot</groupId>
31             <artifactId>spring-boot-starter-web</artifactId>
32         </dependency>
33         <dependency>
34             <groupId>org.springframework.boot</groupId>
35             <artifactId>spring-boot-starter-actuator</artifactId>
36         </dependency>
37 
38         <dependency>
39             <groupId>org.springframework.boot</groupId>
40             <artifactId>spring-boot-devtools</artifactId>
41             <scope>runtime</scope>
42             <optional>true</optional>
43         </dependency>
44 
45         <dependency>
46             <groupId>org.projectlombok</groupId>
47             <artifactId>lombok</artifactId>
48             <optional>true</optional>
49         </dependency>
50         <dependency>
51             <groupId>org.springframework.boot</groupId>
52             <artifactId>spring-boot-starter-test</artifactId>
53             <scope>test</scope>
54         </dependency>
55 
56     </dependencies>
57 </project>
pom.xml

  3、便捷配置文件application.yml

 1 # 端口
 2 server:
 3   port: 8802
 4 
 5 spring:
 6   application:
 7     name: cloud-stream-consumer
 8   cloud:
 9     stream:
10       binders:
11         # 表示定义的名称,用于binding的服务信息
12         defaultRabbit:
13           # 消息组件类型
14           type: rabbit
15           # 设置rabbitmq的相关配置的环境配置
16           environment:
17             spring:
18               rabbitmq:
19                 host: 127.0.0.1
20                 port: 5672
21                 username: guest
22                 password: guest
23       # 服务的整合处理
24       bindings:
25         # 这个名字是一个通道的名称
26         input:
27           # 表示要使用的Exchange 名称定义
28           destination: studyExchange
29           # 设置消息类型,本次为json,文本则设置"text/plain"
30           content-type: application/json
31           # 设置要绑定的消息服务的具体设置
32           binder: defaultRabbit
33           # 消费分组
34 #          group: testA
35 
36 eureka:
37   client:
38     service-url:
39       defaultZone: http://localhost:8761/eureka

  4、编写主启动方法类

1 @SpringBootApplication
2 public class StreamMQMain8802 {
3     public static void main(String[] args) {
4         SpringApplication.run(StreamMQMain8802.class, args);
5     }
6 }

  5、编辑消息监听组件

 1 @Component
 2 @EnableBinding(Sink.class)
 3 public class ReceiveMessageListenerController {
 4 
 5     @Value("${server.port}")
 6     private String serverPort;
 7 
 8     @StreamListener(Sink.INPUT)
 9     public void input(Message<String> message){
10         System.out.println("消费者" + serverPort + ",消费信息:" + message.getPayload());
11     }
12 }

  6、测试

    1)启动Eureka注册中心,启动RabbitMQ消息中间件,启动Stream生产者项目,以及启动Stream消费者项目

    2)查看RaibbitMQ的Web后台,发现Queue中多了队列,即Stream消费者项目监听的队列,且此队列绑定了studyExchange

      

    3)访问地址:http://localhost:8801/sendMessage,发送消息,查看RabbitMQ后台

      可以看到test.news此queue,已经收到消息,且Stream消费者项目也收到消息,并处理了

      

 

 

  

以上是关于SpringCloudSpring Cloud Stream 消息驱动(二十三)的主要内容,如果未能解决你的问题,请参考以下文章

SpringCloudSpring Cloud Config 配置中心(二十)

SpringCloudSpring Cloud Config 客户端(二十一)

SpringCloudSpring Cloud Alibaba 及 Nacos介绍(二十六)

SpringCloudSpring Cloud Alibaba 之 Nacos配置中心(二十八)

SpringCloudSpring Cloud Bus 服务总线(二十二)

SpringCloudSpring Cloud Alibaba 之 Sentinel熔断降级(三十一)