用RocketMQ这么久,才知道消息可以这样玩

Posted 牧小农

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了用RocketMQ这么久,才知道消息可以这样玩相关的知识,希望对你有一定的参考价值。

前言

在上一章节中,我们讲解了RocketMQ的基本介绍,作为MQ最重要的就是消息的使用了,今天我们就来带大家如何玩转MQ的消息。

消息中间件,英文Message Queue,简称MQ。它没有标准定义,一般认为:消息中间件属于分布式系统中一个子系统,关注于数据的发送和接收,利用高效可靠的异步消息传递机制对分布式系统中的其余各个子系统进行集成。

高效: 对于消息的处理处理速度快,RocketMQ可以达到单机10万+的并发。

可靠: 一般消息中间件都会有消息持久化机制和其他的机制确保消息不丢失。

异步: 指发送完一个请求,不需要等待返回,随时可以再发送下一个请求,既不需要等待。

消息中间件不生产消息,只是消息的搬运工。

首先Message包含的内容主要有几个方面组成:id(MQ自动生成)、Topic、tag、proerties、内容。

消息的发送分为:

  • 普通消息
  • 顺序消息
  • 延时消息
  • 批量消息
  • 分布式消息

普通消息

普通消息的发送方式主要有三种:发送同步消息、发送异步消息、单向发送

我们可以先使用 RocketMQ 提供的原生客户端的API,在 SpringBoot、SpringCloudStream 也进行了集成,但本质上这些也是基于原生API的封装,所以我们只需要掌握原生API的时候,其他的也就无师自通了。

想要使用 RocketMQ中的API,就需要先导入对应的客户端依赖

<dependency>
    <groupId>org.apache.rocketmq</groupId>
    <artifactId>rocketmq-client</artifactId>
    <version>4.9.2</version>
</dependency>

消息发送者的步骤分为:

  1. 创建消息生产者 producer,执行生产者组名
  2. 指定Nameserver地址
  3. 启动producer
  4. 创建消息对象,指定Topic、Tag和消息体
  5. 发送消息
  6. 关闭生产者producer

消息消费者的步骤分为:

  1. 创建消费者 Consumer,指定消费者组名
  2. 指定Nameserver地址
  3. 订阅主题Topic和Tag
  4. 设置回调函数,处理消息
  5. 启动消费者consumer

发送同步消息

发送同步消息是说消息发送方发出数据后,同步等待,一直等收到接收方发回响应之后才发下一个请求。这种可靠性同步地发送方式使用的比较广泛,比如:重要的消息通知,短信通知。

流程如下所示:

package org.apache.rocketmq.example.quickstart;

import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.remoting.common.RemotingHelper;
/**
 * 同步发送
 */
