万字精华总结RocketMQ的常见用法(案例+图)

Posted 程序猿小亮

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了万字精华总结RocketMQ的常见用法(案例+图)相关的知识,希望对你有一定的参考价值。

概述

上篇博文,我们介绍了什么是RocketMQ,以及如何安装单机版的RocketMQ。在安装的过程了,我们主要安装了两个服务,NameServer和Broker。在发送和接收消息时,又接触了两个概念,生产者和消费者。

那这些又代表什么含义呢?

对于单机版本的RocketMQ架构,如下图所示:

主要分为四部分:

  • 名字服务(Name Server)

Name Server充当路由消息的提供者。生产者或消费者能够通过Name Server查找各主题相应的Broker IP列表。多个Namesrv实例组成集群,但相互独立,没有信息交换。

  • 代理服务器(Broker Server)

Broker Server负责存储消息、转发消息。Broker在RocketMQ系统中负责接收从生产者发送来的消息并存储、同时为消费者的拉取请求作准备。Broker也存储消息相关的元数据,包括消费者组、消费进度偏移和主题和队列消息等。

  • 消息生产者(Producer)

负责生产消息,一般由业务系统负责生产消息。一个消息生产者会把业务应用系统里产生的消息发送到broker服务器。RocketMQ提供多种发送方式,同步发送、异步发送、顺序发送、单向发送。同步和异步方式均需要Broker返回确认信息,单向发送不需要。

  • 消息消费者(Consumer)

负责消费消息,一般是后台系统负责异步消费。一个消息消费者会从Broker服务器拉取消息、并将其提供给应用程序。


对于上面的学习,我们知道了RocketMQ的核心模块以及相应的概念。那么,RocketMQ都有哪些发送消息的方式呢,又如何使用,使用的场景是什么,又是如何消费的?

常见用法

在项目中添加MQ客户端依赖

<dependency>
    <groupId>org.apache.rocketmq</groupId>
    <artifactId>rocketmq-client</artifactId>
    <version>x.x.x</version>
</dependency>

1、基本消息

1.1消息发送

  • 在基本消息发送中,我们使用RocketMQ发送三种类型的消息:同步消息、异步消息和单向消息。其中前两种消息是可靠的,因为会有发送是否成功的应答。

  • 使用RocketMQ两个不同模式,来消费接收到的消息。

1、同步消息

这种可靠性同步地发送方式使用的比较广泛,比如:重要的消息通知,短信通知。

public class SyncProducer 
    public static void main(String[] args) throws Exception 
        //1.创建消息生产者producer,并制定生产者组名
        DefaultMQProducer producer = new
                DefaultMQProducer("please_rename_unique_group_name");
        //2.指定Nameserver地址
        producer.setNamesrvAddr("localhost:9876");
        //3.启动producer
        producer.start();
        for (int i = 0; i < 10; i++) 
            //4.创建消息对象,指定主题Topic、Tag和消息体
            /**
             * 参数一:消息主题Topic
             * 参数二:消息Tag
             * 参数三:消息内容
             */
            Message msg = new Message("TopicTest" /* Topic */,
                    "TagA" /* Tag */,
                    ("Hello RocketMQ " +
                            i).getBytes(RemotingHelper.DEFAULT_CHARSET) /* Message body */
            );
            //5.发送同步消息,将消息发送给其中一个broker
            SendResult sendResult = producer.send(msg);
            System.out.printf("%s%n", sendResult);
        
        //6.关闭生产者producer
        producer.shutdown();
    

上面的案例中设计到两个陌生的概念,含义如下所示:

生产者组(Producer Group):同一类Producer的集合,这类Producer发送同一类消息且发送逻辑一致。

标签(Tag):为消息设置的标志,用于同一主题下区分不同类型的消息。来自同一业务单元的消息,可以根据不同业务目的在同一主题下设置不同标签。消费者可以根据Tag实现对不同子主题的不同消费逻辑,实现更好的扩展性。

2、异步消息

异步消息通常用在对响应时间敏感的业务场景,即发送端不能容忍长时间地等待Broker的响应。

