RocketMQ-消息消费模式 顺序消费
Posted 杨宸杨
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了RocketMQ-消息消费模式 顺序消费相关的知识,希望对你有一定的参考价值。
RocketMQ-消息消费模式 顺序消费
RocketMQ-消息消费模式
集群模式
在消费模式为集群的情况下,如果机器是集群的,消息只会给集群中的其中一台机器消费到
集群模式的演示(本身就默认)
假设我们生产者生产了十条信息 ,当我们集群了两台消费者服务器的时候,就会每个服务器执行五条
Rocketmq存储队列
在消息中间件每个topic是有4个写和读队列,主要是解决并发性能的问题的
如果只有一个队列,保证线程安全,必须得给队列进行写操作的时候上锁。
多几个队列,降低并发度,等待时间就短一些。
为什么是四个队列?
因为大多数服务器只有四核,意味着同时最多只能有CPU同时工作
广播模式
在消费模式为集群的情况下,如果机器是集群的,消费是会给集群中的所有机器所消费到
public class Consumer
public static void main(String[] args) throws Exception
//定义消息消费者(在同一个JVM中,消费者的组名不能重复)
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("helloConsumerGroup");
//设置nameServer地址
consumer.setNamesrvAddr("43.143.161.59:9876");
//设置订阅的主题
consumer.subscribe("helloTopic","*");
//设置消费模式
consumer.setMessageModel(MessageModel.BROADCASTING);
//设置消息的监听器
consumer.setMessageListener(new MessageListenerConcurrently()
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext)
for(MessageExt msg:list)
String s = new String(msg.getBody(), Charset.defaultCharset());
System.out.println("线程:"+Thread.currentThread()+",消息的内容:"+s);
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
);
//启动消费者
consumer.start();
运行结果:
生产者发送了十条消息之后,消费者集群的每个服务器均收到十条数据
顺序消费
实现生产顺序:12345消费顺序12345
哪些消息要实现顺序消费,就要让那些消息进入到同一个队列当中,对于消费者来说,一个队列对于一个线程
假设我们没有实现顺序消费的时候
创建生产者
1.创建实体类
@Setter
@Getter
public class OrderStep
private long orderId;
private String desc;
@Override
public String toString()
return "OrderStep" +
"orderId=" + orderId +
", desc='" + desc + '\\'' +
'';
2.创建测试类
public class OrderUtil
public static List<OrderStep> buildOrders()
List<OrderStep> orderList = new ArrayList<OrderStep>();
OrderStep orderDemo = new OrderStep();
orderDemo.setOrderId(15103111039L);
orderDemo.setDesc("创建");
orderList.add(orderDemo);
orderDemo = new OrderStep();
orderDemo.setOrderId(15103111065L);
orderDemo.setDesc("创建");
orderList.add(orderDemo);
orderDemo = new OrderStep();
orderDemo.setOrderId(15103111039L);
orderDemo.setDesc("付款");
orderList.add(orderDemo);
orderDemo = new OrderStep();
orderDemo.setOrderId(15103117235L);
orderDemo.setDesc("创建");
orderList.add(orderDemo);
orderDemo = new OrderStep();
orderDemo.setOrderId(15103111065L);
orderDemo.setDesc("付款");
orderList.add(orderDemo);
orderDemo = new OrderStep();
orderDemo.setOrderId(15103117235L);
orderDemo.setDesc("付款");
orderList.add(orderDemo);
orderDemo = new OrderStep();
orderDemo.setOrderId(15103111065L);
orderDemo.setDesc("完成");
orderList.add(orderDemo);
orderDemo = new OrderStep();
orderDemo.setOrderId(15103111039L);
orderDemo.setDesc("推送");
orderList.add(orderDemo);
orderDemo = new OrderStep();
orderDemo.setOrderId(15103117235L);
orderDemo.setDesc("完成");
orderList.add(orderDemo);
orderDemo = new OrderStep();
orderDemo.setOrderId(15103111039L);
orderDemo.setDesc("完成");
orderList.add(orderDemo);
return orderList;
3.创建生产者类
public class Producer
public static void main(String[] args) throws Exception
DefaultMQProducer producer = new DefaultMQProducer("orderlyProducerGroup");
producer.setNamesrvAddr("43.143.161.59:9876");
producer.start();
String topic = "orderTopic";
List<OrderStep> orderSteps = OrderUtil.buildOrders();
for(OrderStep step:orderSteps)
Message msg = new Message(topic,step.toString().getBytes(Charset.defaultCharset()));
producer.sendOneway(msg);
producer.shutdown();
创建消费者类
public class Consumer
public static void main(String[] args) throws Exception
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("orderlyConsumerGroup");
consumer.setNamesrvAddr("43.143.161.59:9876");
consumer.subscribe("orderTopic","*");
consumer.setMessageModel(MessageModel.BROADCASTING);
consumer.setMessageListener(new MessageListenerConcurrently()
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext)
for(MessageExt msg:list)
String s = new String(msg.getBody(), Charset.defaultCharset());
System.out.println("线程:"+Thread.currentThread()+",消息的内容:"+s);
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
);
//启动消费者
consumer.start();
运行结果:
可以看出和我们生产数据的顺序完全不同,整个订单的顺序都反了
如何改实现顺序消费
生产者类
public class Producer
public static void main(String[] args) throws Exception
DefaultMQProducer producer = new DefaultMQProducer("orderlyProducerGroup");
producer.setNamesrvAddr("43.143.161.59:9876");
producer.start();
String topic = "orderTopic";
List<OrderStep> orderSteps = OrderUtil.buildOrders();
//设置队列选择器
MessageQueueSelector selector = new MessageQueueSelector()
@Override
public MessageQueue select(List<MessageQueue> list, Message message, Object o)
System.out.println("队列个数"+list.size());
Long orderId = (Long) o;
int index = (int)(orderId % list.size());
return list.get(index);
;
for(OrderStep step:orderSteps)
Message msg = new Message(topic,step.toString().getBytes(Charset.defaultCharset()));
//指定消息选择器,换入的参数
producer.send(msg,selector,step.getOrderId());
producer.shutdown();
消费者类
public class Consumer
public static void main(String[] args) throws Exception
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("orderlyConsumerGroup");
consumer.setNamesrvAddr("43.143.161.59:9876");
consumer.subscribe("orderTopic","*");
//从什么地方开始消费
consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
//一个队列对应一个线程
consumer.setMessageListener(new MessageListenerOrderly()
@Override
public ConsumeOrderlyStatus consumeMessage(List<MessageExt> list, ConsumeOrderlyContext consumeOrderlyContext)
for(MessageExt msg:list)
System.out.println("当前线程:"+Thread.currentThread()+":,队列ID"+msg.getQueueId()+",消息内容:"+new String(msg.getBody(),Charset.defaultCharset()));
return ConsumeOrderlyStatus.SUCCESS;
);
//启动消费者
consumer.start();
以上是关于RocketMQ-消息消费模式 顺序消费的主要内容,如果未能解决你的问题,请参考以下文章