public class SyncProducer 
    public static void main(String[] args) throws Exception
        // 实例化消息生产者Producer
        DefaultMQProducer producer = new DefaultMQProducer("group_test");

        // 设置NameServer的地址
        producer.setNamesrvAddr("127.0.0.1:9876");
        //producer.setSendLatencyFaultEnable(true);

        // 启动Producer实例
        producer.start();


        for (int i = 0; i < 10; i++) 
            // 创建消息,并指定Topic,Tag和消息体
            Message msg = new Message("TopicTest" /* Topic */,
                    "TagA" /* Tag */,
                    ("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET) /* Message body */
            );
            // 发送消息到一个Broker
            SendResult sendResult = producer.send(msg);
            System.out.printf("%s%n", sendResult);
        
        //如果不再发送消息,关闭Producer实例。
        producer.shutdown();
    


响应结果如下所示:

  • msgId: 消息的全局唯一标识(RocketMQ的ID生成是使用机器IP和消息偏移量的组成),由消息队列 MQ 系统自动生成,唯一标识某条消息。

  • sendStatus: 发送的标识:成功,失败等

  • queueId: queueId是Topic的分区;Producer发送具体一条消息的时,对应选择的该Topic下的某一个Queue的标识ID。

  • queueOffset: Message queue是无限长的数组。一条消息进来下标就会涨1,而这个数组的下标就是queueOffset,queueOffset是从0开始递增。

在上面代表的是四个queue,而maxOffset代表我们发送消息的数量,之前发送过消息,所以大家现在看到的数量是17、18…这种,当你在运行一次发送消息时,就会看到十条消息会分布在不同机器上

发送异步消息

异步消息通常用在对响应时间敏感的业务场景,即发送端不能容忍长时间地等待Broker的响应。消息发送方在发送了一条消息后,不等接收方发回响应,接着进行第二条消息发送。发送方通过回调接口的方式接收服务器响应,并对响应结果进行处理。

流程如下:

package com.muxiaonong.normal;

import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendCallback;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.remoting.common.RemotingHelper;

/**
 * 异步发送--生产者
 */
public class AsyncProducer 
    public static void main(String[] args) throws Exception
        // 实例化消息生产者Producer
        DefaultMQProducer producer = new DefaultMQProducer("group_test");
        // 设置NameServer的地址
        producer.setNamesrvAddr("127.0.0.1:9876");
        // 启动Producer实例
        producer.start();
        for (int i = 0; i < 10; i++) 
            final int index = i;
            // 创建消息,并指定Topic,Tag和消息体
            Message msg = new Message("TopicTest", "TagA", "OrderID888",
                    "Hello world".getBytes(RemotingHelper.DEFAULT_CHARSET));
            // SendCallback接收异步返回结果的回调
            producer.send(msg, new SendCallback() 
                //发送成功
                @Override
                public void onSuccess(SendResult sendResult) 
                    System.out.printf("%s%n", sendResult);
                
                //发送异常
                @Override
                public void onException(Throwable e) 
                    System.out.printf("%-10d Exception %s %n", index, e);
                    e.printStackTrace();
                
            );
        
        Thread.sleep(10000);
        // 如果不再发送消息,关闭Producer实例。
        producer.shutdown();
    


发送成功报文:

我们在dashbord下看到已经成功拿到消息了

单向发送

这种方式不需要我们特别关心发送结果的场景,比如日志发送、单向发送特点是发送方只需要负责发送消息,不需要等待服务器回应且没有回调函数触发,发送请求不需要等待应答,只管发,这种放松方式过程耗时很短,一般在微妙级别。

流程如下:

package com.muxiaonong.normal;

import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.remoting.common.RemotingHelper;

/**
 * 单向发送
 */
public class OnewayProducer 
    public static void main(String[] args) throws Exception
        // 实例化消息生产者Producer对象
        DefaultMQProducer producer = new DefaultMQProducer("group_test");
        // 设置NameServer的地址
        producer.setNamesrvAddr("127.0.0.1:9876");
        // 启动Producer实例
        producer.start();
        for (int i = 0; i < 10; i++) 
            // 创建消息,并指定Topic,Tag和消息体
            Message msg = new Message("TopicTest" /* Topic */,
                    "TagA" /* Tag */,
                    ("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET) /* Message body */
            );
            // 发送单向消息,没有任何返回结果
            producer.sendOneway(msg);

        
        // 如果不再发送消息,关闭Producer实例。
        producer.shutdown();
    

返回报文:


这种发送方式,我们客户端不会感受到发送结果,发送完成之后,我们并不知道到底有没有发送成功,我们只能在 top status 中去查看

普通消息发送对比:

发送方式发送TPS可靠性结果反馈使用场景
同步消息发送不丢失重要通知(邮件、短信通知、)等
异步消息发送不丢失用户文件上传自动解析服务,完成后通知其结果
单向发送超快可能丢失适用于 耗时非常短,但是对于可靠性要求不高的场景,比如日志收集

消息的消费方式

普通消息的消费方式主要有三种:集群消费、广播消费

一、集群消费模式

集群消费方式下,一个分组(Group) 下的多个消费者共同消费队列消息,每一个消费者出来处理的消息不一样,一个Consumer Group 中的各个Consumer 实例分摊去消费消息,一条消息只会投递到一个Consumer Group 下的一个实例,如果一个Topic有三个队列,其中一个 Consumer Group 有三个实例,那么每个实例只会消费其中一个队列,集群消费模式是消费者默认的消费方式。

实例代码:

package com.muxiaonong.normal.consumer;

import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.common.protocol.heartbeat.MessageModel;

import java.util.List;


/**
 * 集群消费模式
 */
public class BalanceConsumer 
    public static void main(String[] args) throws Exception 
        // 实例化消费者,指定组名:  TopicTest  10条消息 group_consumer  ,  lijin 8(2)
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("group_consumer");
        // 指定Namesrv地址信息.
        consumer.setNamesrvAddr("127.0.0.1:9876");
        // 订阅Topic
        consumer.subscribe("TopicTest", "*"); //tag  tagA|TagB|TagC
        //consumer.setConsumeFromWhere();

        //集群模式消费
        consumer.setMessageModel(MessageModel.CLUSTERING);

        //取消
        consumer.unsubscribe("TopicTest");
        //再次订阅Topic即可
        consumer.subscribe("TopicTest", "*"); //tag  tagA|TagB|TagC

        // 注册回调函数,处理消息
        consumer.registerMessageListener(new MessageListenerConcurrently() 
            @Override
            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,
                                                            ConsumeConcurrentlyContext context) 
                try 
                    for(MessageExt msg : msgs) 
                        String topic = msg.getTopic();
                        String msgBody = new String(msg.getBody(), "utf-8");
                        String tags = msg.getTags();
                        Thread.sleep(1000);
                        System.out.println("收到消息:" + " topic :" + topic + " ,tags : " + tags + " ,msg : " + msgBody);
                    
                 catch (Exception e) 
                    e.printStackTrace();
                    return ConsumeConcurrentlyStatus.RECONSUME_LATER;

                
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            
        );

        //启动消息者
        consumer.start();
        //注销Consumer
        //consumer.shutdown();
        System.out.printf("Consumer Started.%n");
    


我们启动两个实例对象,分别为BalanceConsumer2和BalanceConsumer,我们再去生产者生产十条消息后,我们再去看consumer,分别均摊了这十条消息

二、广播消费模式

广播消费模式中消息将对一个Consumer Group下的各个Consumer实例都投递一遍。即使这些 Consumer属于同一个Consumer Group,消息也会被Consumer Group 中的每个Consumer都消费一次。因为一个消费组下的每个消费者实例都获取到了topic下面的每个Message Queue去拉取消费。所以消息会投递到每个消费者实例。每一个消费者下面的消费实例,都会去拿到我们Topic下的每一条消息,但是这种消费进度的保存,不会放在broker里面,而是持久化到我们的本地实例

流程图如下:

具体代码

package com.muxiaonong.normal.consumer;

import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.common.protocol.heartbeat.MessageModel;

import java.util.List;


/**
 * 广播消费模式
 */
public class BroadcastConsumer 
    public static void main(String[] args) throws Exception 
        // 实例化消费者,指定组名:  TopicTest  10条消息 group_consumer  ,  lijin 8(2)
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("group_consumer");
        // 指定Namesrv地址信息.
        consumer.setNamesrvAddr("127.0.0.1:9876");
        // 订阅Topic
        consumer.subscribe("TopicTest", "*"); //tag  tagA|TagB|TagC
        //consumer.setConsumeFromWhere();

        //广播模式消费
        consumer.setMessageModel(MessageModel.BROADCASTING);

        //取消
        consumer.unsubscribe("TopicTest");
        //再次订阅Topic即可
        consumer.subscribe("TopicTest", "*"); //tag  tagA|TagB|TagC

        // 注册回调函数,处理消息
        consumer.registerMessageListener(new MessageListenerConcurrently() 
            @Override
            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,
                                                            ConsumeConcurrentlyContext context) 
                try 
                    for(MessageExt msg : msgs) 
                        String topic = msg.getTopic();
                        String msgBody = new String(msg.getBody(), "utf-8");
                        String tags = msg.getTags();
                        Thread.sleep(1000);
                        System.out.println("收到消息:" + " topic :" + topic + " ,tags : " + tags + " ,msg : " + msgBody);
                    
                 catch (Exception e) 
                    e.printStackTrace();
                    return ConsumeConcurrentlyStatus.RECONSUME_LATER;

                
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            
        );

        //启动消息者
        consumer.start();
        //注销Consumer
        //consumer.shutdown();
        System.out.printf("Consumer Started.%n");
    


