RocketMQ学习笔记:消息发送模式

Posted 大苏打seven

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了RocketMQ学习笔记:消息发送模式相关的知识,希望对你有一定的参考价值。

这是本人学习的总结,主要学习资料如下

  • 马士兵教育

目录


1、消息发送模式

三种发送方式,同步,异步和单向。

同步机会简单的一问一答,我发过去消息,MQ会立刻给我回复收到与否;没收到回复前会阻塞当前线程直到收到回复。

异步和通不一样也是一问一答,你需要回复收到,但是我不会等,我会做自己的事。

单向则是我发送消息,你收不收到我不关心。

2、消息消费模式

我们有两种消费模式,一种是集群消费,另一种是广播消费。默认情况和大部分情况是使用集群消费

  • 集群消费:消费消息往往比生产消息要复杂,所以一个porducer往往对应多个consumer。一个topic+tag里有多条消息等待着多个consumer消费。比如下面的图例,group 1里有三个consumer,他们各自分别从一个message queue里拿消息。

    一条消息只会被送到某一个message queue中,被某一个consumer消费,绝不会出现同一条消息被多个consumer消费的情况。

  • 广播消费:和集群模式相反,每一条消息都会发送给所有的consumer,也就是说一个消息会被消费多次,被不同的消费者各自消费一次。



3、顺序消息的消费和发送

顺序消息的消费和发送指的是,消费者消费的消息的顺序和生产者生产消息的顺序是一样的。

比如生产者生产消息是1,2,3,4,消费者消费的顺序也是1,2,3,4

顺序消息又分为全局顺序和部分顺序。


3.1、全局顺序

全局顺序出现在topic里只有一个message queue的情况。

因为message queue是队列的结构,消息先进先出,自然保证了消息消费的顺序。


3.2、部分顺序

部分顺序出现在topic有多个message queue的情况。

因为有多个message queue对应多个consumer,而不同的consumer消费的效率有所不同,所以每个消息被消费的顺序不严格与生产者生成的相同。

它只能保证消息每个message queue中的消息顺序消费。

如果业务上需要控制顺序,生产者在生产消息时可以将消息送到指定的message queue。比如下图的M1,M4指定送到queue1,保证M1M4先消费。

同时消费者也只从指定的message queue拿消息即可

3.3、部分顺序代码样例

3.3.1、依赖

<dependency>
    <groupId>org.apache.rocketmq</groupId>
    <artifactId>rocketmq-client</artifactId>
    <version>4.8.0</version>
</dependency>
<dependency>
    <groupId>org.projectlombok</groupId>
    <artifactId>lombok</artifactId>
    <version>1.18.24</version>
    <scope>provided</scope>
</dependency>



3.3.2、发送信息

代码的场景是发送订单,有多个消息用来通知订单状态变化。订单生命周期created -> payed -> completed

其中有两个订单,id分别为1和2。订单1的状态消息都发送到message queue 1,订单2 的状态消息都发送到message queue 2

public class ScheduleProducerExample 
    public static void main(String[] args) throws Exception
        DefaultMQProducer producer = getProducer();
        List<Order> orderList = getOrderList();

        for(int i = 0; i < orderList.size(); i++) 
            String body = orderList.get(i).toString();
            Message msg = new Message("PartOrder", null, "KEY" + i, body.getBytes());
            SendResult sendResult = producer.send(msg, new MessageQueueSelector() 
                @Override
                public MessageQueue select(List<MessageQueue> queueList, Message message, Object arg) 
                    Integer id = (Integer) arg;
                    int index = id % queueList.size();
                    return queueList.get(index);
                
            , orderList.get(i).getId());

            System.out.println(String.format("SendResult status:%s, queueId:%d, body:%s",
                    sendResult.getSendStatus(),
                    sendResult.getMessageQueue().getQueueId(),
                    body));
        
        producer.shutdown();
    

    private static List<Order> getOrderList() 
        List<Order> list = new ArrayList<>();
        list.add(new Order(1, "created"));
        list.add(new Order(2, "created"));
        list.add(new Order(1, "payed"));
        list.add(new Order(2, "payed"));
        list.add(new Order(1, "completed"));
        list.add(new Order(2, "completed"));
        return list;
    

    private static DefaultMQProducer getProducer() throws MQClientException 
        DefaultMQProducer producer = new DefaultMQProducer("group1");
        producer.setNamesrvAddr("localhost:9876");
        producer.start();
        return producer;
    


输出结果,可以看到订单1都发送到了queueId:1,订单而都发送到了queueId:2



3.3.3、接受信息

注意在consumer.registerMessageListener中会定义如何消费信息。返回值用于定义消费的后续处理,比如返回SUSPEND_CURRENT_QUEUE_A_MOMENT则表示这次消息没有消费成功,下次还会继续消费该消息;

SUCCESS则表示消息消费成功,下次会消费下一条消息

