RocketMQ实现延迟队列精确到秒级实现

Posted BasicLab基础架构实验室

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了RocketMQ实现延迟队列精确到秒级实现相关的知识,希望对你有一定的参考价值。

前言篇:

为了节约成本,决定通过自研来改造rocketmq,添加任意时间延迟的延时队列,开源版本的rocketmq只有支持18个等级的延迟时间,

其实对于大部分的功能是够用了的,但是以前的项目,全部都是使用了阿里云的rocketmq,原因是不同的供应商的订单的延时时间是不同的

(部分供应商的订单未支付30分钟取消,有些1个半小时取消,各种时间都有),

所以使用了大量的延时队列,但是开源版本不支持任意时间延时(希望官方支持这个功能)

为了实现这个功能,网上查询了不少资料,查询到不少相关的文章,主要实现都是基于时间轮来实现的,

但是比较少开源的代码实现(也许大家都没有这个需求吧)

debug实践篇:

1. 撸起袖子加油干,首先,下载源代码 https://github.com/apache/rocketmq.git,导入ide

运行mvn package 生成jar包,如果成功的话,会生成到distribution目录下面

2. 查看文档,发现要运行namesvr 和 broker

找到 src\\main\\java\\org\\apache\\rocketmq\\namesrv\\NamesrvStartup.java ,开心的执行main方法,

哦哦哦哦哦,果然报错了,提示 rocketmq.home.dir 目录不存在,查看源码, 原来是从system.propeties读取的,

为了调试,我毫不犹豫的加上了配置文件,

再次运行,不报错了,控制台显示,成功啦( 生活是多么美好,空气是多么清晰! )

3.运行 broker ,打开 src\\main\\java\\org\\apache\\rocketmq\\broker\\BrokerStartup.java,执行main方法,

添加 配置文件 ( D:\\\\mq\\\\rocketmq-rocketmq-all-4.9.2是我本地的路径,你要修改成自己的 )

1 System.setProperty("rocketmq.home.dir", "D:\\\\mq\\\\rocketmq-rocketmq-all-4.9.2\\\\rb");2 System.setProperty("user.home", "D:\\\\mq\\\\rocketmq-rocketmq-all-4.9.2\\\\rb\\\\home\\\\");

运行一下,成功了,开心的发一条消息,试试,哦哦哦哦哦。 发不出去哦( 人生最痛苦的事情是,快要成功了,却没有成功 ) 。

原来还要配置namesvr地址,在启动命令,添加 -n localhost:9876 ( 上面的namesvr 启动的ip和端口)

4.漫长的改造之路 (我们是勇敢的斯巴达勇士,一直勇往直前)

用了阿里云的延时队列,发现它的message 可以传一个时间过来(任意的延时时间)

来来来,我们复制一下( 不要告诉别人,我们一直是复制,粘贴的,没有原创, 嘘 ...... )

1/** 2      * 该类预定义一些系统键. 3      */4     static public class SystemPropKey 5         public static final String TAG = "__TAG";6         public static final String KEY = "__KEY";7         public static final String MSGID = "__MSGID";8         public static final String SHARDINGKEY = "__SHARDINGKEY";9         public static final String RECONSUMETIMES = "__RECONSUMETIMES";10         public static final String BORNTIMESTAMP = "__BORNTIMESTAMP";11         public static final String BORNHOST = "__BORNHOST";12/**13          * 设置消息的定时投递时间(绝对时间). <p>例1: 延迟投递, 延迟3s投递, 设置为: System.currentTimeMillis() + 3000; <p>例2: 定时投递,14          * 2016-02-01 11:30:00投递, 设置为: new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").parse("2016-02-0115          * 11:30:00").getTime()16          */17         public static final String STARTDELIVERTIME = "__STARTDELIVERTIME";18     
/**     * <p> 设置消息的定时投递时间(绝对时间),最大延迟时间为7天. </p> <ol> <li>延迟投递: 延迟3s投递, 设置为: System.currentTimeMillis() + 3000;</li>     * <li>定时投递: 2016-02-01 11:30:00投递, 设置为: new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").parse("2016-02-01     * 11:30:00").getTime()</li> </ol>     */    public void setStartDeliverTime(final long value)         putSystemProperties(SystemPropKey.STARTDELIVERTIME, String.valueOf(value));    

5.既然要改造rocketmq,在哪里改呢,debug,debug,debug(一直到天荒地老),功夫不负有心人,找到啦,

找到 \\src\\main\\java\\org\\apache\\rocketmq\\broker\\processor\\SendMessageProcessor.java, 发现

