一体化MQ解决方案-SpringCloudStream

Posted roykingw

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了一体化MQ解决方案-SpringCloudStream相关的知识,希望对你有一定的参考价值。

文章目录

一体化MQ解决方案-SpringCloudStream 深度解析
楼兰:你的神秘java宝藏
老友聊IT,老王多指教

一、MQ的水有多深

​ MQ:也就是MessageQueue,消息对列。对列,是一种具有FIFO先进先出特性的数据结构。在互联网中,基于MQ的数据结构,人们封装出了非常多各具特色的分布式消息中间件产品,由此构建出了非常多很有特色的应用场景。例如像我们经常用的QQ和微信,就是很典型的消息中间件。消息可以由生产者发送到MQ中进行排队,然后消息的消费者可以对消息按照顺序进行处理。而在网络应用层面,消息中间件更是实现分布式消息驱动场景的利器。

​ 在当今互联网中,MQ已经不是一个神秘的东西了,关于MQ的作用主要有三个:

  • 异步

    例子:快递员送快递,早期直接送到客户家签收,这样效率会很低。引入菜鸟驿站后,快递员只需要将快递放到菜鸟驿站,就可以继续派送其他快递去了。客户可以按照自己的时间安排去菜鸟驿站取快递。

    作用:异步能提高系统的响应速度、吞吐量。

  • 解耦

    例子:《Thinking in JAVA》很经典,但是都是英文,我们看不懂。这时引入编辑社,可以将文章翻译成其他语言,这样就可以完成英语与其他语言的交流。

    作用:1、服务之间接口,可以减少服务之间的影响,提升应用的稳定性以及可扩展性。

    2、接口后可以实现数据分发。生产者发送一个消息后,可以由一个或者多个消费者进行消费,并且消费者的增加或者减少对生产者没有影响。

  • 削峰

    例子:长江每年都会涨水,但是下游出水口的速度是基本稳定的,所以经常会出现水灾。引入三峡大坝后,可以把高峰期的水储存起来,下游慢慢排水。

    所用:让应用可以以稳定的系统资源应对突然的流量冲击。

​ 目前主流的MQ产品有Kafka、RabbitMQ、RocketMQ三个。我们对三个产品做下简单的比较,重点要理解他们的适用场景。

​ 这三种主流产品都非常成熟,也都有各自不同的一些特性功能,适用于不同的业务场景。但是,这会不会给你带来困惑?同样都是MQ,不同场景下要使用不同的产品,而每个产品都有自己的客户端。那岂不是在细分场景下要用好MQ,就需要一套庞大的技术体系?

​ 有没有一套框架可以像用JDBC去连接各种数据库一样,用一套快速简单的标准对接各种各样的MQ产品呢?这就是我们这次要聊到的SpringCloudStream框架(后续简称SCS)。

二、一套框架,盲拆所有MQ产品

​ SpringCloudStream是SpringCloud家族的一个重要组件。官网地址: https://spring.io/projects/spring-cloud-stream 他是一个基于共享MQ系统,用于构建高度可伸缩的,事件驱动的问服务应用的框架。简单粗暴的理解就是,一个统一的框架标准连接所有的MQ产品。在官网可以看到SCS已经集成了非常多的MQ产品。

​ 我估计你跟我一样,这么多MQ产品,有些连听都没听说过,更别说在系统中用上了。但是有了SCS后,这些就都不用担心了。SCS是如何帮我们连接所有MQ产品的呢?接下来我们就用常见的Kafka,RabbitMQ和RocketMQ为例,开始一段舒心的SCS之旅。

注:以下部分,假定你对于Kafka、RabbitMQ和RocketMQ已经有了初步了解,并且搭建有本地服务。

各组件的版本如下:RabbitMQ版本:3.9.5, RocketMQ版本:4.9.1, Kafka版本:3.0.1。

2.1:一分钟快速搭建SCS连接RabbitMQ环境。

​ SCS是基于SpringBoot构建的,首先我们创建一个基于Maven的SpringBoot项目。按照Maven应用三板斧,快速搭建基础环境。

注:这里先快速把开发环境构建起来,不用理解每一步的具体含义。后面会做深入分析

