RocketMQ(10)——发送延时消息

Posted elim168

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了RocketMQ(10)——发送延时消息相关的知识,希望对你有一定的参考价值。

发送延时消息

正常发送的非事务消息只要到达了Broker就会写入消息队列,消费者就可以进行消费了。RocketMQ支持我们发送延时消费的消息,即现在发送的消息先发送到Broker,但是需要过一会才能进行消费。如果需要发送延时消息,只需要通过Message的setDelayDelevel()指定一个消息的延时等级即可。RocketMQ内部定义了18个等级,分别是1s、5s、10s、30s、1m、2m、3m、4m、5m、6m、7m、8m、9m、10m、20m、30m、1h、2h,分别对应于1-18,所以如果一条消息需要延时1分钟,需要指定delayLevel为5,延时10分钟,则指定delayLevel为14。下面的代码中一共发送了10条延时消息,分别应用了1-10共10个delayLevel。

@Test
public void testScheduledMessageSend() throws Exception 
  DefaultMQProducer producer = new DefaultMQProducer("group1");
  producer.setNamesrvAddr(this.nameServer);
  producer.start();
  for (int i=0; i<10; i++) 
    Message message = new Message("topic1", "tag5", String.valueOf(i+1).getBytes());
    message.setDelayTimeLevel(i+1);
    producer.send(message);
  
  producer.shutdown();

下面代码可以用来验证一下这10条消息是不是真的延时消费了,通过实际输出的结果会看到从消费到产生的时间间隔与上面delayLevel上描述的基本一致,差的一点毫秒数在于消息传递上的耗时。

@Test
public void testScheduledMessageConsume() throws Exception 
  DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("group2");
  consumer.setNamesrvAddr(this.nameServer);
  consumer.subscribe("topic1", "tag5");
  consumer.registerMessageListener(new MessageListenerConcurrently() 
    @Override
    public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) 
      MessageExt msg = msgs.get(0);
      System.out.println(String.format("收到消息%s,延时%dms,内容:%s", msg.getMsgId(), System.currentTimeMillis()-msg.getBornTimestamp(), new String(msg.getBody())));
      return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
    
  );
  consumer.start();
  TimeUnit.SECONDS.sleep(1200);
  consumer.shutdown();

以下是笔者用上述代码跑出来的结果。

收到消息0AC030AA204418B4AAC28F8A0B700000,延时1076ms,内容:1
收到消息0AC030AA204418B4AAC28F8A0B790001,延时5006ms,内容:2
收到消息0AC030AA204418B4AAC28F8A0B7C0002,延时10005ms,内容:3
收到消息0AC030AA204418B4AAC28F8A0B810003,延时30005ms,内容:4
收到消息0AC030AA204418B4AAC28F8A0B830004,延时60005ms,内容:5
收到消息0AC030AA204418B4AAC28F8A0B870005,延时120007ms,内容:6
收到消息0AC030AA204418B4AAC28F8A0B8A0006,延时180004ms,内容:7
收到消息0AC030AA204418B4AAC28F8A0B8C0007,延时240003ms,内容:8
收到消息0AC030AA204418B4AAC28F8A0B8E0008,延时300005ms,内容:9
收到消息0AC030AA204418B4AAC28F8A0B900009,延时360002ms,内容:10

(注:本文是基于RocketMQ4.5.0所写)

以上是关于RocketMQ(10)——发送延时消息的主要内容,如果未能解决你的问题,请参考以下文章

RocketMQ(10)——发送延时消息

rocketmq实现延时队列

Spring boot实战项目整合阿里云RocketMQ 消息队列实现发送普通消息,延时消息

Spring boot实战项目整合阿里云RocketMQ 消息队列实现发送普通消息,延时消息

Spring boot实战项目整合阿里云RocketMQ 消息队列实现发送普通消息,延时消息

rocketmq源码分析:事务消息延时消息消息查询