十五Spring Cloud Stream 消息驱动

Posted 多加关注哟

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了十五Spring Cloud Stream 消息驱动相关的知识,希望对你有一定的参考价值。

clipboard

1、消息驱动概述

1)是什么?

一句话:屏蔽底层消息中间件的差异,降低切换成本,统一消息的编程模型


什么是SpringCloudStream

官方定义的SpringCloudStream是一个构建消息驱动微服务的框架


应用程序通过inputs或者outputs来与SpringCloudStream中的binder对象交互。

通过我们来配置binding(绑定),Spring Cloud Stream的binder对象负责与消息中间件交互。

所以,我们只需要弄清楚如何与Spring Cloud Stream交互就可以方便使用消息驱动的方式


通过使用Spring Integration来连接消息代理中间件以实现消息事件驱动

SpringCloudStream为一些供应商的消息中间件产品提供了个性化的自动化配置实现,引用了 发布-订阅,消费组,分区的三个核心概念

目前支持 RabbitMQ和 kafka


2)Spring Cloud Stream 中文指导手册

http://m.wang1314.com/doc/webapp/topic/20971999.html


2、设计思想

1)标准的MQ

clipboard


2)为什么需要Spring Cloud Stream?

RabbitMQ,kafka这些消息中间件的差异性给我们实际项目开发造成了一定的困扰,如果我们使用两个消息队列其中的一种,后期因为业务需求,我们想往另外一种消息队列进行迁移,这无疑是灾难性的,一大堆东西要推倒重做,因为它

和我们的系统耦合了,这时候SpringCloud Stream给我们提供了一种解耦的方式

clipboard


3)Spring Cloud Stream 标准流程套路

clipboard

Binder:方便的连接中间件,屏蔽差异

Channel:通道,队列Queue的一种抽象,在消息通讯系统中就是实现存储和转发的媒介,

通过Channel对队列进行配置

Source 和 Sink:从stream 发布消息就是输出,接受消息就是输入


4)编码api和常用注解

clipboard


3、编码实现demo

新建三个工程

clipboard

1 )构建消息驱动的生产者

项目结构

clipboard

①、pom文件

<dependencies>
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-web</artifactId>
    </dependency>
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-actuator</artifactId>
    </dependency>
    <dependency>
        <groupId>org.springframework.cloud</groupId>
        <artifactId>spring-cloud-starter-netflix-eureka-client</artifactId>
    </dependency>
    <dependency>
        <groupId>org.springframework.cloud</groupId>
        <artifactId>spring-cloud-starter-stream-rabbit</artifactId>
    </dependency>
    <!--基础配置-->
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-devtools</artifactId>
        <scope>runtime</scope>
        <optional>true</optional>
    </dependency>
    <dependency>
        <groupId>org.projectlombok</groupId>
        <artifactId>lombok</artifactId>
        <optional>true</optional>
    </dependency>
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-test</artifactId>
        <scope>test</scope>
    </dependency>
</dependencies>


②、application.xml

server:
  port: 8801
spring:
  application:
    name: clou-stream-provider
  cloud:
    stream:
      binders: # 在此处配置要绑定的rabbitmq的服务信息;
        defaultRabbit: # 表示定义的名称,用于于binding整合
          type: rabbit # 消息组件类型
          environment: # 设置rabbitmq的相关的环境配置
            spring:
              rabbitmq:
                host: localhost
                port: 5672
                username: guest
                password: guest
      bindings: # 服务的整合处理
        output: # 这个名字是一个通道的名称
          destination: studyExchange # 表示要使用的Exchange名称定义
          content-type: application/json # 设置消息类型,本次为json,文本则设置“text/plain”
          binder: defaultRabbit # 设置要绑定的消息服务的具体设置

eureka:
  client: # 客户端进行Eureka注册的配置
    service-url:
      defaultZone: http://localhost:7001/eureka
  instance:
    lease-renewal-interval-in-seconds: 2 # 设置心跳的时间间隔(默认是30秒)
    lease-expiration-duration-in-seconds: 5 # 如果现在超过了5秒的间隔(默认是90秒)
    instance-id: 127.0.0.1  # 在信息列表时显示主机名称
    prefer-ip-address: true     # 访问的路径变为IP地址


③、主启动类

@SpringBootApplication
public class StreamMqMain8801 {
    public static void main(String[] args) {
        SpringApplication.run(StreamMqMain8801.class,args);
    }
}


④、业务类

@RestController
public class SendMessageController {

    @Autowired
    private IMessageProvider iMessageProvider;

    @GetMapping("/sengMessage")
    public String sendMessage(){
        return iMessageProvider.send();
    }
}