public class PartialOrderConsumerExample 
    public static void main(String[] args) throws Exception
        DefaultMQPushConsumer consumer = getConsumer();

        consumer.registerMessageListener(new MessageListenerOrderly() 
            Random random = new Random();
            @Override
            public ConsumeOrderlyStatus consumeMessage(List<MessageExt> list, ConsumeOrderlyContext context) 
                context.setAutoCommit(true);
                for(MessageExt msg: list) 
                    System.out.println("consumeThread=" + Thread.currentThread().getName()
                            + ", queueId=" + msg.getQueueId() + ", context:" + new String(msg.getBody()));
                
                try
                	// mock business process
                    TimeUnit.MILLISECONDS.sleep(random.nextInt(300));
                 catch (Exception e) 
                    e.printStackTrace();
                    // means that failed to consume this msg. In next time will still consume this msg.
                    return ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT;
                
                // means that success to consume this msg. In the next time will consume next msg.
                return ConsumeOrderlyStatus.SUCCESS;
            
        );

        consumer.start();
        while(true)
        

//        consumer.shutdown();
    

    private static DefaultMQPushConsumer getConsumer() throws MQClientException 
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("order_consumer1");
        consumer.setNamesrvAddr("localhost:9876");
        consumer.subscribe("PartOrder", "*");
        return consumer;
    

返回结果消费成功,保证了消息部分有序。



4、延时消息的消费和发送

顾名思义,就是延时发送消息。下面看代码示例

4.1、代码样例

就普通地发送消息即可,只需要设置消息的属性就可达到延时的效果。比如msg.setDelayTimeLevel(1);

延时有18个等级,用数字1~18表示,分别代表1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h

public class DelayProducerExample 
    public static void main(String[] args) throws Exception
        DefaultMQProducer producer = getProducer();
        Message msg = getMessage();

        SendResult sendResult = producer.send(msg);

        System.out.printf("%s%n", sendResult);
        producer.shutdown();
    

    private static DefaultMQProducer getProducer() throws MQClientException 
        DefaultMQProducer producer = new DefaultMQProducer("group1");
        producer.setNamesrvAddr("localhost:9876");
        producer.start();
        return producer;
    

    private static Message getMessage() 
        Message msg = new Message();
        msg.setTopic("topic1");
        msg.setTags("tag1");
        msg.setBody("hello world".getBytes());
        // delayTimeLevel: 1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h
        msg.setDelayTimeLevel(1);
        return msg;
    



5、批量发送消息

就是一次性发送多个消息,大批量消息发送中可提高效率。

需要注意的是,批量发送的消息默认都是发送到同一个message queue

当然,批量发送的消息有上限。一次不能超过4M。超过的话我们需要拆分消息。

5.1、代码样例

其实比较简单,就是普通的send方法,不过参数换成了List,我们只需要往List填信息就可以批量发送了。

public class BatchProducerExample 
    public static void main(String[] args) throws Exception
        DefaultMQProducer producer = getProducer();

        List<Message> messageList = getMessageList();
        SendResult sendResult = producer.send(messageList);

        System.out.printf("%s%n", sendResult);
        producer.shutdown();
    

    private static List<Message> getMessageList() 
        List<Message> messageList = new ArrayList<>();
        messageList.add(new Message("topic1", "tag1", "hello world1".getBytes()));
        messageList.add(new Message("topic1", "tag1", "hello world2".getBytes()));
        messageList.add(new Message("topic1", "tag1", "hello world3".getBytes()));
        messageList.add(new Message("topic1", "tag1", "hello world4".getBytes()));
        messageList.add(new Message("topic1", "tag1", "hello world5".getBytes()));
        messageList.add(new Message("topic1", "tag1", "hello world6".getBytes()));
        return messageList;
    

    private static DefaultMQProducer getProducer() throws MQClientException 
        DefaultMQProducer producer = new DefaultMQProducer("group1");
        producer.setNamesrvAddr("localhost:9876");
        producer.start();
        return producer;
    

下面是接受的结果,所有消息都在同一个message queue



RocketMq小笔记

RocketMq特点

  • RocketMq天然持久化存储消息
  • RocketMq支持消息的退、拉模式。推模式天然不支持消息的批处理,对消息的处理是一条一条的处理。
  • RocketMq消费端对消息的消费模式有两种:
    1 集群消费 2 广播消费
    默认为集群消费模式,若要设置为广播消费,则:
    consumer.setMessageModel(MessageModel.BROADCASTING);
  • RocketMq启动的时候,要先启动消费端,后启动生产端,这是行业一个不成文的规则。

以上是关于RocketMQ学习笔记:消息发送模式的主要内容,如果未能解决你的问题,请参考以下文章

RocketMQ 整合SpringBoot发送事务消息

RocketMq实现延时队列

源码分析RocketMQ系列索引

RocketMQ笔记:顺序消息

RocketMq小笔记

阅读rocketmq技术内幕杂记 - 设计