我们先启动 BroadcastConsumer和BroadcastConsumer2,生产十条消息以后,我们会看到不管是哪个消费者,都会接收到十条消息,这个就是广播消费模式

消息消费的权衡

负载均衡模式: 消费端集群化部署,每条消息只需要被处理一次,由于消费进度在服务端维护,可靠性更高。

集群消费模式下,不能保证每一次失败重投的消息路由到同一台机器上,因此处理消息时不应该做任何确定性假设。每一条消息都只会被分发到一台机器上处理,如果需要被集群下的每一台机器都处理,只能使用广播模式。

广播模式: 每条消息都需要被相同逻辑的多台机器处理,消费进度在客户端维护,出现重复的概率稍大于集群模式。

广播模式下,消息队列 RocketMQ 保证每条消息至少被每台客户端消费一次,但是并不会对消费失败的消息进行失败重投,因此需要关注消费失败的情况,客户端每一次重启都会从最新消息消费。客户端在被停止期间发送至服务端的消息会被自动跳过,这一点是需要注意的地方

每条消息都会被大量的客户端重复处理,因此推荐尽可能使用集群模式。目前仅 Java 客户端支持广播模式,不支持顺序消息且服务端不维护消费进度,所以消息队列 RocketMQ 控制台不支持消息堆积查询、消息堆积报警和订阅关系查询功能。