====================================================================
@EnableBinding(Source.class)  //定义消息的推送管道
public class MessageProviderImpl implements IMessageProvider {

    @Resource
    private MessageChannel output;

    @Override
    public String send() {

        String serial = UUID.randomUUID().toString();
        output.send(MessageBuilder.withPayload(serial).build());
        System.out.println("serial******************"+serial);
        return null;
    }
}


⑤、查看RabbitMQ,交换机上出现了我们yml文件上定义的 studyExchange

clipboard

访问http://localhost:8801/sengMessage

clipboard


2)消息驱动之消费者

项目结构:

clipboard

① 、pom文件

<dependencies>
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-web</artifactId>
    </dependency>
    <dependency>
        <groupId>org.springframework.cloud</groupId>
        <artifactId>spring-cloud-starter-netflix-eureka-client</artifactId>
    </dependency>
    <dependency>
        <groupId>org.springframework.cloud</groupId>
        <artifactId>spring-cloud-starter-stream-rabbit</artifactId>
    </dependency>
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-actuator</artifactId>
    </dependency>
    <!--基础配置-->
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-devtools</artifactId>
        <scope>runtime</scope>
        <optional>true</optional>
    </dependency>
    <dependency>
        <groupId>org.projectlombok</groupId>
        <artifactId>lombok</artifactId>
        <optional>true</optional>
    </dependency>
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-test</artifactId>
        <scope>test</scope>
    </dependency>
</dependencies>


②、application.yml

server:
  port: 8802

spring:
  application:
    name: cloud-stream-consumer
  cloud:
    stream:
      binders: # 在此处配置要绑定的rabbitmq的服务信息;
        defaultRabbit: # 表示定义的名称,用于于binding整合
          type: rabbit # 消息组件类型
          environment: # 设置rabbitmq的相关的环境配置
            spring:
              rabbitmq:
                host: localhost
                port: 5672
                username: guest
                password: guest
      bindings: # 服务的整合处理
        input: # 这个名字是一个通道的名称
          destination: studyExchange # 表示要使用的Exchange名称定义
          content-type: application/json # 设置消息类型,本次为对象json,如果是文本则设置“text/plain”
          binder: defaultRabbit # 设置要绑定的消息服务的具体设置



eureka:
  client: # 客户端进行Eureka注册的配置
    service-url:
      defaultZone: http://localhost:7001/eureka
  instance:
    lease-renewal-interval-in-seconds: 2 # 设置心跳的时间间隔(默认是30秒)
    lease-expiration-duration-in-seconds: 5 # 如果现在超过了5秒的间隔(默认是90秒)
    instance-id: 127.0.0.1  # 在信息列表时显示主机名称
    prefer-ip-address: true     # 访问的路径变为IP地址


③、主启动类

@SpringBootApplication
public class StreamMqMain8802 {
    public static void main(String[] args) {
        SpringApplication.run(StreamMqMain8802.class, args);
    }
}


④、业务类

@Component
@EnableBinding(Sink.class)
public class ReceiveMessageController {

    @Value("${server.port}")
    private String serverPort;

    @StreamListener(Sink.INPUT)
    public void input(Message<String> message){
        System.out.println("消费者1号-"+serverPort+"接收到消息:"+message.getPayload());
    }
}


⑤、测试:

访问:http://localhost:8801/sengMessage

clipboard

clipboard


4、分组消费与持久化

1)重复消费问题

生产者生产一条消息后,两个消费者 8802 和 8803 都收到了消息

clipboard

clipboard


会有什么后果:

clipboard


8802服务和8803服务默认处于不同的组

clipboard


2)分组

①、

微服务放置在同一个group中,就能保证消息只能被其中的一个应用消费一次,

同一个group内会发生竞争关系,只有其中一个可以消费

不同组的应用可以重复消费同一个消息


②、自定义分组:

clipboard

将8802 和 8803分配到同一个组:

clipboard

clipboard

clipboard


3)持久化

添加了分组,就自动实现了持久化,当启动了服务消费者,会自动从交换机中读取消息

当8802去掉group配置后,重启,不会读取交换机中缓存的消息

以上是关于十五Spring Cloud Stream 消息驱动的主要内容,如果未能解决你的问题,请参考以下文章

spring-cloud-stream 请求-回复消息模式

Spring Cloud 系列之 Stream 消息驱动

Spring cloud stream消息分区

Spring cloud stream消息分组

spring cloud 2.x版本 Spring Cloud Stream消息驱动组件基础教程(kafaka篇)

Spring Cloud Stream消息JSON转换无效