SpringCloud学习之Stream消息驱动默认通道

Posted xulijun137

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了SpringCloud学习之Stream消息驱动默认通道相关的知识,希望对你有一定的参考价值。

在实际开发过程中,服务与服务之间通信经常会使用到消息中间件,而以往使用了中间件比如RabbitMQ,那么该中间件和系统的耦合性就会非常高,如果我们要替换为Kafka那么变动会比较大,这时我们可以使用SpringCloudStream来整合我们的消息中间件,来降低系统和中间件的耦合性。

一、消息中间的几大应用场景

1、异步处理

比如用户在电商网站下单,下单完成后会给用户推送短信或邮件,发短信和邮件的过程就可以异步完成。因为下单付款是核心业务,发邮件和短信并不属于核心功能,并且可能耗时较长,所以针对这种业务场景可以选择先放到消息队列中,有其他服务来异步处理。

2、应用解耦:

假设公司有几个不同的系统,各系统在某些业务有联动关系,比如 A 系统完成了某些操作,需要触发 B 系统及 C 系统。如果 A 系统完成操作,主动调用 B 系统的接口或 C 系统的接口,可以完成功能,但是各个系统之间就产生了耦合。用消息中间件就可以完成解耦,当 A 系统完成操作将数据放进消息队列,B 和 C 系统去订阅消息就可以了。这样各系统只要约定好消息的格式就好了。

3、流量削峰

比如秒杀活动,一下子进来好多请求,有的服务可能承受不住瞬时高并发而崩溃,所以针对这种瞬时高并发的场景,在中间加一层消息队列,把请求先入队列,然后再把队列中的请求平滑的推送给服务,或者让服务去队列拉取。

4、日志处理

kafka 最开始就是专门为了处理日志产生的。

当碰到上面的几种情况的时候,就要考虑用消息队列了。如果你碰巧使用的是 RabbitMQ 或者 kafka ,而且同样也是在使用 Spring Cloud ,那可以考虑下用 Spring Cloud Stream。

二、什么是SpringCloudStream

  官方定义 Spring Cloud Stream 是一个构建消息驱动微服务的框架。
  应用程序通过 inputs 或者 outputs 来与 Spring Cloud Stream 中binder 交互,通过我们配置来 binding ,而 Spring Cloud Stream 的 binder 负责与消息中间件交互。所以,我们只需要搞清楚如何与 Spring Cloud Stream 交互就可以方便使用消息驱动的方式。
  通过使用Spring Integration来连接消息代理中间件以实现消息事件驱动。Spring Cloud Stream 为一些供应商的消息中间件产品提供了个性化的自动化配置实现,引用了发布-订阅、消费组、分区的三个核心概念。目前仅支持RabbitMQKafka

三、Stream 解决了什么问题?

  Stream解决了开发人员无感知的使用消息中间件的问题,因为Stream对消息中间件的进一步封装,可以做到代码层面对中间件的无感知,甚至于动态的切换中间件(rabbitmq切换为kafka),使得微服务开发的高度解耦,服务可以关注更多自己的业务流程

官网结构图

技术图片技术图片?

组成说明
Middleware 中间件,目前只支持RabbitMQ和Kafka
Binder Binder是应用与消息中间件之间的封装,目前实行了Kafka和RabbitMQ的Binder,通过Binder可以很方便的连接中间件,可以动态的改变消息类型(对应于Kafka的topic,RabbitMQ的exchange),这些都可以通过配置文件来实现
@Input 注解标识输入通道,通过该输入通道接收到的消息进入应用程序
@Output 注解标识输出通道,发布的消息将通过该通道离开应用程序
@StreamListener 监听队列,用于消费者的队列的消息接收
@EnableBinding 指信道channel和exchange绑定在一起

 

以下实战代码是基于RabbitMQ的,不清楚如何安装RabbitMQ请查看我的另一篇文章最简单的RabbitMQ消息队列搭建(windows环境下安装),项目的三个模块如下:

技术图片技术图片?

 

(一)创建消息生产者【service-sender-stream-8089】

pom.xml文件

技术图片技术图片?