1、pom.xml 依赖配置

​ 按照最常用的开发方式引入SpringBoot以及SpringCloud的集成版本控制,并且引入RabbitMQ的SCS集成依赖包。

	<!-- 引入SpringBoot与SpringCloud的集成版本 -->
    <dependencyManagement>
        <dependencies>
            <dependency>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-dependencies</artifactId>
                <version>2.3.4.RELEASE</version>
                <type>pom</type>
                <scope>import</scope>
            </dependency>
            <dependency>
                <groupId>org.springframework.cloud</groupId>
                <artifactId>spring-cloud-dependencies</artifactId>
                <version>Hoxton.SR6</version>
                <type>pom</type>
                <scope>import</scope>
            </dependency>
        </dependencies>
    </dependencyManagement>

    <dependencies>
        <!-- 引入RabbitMQ的SCS集成依赖 -->
        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <!-- artifactId>spring-cloud-starter-stream-rabbit</artifactId -->
            <artifactId>spring-cloud-stream-binder-rabbit</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>
    </dependencies>

2、启动类 引入核心注解

​ 注意在启动类中引入@EnableBinding注解。

@SpringBootApplication
@EnableBinding(Source.class, Sink.class)
public class SCSApplication 

    public static void main(String[] args) 
        SpringApplication.run(SCSApplication.class);
    

3、application.properties 配置文件

​ 在配置文件中,只需要简单的目标配置,甚至都不需要指定MQ的服务地址。

spring.cloud.stream.bindings.output.destination=scstreamExchange

spring.cloud.stream.bindings.input.destination=scstreamExchange
spring.cloud.stream.bindings.input.group=stream
spring.cloud.stream.bindings.input.content-type=text/plain

​ 这样,基础的开发框架就搭建成功了。接下来搭建消息发送者和消息生产者。

4、创建一个发送消息的Controller

@RestController
public class SendMessageController 
    @Autowired
    private Source source;

    @GetMapping("/send")
    public Object send(String message) 
		MessageBuilder<String> messageBuilder = MessageBuilder.withPayload(message);
        source.output().send(messageBuilder.build());
        return "message sended : "+message;
    

5、声明一个消息消费者

@Component
public class MessageReceiver 

    @StreamListener(Sink.INPUT)
    public void process(Object message) 
        System.out.println("received message : " + message);
    

​ 到这里,整个MQ应用环境就搭建完成了。接下来可以启动应用,然后访问测试接口发送消息。http://localhost:8080/send?message=123 。

当然,要保证本地的RabbitMQ服务正常启动了。

如果你对RabbitMQ有点基础的了解,也可以去RabbitMQ的管理控制台确认一下,这个简单的应用程序是如何实现RabbitMQ的消息收发的。

​ 实际上他的实现过程就是在本地的RabbitMQ中创建了一个scstreamExchange的交换机。应用中的Source组件就是往这个交换机中发送消息。然后,在scstreamExchange这个交换机上,也声明并绑定了一个队列,scstreamExchange.stream队列。应用中的消息监听器Sink组件,就是消费这个队列上的消息。

​ 当然,如果你对RabbitMQ一点也不了解,那也没关系,至少通过这个不超过十行代码的应用,就可以完成与RabbitMQ的消息交互。你只需要知道,这个应用当中的Source消息生产者和Sink消息消费者,是解耦的。在实际应用中,可以将他们拆分到不同的应用当中,实现消息的异步消费。这就够了。

你不需要理解与RabbitMQ交互的细节,甚至连RabbitMQ服务地址,SCS中都提供了默认的配置。你所要关注的,只有业务。

2.2: 一秒切换对接kafka

​ 如果只是简化与RabbitMQ的交互过程,那么还体现不出SCS的强大。如果需要将上面的简单应用改为对接Kafka,那么整个业务代码不需要做任何改动,只需要修改pom.xml中的maven依赖,将RabbitMQ的SCS依赖替换成Kafka的SCS依赖即可。

		<dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-stream-binder-kafka</artifactId>
        </dependency>

​ 接下来,重新启动示例应用,再次访问发送消息接口。消息就会改由Kafka来转发。