public class AsyncProducer 
    public static void main(String[] args) throws Exception 
        //1.创建消息生产者producer,并制定生产者组名
        DefaultMQProducer producer = new DefaultMQProducer("please_rename_unique_group_name");
        //2.指定Nameserver地址
        producer.setNamesrvAddr("localhost:9876");
        //3.启动producer
        producer.start();

        int messageCount = 10;
        final CountDownLatch countDownLatch = new CountDownLatch(messageCount);
        for (int i = 0; i < messageCount; i++) 
            try 
                final int index = i;
                //4.创建消息对象,指定主题Topic、Tag和消息体
                /**
                 * 参数一:消息主题Topic
                 * 参数二:消息Tag
                 * 参数三:消息内容
                 */
                Message msg = new Message("TopicTest",
                        "TagA",
                        "OrderID188",
                        "Hello world".getBytes(RemotingHelper.DEFAULT_CHARSET));

                //5.发送异步消息,SendCallback接收异步返回结果的回调
                producer.send(msg, new SendCallback() 
                    @Override
                    public void onSuccess(SendResult sendResult) 
                        countDownLatch.countDown();
                        System.out.printf("%-10d OK %s %n", index, sendResult.getMsgId());
                    

                    @Override
                    public void onException(Throwable e) 
                        countDownLatch.countDown();
                        System.out.printf("%-10d Exception %s %n", index, e);
                        e.printStackTrace();
                    
                );
             catch (Exception e) 
                e.printStackTrace();
            
        
        countDownLatch.await(5, TimeUnit.SECONDS);

        //6.关闭生产者producer
        producer.shutdown();
    


keys:Message索引键,多个用空格隔开,RocketMQ可以根据这些key快速检索到消息对消息关键字的提取方便查询。

3、单向消息

这种方式主要用在不特别关心发送结果的场景,例如日志发送。

只发送消息,不等待服务器响应,只发送请求不等待应答。此方式发送消息的过程耗时非常短,一般在微秒级别。

public class OneWayProducer 

    public static void main(String[] args) throws Exception, MQBrokerException 
        //1.创建消息生产者producer,并制定生产者组名
        DefaultMQProducer producer = new DefaultMQProducer("please_rename_unique_group_name");
        //2.指定Nameserver地址
        producer.setNamesrvAddr("localhost:9876");
        //3.启动producer
        producer.start();

        for (int i = 0; i < 10; i++) 
            //4.创建消息对象,指定主题Topic、Tag和消息体
            /**
             * 参数一:消息主题Topic
             * 参数二:消息Tag
             * 参数三:消息内容
             */
            Message msg = new Message("TopicTest", "TagA", ("Hello World,单向消息" + i).getBytes());
            //5.发送单向消息
            producer.sendOneway(msg);

            //线程睡1秒
            TimeUnit.SECONDS.sleep(5);
        

        //6.关闭生产者producer
        producer.shutdown();
    

1.2消息消费

此时,RocketMQ中已经有我们需要发送的消息了,我们使用RocketMQ来消费队列中的消息。接收消息有两种模式:

  • 负载均衡模式(Clustering)
  • 广播模式(Broadcasting)

启动多个消费者,最直接的区别:模式不同,消费的消息不同。

1、负载均衡模式

默认模式,消费者采用负载均衡方式消费消息,相同消费者组的每个消费者共同消费队列中的消息即每个Consumer实例平均分摊消息,每个消费者处理的消息不同。消费进度存储在服务端

/**
 * @PROJECT_NAME: SpringCloud-Learning
 * @USER: yuliang
 * @DESCRIPTION:
 * @DATE: 2021-04-14 15:22
 *
 * 异步消息,同步消息,单向消息 - 消费者 - 负载均衡模式
 */
public class ClusteringConsumer 

    public static void main(String[] args) throws InterruptedException, MQClientException 

        //创建一个消息消费者,并设置一个消息消费者组
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("please_rename_unique_group_name");

        //指定 NameServer 地址
        consumer.setNamesrvAddr("localhost:9876");

        // Subscribe one more more topics to consume.
        //订阅指定 Topic 下的所有消息
        consumer.subscribe("TopicTest", "*");

        //负载均衡模式,默认
        consumer.setMessageModel(MessageModel.CLUSTERING);
        // Register callback to execute on arrival of messages fetched from brokers.
        // 注册回调函数,处理消息
        consumer.registerMessageListener(new MessageListenerConcurrently() 

            @Override
            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,
                                                            ConsumeConcurrentlyContext context) 
                System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs);

                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            
        );

        // 启动消费者
        consumer.start();

        System.out.println("消息消费者已启动");
    

