RocketMQ学习笔记:消息发送模式
Posted 大苏打seven
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了RocketMQ学习笔记:消息发送模式相关的知识,希望对你有一定的参考价值。
这是本人学习的总结,主要学习资料如下
- 马士兵教育
目录
1、消息发送模式
三种发送方式,同步,异步和单向。
同步机会简单的一问一答,我发过去消息,MQ会立刻给我回复收到与否;没收到回复前会阻塞当前线程直到收到回复。
异步和通不一样也是一问一答,你需要回复收到,但是我不会等,我会做自己的事。
单向则是我发送消息,你收不收到我不关心。
2、消息消费模式
我们有两种消费模式,一种是集群消费
,另一种是广播消费
。默认情况和大部分情况是使用集群消费
-
集群消费:消费消息往往比生产消息要复杂,所以一个
porducer
往往对应多个consumer
。一个topic+tag
里有多条消息等待着多个consumer
消费。比如下面的图例,group 1
里有三个consumer
,他们各自分别从一个message queue
里拿消息。一条消息只会被送到某一个
message queue
中,被某一个consumer
消费,绝不会出现同一条消息被多个consumer
消费的情况。 -
广播消费:和集群模式相反,每一条消息都会发送给所有的
consumer
,也就是说一个消息会被消费多次,被不同的消费者各自消费一次。
3、顺序消息的消费和发送
顺序消息的消费和发送指的是,消费者消费的消息的顺序和生产者生产消息的顺序是一样的。
比如生产者生产消息是1,2,3,4
,消费者消费的顺序也是1,2,3,4
。
顺序消息又分为全局顺序和部分顺序。
3.1、全局顺序
全局顺序出现在topic
里只有一个message queue
的情况。
因为message queue
是队列的结构,消息先进先出,自然保证了消息消费的顺序。
3.2、部分顺序
部分顺序出现在topic
有多个message queue
的情况。
因为有多个message queue
对应多个consumer
,而不同的consumer
消费的效率有所不同,所以每个消息被消费的顺序不严格与生产者生成的相同。
它只能保证消息每个message queue
中的消息顺序消费。
如果业务上需要控制顺序,生产者在生产消息时可以将消息送到指定的message queue
。比如下图的M1,M4
指定送到queue1
,保证M1
比M4
先消费。
同时消费者也只从指定的message queue
拿消息即可
3.3、部分顺序代码样例
3.3.1、依赖
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-client</artifactId>
<version>4.8.0</version>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<version>1.18.24</version>
<scope>provided</scope>
</dependency>
3.3.2、发送信息
代码的场景是发送订单,有多个消息用来通知订单状态变化。订单生命周期created -> payed -> completed
。
其中有两个订单,id分别为1和2。订单1的状态消息都发送到message queue 1
,订单2 的状态消息都发送到message queue 2
。
public class ScheduleProducerExample
public static void main(String[] args) throws Exception
DefaultMQProducer producer = getProducer();
List<Order> orderList = getOrderList();
for(int i = 0; i < orderList.size(); i++)
String body = orderList.get(i).toString();
Message msg = new Message("PartOrder", null, "KEY" + i, body.getBytes());
SendResult sendResult = producer.send(msg, new MessageQueueSelector()
@Override
public MessageQueue select(List<MessageQueue> queueList, Message message, Object arg)
Integer id = (Integer) arg;
int index = id % queueList.size();
return queueList.get(index);
, orderList.get(i).getId());
System.out.println(String.format("SendResult status:%s, queueId:%d, body:%s",
sendResult.getSendStatus(),
sendResult.getMessageQueue().getQueueId(),
body));
producer.shutdown();
private static List<Order> getOrderList()
List<Order> list = new ArrayList<>();
list.add(new Order(1, "created"));
list.add(new Order(2, "created"));
list.add(new Order(1, "payed"));
list.add(new Order(2, "payed"));
list.add(new Order(1, "completed"));
list.add(new Order(2, "completed"));
return list;
private static DefaultMQProducer getProducer() throws MQClientException
DefaultMQProducer producer = new DefaultMQProducer("group1");
producer.setNamesrvAddr("localhost:9876");
producer.start();
return producer;
输出结果,可以看到订单1都发送到了queueId:1
,订单而都发送到了queueId:2
。
3.3.3、接受信息
注意在consumer.registerMessageListener
中会定义如何消费信息。返回值用于定义消费的后续处理,比如返回SUSPEND_CURRENT_QUEUE_A_MOMENT
则表示这次消息没有消费成功,下次还会继续消费该消息;
而SUCCESS
则表示消息消费成功,下次会消费下一条消息
public class PartialOrderConsumerExample
public static void main(String[] args) throws Exception
DefaultMQPushConsumer consumer = getConsumer();
consumer.registerMessageListener(new MessageListenerOrderly()
Random random = new Random();
@Override
public ConsumeOrderlyStatus consumeMessage(List<MessageExt> list, ConsumeOrderlyContext context)
context.setAutoCommit(true);
for(MessageExt msg: list)
System.out.println("consumeThread=" + Thread.currentThread().getName()
+ ", queueId=" + msg.getQueueId() + ", context:" + new String(msg.getBody()));
try
// mock business process
TimeUnit.MILLISECONDS.sleep(random.nextInt(300));
catch (Exception e)
e.printStackTrace();
// means that failed to consume this msg. In next time will still consume this msg.
return ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT;
// means that success to consume this msg. In the next time will consume next msg.
return ConsumeOrderlyStatus.SUCCESS;
);
consumer.start();
while(true)
// consumer.shutdown();
private static DefaultMQPushConsumer getConsumer() throws MQClientException
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("order_consumer1");
consumer.setNamesrvAddr("localhost:9876");
consumer.subscribe("PartOrder", "*");
return consumer;
返回结果消费成功,保证了消息部分有序。
4、延时消息的消费和发送
顾名思义,就是延时发送消息。下面看代码示例
4.1、代码样例
就普通地发送消息即可,只需要设置消息的属性就可达到延时的效果。比如msg.setDelayTimeLevel(1);
延时有18个等级,用数字1~18表示,分别代表1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h
。
public class DelayProducerExample
public static void main(String[] args) throws Exception
DefaultMQProducer producer = getProducer();
Message msg = getMessage();
SendResult sendResult = producer.send(msg);
System.out.printf("%s%n", sendResult);
producer.shutdown();
private static DefaultMQProducer getProducer() throws MQClientException
DefaultMQProducer producer = new DefaultMQProducer("group1");
producer.setNamesrvAddr("localhost:9876");
producer.start();
return producer;
private static Message getMessage()
Message msg = new Message();
msg.setTopic("topic1");
msg.setTags("tag1");
msg.setBody("hello world".getBytes());
// delayTimeLevel: 1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h
msg.setDelayTimeLevel(1);
return msg;
5、批量发送消息
就是一次性发送多个消息,大批量消息发送中可提高效率。
需要注意的是,批量发送的消息默认都是发送到同一个message queue
。
当然,批量发送的消息有上限。一次不能超过4M。超过的话我们需要拆分消息。
5.1、代码样例
其实比较简单,就是普通的send
方法,不过参数换成了List
,我们只需要往List
填信息就可以批量发送了。
public class BatchProducerExample
public static void main(String[] args) throws Exception
DefaultMQProducer producer = getProducer();
List<Message> messageList = getMessageList();
SendResult sendResult = producer.send(messageList);
System.out.printf("%s%n", sendResult);
producer.shutdown();
private static List<Message> getMessageList()
List<Message> messageList = new ArrayList<>();
messageList.add(new Message("topic1", "tag1", "hello world1".getBytes()));
messageList.add(new Message("topic1", "tag1", "hello world2".getBytes()));
messageList.add(new Message("topic1", "tag1", "hello world3".getBytes()));
messageList.add(new Message("topic1", "tag1", "hello world4".getBytes()));
messageList.add(new Message("topic1", "tag1", "hello world5".getBytes()));
messageList.add(new Message("topic1", "tag1", "hello world6".getBytes()));
return messageList;
private static DefaultMQProducer getProducer() throws MQClientException
DefaultMQProducer producer = new DefaultMQProducer("group1");
producer.setNamesrvAddr("localhost:9876");
producer.start();
return producer;
下面是接受的结果,所有消息都在同一个message queue
。
RocketMq小笔记
RocketMq特点
- RocketMq天然持久化存储消息
- RocketMq支持消息的退、拉模式。推模式天然不支持消息的批处理,对消息的处理是一条一条的处理。
- RocketMq消费端对消息的消费模式有两种:
1 集群消费 2 广播消费
默认为集群消费模式,若要设置为广播消费,则:consumer.setMessageModel(MessageModel.BROADCASTING);
- RocketMq启动的时候,要先启动消费端,后启动生产端,这是行业一个不成文的规则。
以上是关于RocketMQ学习笔记:消息发送模式的主要内容,如果未能解决你的问题,请参考以下文章