RocketMQ-消息消费模式 顺序消费

Posted 杨宸杨

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了RocketMQ-消息消费模式 顺序消费相关的知识,希望对你有一定的参考价值。

RocketMQ-消息消费模式 顺序消费


RocketMQ-消息消费模式

集群模式

在消费模式为集群的情况下,如果机器是集群的,消息只会给集群中的其中一台机器消费到

集群模式的演示(本身就默认)

假设我们生产者生产了十条信息 ,当我们集群了两台消费者服务器的时候,就会每个服务器执行五条

Rocketmq存储队列

在消息中间件每个topic是有4个写和读队列,主要是解决并发性能的问题的
如果只有一个队列,保证线程安全,必须得给队列进行写操作的时候上锁。
多几个队列,降低并发度,等待时间就短一些。

为什么是四个队列?

因为大多数服务器只有四核,意味着同时最多只能有CPU同时工作

广播模式

在消费模式为集群的情况下,如果机器是集群的,消费是会给集群中的所有机器所消费到

public class Consumer 
    public static void main(String[] args) throws Exception 
        //定义消息消费者(在同一个JVM中,消费者的组名不能重复)
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("helloConsumerGroup");
        //设置nameServer地址
        consumer.setNamesrvAddr("43.143.161.59:9876");
        //设置订阅的主题
        consumer.subscribe("helloTopic","*");
        //设置消费模式
        consumer.setMessageModel(MessageModel.BROADCASTING);
        //设置消息的监听器
        consumer.setMessageListener(new MessageListenerConcurrently() 
            @Override
            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) 
                for(MessageExt msg:list)
                    String s = new String(msg.getBody(), Charset.defaultCharset());
                    System.out.println("线程:"+Thread.currentThread()+",消息的内容:"+s);
                
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            
        );
        //启动消费者
        consumer.start();
    

运行结果:

生产者发送了十条消息之后,消费者集群的每个服务器均收到十条数据


顺序消费

实现生产顺序:12345消费顺序12345
哪些消息要实现顺序消费,就要让那些消息进入到同一个队列当中,对于消费者来说,一个队列对于一个线程

假设我们没有实现顺序消费的时候

创建生产者

1.创建实体类

@Setter
@Getter
public class OrderStep 
    private long orderId;
    private String desc;

    @Override
    public String toString() 
        return "OrderStep" +
                "orderId=" + orderId +
                ", desc='" + desc + '\\'' +
                '';
    

2.创建测试类

public class OrderUtil 
    public static List<OrderStep> buildOrders()
        List<OrderStep> orderList = new ArrayList<OrderStep>();

        OrderStep orderDemo = new OrderStep();
        orderDemo.setOrderId(15103111039L);
        orderDemo.setDesc("创建");
        orderList.add(orderDemo);

        orderDemo = new OrderStep();
        orderDemo.setOrderId(15103111065L);
        orderDemo.setDesc("创建");
        orderList.add(orderDemo);

        orderDemo = new OrderStep();
        orderDemo.setOrderId(15103111039L);
        orderDemo.setDesc("付款");
        orderList.add(orderDemo);

        orderDemo = new OrderStep();
        orderDemo.setOrderId(15103117235L);
        orderDemo.setDesc("创建");
        orderList.add(orderDemo);

        orderDemo = new OrderStep();
        orderDemo.setOrderId(15103111065L);
        orderDemo.setDesc("付款");
        orderList.add(orderDemo);

        orderDemo = new OrderStep();
        orderDemo.setOrderId(15103117235L);
        orderDemo.setDesc("付款");
        orderList.add(orderDemo);

        orderDemo = new OrderStep();
        orderDemo.setOrderId(15103111065L);
        orderDemo.setDesc("完成");
        orderList.add(orderDemo);

        orderDemo = new OrderStep();
        orderDemo.setOrderId(15103111039L);
        orderDemo.setDesc("推送");
        orderList.add(orderDemo);

        orderDemo = new OrderStep();
        orderDemo.setOrderId(15103117235L);
        orderDemo.setDesc("完成");
        orderList.add(orderDemo);

        orderDemo = new OrderStep();
        orderDemo.setOrderId(15103111039L);
        orderDemo.setDesc("完成");
        orderList.add(orderDemo);
        return orderList;
    