2、广播模式

消费者采用广播的方式消费消息,相同Consumer Group的每个消费者消费的消息都是相同的。消费进度存储在消费者本地

/**
 * @PROJECT_NAME: SpringCloud-Learning
 * @USER: yuliang
 * @DESCRIPTION:
 * @DATE: 2021-04-19 19:02
 *
 *
 *  异步消息,同步消息,单向消息 - 消费者 - 广播模式
 */
public class BroadcastConsumer 

    public static void main(String[] args) throws Exception 

        //创建一个消息消费者,并设置一个消息消费者组
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("please_rename_unique_group_name");

        //指定 NameServer 地址
        consumer.setNamesrvAddr("localhost:9876");
        //设置广播模式
        consumer.setMessageModel(MessageModel.BROADCASTING);

        //订阅指定 Topic 下的所有消息
        consumer.subscribe("TopicTest", "*");

        // 注册回调函数,处理消息
        consumer.registerMessageListener(new MessageListenerConcurrently() 

            @Override
            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,
                                                            ConsumeConcurrentlyContext context) 
//                System.out.printf(Thread.currentThread().getName() + " Receive New Messages: " + msgs + "%n");

                //默认 list 里只有一条消息,可以通过设置参数来批量接收消息
                if (msgs != null) 
                    for (MessageExt ext : msgs) 
                        try 
                            System.out.println(new Date() + ext.toString() + new String(ext.getBody(), "UTF-8"));
                         catch (UnsupportedEncodingException e) 
                            e.printStackTrace();
                        
                    
                
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            
        );

        consumer.start();
        System.out.println("消息消费者已启动");
    

消费者组(Consumer Group):同一类Consumer的集合,这类Consumer通常消费同一类消息且消费逻辑一致。消费者组使得在消息消费方面,实现负载均衡和容错的目标变得非常容易。

2、顺序消息

消息有序指的是一类消息消费时,能按照发送的顺序来消费。例如:一个订单产生了三条消息分别是订单创建、订单付款、订单完成。消费时要按照这个顺序消费才能有意义,但是同时订单之间是可以并行消费的。RocketMQ可以严格的保证消息有序。

顺序消息分为全局顺序消息与分区顺序消息,全局顺序是指某个Topic下的所有消息都要保证顺序;部分顺序消息只要保证每一组消息被顺序消费即可。

  • 全局顺序

    对于指定的一个 Topic,所有消息按照严格的先入先出(FIFO)的顺序进行发布和消费。

    实现方式:

    当发送和消费参与的queue只有一个

    适用场景:

    性能要求不高,所有的消息严格按照 FIFO 原则进行消息发布和消费的场景

  • 分区顺序

    对于指定的一个 Topic,所有消息根据 sharding key 进行区块分区。 同一个分区内的消息按照严格的 FIFO 顺序进行发布和消费。 Sharding key 是顺序消息中用来区分不同分区的关键字段,和普通消息的 Key 是完全不同的概念。

    实现方式:

    如果多个queue参与,按照Sharding key选择队列,则为分区有序,即相对每个queue,消息都是有序的。

    适用场景:

    性能要求高,以 sharding key 作为分区字段,在同一个区块中严格的按照 FIFO 原则进行消息发布和消费的场景。

下面用订单进行分区有序的示例。一个订单的顺序流程是:创建、付款、推送、完成。订单号相同的消息会被先后发送到同一个队列中,消费时,同一个OrderId获取到的肯定是同一个队列。

2.1顺序消息生产

/**
 * @PROJECT_NAME: SpringCloud-Learning
 * @USER: yuliang
 * @DESCRIPTION:
 * @DATE: 2021-04-19 11:17
 *
 * 顺序消息-生产者
 */
