Java之 Spring Cloud 微服务的 Spring Cloud Stream(第四个阶段)SpringBoot项目实现商品服务器端调用

Posted 蓝盒子itbluebox

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Java之 Spring Cloud 微服务的 Spring Cloud Stream(第四个阶段)SpringBoot项目实现商品服务器端调用相关的知识,希望对你有一定的参考价值。

在实际的企业开发中,消息中间件是至关重要的组件之一。

消息中间件主要解决应用解耦,异步消息,流量削锋等问题,实现高性能,高可用,可伸缩和最终一致性架构。不同的中间件其实现方式,内部结构是不一样的。如常见的RabbitMQ和Kafka,由于这两个消息中间件的架构上的不同,像RabbitMQ有exchange,kafka有Topic,partitions分区,这些中间件的差异性导致我们实际项目开发给我们造成了一定的困扰,我们如果用了两个消息队列的其中一种,后面的业务需求,我想往另外一种消息队列进行迁移,这时候无疑就是一个灾难性的,一大堆东西都要重新推倒重新做,因为它跟我们的系统耦合了,这时候 springcloud Stream 给我们提供了一种解耦合的方式。

一、概述

Spring Cloud Stream由一个中间件中立的核组成。应用通过Spring Cloud Stream插入的input(相当于消费者consumer,它是从队列中接收消息的)和output(相当于生产者producer,它是从队列中发送消息的。
通道与外界交流。通道通过指定中间件的Binder实现与外部代理连接。业务开发者不再关注具体消息中间件,只需关注Binder对应用程序提供的抽象概念来使用消息中间件实现业务即可。


说明:最底层是消息服务,中间层是绑定层,绑定层和底层的消息服务进行绑定,顶层是消息生产者和消息消费者,顶层可以向绑定层生产消息和和获取消息消费

1、核心概念

绑定器

Binder 绑定器是Spring Cloud Stream中一个非常重要的概念。

在没有绑定器这个概念的情况下,
我们的Spring Boot应用要直接与消息中间件进行信息交互的时候,由于各消息中间件构建的初衷不同,它们的实现细节上会有较大的差异性,

这使得我们实现的消息交互逻辑就会非常笨重,因为对具体的中间件实现细节有太重的依赖,
当中间件有较大的变动升级、或是更换中间件的时候,我们就需要付出非常大的代价来实施。

通过定义绑定器作为中间层,实现了应用程序与消息中间件(Middleware)细节之间的隔离。通过向应用程序暴露统一的Channel通过,使得应用程序不需要再考虑各种不同的消息中间件的实现。

当需要升级消息中间件,或者是更换其他消息中间件产品时,我们需要做的就是更换对应的Binder绑定器而不需要修改任何应用逻辑 。

甚至可以任意的改变中间件的类型而不需要修改一行代码。

Spring Cloud Stream支持各种binder实现

通过配置把应用和spring cloud stream 的 binder 绑定在一起,之后我们只需要修改 binder 的配置来达到动态修改topic、exchange、type等一系列信息而不需要修改一行代码。

发布/订阅模型
在Spring Cloud Stream中的消息通信方式遵循了发布-订阅模式,
当一条消息被投递到消息中间件之后,它会通过共享的 Topic 主题进行广播,消息消费者在订阅的主题中收到它并触发自身的业务逻辑处理。

这里所提到的 Topic 主题是Spring Cloud Stream中的一个抽象概念,用来代表发布共享消息给消费者的地方。

在不同的消息中间件中, Topic 可能对应着不同的概念,比如:在RabbitMQ中的它对应了Exchange、而在Kakfa中则对应了Kafka中的Topic。

二、入门案例

1、 准备工作

案例中通过rabbitMQ作为消息中间件,完成SpringCloud Stream的案例。需要自行安装

2、工程搭建

(1)创建工程并引入坐标

  • 创建stream_producer


  • 创建stream_consumer


(2)引入依赖

  • stream_producer当中
 <dependencies>
        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-stream</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-starter-stream-rabbit</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-stream-binder-rabbit</artifactId>
        </dependency>
    </dependencies>
  • stream_consumer当中
    <dependencies>
        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-stream</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-starter-stream-rabbit</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-stream-binder-rabbit</artifactId>
        </dependency>
    </dependencies>

3、消息生产者

(1)设置 stream_producer当中设置配置文件

server:
  port: 7001 #服务端口
spring:
  application:
    name: stream_producer #指定服务名
  rabbitmq:
    addresses: 127.0.0.1
    username: guest
    password: guest
  cloud:
    stream:
      bindings:
        output:
          destination: itbluebox-default  #指定消息发送的目的地,在rabbitmq当中,发送一个itbluebox-default的exchange
      binders: #配置绑定器
        defaultRabbit:
          type: rabbit

(2)stream_producer当中编写启动类


package cn.itbluebox.stream;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.CommandLineRunner;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.messaging.Source;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.support.MessageBuilder;
/*
    入门案例
        1、引入依赖
        2、配置application.yml文件
        3、发送消息的话,定义一个通道的接口,通道接口当中内置的 MessageChannel
            在SpringCloudStream当中内置了接口:Source
        4、@enableBinding: 绑定对应的通道
        5、发送消息的话,通过 MessageChannel 发送消息
            * 如果需要messageChannel  -->  通过绑定的内置接口获取
 */
@SpringBootApplication
@EnableBinding(Source.class)
public class ProducerApplication implements CommandLineRunner 
    @Autowired
    private MessageChannel output;
    @Override
    public void run(String... args) throws Exception 
        //发送消息
        //MessageBuilder  : 工具类,创建消息
        output.send(MessageBuilder.withPayload("hello 蓝盒子").build());
    
    public static void main(String[] args) 
        SpringApplication.run(ProducerApplication.class,args);
    


(3)运行测试



访问:RabbitMQ http://localhost:15672/#/exchanges


4、 消息消费者

(1)设置消费者配置文件,stream_consumer

创建application.yml

server:
  port: 7002 #服务端口
spring:
  application:
    name: rabbitmq_consumer #指定服务名
  rabbitmq:
    addresses: 127.0.0.1
    username: guest
    password: guest
  cloud:
    stream:
      bindings:
        input: #内置的获取消息的通道,从itbluebox-default当中获取消息
          destination: itbluebox-default
      binders:
        defaultRabbit:
          type: rabbit

(2)编写测试类


/*
* 1、引入依赖
* 2、配置Application.yml
* 3、需要配置一个通道的接口
*   内置获取消息的通道接口  sink
* 4、绑定通道
* 5、配置一个监听方法 : 当程序从中间件获取数据之后,执行的业务逻辑的方法
*   需要在监听方法上配置@StreamListener
 */
@SpringBootApplication
@EnableBinding(Sink.class)
public class ConsumerApplication 
    //监听binding中的消息
    @StreamListener(Sink.INPUT)
    public void input(String message)
        System.out.println("获取消息:"+message);
    
    public static void main(String[] args) 
        SpringApplication.run(ConsumerApplication.class,args);
    



运行测试

清空一下控制台

我们重新运行消息生产者

观察消息消费者

5、 对上述的代码进行优化

(1)消息生产者代码优化

创建对应的工具类

  • 创建MessageSender

/*
负责向中间件发送数据
 */
@Component
@EnableBinding(Source.class)
public class MessageSender 
    @Autowired
    private MessageChannel output;
    //发送消息
    public void send(Object obj)
        output.send(MessageBuilder.withPayload(obj).build());
    

  • 修改启动类
@SpringBootApplication
public class ProducerApplication  

    public static void main(String[] args) 
        SpringApplication.run(ProducerApplication.class,args);
    


  • 编写测试类


@RunWith(SpringJUnit4ClassRunner.class)
@SpringBootTest
public class ProducerTest 
    @Autowired
    private MessageSender messageSender;
    @Test
    public void testSend()
        messageSender.send("hello 工具类");
    

(2)消息消费者代码优化

  • 创建MessageListener

@Component
@EnableBinding(Sink.class)
public class MessageListener 

    //监听binding中的消息
    @StreamListener(Sink.INPUT)
    public void input(String message)
        System.out.println("获取消息:"+message);
    



  • 修改启动类
@SpringBootApplication
public class ConsumerApplication 
    public static void main(String[] args) 
        SpringApplication.run(ConsumerApplication.class,args);
    

(3)启动测试

启动stream_consumer

启动成功邮件清空一下控制台

消息的生产者stream_producer执行一下单元测试

运行成功

观察ConsumerApplication控制台

三、自定义消息通道

Spring Cloud Stream 内置了两种接口,
分别定义了 binding 为 “input” 的输入流,和 “output” 的输出流,
而在我们实际使用中,往往是需要定义各种输入输出流。

使用方法也很简单。

1、设置消息的生产者

(1)创建MyProcessor



/**
 * 自定义的消息通道
 */
public interface MyProcessor 

	/**
	 * 消息生产者的配置
	 */
	String MYOUTPUT = "myoutput";

	@Output("myoutput")
	MessageChannel myoutput();

	/**
	 * 消息消费者的配置
	 */
	String MYINPUT = "myinput";

	@Input("myinput")
	SubscribableChannel myinput();

(2)创建修改MessageSender

/*
负责向中间件发送数据
 */
@Component
@EnableBinding(MyProcessor.class)
public class MessageSender 
    @Autowired
    @Qualifier(value = "myoutput")
    private MessageChannel myoutput;
    //发送消息
    public void send(Object obj)
        myoutput.send(MessageBuilder.withPayload(obj).build());
    

(3)设置配置文件

        myoutput:
          destination: itbluebox-custom-default

2、设置消息的生产者

(1)创建MyProcessor




/**
 * 自定义的消息通道
 */
public interface MyProcessor 

	/**
	 * 消息生产者的配置
	 */
	String MYOUTPUT = "myoutput";

	@Output("myoutput")
	MessageChannel myoutput();

	/**
	 * 消息消费者的配置
	 */
	String MYINPUT = "myinput";

	@Input("myinput")
	SubscribableChannel myinput();



(2)创建MyProcessor


@Component
@EnableBinding(MyProcessor.class)
public class MessageListener 

    //监听binding中的消息
    @StreamListener(MyProcessor.MYINPUT)
    public void input(String message)
        System.out.println("获取消息:"+message);
    



(3)设置application.yml配置文件

        myinput:
          destination: itbluebox-custom-default

3、运行测试

重新启动消费者

运行成功并清空控制台

运行消息生产者

运行成功

观察ConsumerApplication

四、 消息分组

1、工程准备

  • 创建第二个消费者

  • 添加依赖
<dependencies>
        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-stream</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-starter-stream-rabbit</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-stream-binder-rabbit</artifactId>
        </dependency>
    </dependencies>
  • 创建对应需要的启动类和其类,这里和之前的一样之前复制stream_consumer工程当中的内容即可

    修改一下启动类的名称
  • 设置配置文件,设置其端口号7003
server:
  port: 7003 #服务端口以上是关于Java之 Spring Cloud 微服务的 Spring Cloud Stream(第四个阶段)SpringBoot项目实现商品服务器端调用的主要内容,如果未能解决你的问题,请参考以下文章

Java之 Spring Cloud 微服务 Eureka (第一个阶段)SpringBoot项目实现商品服务器端是调用

Java之 Spring Cloud 微服务搭建 Feign组件(第二个阶段)SpringBoot项目实现商品服务器端是调用

Java之 Spring Cloud 微服务搭建(第一个阶段)SpringBoot项目实现商品服务器端是调用

Java之Spring Cloud概念介绍

Java之 Spring Cloud 微服务搭建 Consul(第一个阶段)SpringBoot项目实现商品服务器端是调用

Java之 Spring Cloud 微服务搭建Sentinel (第二个阶段)SpringBoot项目实现商品服务器端是调用