RocketMQ源码(24)—DefaultMQPushConsumer延迟消息源码
Posted 刘Java
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了RocketMQ源码(24)—DefaultMQPushConsumer延迟消息源码相关的知识,希望对你有一定的参考价值。
基于RocketMQ release-4.9.3,深入的介绍了DefaultMQPushConsumer延迟消息源码。
文章目录
并发消息消费失败引发消费重试时,默认情况下重试16次,从延迟等级level3(10s)开始,每次延迟时间递增,时间到了又会发送到重试topic去消费,这其中就涉及到RocketMQ的延迟消息,可以说RocketMQ并发消息消费失败引发消费重试就是基于topic替换和延迟消息这两个技术实现的。
此前我们学习了RocketMQ的消费重试,我们知道在判断消息为延迟消息的时候,即DelayTimeLevel大于0,那么替换topic为SCHEDULE_TOPIC_XXXX,替换queueId为延迟队列id, id = level - 1,如果延迟级别大于最大级别,则设置为最大级别18,,默认延迟2h。这些参数可以在broker端配置类MessageStoreConfig中配置。最后保存真实topic到消息的REAL_TOPIC属性,保存queueId到消息的REAL_QID属性,方便后面恢复。
实际上普通的延迟消息也会进行topic替换,那么,发送到SCHEDULE_TOPIC_XXXX对应的消息队列里面的延迟消息,到底是做到能够在给定的延迟时间之后取出来重新投递的呢?下面我们来看看RocketMQ延迟消息的源码。
实际上RocketMQ通过ScheduleMessageService调度消息服务实现延迟(定时)消息。ScheduleMessageService继承了ConfigManager,在DefaultMessageStore实例化的时候被实例化。
1 load加载延迟消息数据
在broker启动执行DefaultMessageStore#load方法加载Commit Log、Consume Queue、index file等文件,将数据加载到内存中,并完成数据的恢复的时候,同样会执行ScheduleMessageService#load方法,加载延迟消息数据,初始化delayLevelTable和offsetTable。
首先调用父类的ConfigManager#load方法(在broker启动部分就讲过源码了),将延迟消息文件$user.home/store/config/delayOffset.json加载到内存的offsetTable集合中,delayOffset.json中保存着延迟topic每个队列的消费进度(消费偏移量)。
/**
* ScheduleMessageService的方法
* <p>
* 加载延迟消息数据,初始化delayLevelTable和offsetTable
*/
@Override
public boolean load()
//调用父类ConfigManager#load方法,将延迟消息文件$user.home/store/config/delayOffset.json加载到内存的offsetTable集合中
//delayOffset.json中保存着延迟topic每个队列的消费进度(消费偏移量)
boolean result = super.load();
//解析延迟级别到delayLevelTable集合中
result = result && this.parseDelayLevel();
//矫正每个延迟队列的偏移量
result = result && this.correctDelayOffset();
return result;
/**
* ScheduleMessageService的方法
* <p>
* 获取延迟消息文件路径$user.home/store/config/delayOffset.json
*/
@Override
public String configFilePath()
//$user.home/store/config/delayOffset.json
return StorePathConfigHelper.getDelayOffsetStorePath(this.defaultMessageStore.getMessageStoreConfig()
.getStorePathRootDir());
/**
* ScheduleMessageService的方法
* <p>
* json字符串转换为offsetTable对象
*/
@Override
public void decode(String jsonString)
if (jsonString != null)
DelayOffsetSerializeWrapper delayOffsetSerializeWrapper =
DelayOffsetSerializeWrapper.fromJson(jsonString, DelayOffsetSerializeWrapper.class);
if (delayOffsetSerializeWrapper != null)
this.offsetTable.putAll(delayOffsetSerializeWrapper.getOffsetTable());
延迟消息文件$user.home/store/config/delayOffset.json的内容,它保存着延迟队列对应的消费偏移量。
"offsetTable":3:2,4:1
1.1 parseDelayLevel解析延迟等级
延迟等级字符串存储在MessageStoreConfig的messageDelayLevel属性中,默认值为"1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h",即18个等级,因此也是可以配置的,但是单位仅支持s、m、h、d,分别表示秒、分、时、天。
该方法解析延迟等级以及对应的延迟时间到delayLevelTable中,单位统一转换为毫秒,注意延迟等级从1开始。
/**
* ScheduleMessageService的方法
* 解析延迟等级到delayLevelTable中
* @return
*/
public boolean parseDelayLevel()
//时间单位表
HashMap<String, Long> timeUnitTable = new HashMap<String, Long>();
timeUnitTable.put("s", 1000L);
timeUnitTable.put("m", 1000L * 60);
timeUnitTable.put("h", 1000L * 60 * 60);
timeUnitTable.put("d", 1000L * 60 * 60 * 24);
//从MessageStoreConfig中获取延迟等级字符串messageDelayLevel
String levelString = this.defaultMessageStore.getMessageStoreConfig().getMessageDelayLevel();
try
//通过空格拆分
String[] levelArray = levelString.split(" ");
for (int i = 0; i < levelArray.length; i++)
//获取每个等级的延迟时间
String value = levelArray[i];
//获取延迟单位
String ch = value.substring(value.length() - 1);
//获取对应的延迟单位的时间毫秒
Long tu = timeUnitTable.get(ch);
//延迟等级,从1开始
int level = i + 1;
//如果当前等级已经大于最大等级,则赋值为最大等级
if (level > this.maxDelayLevel)
this.maxDelayLevel = level;
//延迟时间
long num = Long.parseLong(value.substring(0, value.length() - 1));
//计算该等级的延迟时间毫秒
long delayTimeMillis = tu * num;
//存入delayLevelTable中
this.delayLevelTable.put(level, delayTimeMillis);
if (this.enableAsyncDeliver)
this.deliverPendingTable.put(level, new LinkedBlockingQueue<>());
catch (Exception e)
log.error("parseDelayLevel exception", e);
log.info("levelString String = ", levelString);
return false;
return true;
2 start启动调度消息服务
ScheduleMessageService依靠内部的定时任务实现延迟消息,ScheduleMessageService通过start方法完成启动。
在broker的启动过程中,会执行DefaultMessageStore的start方法中,该方法内部通过handleScheduleMessageService方法执行ScheduleMessageService的start方法。
该方法的大概逻辑为:
- 初始化延迟消息投递线程池deliverExecutorService,该线程池是一个调度任务线程池ScheduledThreadPoolExecutor,核心线程数就是最大的延迟等级,默认18。
- 遍历所有的延迟等级,为每一个延迟等级构建一个对应的DeliverDelayedMessageTimerTask调度任务放到deliverExecutorService中,默认延迟1000ms后执行。
- 构建一个延迟队列消费偏移量持久化的定时调度任务,首次延迟1000ms之后执行,后续每次执行间隔flushDelayOffsetInterval时间,默认10s。
/**
* ScheduleMessageService的方法
* <p>
* 启动调度消息服务
*/
public void start()
//将启动标志CAS的从false改为true,该服务只能启动一次
if (started.compareAndSet(false, true))
//调用父类的load方法,将延迟消息文件$user.home/store/config/delayOffset.json加载到内存的offsetTable集合中
//fix(dledger): reload the delay offset when master changed (#2518)
super.load();
/*
* 1 初始化延迟消息投递线程池,核心线程数就是最大的延迟等级,默认18
*/
this.deliverExecutorService = new ScheduledThreadPoolExecutor(this.maxDelayLevel, new ThreadFactoryImpl("ScheduleMessageTimerThread_"));
//异步投递,默认不支持
if (this.enableAsyncDeliver)
this.handleExecutorService = new ScheduledThreadPoolExecutor(this.maxDelayLevel, new ThreadFactoryImpl("ScheduleMessageExecutorHandleThread_"));
/*
* 2 对所有的延迟等级构建一个对应的DeliverDelayedMessageTimerTask调度任务,默认延迟1000ms后执行
*/
for (Map.Entry<Integer, Long> entry : this.delayLevelTable.entrySet())
//延迟等级
Integer level = entry.getKey();
//延迟时间,毫秒
Long timeDelay = entry.getValue();
//根据延迟等级获取对应的延迟队列的消费偏移量,如果没有则设置为0
Long offset = this.offsetTable.get(level);
if (null == offset)
offset = 0L;
//延迟时间不为null,那么为该等级的延迟队列构建一个DeliverDelayedMessageTimerTask调度任务,默认延迟1000ms后执行
if (timeDelay != null)
if (this.enableAsyncDeliver)
this.handleExecutorService.schedule(new HandlePutResultTask(level), FIRST_DELAY_TIME, TimeUnit.MILLISECONDS);
//DeliverDelayedMessageTimerTask构造参数包括对应的延迟等级,以及最新消费偏移量
this.deliverExecutorService.schedule(new DeliverDelayedMessageTimerTask(level, offset), FIRST_DELAY_TIME, TimeUnit.MILLISECONDS);
/*
* 3 构建一个延迟队列消费偏移量持久化的定时调度任务,首次延迟1000ms之后执行,后续每次执行间隔flushDelayOffsetInterval时间,默认10s
*/
this.deliverExecutorService.scheduleAtFixedRate(new Runnable()
@Override
public void run()
try
if (started.get())
ScheduleMessageService.this.persist();
catch (Throwable e)
log.error("scheduleAtFixedRate flush exception", e);
, 10000, this.defaultMessageStore.getMessageStoreConfig().getFlushDelayOffsetInterval(), TimeUnit.MILLISECONDS);
3 DeliverDelayedMessageTimerTask投递延迟消息任务
在start方法中,ScheduleMessageService会为每一个延迟等级创建一个DeliverDelayedMessageTimerTask投递延迟消息任务,不同延迟等级的消息放到不同的延迟队列里面,被不同的Task处理。
采用不同的队列处理同一个延迟等级的消息的方式,不再需要进行消息排序,避免了消息排序的复杂逻辑,能比较简单的实现有限等级的延迟消息,RocketMQ的开源版本不支持任意时间的延迟消息,这也是它的一个限制吧!
DeliverDelayedMessageTimerTask是一个线程任务,下面来看看它的run方法,主要是调用executeOnTimeup执行消息投递。
@Override
public void run()
try
//如果服务已启动,那么继续执行
if (isStarted())
//执行消息投递
this.executeOnTimeup();
catch (Exception e)
// XXX: warn and notify me
log.error("ScheduleMessageService, executeOnTimeup exception", e);
//抛出异常,新建一个DeliverDelayedMessageTimerTask任务存入deliverExecutorService,10000ms后执行,本次任务结束
this.scheduleNextTimerTask(this.offset, DELAY_FOR_A_PERIOD);
3.1 executeOnTimeup执行延迟消息投递
延迟消息的核心逻辑实现,执行延迟消息消息投递。
- 调用findConsumeQueue方法,根据topic和延迟队列id从consumeQueueTable查找需要写入的ConsumeQueue,如果没找到就新建,即ConsumeQueue文件是延迟创建的。该方法的源码我们在ReputMessageService异步构建ConsumeQueue和IndexFile部分已经讲过了。
- 如果没找到对应的消息队列,调用scheduleNextTimerTask方法,新建一个DeliverDelayedMessageTimerTask任务存入deliverExecutorService,100ms后执行,本次任务结束。
- 调用getIndexBuffer方法,根据逻辑offset定位到物理偏移量,然后截取该偏移量之后的一段Buffer,其包含要拉取的消息的索引数据及对应consumeQueue文件之后的全部索引数据,即这里截取的Buffer可能包含多条索引数据。该方法的源码我们在broker处理拉取消息请求部分已经讲过了。
- 遍历缓存buffer中的消息,根据tagsCode投递时间判断消息是否到期,如果到期则回复真实消息并且投递安到真实topic以及对应的queueId中。
- 获取该条目对应的消息的tagsCode,对于延迟消息,tagsCode被替换为延迟消息的发送时间(在CommitLog#checkMessageAndReturnSize方法中,源码此前讲过了)。
- 如果投递时间小于当前时间,那么可以投递该延迟消息。如果投递时间大于当前时间,那么新建一个DeliverDelayedMessageTimerTask任务存入deliverExecutorService,指定新的消费偏移量,100ms后执行,本次任务结束。
- 根据消息物理偏移量从commitLog中找到该条消息,调用messageTimeup方法构建内部消息对象,设置topic为REAL_TOPIC属性值,即原始topic,设置queueId为REAL_QID属性值,即原始queueId。即恢复为正常消息。
- 最后调用syncDeliver方法投递该消息,消息将会被投递到原始topic和队列中,这样就可以被消费了。
- 遍历结束,更新下一个offset,新建一个DeliverDelayedMessageTimerTask任务存入deliverExecutorService,指定新的消费偏移量,100ms后执行,本次任务结束。保证线程任务的活性。
/**
* DeliverDelayedMessageTimerTask的方法
* <p>
* 执行延迟消息消息投递
*/
public void executeOnTimeup()
/*
* 1 根据topic和延迟队列id从consumeQueueTable查找需要写入的ConsumeQueue,如果没找到就新建,即ConsumeQueue文件是延迟创建的。
* 该方法的源码我们在ReputMessageService异步构建ConsumeQueue和IndexFile部分已经讲过了
*/
ConsumeQueue cq =
ScheduleMessageService.this.defaultMessageStore.findConsumeQueue(TopicValidator.RMQ_SYS_SCHEDULE_TOPIC,
delayLevel2QueueId(delayLevel));
/*
* 2 如果没找到对应的消息队列,新建一个DeliverDelayedMessageTimerTask任务存入deliverExecutorService,100ms后执行,本次任务结束
*/
if (cq == null)
this.scheduleNextTimerTask(this.offset, DELAY_FOR_A_WHILE);
return;
/*
* 3 根据逻辑offset定位到物理偏移量,然后截取该偏移量之后的一段Buffer,其包含要拉取的消息的索引数据及对应consumeQueue文件之后的全部索引数据。
* 这里截取的Buffer可能包含多条索引数据,因为需要批量拉取多条消息,以及进行消息过滤。
* 该方法的源码我们在broker处理拉取消息请求部分已经讲过了
*/
SelectMappedBufferResult bufferCQ = cq.getIndexBuffer(this.offset);
//没获取到缓存buffer
if (bufferCQ == null)
long resetOffset;
//如果当前消息队列的最小偏移量 大于 当前偏移量,那么当前偏移量无效,设置新的offset为最小偏移量
if ((resetOffset = cq.getMinOffsetInQueue()) > this.offset)
log.error("schedule CQ offset invalid. offset=, cqMinOffset=, queueId=",
this.offset, resetOffset, cq.getQueueId());
//如果当前消息队列的最大偏移量 小于 当前偏移量,那么当前偏移量无效,设置新的offset为最大偏移量
else if ((resetOffset = cq.getMaxOffsetInQueue()) < this.offset)
log.error("schedule CQ offset invalid. offset=, cqMaxOffset=, queueId=",
this.offset, resetOffset, cq.getQueueId());
else
resetOffset = this.offset;
//新建一个DeliverDelayedMessageTimerTask任务存入deliverExecutorService,指定新的消费偏移量,100ms后执行,本次任务结束
this.scheduleNextTimerTask(resetOffset, DELAY_FOR_A_WHILE);
return;
/*
* 3 遍历缓存buffer中的消息,根据tagsCode投递时间判断消息是否到期,如果到期则回复真实消息并且投递安到真实topic以及对应的queueId中
*/
//下一个消费的offset
long nextOffset = this.offset;
try
//i表示consumeQueue消息索引大小
int i = 0;
ConsumeQueueExt.CqExtUnit cqExtUnit = new ConsumeQueueExt.CqExtUnit();
//遍历截取的Buffer中的consumeQueue消息索引,固定长度20b
for (; i < bufferCQ.getSize() && isStarted(); i += ConsumeQueue.CQ_STORE_UNIT_SIZE)
//获取该条目对应的消息在commitlog文件中的物理偏移量
long offsetPy = bufferCQ.getByteBuffer().getLong();
//获取该条目对应的消息在commitlog文件中的总长度
int sizePy = bufferCQ.getByteBuffer().getInt();
//获取该条目对应的消息的tagsCode,对于延迟消息,tagsCode被替换为延迟消息的发送时间(CommitLog#checkMessageAndReturnSize方法中)
long tagsCode = bufferCQ.getByteBuffer().getLong();
//如果tagsCode是扩展文件地址
if (cq.isExtAddr(tagsCode))
if (cq.getExt(tagsCode, cqExtUnit))
tagsCode = cqExtUnit.getTagsCode();
else
//can't find ext content.So re compute tags code.
log.error("[BUG] can't find consume queue extend file content!addr=, offsetPy=, sizePy=",
tagsCode, offsetPy, sizePy);
long msgStoreTime = defaultMessageStore.getCommitLog().pickupStoreTimestamp(offsetPy, sizePy);
tagsCode = computeDeliverTimestamp(delayLevel, msgStoreTime);
//当前时间戳
long now = System.currentTimeMillis();
//校验投递时间,必须小于等于当前时间 + 延迟时间
long deliverTimestamp = this.correctDeliverTimestamp(now, tagsCode);
//计算下一个offset
nextOffset = offset + (i / ConsumeQueue.CQ_STORE_UNIT_SIZE);
//如果投递时间大于当前时间,那么新建一个DeliverDelayedMessageTimerTask任务存入deliverExecutorService,指定新的消费偏移量,100ms后执行,本次任务结束
long countdown = deliverTimestamp - now;
if (countdown > 0)
this.scheduleNextTimerTask(nextOffset, DELAY_FOR_A_WHILE);
return;
//根据消息物理偏移量从commitLog中找到该条消息。
MessageExt msgExt = ScheduleMessageService.this.defaultMessageStore.lookMessageByOffset(offsetPy, sizePy);
if (msgExt == null)
continue;
//构建内部消息对象,设置topic为REAL_TOPIC属性值,即原始topic,设置queueId为REAL_QID属性值,即原始queueId。即恢复为正常消息
MessageExtBrokerInner msgInner = ScheduleMessageService.this.messageTimeup(msgExt);
if (TopicValidator.RMQ_SYS_TRANS_HALF_TOPIC.equals(msgInner.getTopic()))
log.error("[BUG] the real topic of schedule msg is , discard the msg. msg=",
msgInner.getTopic(), msgInner);
continue;
boolean deliverSuc;
/*
* 消息投递
*/
if (ScheduleMessageService.this.enableAsyncDeliver)
//异步投递,默认不支持
deliverSuc = this.asyncDeliver(msgInner, msgExt.getMsgId(), offset, offsetPy, sizePy);
else
//默认同步投递
deliverSuc = this.syncDeliver(msgInner, msgExt.getMsgId(), offset, offsetPy, sizePy);
//如果投递失败,那么新建一个DeliverDelayedMessageTimerTask任务存入deliverExecutorService,指定新的消费偏移量,100ms后执行,本次任务结束
if (!deliverSuc)
this.RocketMQ源码—RocketMQ源码调试环境准备
详细介绍了RocketMQ 4.9.3 分支的源码调试环境搭建等一系列准备工作。
RocketMQ的源码这么多,我们肯定不会全部看完的,我们的源码分析主要会涉及到namesrv、broker、client、remoting、store等模块,也就是生产者、消费者、nameServr、Broker这几个角色的核心功能点。
在看RocketMQ源码之前,一定一定一定要先学会如何使用RocketMQ,建议通看官方文档:
首先我们尝试搭建RocketMQ的源码调试环境,这里不考虑任何集群环境,全部都是单机启动,首先是namesrv,然后是broker,最后启动生产者和消费者测试。
本源码基于RocketMQ 4.9.3版本。
文章目录
1 下载源码包
进入rocketmq官网https://github.com/apache/rocketmq,先fork源码到自己的仓库,然后,下载自己的仓库中fork的源码clone下来,这样就可以把自己写的源码注释上传到github了。
idea打开源码,会下载依赖,切换到4.9.3分支:
![](https://image.cha138.com/20220503/4bd822cf26e04311b80fc43988968e9f.jpg)
到此源码就下载下来了,看下结构:
![](https://image.cha138.com/20220503/9e7e1092994543c196789647ebbd0865.jpg)
可以看到很多的子module,简单介绍下:
- acl:用户权限、安全、验证相关模块。
- broker:RocketMQ 的 Broker的实现代码,实现消息存储、投递、查询等功能,重点。
- client:RocketMQ 的 Producer、Consumer、管理后台等客户端的实现代码,实现生产消息、消费消息、后台监控等,重点。
- common:公共的类和方法的模块。
- dev:开发相关的一些信息。
- distribution:部署 RocketMQ 的一些脚本和配置文件,比如 bin,conf目录。
- example:简单使用 RocketMQ 的案例。
- filter:RocketMQ 的消息过滤器。
- logging:RocketMQ 的日志模块。
- namesvr:RocketMQ 的NameServer 的实现代码,实现注册中心和名字服务,重点。
- openmessaging:开放消息标准模块。
- remoting:RocketMQ 的远程网络通信模块,基于netty实现,重点。
- srvutil:namesrc 模块的相关工具类。
- store: Broker的消息存储的模块,重点。
- style: 代码检查相关的。
- test:测试相关模块。
- tools:命令行监控工具相关工具类。
2 配置目录
我们首先配置ROCKETMQ_HOME的目录地址,这个目录就是专门用于存放配置文件的地方。我们直接在rocketmq的项目主目录下面新建一个config目录,作为ROCKETMQ_HOME的目录地址:
![](https://image.cha138.com/20220503/2c13685329e64b22a30c884533b7b36d.jpg)
然后在config下面创建三个目录conf、logs、store,config用于存放配置文件,logd用于存放日志文件,store则用于消息存储。
![](https://image.cha138.com/20220503/13e6ca8c17654377a103594a9f419552.jpg)
然后我们将distribution模块下面的broker.conf、logback_namesrv.xml、logback_broker.xml这三个配置文件拷贝到此前创建的conf目录下:
![](https://image.cha138.com/20220503/b4250e07e8b74541baaa34f268587ebb.jpg)
拷贝到conf目录下:
![](https://image.cha138.com/20220503/11662a737b2b40a388ffb876a7bb19c3.jpg)
然后我们修改broker.conf文件,添加如下配置:
brokerClusterName = DefaultCluster
brokerName = broker-a
brokerId = 0
#删除文件的时间点,一天的固定时间执行一次删除过期文件操作,默认为凌晨4点。
deleteWhen = 04
#文件保留时间,也就是从最后一次更新时间到现在,如果超过了该时间,则认为是过期文件,可以被删除,单位小时
fileReservedTime = 48
#broker的角色,默认是异步master,即生产者发送的每一条消息只要写入master就返回告诉生产者成功。然后再“异步复制”到slave。
#同步master:Sync Broker:生产者发送的每一条消息都至少同步复制到一个slave后才返回告诉生产者成功,即“同步双写”。
brokerRole = ASYNC_MASTER
#消息刷盘策略,默认是异步刷盘。
#异步刷盘ASYNC_FLUSH:生产者发送的每一条消息并不是立即保存到磁盘,而是暂时缓存起来,然后就返回生产者成功。随后再异步的将缓存数据保存到磁盘,有两种情况:
#1是定期将缓存中更新的数据进行刷盘,2是当缓存中更新的数据条数达到某一设定值后进行自动刷盘。异步刷盘有较低概率导致消息丢失,比如在还未来得及同步到磁盘的时候宕机,但是性能更好。
#同步刷盘SYNC_FLUSH:生产者发送的每一条消息都在保存到磁盘成功后才返回告诉生产者成功。这种方式不会存在消息丢失的问题,但是有很大的磁盘IO开销,性能有一定影响。
flushDiskType = ASYNC_FLUSH
#nameserver的地址,也可以指定真实ip
namesrvAddr=127.0.0.1:9876
#brokerIp,也可以指定真实ip
brokerIP1=127.0.0.1
#消息存储根路径
storePathRootDir=/Volumes/Samsung/Idea/rocketmq/config/store
#commitLog文件的存储路径
storePathCommitLog=/Volumes/Samsung/Idea/rocketmq/config/store/commitlog
#consume queue文件的存储路径
storePathConsumeQueue=/Volumes/Samsung/Idea/rocketmq/config/store/consumequeue
#消息索引文件的存储路径
storePathIndex=/Volumes/Samsung/Idea/rocketmq/config/store/index
#checkpoint文件的存储路径
storeCheckpoint=/Volumes/Samsung/Idea/rocketmq/config/store/checkpoint
#abort文件的存储路径
abortFile=/Volumes/Samsung/Idea/rocketmq/config/store/abort
注意上面的配置中的前缀路径填写自己创建的ROCKETMQ_HOME的目录,我的就是/Volumes/Samsung/Idea/rocketmq/config,我是mac系统,文件路径和windows的有些区别。
然后我们来修改日志文件的配置:
-
首先将logback_namesrv.xml和logback_broker.xml配置文件中的所有$user.home都替换为自己的ROCKETMQ_HOME的目录,比如$user.home/logs/rocketmqlogs/namesrv_default.log,我会替换为/Volumes/Samsung/Idea/rocketmq/config/logs/rocketmqlogs/namesrv_default.log。
-
然后将所有的$brokerLogDir都替换为broker,这一级路径下主要存放broker的日志。
例如,替换前:
![](https://image.cha138.com/20220503/08ecfc40d03243e4b2e57acd061c965f.jpg)
替换后:
![](https://image.cha138.com/20220503/6134621cc86c4edbb2995a149e41ef8d.jpg)
3 启动namesrv
nameserver是rocketmq的注册中心,提供名字服务,因此需要最先启动。找到namesrv模块的NamesrvStartup类,尝试直接运行main方法:
![](https://image.cha138.com/20220503/2e1e08d669ce490a831e49d2dad9144d.jpg)
当然是无法启动的,控制台打印需要设置ROCKETMQ_HOME环境变量的提示信息:
Please set the ROCKETMQ_HOME variable in your environment to match the location of the RocketMQ installation
这个ROCKETMQ_HOME变量是rocketmq的配置文件目录,实际上就是指的distribution模块里面的conf文件夹所在的目录,而我们之前已经单独配置了ROCKETMQ_HOME目录,所以此时我们配置那一个路径即可。
我们在idea中配置ROCKETMQ_HOME=/Volumes/Samsung/Idea/rocketmq/config的环境变量即可。
![](https://image.cha138.com/20220503/8830f2be91884ed8a42476d9480dd75c.jpg)
再次运行,可以在控制台看到启动的日志输出:
The Name Server boot success. serializeType=JSON
此时namesrv启动成功!同时在我们配置的logs目录下可以看到详细的启动日志。
![](https://image.cha138.com/20220503/81e9e0853693400ca006b4671c7ac8ae.jpg)
4 启动broker
namesrv启动完毕后,下面我们启动rocketmq的broker服务。
broker通过broker模块里面的BrokerStartup类启动,同样,我们需要配置启动环境变量ROCKETMQ_HOME=/Volumes/Samsung/Idea/rocketmq/config。
同时,相比于namesrv的启动,多了一个启动参数的配置:-c /Volumes/Samsung/Idea/rocketmq/config/conf/broker.conf,后面的路径是我们配置的broker.conf配置文件的地址。
![](https://image.cha138.com/20220503/ac8f836d13074e4b93d3a325007ff31f.jpg)
启动broker,可以看到启动成功的日志:
The broker[broker-a, 127.0.0.1:10911] boot success. serializeType=JSON and name server is 127.0.0.1:9876
注意有时候broker并没有连上nameserver,但是控制台没有报错,因此需要在我们配置的logs目录下查看详细的启动日志来确定是否真的成功了。
![](https://image.cha138.com/20220503/52c11aa65d994166a6f75b355f599f6f.jpg)
查看broker.log日志:
![](https://image.cha138.com/20220503/d529e9fd0c2d4827974ff0c8235f7ccc.jpg)
5 启动管理后台
管理后台项目不在主项目中,需要单独的下载,此前的Console项目已被重新命名为dashboard:https://github.com/apache/rocketmq-dashboard。
通过git clone下载下来后使用idea打开,可以看出来是一个spring boot项目:
![](https://image.cha138.com/20220503/81c3197c2fdd405d8a576ee9da03f603.jpg)
修改application.yml配置文件中的namesrvAddrs配置,改为本地namesrv地址:
![](https://image.cha138.com/20220503/56be14f74e684035bc37aafa291276b7.jpg)
启动项目,在浏览器输入localhost:8080,即可看到管理后台界面:
![](https://image.cha138.com/20220503/626980c179304ee2b2d788454e8c93d4.jpg)
6 快速案例
rocketmq的源码中就已经提供了非常多的案例,在example模块中可以找到!
我们找到下面的quickstart包,里面有两个简单的测试类:
![](https://image.cha138.com/20220503/fdbb2bb66f5b43ce9d05fd66ba9a1aed.jpg)
打开Producer类,可以看到里面是一个循环发送1000条消息的代码,非常简单,我们添加一行代码producer.setNamesrvAddr(“127.0.0.1:9876”);指定namesrv的地址:
![](https://image.cha138.com/20220503/9f6640e80cad4c21aa304e3e8305d029.jpg)
运行生产者,可以看到控制台成功输出日志:
![](https://image.cha138.com/20220503/55078c0eb9dd4a628727816dfbda03bb.jpg)
下面启动消费者,同样增加一行代码设置namesrv地址:
![](https://image.cha138.com/20220503/1a26f9af464d4c69b41e384accdaab1c.jpg)
启动之后可以看到成功的消费了数据:
![](https://image.cha138.com/20220503/ca019ddcc4524521bb49da4225a6660a.jpg)
进入localhost:8080控制台,可以看到各种统计信息:
![](https://image.cha138.com/20220503/89d1713e800146c4bd94bb7520b20216.jpg)
到此,我们的案例运行完毕,这证明我们的最小化rocketmq源码搭建并运行成功,此时,万事俱备只欠东风,后面我们将会开始真正的进入源码分析,期待着一起学习吧!
如有需要交流,或者文章有误,请直接留言。另外希望点赞、收藏、关注,我将不间断更新各种Java学习博客!
以上是关于RocketMQ源码(24)—DefaultMQPushConsumer延迟消息源码的主要内容,如果未能解决你的问题,请参考以下文章