public class ProducerInOrder 
    public static void main(String[] args) throws Exception 
        DefaultMQProducer producer = new DefaultMQProducer("please_rename_unique_group_name");

        producer.setNamesrvAddr("localhost:9876");

        producer.start();

        String[] tags = new String[]"TagA", "TagC", "TagD";

        // 订单列表
        List<OrderStep> orderList = new ProducerInOrder().buildOrders();

        Date date = new Date();
        SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
        String dateStr = sdf.format(date);
        for (int i = 0; i < orderList.size(); i++) 
            // 加个时间前缀
            String body = dateStr + " Hello RocketMQ " + orderList.get(i);
            Message msg = new Message("OrderTopic", tags[i % tags.length], "KEY" + i, body.getBytes());

            //自定义消息队列选取规则
//            SendResult sendResult = producer.send(msg, new MessageQueueSelector() 
//                @Override
//                public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) 
//                    Long id = (Long) arg;  //根据订单id选择发送queue
//                    long index = id % mqs.size();
//                    return mqs.get((int) index);
//                
//            , orderList.get(i).getOrderId());//订单id

            //SelectMessageQueueByHash,官方提供的选取规则,还有其他实现,大家自行发现
            SendResult sendResult = producer.send(msg, new SelectMessageQueueByHash(), orderList.get(i).getOrderId());

            System.out.println(String.format("SendResult status:%s, queueId:%d, body:%s",
                    sendResult.getSendStatus(),
                    sendResult.getMessageQueue().getQueueId(),
                    body));

        
        producer.shutdown();
    

    /**
     * 订单的步骤
     */
    private static class OrderStep 
        private long orderId;
        private String desc;

        public long getOrderId() 
            return orderId;
        

        public void setOrderId(long orderId) 
            this.orderId = orderId;
        

        public String getDesc() 
            return desc;
        

        public void setDesc(String desc) 
            this.desc = desc;
        

        @Override
        public String toString() 
            return "OrderStep" +
                    "orderId=" + orderId +
                    ", desc='" + desc + '\\'' +
                    '';
        
    

    /**
     * 生成模拟订单数据
     */
    private List<OrderStep> buildOrders() 
        List<OrderStep> orderList = new ArrayList<OrderStep>();

        OrderStep orderDemo = new OrderStep();
        orderDemo.setOrderId(15103111039L);
        orderDemo.setDesc("创建");
        orderList.add(orderDemo);

        orderDemo = new OrderStep();
        orderDemo.setOrderId(15103111065L);
        orderDemo.setDesc("创建");
        orderList.add(orderDemo);

        orderDemo = new OrderStep();
        orderDemo.setOrderId(15103111039L);
        orderDemo.setDesc("付款");
        orderList.add(orderDemo);

        orderDemo = new OrderStep();
        orderDemo.setOrderId(15103117235L);
        orderDemo.setDesc("创建");
        orderList.add(orderDemo);

        orderDemo = new OrderStep();
        orderDemo.setOrderId(15103111065L);
        orderDemo.setDesc("付款");
        orderList.add(orderDemo);

        orderDemo = new OrderStep();
        orderDemo.setOrderId(15103117235L);
        orderDemo.setDesc("付款");
        orderList.add(orderDemo);

        orderDemo = new OrderStep();
        orderDemo.setOrderId(15103111065L);
        orderDemo.setDesc("完成");
        orderList.add(orderDemo);

        orderDemo = new OrderStep();
        orderDemo.setOrderId(15103111039L);
        orderDemo.setDesc("推送");
        orderList.add(orderDemo);

        orderDemo = new OrderStep();
        orderDemo.setOrderId(15103117235L);
        orderDemo.setDesc("完成");
        orderList.add(orderDemo);

        orderDemo = new OrderStep();
        orderDemo.setOrderId(15103111039L);
        orderDemo.setDesc("完成");
        orderList.add(orderDemo)万字总结webpack实战案例配置

MySQL万字精华总结!java注解解决了什么问题

50家大厂面试万字精华总结,2021Java大厂面试真题

rocketMq架构原理精华分析

50家大厂面试万字精华总结,java电子书教材下载

一文讲透Apache RocketMQ技术精华