顺序消息

顺序消息指的是可以按照消息的发送顺序来消费(FIFO)。RocketMQ 可以严格的保证消息有序,可以分为 分区有序 或者 全局有序。

生产消息时在默认的情况下消息发送会采取 Round Robin 轮询方式把消息发送到不同的 queue ( 分区队列);而消费消息的时候从多个 queue 上拉取消息,这种情况发送和消费是不能保证顺序。但是如果控制发送的顺序消息只依次发送到同一个 queue 中,消费的时候只从这个 queue 上依次拉取,则就保证了顺序。当发送和消费参与的 queue 只有一个,则是全局有序;如果多个 queue 参与,则为分区有序,即相对每个 queue ,消息都是有序的。

全局有序

全局有序主要控制在于创建Topic指定只有一个队列,同步确保生产者与消费者都只有一个实例进行即可

分区有序

在电商业务场景中,订单的流程是:创建、付款、推送、完成。 在加入 RocketMQ 后,一个订单会分别产生对于这个订单的创建、付款、推送、完成等消息,如果我们把所有消息全部送入到 RocketMQ 中的一个主题中,如何实现针对一个订单的消息顺序性呢!如下图:

要完成分区有序性,在生产者环节使用自定义的消息队列选择策略,确保订单号尾数相同的消息会被先后发送到同一个队列中(案例中主题有3个队列,生产环境中可设定成10个满足全部尾数的需求),然后再消费端开启负载均衡模式,最终确保一个消费者拿到的消息对于一个订单来说是有序的。

/** @Author 牧小农
 * @Description // 订单消息生产
 * @Date 16:47 2022/8/20
 * @Param 
 * @return 
 **/
public class OrderProducer 

    public static void main(String[] args) throws Exception 
        DefaultMQProducer producer = new DefaultMQProducer("OrderProducer");
        producer.setNamesrvAddr("127.0.0.1:9876");
        producer.start();
        // 订单列表
        List<Order> orderList = new OrderProducer().buildOrders();
        for (int i = 0; i < orderList.size()用RocketMQ这么久,才知道消息可以这样玩

用RocketMQ这么久,才知道消息可以这样玩

用RocketMQ这么久,才知道消息可以这样玩

原来RocketMQ高可用设计是这么玩的

原来RocketMQ中间件可以这么玩!带你碾压面试官!

原来RocketMQ中间件可以这么玩