​ 同样,如果你熟悉Kafka产品,也能看到实际上SCS框架在Kafka上申请了一个scstreamExchange的Topic,并对这个Topic声明了一个消费者。

2.3:一秒切换对接RocketMQ

​ 接下来对接RocketMQ就简单了,同样是修改pom.xml中的maven依赖,代码不需要做任何改动。

	<dependency>
            <groupId>com.alibaba.cloud</groupId>
            <artifactId>spring-cloud-starter-stream-rocketmq</artifactId>
            <version>2021.1</version>
            <exclusions>
                <exclusion>
                    <groupId>org.apache.rocketmq</groupId>
                    <artifactId>rocketmq-acl</artifactId>
                </exclusion>
                <exclusion>
                    <groupId>org.apache.rocketmq</groupId>
                    <artifactId>rocketmq-client</artifactId>
                </exclusion>
            </exclusions>
        </dependency>
        <dependency>
            <groupId>org.apache.rocketmq</groupId>
            <artifactId>rocketmq-client</artifactId>
            <version>4.9.1</version>
        </dependency>
        <dependency>
            <groupId>org.apache.rocketmq</groupId>
            <artifactId>rocketmq-acl</artifactId>
            <version>4.9.1</version>
        </dependency>

从这个maven依赖中就能看出, kafka和RabbitMQ是亲儿子,RocketMQ是厂商自己拉扯大的。所以客户端版本匹配需要注意下。

​ 使用Maven重新编译后,就可以重新访问发送消息的接口。这次消息就会通过RocketMQ发送给后端。

​ 同样如果你了解RocketMQ产品,那么可以去RocketMQ的Console控制台上查看他的实现机制。自动创建了一个scstreamExchange的Topic,并声明了一个stream消费者组。

​ 在这个简单的实验过程中可以看到,SCS框架以一种标准化的方式提供了与MQ对接的功能,而与具体的MQ产品交互的细节,都被隐藏在框架之中。在这个过程中,如果你对相关MQ产品比较熟悉,那么可以大致揣测出他的实现方式。但是,就算你对这些MQ产品一无所知,通过SCS框架,也完全不影响基础的编程开发。这正是SCS框架的强大之处。

三、详解SCS三神器

​ SCS框架抽象出了一个非常简洁的MQ模型,来对接各种MQ产品。这个模型其实非常简单,只包含以下三个组件:

  • Destination Binder: 负责集成外部消息系统的组件。
  • Destination Binding: 由Binder创建,负责沟通外部消息系统、消息发送者和消息消费者的桥梁。
  • Message:消息发送者与消息消费者沟通的简单数据结构。

​ 可以看到,这个模型非常简单,使用时也非常方便。但是简单,也意味着SCS中的这些概念模型,与实际MQ产品中的概念是有比较大的区别的。例如,在RabbitMQ中有Exchange交换机、Queue队列等这些概念,但是在SCS中并没有对应的概念。在使用具体产品时,需要非常注意这些模型之间的对应关系。

​ 接下来会以RabbitMQ作为示例,分别了解这些核心概念。

3.1 Binder

​ SCS通过Binder定义一个外部消息服务器。默认情况下,SCS会使用对应的SpringBoot插件来构建Binder。所以,对于RabbitMQ来说,SCS默认就支持spring-boot-starter-amqp组件中提供的RabbitMQ的服务配置信息。这些配置信息,在application.properties配置文件中,都以spring.rabbitmq开头。而这一组配置,都是有默认值的。

spring.rabbitmq.host=local
spring.rabbitmq.port=5672
spring.rabbitmq.username=guest
spring.rabbitmq.password=guest
spring.rabbitmq.virtual-host=/

​ 如果你熟悉RabbitMQ就会知道,这其实就是一组默认的RabbitMQ服务。这也就是为什么在之前的示例中,都没有指定服务器地址,但是SCS也能正常工作的原因。

对于Kafka和RocketMQ,也同样会使用默认的服务。