pom.xml

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>
    <parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <version>2.2.0.RELEASE</version>
        <relativePath/> <!-- lookup parent from repository -->
    </parent>
    <groupId>com.xu</groupId>
    <artifactId>service-sender-stream-8089</artifactId>
    <version>0.0.1-SNAPSHOT</version>
    <name>service-sender-stream-8089</name>
    <description>Demo project for Spring Boot</description>

    <properties>
        <java.version>1.8</java.version>
        <spring-cloud.version>Hoxton.M3</spring-cloud.version>
    </properties>

    <dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-starter-stream-rabbit</artifactId>
        </dependency>
        <dependency>
            <groupId>io.springfox</groupId>
            <artifactId>springfox-swagger2</artifactId>
            <version>2.5.0</version>
        </dependency>
        <!-- swagger-ui -->
        <dependency>
            <groupId>io.springfox</groupId>
            <artifactId>springfox-swagger-ui</artifactId>
            <version>2.5.0</version>
        </dependency>
    </dependencies>

    <dependencyManagement>
        <dependencies>
            <dependency>
                <groupId>org.springframework.cloud</groupId>
                <artifactId>spring-cloud-dependencies</artifactId>
                <version>${spring-cloud.version}</version>
                <type>pom</type>
                <scope>import</scope>
            </dependency>
        </dependencies>
    </dependencyManagement>

    <build>
        <plugins>
            <plugin>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-maven-plugin</artifactId>
            </plugin>
        </plugins>
    </build>

    <repositories>
        <repository>
            <id>spring-milestones</id>
            <name>Spring Milestones</name>
            <url>https://repo.spring.io/milestone</url>
        </repository>
    </repositories>

</project>
技术图片

技术图片技术图片?

application.yml

server:
  port: 8089
spring:
  application:
    name: spring-cloud-stream-sender
  cloud:
    stream:
      binders:
        defaultRabbit:
          type: rabbit
          environment: #配置rabbimq连接环境
            spring:
              rabbitmq:
                host: localhost
                username: guest
                password: guest
                virtual-host: /
      bindings:
        output:       #指定输入通道对应的主题名
          destination: stream-demo       #exchange名称,交换模式默认是topic
          content-type: text/plain       #消息发送的格式,接收端不用指定格式,但是发送端要
技术图片

 

技术图片技术图片?

IMessageSender.java

package com.xu.serviceconsumer.interfaces;

public interface IMessageSender {

    void sendMessage(String message);
}
技术图片

技术图片技术图片?

MessageSenderImpl.java

package com.xu.serviceconsumer.services;

import com.xu.serviceconsumer.interfaces.IMessageSender;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.messaging.Source;
import org.springframework.messaging.support.MessageBuilder;

/**
 * 这个注解给我们绑定消息通道的,Source是Stream给我们提供的,可以点进去看源码,
 * 可以看到output和input,这和配置文件中的output,input对应的。
 */
@EnableBinding(Source.class)
public class MessageSenderImpl implements IMessageSender {

    private final static Logger logger = LoggerFactory.getLogger(MessageSenderImpl.class);

    //注入Source
    @Autowired
    private Source source;

    @Override
    public void sendMessage(String message) {
        boolean sendStatus = source.output().send(MessageBuilder.withPayload(message).build());
        logger.info("发送数据:{},sendStatus: {}",message,sendStatus);

    }
}
技术图片

TestController.java

技术图片技术图片?

package com.xu.serviceconsumer.controller;

/**
 *
 */

import com.xu.serviceconsumer.interfaces.IMessageSender;
import io.swagger.annotations.Api;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;

/**
 * @author mazhen
 *
 */
@Api(description = "提交给MQ")
@RestController
public class TestController {

    private final static Logger logger = LoggerFactory.getLogger(TestController.class);

    @Autowired
    private IMessageSender iMessageSender;

    @GetMapping("send")
    public void send() {
        iMessageSender.sendMessage("Ronnie O‘Sullivan");
    }

}

技术图片

附上swagger部分代码

技术图片技术图片?

SwaggerApp.java

package com.xu.serviceconsumer;

import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import springfox.documentation.builders.ApiInfoBuilder;
import springfox.documentation.builders.PathSelectors;
import springfox.documentation.builders.RequestHandlerSelectors;
import springfox.documentation.service.ApiInfo;
import springfox.documentation.service.Contact;
import springfox.documentation.spi.DocumentationType;
import springfox.documentation.spring.web.plugins.Docket;
import springfox.documentation.swagger2.annotations.EnableSwagger2;

@Configuration
@EnableSwagger2
public class SwaggerApp {
    @Bean
    public Docket createRestApi() {

        return new Docket(DocumentationType.SWAGGER_2)
                .apiInfo(apiInfo())
                .select()
                //为当前包路径
                .apis(RequestHandlerSelectors.basePackage("com.xu.serviceconsumer.controller"))
                .paths(PathSelectors.any())
                .build();
    }

    //构建 api文档的详细信息函数,注意这里的注解引用的是哪个
    private ApiInfo apiInfo() {
        return new ApiInfoBuilder()
                //页面标题
                .title("Spring Boot 使用 Swagger2 构建RESTful API")
                //创建人
                .contact(new Contact("Bryan", "http://blog.bianxh.top/", ""))
                //版本号
                .version("1.0")
                //描述
                .description("API 描述")
                .build();
    }
}
技术图片

项目的启动类如下,没有什么特殊的处理:

技术图片技术图片?

启动项目后,输入地址http://localhost:8089/swagger-ui.html打开swagger页面,然后点击try it out发送消息

技术图片技术图片?