public RemotingCommand processRequest(ChannelHandlerContext ctx,                                          RemotingCommand request) throws RemotingCommandException         SendMessageContext mqtraceContext;switch (request.getCode()) case RequestCode.CONSUMER_SEND_MSG_BACK:return this.consumerSendMsgBack(ctx, request);default:                SendMessageRequestHeader requestHeader = parseRequestHeader(request);if (requestHeader == null) return null;                                mqtraceContext = buildMsgContext(ctx, requestHeader);                this.executeSendMessageHookBefore(ctx, request, mqtraceContext);                RemotingCommand response;if (requestHeader.isBatch())                     response = this.sendBatchMessage(ctx, request, mqtraceContext, requestHeader);                 else                     response = this.sendMessage(ctx, request, mqtraceContext, requestHeader);                                this.executeSendMessageHookAfter(response, mqtraceContext);return response;            

继续debug,发现 sendMessage 就是处理发送消息的,

如果我们在这里判断是否延时消息就写入文件,然后返回成功到客户端,等到了时间就发送延迟消息,不就搞定了吗?

oh,yes,就是这么干的

//处理延迟消息 delay message        String startTime = msgInner.getProperty(Message.SystemPropKey.STARTDELIVERTIME);        boolean isDelayMsg = false;        long nextStartTime = 0;if (startTime != null && msgInner.getDelayTimeLevel() <= 0)             nextStartTime = Long.parseLong(startTime);if (nextStartTime >= System.currentTimeMillis())                 isDelayMsg = true;                    if (isDelayMsg) return delayProcessor.handlePutMessageResultFuture(response, request, msgInner, ctx, queueIdInt, nextStartTime);         else if (traFlag != null && Boolean.parseBoolean(traFlag)) if (this.brokerController.getBrokerConfig().isRejectTransactionMessage())                     response.setCode(ResponseCode.NO_PERMISSION);                    response.setRemark("the broker[" + this.brokerController.getBrokerConfig().getBrokerIP1()                                    + "] sending transaction message is forbidden");return response;                                putMessageResult = this.brokerController.getTransactionalMessageService().prepareMessage(msgInner);             else                 putMessageResult = this.brokerController.getMessageStore().putMessage(msgInner);            return handlePutMessageResult(putMessageResult, response, request, msgInner, responseHeader, sendMessageContext, ctx, queueIdInt);            

其中 delayProcessor.handlePutMessageResultFuture 是我们用来处理延迟消息的地方

我们按照每个时间一个文件夹来保存延时消息,等延时消息到达后,定时的写入延时队列里面。

详细原理,请查考 rocketmq 原理实现篇 https://www.cnblogs.com/tomj2ee/p/15815186.html

