RocketMQ(10)——发送延时消息
Posted elim168
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了RocketMQ(10)——发送延时消息相关的知识,希望对你有一定的参考价值。
发送延时消息
正常发送的非事务消息只要到达了Broker就会写入消息队列,消费者就可以进行消费了。RocketMQ支持我们发送延时消费的消息,即现在发送的消息先发送到Broker,但是需要过一会才能进行消费。如果需要发送延时消息,只需要通过Message的setDelayDelevel()
指定一个消息的延时等级即可。RocketMQ内部定义了18个等级,分别是1s、5s、10s、30s、1m、2m、3m、4m、5m、6m、7m、8m、9m、10m、20m、30m、1h、2h,分别对应于1-18,所以如果一条消息需要延时1分钟,需要指定delayLevel为5,延时10分钟,则指定delayLevel为14。下面的代码中一共发送了10条延时消息,分别应用了1-10共10个delayLevel。
@Test
public void testScheduledMessageSend() throws Exception
DefaultMQProducer producer = new DefaultMQProducer("group1");
producer.setNamesrvAddr(this.nameServer);
producer.start();
for (int i=0; i<10; i++)
Message message = new Message("topic1", "tag5", String.valueOf(i+1).getBytes());
message.setDelayTimeLevel(i+1);
producer.send(message);
producer.shutdown();
下面代码可以用来验证一下这10条消息是不是真的延时消费了,通过实际输出的结果会看到从消费到产生的时间间隔与上面delayLevel上描述的基本一致,差的一点毫秒数在于消息传递上的耗时。
@Test
public void testScheduledMessageConsume() throws Exception
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("group2");
consumer.setNamesrvAddr(this.nameServer);
consumer.subscribe("topic1", "tag5");
consumer.registerMessageListener(new MessageListenerConcurrently()
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context)
MessageExt msg = msgs.get(0);
System.out.println(String.format("收到消息%s,延时%dms,内容:%s", msg.getMsgId(), System.currentTimeMillis()-msg.getBornTimestamp(), new String(msg.getBody())));
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
);
consumer.start();
TimeUnit.SECONDS.sleep(1200);
consumer.shutdown();
以下是笔者用上述代码跑出来的结果。
收到消息0AC030AA204418B4AAC28F8A0B700000,延时1076ms,内容:1
收到消息0AC030AA204418B4AAC28F8A0B790001,延时5006ms,内容:2
收到消息0AC030AA204418B4AAC28F8A0B7C0002,延时10005ms,内容:3
收到消息0AC030AA204418B4AAC28F8A0B810003,延时30005ms,内容:4
收到消息0AC030AA204418B4AAC28F8A0B830004,延时60005ms,内容:5
收到消息0AC030AA204418B4AAC28F8A0B870005,延时120007ms,内容:6
收到消息0AC030AA204418B4AAC28F8A0B8A0006,延时180004ms,内容:7
收到消息0AC030AA204418B4AAC28F8A0B8C0007,延时240003ms,内容:8
收到消息0AC030AA204418B4AAC28F8A0B8E0008,延时300005ms,内容:9
收到消息0AC030AA204418B4AAC28F8A0B900009,延时360002ms,内容:10
(注:本文是基于RocketMQ4.5.0所写)
以上是关于RocketMQ(10)——发送延时消息的主要内容,如果未能解决你的问题,请参考以下文章
Spring boot实战项目整合阿里云RocketMQ 消息队列实现发送普通消息,延时消息
Spring boot实战项目整合阿里云RocketMQ 消息队列实现发送普通消息,延时消息