3.创建生产者类

public class Producer 
        public static void main(String[] args) throws Exception 
            DefaultMQProducer producer = new DefaultMQProducer("orderlyProducerGroup");
            producer.setNamesrvAddr("43.143.161.59:9876");
            producer.start();
            String topic = "orderTopic";
            List<OrderStep> orderSteps = OrderUtil.buildOrders();
            for(OrderStep step:orderSteps)
                Message msg = new Message(topic,step.toString().getBytes(Charset.defaultCharset()));
                producer.sendOneway(msg);
            
            producer.shutdown();
        

创建消费者类

public class Consumer 
    public static void main(String[] args) throws Exception 
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("orderlyConsumerGroup");
        consumer.setNamesrvAddr("43.143.161.59:9876");
        consumer.subscribe("orderTopic","*");
        consumer.setMessageModel(MessageModel.BROADCASTING);
        consumer.setMessageListener(new MessageListenerConcurrently() 
            @Override
            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) 
                for(MessageExt msg:list)
                    String s = new String(msg.getBody(), Charset.defaultCharset());
                    System.out.println("线程:"+Thread.currentThread()+",消息的内容:"+s);
                
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            
        );
        //启动消费者
        consumer.start();
    

运行结果:

可以看出和我们生产数据的顺序完全不同,整个订单的顺序都反了

如何改实现顺序消费

生产者类

public class Producer 
    public static void main(String[] args) throws Exception 
        DefaultMQProducer producer = new DefaultMQProducer("orderlyProducerGroup");
        producer.setNamesrvAddr("43.143.161.59:9876");
        producer.start();
        String topic = "orderTopic";
        List<OrderStep> orderSteps = OrderUtil.buildOrders();
        //设置队列选择器
        MessageQueueSelector selector = new MessageQueueSelector() 
            @Override
            public MessageQueue select(List<MessageQueue> list, Message message, Object o) 
                System.out.println("队列个数"+list.size());
                Long orderId = (Long) o;
                int index = (int)(orderId % list.size());
                return list.get(index);
            
        ;
        for(OrderStep step:orderSteps)
            Message msg = new Message(topic,step.toString().getBytes(Charset.defaultCharset()));
            //指定消息选择器,换入的参数
            producer.send(msg,selector,step.getOrderId());
        
        producer.shutdown();
    

消费者类

public class Consumer 
    public static void main(String[] args) throws Exception 
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("orderlyConsumerGroup");
        consumer.setNamesrvAddr("43.143.161.59:9876");
        consumer.subscribe("orderTopic","*");
        //从什么地方开始消费
        consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
        //一个队列对应一个线程
        consumer.setMessageListener(new MessageListenerOrderly() 
            @Override
            public ConsumeOrderlyStatus consumeMessage(List<MessageExt> list, ConsumeOrderlyContext consumeOrderlyContext) 
                for(MessageExt msg:list)
                    System.out.println("当前线程:"+Thread.currentThread()+":,队列ID"+msg.getQueueId()+",消息内容:"+new String(msg.getBody(),Charset.defaultCharset()));
                
                return ConsumeOrderlyStatus.SUCCESS;
            
        );
        //启动消费者
        consumer.start();
    

以上是关于RocketMQ-消息消费模式 顺序消费的主要内容,如果未能解决你的问题,请参考以下文章

详解RocketMQ 顺序消费机制

RocketMQ 消息详解

RocketMQ 顺序消费

RocketMQ学习笔记:消息发送模式

RocketMQ(消息重发重复消费事务消息模式)

RocketMQ的顺序消息(顺序消费)