​ 另外,在SCS中,也可以支持按照自己的方式配置多个Binder访问不同的外部消息服务器。这些配置的方式都是通过spring.cloud.stream.binders.[bindername].environment.[props]=[value]的方式进行配置。另外,如果配置了多个Binder,也可以通过spring.cloud.stream.default-binder属性指定默认的Binder。例如:

spring.cloud.stream.binders.testbinder.environment.spring.rabbitmq.host=localhost
spring.cloud.stream.binders.testbinder.environment.spring.rabbitmq.port=5672
spring.cloud.stream.binders.testbinder.environment.spring.rabbitmq.username=guest
spring.cloud.stream.binders.testbinder.environment.spring.rabbitmq.password=guest
spring.cloud.stream.binders.testbinder.environment.spring.rabbitmq.virtual-host=/
spring.cloud.stream.default-binder=testbinder

​ 通过这个方式,就配置了一个名为testbinder的Binder,并且将他指定成了默认的Binder。environment后面的这些props就是跟具体MQ产品相关的属性了。

3.2 Binding

​ Binding是SCS中实际进行消息交互的桥梁。在SCS中,就是通过Binding和Binder建立绑定关系,然后客户端就只需要通过Binding进行实现的消息收发了。

​ 在SCS框架中,配置Binding需要首先对他进行声明。当前版本的SCS框架中,声明Binding的方式是在应用中引入@EnableBinding注解。引入这个注解后,应用会向Spring容器中注入一个Binding接口的实例对象。在SCS中,默认提供了Source、Sink、Process三个接口对象,分别代表消息的生产者、消费者和中间处理者。这三个对象都是简单的接口,可以直接拿来使用。

//消息生产者
public interface Source 
	String OUTPUT = "output";//Binding对象名
	@Output(Source.OUTPUT)
	MessageChannel output(); //发送消息工具

//消息消费者
public interface Sink 
	String INPUT = "input"; //Binding对象名
	@Input(Sink.INPUT)
	SubscribableChannel input();//消息接收工具

//即是发送者,又是接收者。代表一个中间处理步骤
public interface Processor extends Source, Sink 


​ 当然,如果这些默认的接口不够用,你也可以照样声明自己的接口,通过自己的接口声明新的Binding对象。

​ 接下来,就需要在Spring应用的某一个合适的位置通过@EnableBinding注解声明这个Binding对象。例如最为常见的是放到启动类上声明。

@SpringBootApplication
@EnableBinding(Source.class, Sink.class)//声明Binding
public class SCSApplication 
    public static void main(String[] args) 
        SpringApplication.run(SCSApplication.class);
    


​ 在示例中,通过Source和Sink接口,就声明出了两个Binding,对象名分别是output和input。

​ 完成声明后的Binding对象,就可以直接来用了。比如通过source.output()方法获取MessageChannel对象,进而发送消息。然后通过@StreamListener(Sink.INPUT)注解声明消息消费者。使用方式在示例中有,就不再过多赘述。

​ 但是,默认的Binding对象由于缺少很多关键的属性配置,在某些环境下会出现很多问题。比如,在RabbitMQ中,如果不经任何配置,就会声明出一个默认的Exchange和Queue。但是默认的名字会非常奇怪,而且很多细节功能都不好用。所以,通常都要对这个Binding进行配置。配置的方式都是在application.properties中配置。这些配置都是按照spring.cloud.stream.bindings.[bindingname].[props]=[value]这样的格式指定的。例如:

spring.cloud.stream.bindings.output.destination=scstreamExchange
#指定binder。
spring.cloud.stream.bindings.output.binder=testbinder

spring.cloud.stream.bindings.input.destination=scstreamExchange
spring.cloud.stream.bindings.input.group=stream
spring.cloud.stream.bindings.input.content-type=text/plain
#指定binder。
spring.cloud.stream.bindings.input.binder=testbinder

​ 其中,通过binder属性可以指定对应的Binder。如果不指定的话,就会使用默认的Binder。通过这样的简单配置,就指定了output这个Binding对应的Exchange交换机是scstreamExchange,input也对应这同一个交换机,所属的消费者组是stream。