<em> </em>
package org.apache.rocketmq.broker.delay;import io.netty.channel.ChannelHandlerContext;import org.apache.commons.lang3.time.DateFormatUtils;import org.apache.rocketmq.broker.BrokerController;import org.apache.rocketmq.common.protocol.ResponseCode;import org.apache.rocketmq.common.protocol.header.SendMessageResponseHeader;import org.apache.rocketmq.logging.InternalLogger;import org.apache.rocketmq.logging.InternalLoggerFactory;import org.apache.rocketmq.remoting.protocol.RemotingCommand;import org.apache.rocketmq.store.MessageExtBrokerInner;import java.io.*;import java.net.InetSocketAddress;import java.net.SocketAddress;import java.util.Date;import java.util.concurrent.ExecutorService;import java.util.concurrent.Executors;import java.util.concurrent.ThreadLocalRandom;public class DelayProcessor implements  Runnable     protected static final InternalLogger log = InternalLoggerFactory.getLogger(DelayProcessor.class.getCanonicalName());    protected final BrokerController brokerController;    protected final SocketAddress storeHost;    private ExecutorService jobTaskExecute = Executors.newFixedThreadPool(16);    public DelayProcessor(final BrokerController brokerController)         this.brokerController = brokerController;        this.storeHost = new InetSocketAddress(brokerController.getBrokerConfig().getBrokerIP1(), brokerController                .getNettyServerConfig().getListenPort());        Thread thread = new Thread(this);        thread.setName("delayProcessor-run---thread");        thread.setDaemon(true);new File(getDelayPath()).mkdirs();        thread.start();        Thread missCallThread = new Thread(() ->             try for(;;)                     Thread.sleep(10 * 1000);                    sendMissCallMsg();                             catch (InterruptedException e)                 e.printStackTrace();                    );        missCallThread.setName("delayProcessor-callback-thread");        missCallThread.start();        System.out.println("init delay success " +getDelayPath());        public RemotingCommand handlePutMessageResultFuture(RemotingCommand response,                                                        RemotingCommand request,                                                        MessageExtBrokerInner msgInner,                                                        ChannelHandlerContext ctx,int queueIdInt, long nextStartTime) return handlePutMessageResult(response, request, msgInner, ctx, queueIdInt, nextStartTime);        private RemotingCommand handlePutMessageResult(RemotingCommand response,                                                   RemotingCommand request, MessageExtBrokerInner msg,                                                   ChannelHandlerContext ctx,int queueIdInt, long nextStartTime)         boolean svOk = saveMsgFile(nextStartTime, msg);        SendMessageResponseHeader sendMessageResponseHeader = new SendMessageResponseHeader();        sendMessageResponseHeader.setQueueId(1);        sendMessageResponseHeader.setMsgId("0");        sendMessageResponseHeader.setQueueOffset(0l);        sendMessageResponseHeader.setTransactionId("");        RemotingCommand newCommand = RemotingCommand.createRequestCommand(ResponseCode.SUCCESS, sendMessageResponseHeader);if (svOk)             newCommand.setCode(ResponseCode.SUCCESS);         else             newCommand.setCode(ResponseCode.SYSTEM_ERROR);            newCommand.setRemark("发送消息延迟失败!");                newCommand.setExtFields(request.getExtFields());        newCommand.setVersion(response.getVersion());        newCommand.setOpaque(response.getOpaque());        newCommand.setLanguage(response.getLanguage());        newCommand.setBody(request.getBody());if (!request.isOnewayRPC())             try                 ctx.writeAndFlush(newCommand);             catch (Throwable e)                 log.error("DelayProcessor process request over, but response failed", e);                log.error(request.toString());                log.error(response.toString());                    return newCommand;        public void putMessage(MessageExtBrokerInner msgInner)         this.brokerController.getMessageStore().putMessage(msgInner);        @Override    public void run() for (; ; )             long curTime = System.currentTimeMillis() / 1000;            jobTaskExecute.submit(() -> sendMsg(curTime));            try                 Thread.sleep(1000);             catch (InterruptedException e)                             private String getDelayPath()         String delayPath = "./delay-store"+ File.separator + "delay";return delayPath;        private boolean saveMsgFile(long startTime, MessageExtBrokerInner msgInner)         ObjectOutputStream objectOutputStream = null;        try             String msgId =(startTime/1000 )+"-"+ System.currentTimeMillis() + "-" + ThreadLocalRandom.current().nextInt(99999999);            System.out.println( getCurrentTime()+"写入延迟消息 >>" + msgId);            String parentDir = getDelayPath() + File.separator + startTime / 1000;            File parentFile = new File(parentDir);if (!parentFile.exists())                 parentFile.mkdirs();                        String fileName = parentDir + File.separator + msgId;            FileOutputStream fos = new FileOutputStream(fileName);            BufferedOutputStream bos = new BufferedOutputStream(fos);            objectOutputStream = new ObjectOutputStream(bos);            objectOutputStream.writeObject(msgInner);returntrue;         catch (Exception ex)             log.error("saveMsgFile ex:", ex);returnfalse;         finally             try if (objectOutputStream != null)                     objectOutputStream.close();                             catch (Exception ex)                 log.error("saveMsgFile ex:", ex);                            private MessageExtBrokerInner readFile(File f)         ObjectInputStream ois = null;        try             ois = new ObjectInputStream(new FileInputStream(f));return (MessageExtBrokerInner) ois.readObject();         catch (Exception ex) return null;         finally if (ois != null)                 try                     ois.close();                 catch (IOException e)                     e.printStackTrace();                                            private  void sendMissCallMsg()         File lst = new File(getDelayPath());        File[] files = lst.listFiles();        long startTime = System.currentTimeMillis() / 1000 - 10 * 1000;for (File f : files)             String name = f.getName();if (f.isDirectory() && !name.equals(".") && !name.equals(".."))                 try                     Long fileTime = Long.parseLong(name);if (fileTime <= startTime)                         sendMsg(fileTime);                                     catch (Exception ex)                                             private String  getCurrentTime()return  Thread.currentThread().getName()+ ">>["+DateFormatUtils.format(new Date(), "yyyy-MM-dd HH:mm:ss")+"] ";        private void sendMsg(long startTime)         File lst = new File(getDelayPath() + File.separator + startTime);        File[] files = lst.listFiles();if (files != null) for (File f : files)                 System.out.println( getCurrentTime()+"时间到发送>> "+ startTime+" to commitLog " + f.getName());                MessageExtBrokerInner msgInner = readFile(f);if (msgInner != null)                     putMessage(msgInner);                    System.out.println( getCurrentTime()+"写入log >> "+ startTime+" to commitLog " + f.getName()+" success");                    f.delete();                                        lst.delete();            

总结:rocketmq延迟队列实现主要是通过时间轮和文件来保存延时消息,等到了时间后,再写入延时队列,来达到延时的目的。

总共有4种方式来实现延时队列,可以参考延时队列的实现原理篇

https://www.cnblogs.com/tomj2ee/p/15815157.html

开源rocketmq延迟队列实现:

https://gitee.com/venus-suite/rocketmq-with-delivery-time.git

以上是关于RocketMQ实现延迟队列精确到秒级实现的主要内容,如果未能解决你的问题,请参考以下文章

基于supervisor秒级Laravel定时任务

rocketmq实现延时队列

在Java中如何实现较为精确的定时器

分布式技术专题分布式消息队列-RocketMQ延迟消息实现原理和源码分析

APScheduler的cron触发器支持到秒级的cron表达式

RocketMq实现延时队列