rocketMQ之延时处理消息

Posted 一只猪的思考

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了rocketMQ之延时处理消息相关的知识,希望对你有一定的参考价值。

1 启动消费者等待传入的订阅消息

import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.common.message.MessageExt;
import java.util.List;

public class ScheduledMessageConsumer 

    public static void main(String[] args) throws Exception 
        // Instantiate message consumer
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("ExampleConsumer");
        // Subscribe topics
        consumer.subscribe("TestTopic", "*");
        // Register message listener
        consumer.registerMessageListener(new MessageListenerConcurrently() 
            @Override
            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> messages, ConsumeConcurrentlyContext context) 
                for (MessageExt message : messages) 
                    // Print approximate delay time period
                    System.out.println("Receive message[msgId=" + message.getMsgId() + "] "
                                       + (System.currentTimeMillis() - message.getStoreTimestamp()) + "ms later");
                
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            
        );
        // Launch consumer
        consumer.start();
    

2 发送延迟消息

import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.common.message.Message;

public class ScheduledMessageProducer 

    public static void main(String[] args) throws Exception 
        // Instantiate a producer to send scheduled messages
        DefaultMQProducer producer = new DefaultMQProducer("ExampleProducerGroup");
        // Launch producer
        producer.start();
        int totalMessagesToSend = 100;
        for (int i = 0; i < totalMessagesToSend; i++) 
            Message message = new Message("TestTopic", ("Hello scheduled message " + i).getBytes());
            // This message will be delivered to consumer 10 seconds later.
            message.setDelayTimeLevel(3);
            // Send the message
            producer.send(message);
        

        // Shutdown producer after use.
        producer.shutdown();
    


3 确认

您应该会看到消息在其存储时间后大约 10 秒被消耗。

4 延迟消息的使用场景

例如在电子商务中,如果提交订单,可以发送延迟消息,1小时后可以查看订单状态。 如果订单仍未付款,则可以取消订单并释放库存。

5 使用延迟消息的限制

// org/apache/rocketmq/store/config/MessageStoreConfig.java

private String messageDelayLevel = "1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h";

当前 RocketMQ 不支持任意时间的延迟。 生产者发送延迟消息前需要设置几个固定的延迟级别,分别对应1s到2h的1到18个延迟级,消息消费失败会进入延迟消息队列,消息发送时间与设置的延迟级别和重试次数有关。

以上是关于rocketMQ之延时处理消息的主要内容,如果未能解决你的问题,请参考以下文章

rocketMQ之延时处理消息

RocketMQ使用之消息保证,重复读,积压,顺序,过滤,延时,事务,死信

RocketMQ(十五)延时消息

rocketmq延时消息自定义配置;topic下tag使用

rocketmq源码分析:事务消息延时消息消息查询

RocketMQ源码 — 九 RocketMQ延时消息