​ 关于消费者组,如果你熟悉Kafka或者RocketMQ,相信你就不会陌生。同属一个组的多个消费者实例,会共同消费同一份消息副本。但是在RabbitMQ中,却是没有消费者组这个概念的。接下来,应用启动后,在RabbitMQ中,就会声明一个scstreamExchange,并且在这个交换机上绑定一个scstreamExchange.stream的队列。这样,当有多个消费者实例启动时,这些消费者实例都会一起绑定在这个队列上,这样同一个消费者组的多个示例,还是会共同消费这一个队列上的同一个消息副本,从而变相的实现了分组消费机制。

​ 这里的这些属性是SCS中通用的配置属性,对所有支持的MQ产品都会生效。例如:

spring.cloud.stream.bindings.input.consumer.max-attempts=3

​ 这个配置属性就可以指定input这个消费者的消息最大重试次数。默认值是3。通过SCS框架,就以一种统一的方式定义了所有支持的MQ产品中的消息重试次数,尽管在不同的MQ产品中实现消息重试的配置方式是不一样的。

3.3 Message

​ SCS中,有了Binder和Binding之后,就相当于有了完整的消息传输通道了。接下来就可以传递消息了。但是不同的MQ产品中对于消息的定义其实也是不相同的,例如RocketMQ中的消息有tag,key,自定义属性等。而在RabbitMQ的消息中也有messageId,headers头信息等。SCS框架就需要对这些消息类型进行统一。统一后的消息类型是这样的:

public interface Message<T> 
    T getPayload();
    MessageHeaders getHeaders();


​ 其中Payload就是消息实体,在SCS中定义成了一个泛型,也就是说可以直接传递对象。MessageHeaders就是消息的头部属性,可以认为是消息的补充属性。这样,不同的MQ产品下,就可以通过不同的MessageHeaders属性来代表各自的消息差异,而消息内容就可以通过Payload统一。例如如果你熟悉RabbitMQ产品的话,就知道,RabbitMQ中有一个非常重要的概念routingKey。通过routingKey可以定制Exchange与Queue之间的路由关系。这个routingKey在RabbitMQ的实现包中就可以通过在Headers当中指定一个routingkey属性来实现。

MessageBuilder<String> messageBuilder = MessageBuilder.withPayload(message).setHeader("routingkey","info");

​ Message接口下,实际上也是有很多实现类的,简单看一下MessageBuilder的build方法,就能看到,这种方式是可以构建ErrorMessage和GenericMessage两种不同的消息的。

    public Message<T> build() 
        if (!this.modified && !this.headerAccessor.isModified() && this.originalMessage != null && !this.containsReadOnly(this.originalMessage.getHeaders())) 
            return this.originalMessage;
         else 
            return (Message)(this.payload instanceof Throwable ? new ErrorMessage((Throwable)this.payload, this.headerAccessor.toMap()) : new GenericMessage(this.payload, this.headerAccessor.toMap()));
        
    

​ 有了Binder,Binding和Message这三个工具后,你就可以直接用SCS框架去对接所有支持的MQ产品了。

四、SCS提供的高级功能

4.1 消息分组消费机制

​ SCS框架不光是统一与各个MQ产品之间的交互方式,还给各个MQ之间设计了一套统一的消费机制,就是分组消费机制。他的基础思想是在生产者实例和消费者实例之间建立一种对应关系,生产者实例发出的消息只会被对应的消费者消费,这样,当应用中有多个生产者实例和多个消费者实例时,就可以建立一种类似于Kafka和RocketMQ的分组消费的机制。

​ 例如,在RabbitMQ的示例应用中,对于生产者output,可以增加如下分区配置:

spring.cloud.stream.bindings.output.destination=scstreamExchange
#指定参与消息分区的消费端节点数量
spring.cloud.stream.bindings.output.producer.partition-count=2
#只有消费端分区ID为1的消费端能接收到消息
spring.cloud.stream.bindings.output.producer.partition-key-expression=1

​ 这就表示当前生产者会有两个分区实例,当前分区实例的分区键是1。

​ 然后,对于消费者input,如果增加以下分区属性:

spring.cloud.stream.bindings.input.destination=scstreamExchange
spring.cloud.stream.bindings.input.group=myinput
#启动消费分区 新版本这个属性已经取消,改为由分区表达式自动判断
spring.cloud.stream.bindings.input.consumer.partitioned=true
#参与分区的消费端节点个数
spring.cloud.stream.bindings.input.consumer.instance-count=2
#设置该实例的消费端分区ID
spring.cloud.stream.bindings.input.consumer.instance-index=1

​ 这也表示当前消费者组会有两个分区实例,当前分区实例的分区键是1。

​ 这时,把应用启动起来,消费者发送的消息,依然能够正常被消费者消费到。但是,如果将消费者的instance-index属性改成0,或者是2,那重启应用后,就无法消费到生产者发出的消息了。也就是说,只有与生产者的分区键匹配的消费者,才能接收到生产者发送过来的消息。如果分区键不匹配,那就接收不到。这里要注意,这并不是表示消息丢失了,而是当前的消费者实例和生产者实例没有对应上,就无法消费对应的消息了。

​ 我们这个示例还太过简单,生产者和消费者的分区键都是直接指定的,不具备实际使用的价值。但是,SCS有更灵活的机制,让生产者可以根据消息本体来产生生产者的分区键,这样,就可以具备真正的分区消费功能了。

​ 这主要涉及到SCS中的producer后的两个关键的配置属性:partition-key-extractor-name和partition-selector-name。

​ 其中,partition-key-extractor-name属性需要指向一个实现了PartitionKeyExtractorStrategy接口的实现类,用来计算从Message中如何产生一个动态的分区Key。这个机制与partition-key-expression配置是互斥的。而partition-selector-name属性需要指向一个实现了PartitionSelectorStrategy接口的实现类,用来计算如何从分区Key中获取一个动态的分区键。

​ 例如,按照以下配置,就可以实现按照消息的轮询接收。即生产者发送消息的分区键会奇数偶数循环,而消费者只接收分区键为奇数的消息。

//step1、增加分区提取器
public class MyPartitionKeyExtractor implements PartitionKeyExtractorStrategy 
    public static final String PARTITION_PROP="partition";
    @Override
    public Object extractKey(Message<?> message) 
        return message.getHeaders().get(MyPartitionKeyExtractor.PARTITION_PROP);
    

//step2、增加分区键计算器
public class MyPartitionSelectorStrategy implements PartitionSelectorStrategy 
    @Override
    public int selectPartition(Object key, int partitionCount) 
        return Integer.parseInt(key.toString()) % partitionCount;
    

//step3、在application.properties中添加生产者的分区配置
spring.cloud.stream.bindings.output.destination=scstreamExchange
spring.cloud.stream.bindings.output.binder=testbinder
#指定参与消息分区的消费端节点数量
spring.cloud.stream.bindings.output.producer.partition-count=2
#只有消费端分区ID为1的消费端能接收到消息
#spring.cloud.stream.bindings.output.producer.partition-key-expression=1
#动态生成分区键
spring.cloud.stream.bindings.output.producer.partition-key-extractor-name=com.roy.partition.MyPartitionKeyExtractor
spring.cloud.stream.bindings.output.producer.partition-selector-name=com.roy.partition.MyPartitionSelector
//step4、发送消息时,给消息添加个动态增长的index属性。
    //index生成工具
    @Bean
    public AtomicInteger index()
        return new AtomicInteger(0);
    
//step5、在发送消息时,连续发送四条消息。
@RestController
public class SendMessageController 
    @Autowired
    private Source source;

    @Autowired
    private AtomicInteger index;

    @GetMapping("/sendPartitionBatch")
    public Object sendPartitionBatch(String message) 
        for (int i = 0; i < 4; i++) 
            MessageBuilder<String> messageBuilder = MessageBuilder.withPayload(message).setHeader(MyPartitionKeyExtractor.PARTITION_PROP,index.incrementAndGet(基于Kafka+SparkStreaming+OushuDB搭建批流一体大数据分析架构

基于Kafka+SparkStreaming+OushuDB搭建批流一体大数据分析架构

MQ解决消息重发--做到幂等性

MQ解决消息重发--做到幂等性

随笔记——MQ

mq如何保证高可用,解决重复消费、数据丢失问题和顺序性问题