在后台我们可以看到发送消息成功了

技术图片技术图片?

(二)消息消费者【service-consumer-stream-8090】

pom.xml文件如下

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>
    <parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <version>2.2.0.RELEASE</version>
        <relativePath/> <!-- lookup parent from repository -->
    </parent>
    <groupId>com.xu</groupId>
    <artifactId>service-consumer-stream-8090</artifactId>
    <version>0.0.1-SNAPSHOT</version>
    <name>service-consumer-stream-8090</name>
    <description>Demo project for Spring Boot</description>

    <properties>
        <java.version>1.8</java.version>
        <spring-cloud.version>Hoxton.M3</spring-cloud.version>
    </properties>

    <dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-starter-stream-rabbit</artifactId>
        </dependency>
        <dependency>
            <groupId>junit</groupId>
            <artifactId>junit</artifactId>
            <scope>test</scope>
        </dependency>
        <dependency>
            <groupId>com.alibaba</groupId>
            <artifactId>fastjson</artifactId>
            <version>1.2.48</version>
        </dependency>

    </dependencies>

    <dependencyManagement>
        <dependencies>
            <dependency>
                <groupId>org.springframework.cloud</groupId>
                <artifactId>spring-cloud-dependencies</artifactId>
                <version>${spring-cloud.version}</version>
                <type>pom</type>
                <scope>import</scope>
            </dependency>
        </dependencies>
    </dependencyManagement>

    <build>
        <plugins>
            <plugin>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-maven-plugin</artifactId>
            </plugin>
        </plugins>
    </build>

    <repositories>
        <repository>
            <id>spring-milestones</id>
            <name>Spring Milestones</name>
            <url>https://repo.spring.io/milestone</url>
        </repository>
    </repositories>

</project>
技术图片

技术图片技术图片?

application.yml

server:
  port: 8090
spring:
  application:
    name: spring-cloud-stream-receiver-2
  cloud:
    stream:
      binders:
        defaultRabbit:
          type: rabbit
          environment:                                      #配置rabbimq连接环境
            spring:
              rabbitmq:
                host: localhost
                username: guest
                password: guest
                virtual-host: /
      bindings:
        input:
          destination: stream-demo   #exchange名称,交换模式默认是topic

技术图片

定义一个消息接收接口

技术图片技术图片?

ReceviceMsg.java

package com.xu.serviceconsumer.interfaces;

public interface ReceviceMsg {

    void receive(String message);
}


技术图片

ReceviceMsgImpl.java

package com.xu.serviceconsumer.services;

import com.xu.serviceconsumer.interfaces.ReceviceMsg;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.annotation.StreamListener;
import org.springframework.cloud.stream.messaging.Sink;

@EnableBinding(value = {Sink.class})
public class ReceviceMsgImpl implements ReceviceMsg {

    private static Logger logger = LoggerFactory.getLogger(ReceviceMsgImpl.class);

    @StreamListener(Sink.INPUT)
    @Override
    public void receive(String message) {

        logger.info("8090客户端接收消息:"+message);
    }
}

启动类如下,也没有任何特殊处理:
技术图片

技术图片技术图片?

启动项目,然后再次用上面的消息发送控制器TestController.java发送消息到消息队列,然后可以看到消息消费端也收到了队列的消息如下:

技术图片技术图片?

(三)消息消费者【service-consumer-stream-8091】

这是复制上面的消费者创建的另一个消息消费者,基本配置跟上面service-consumer-stream-8090基本一模一样,只是application.yml中的部分配置略有不同(主要是端口不同),如下:

技术图片技术图片?

application.yml

server:
  port: 8091
spring:
  application:
    name: spring-cloud-stream-receiver-1
  cloud:
    stream:
      binders:
        defaultRabbit:
          type: rabbit
          environment:                                      #配置rabbimq连接环境
            spring:
              rabbitmq:
                host: localhost
                username: guest
                password: guest
                virtual-host: /
      bindings:
        input:
          destination: stream-demo   #exchange名称,交换模式默认是topic


启动这个消息消费者之后,用上面的消息生产者TestController生产一个消息,然后可以看到这个消费者也接收到了消息队列的消息了
技术图片

技术图片技术图片?

 

这里我们就用默认的通道完成了消息的发送和接收,下一篇我们将说一下自定义通道实现消息的发送和接收,下次再见!

 
技术图片
 

以上是关于SpringCloud学习之Stream消息驱动默认通道的主要内容,如果未能解决你的问题,请参考以下文章

SpringCloud学习之SpringCloudStream&集成kafka

SpringCloud Stream消息驱动

SpringCloud学习—— SpringCloud Stream 消息驱动

springcloud-消息驱动Stream01

SpringCloud-2.0-周阳(15. 消息驱动 - SpringCloud Stream)

SpringCloud 消息驱动(Stream) 和 分布式链路跟踪(Sleuth)