rocketMQ -- offset管理
Posted
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了rocketMQ -- offset管理相关的知识,希望对你有一定的参考价值。
参考技术A 在rocketMQ中,offset用来管理每个消费队列的不同消费组的消费进度。对offset的管理分为本地模式和远程模式,本地模式是以文本文件的形式存储在客户端,而远程模式是将数据保存到broker端,对应的数据结构分别为LocalFileOffsetStore和RemoteBrokerOffsetStore。
默认情况下,当消费模式为广播模式时,offset使用本地模式存储,因为每条消息会被所有的消费者消费,每个消费者管理自己的消费进度,各个消费者之间不存在消费进度的交集;当消费模式为集群消费时,则使用远程模式管理offset,消息会被多个消费者消费,不同的是每个消费者只负责消费其中部分消费队列,添加或删除消费者,都会使负载发生变动,容易造成消费进度冲突,因此需要集中管理。同时,RocketMQ也提供接口供用户自己实现offset管理(实现OffsetStore接口)。
生产环境上一般使用集群模式,本文主要记录集群模式下offset的管理,即RemoteBrokerOffsetStore。
rocketMQ的broker端中,offset的是以json的形式持久化到磁盘文件中,文件路径为$user.home/store/config/consumerOffset.json。其内容示例如下:
broker端启动后,会调用BrokerController.initialize()方法,方法中会对offset进行加载,consumerOffsetManager.load()。获取文件内容后,序列化为ConsumerOffsetManager对象,实质是其属性ConcurrentMap<String,ConcurrentMap<Integer, Long>> offsetTable, offsetTable的数据结构为ConcurrentMap,是一个线程安全的容器,key的形式为topic@group(每个topic下不同消费组的消费进度),value也是一个ConcurrentMap,key为queueId,value为消费位移(这里不是offset而是位移)。通过对全局ConsumerOffsetManager对象就可以对各个topic下不同消费组的消费位移进行获取与管理。
如下图所示,producer发送消息到broker之后,会将消息具体内容持久化到commitLog文件中,再分发到topic下的消费队列consume Queue,消费者提交消费请求时,broker从该consumer负责的消费队列中根据请求参数起始offset获取待消费的消息索引信息,再从commitLog中获取具体的消息内容返回给consumer。在这个过程中,consumer提交的offset为本次请求的起始消费位置,即beginOffset;consume Queue中的offset定位了commitLog中具体消息的位置。
consume Queue中每个消息索引信息长度为20bytes,包括8位长度的offset,记录commitLog中消息内容的位移;4位长度的size,记录具体消息内容的长度;8位长度的tagHashCode,记录消息的tag的哈希值(订阅时如果指定tag,会根据HashCode快速查找订阅的消息)
对于consumer的消费请求处理(PullMessageProcessor.processRequest()),除了待消费的消息内容,broker在responseHeader(PullMessageResponseHeader)附带上当前消费队列的最小offset(minOffset)、最大offset(maxOffset)、及下次拉取的起始offset(nextBeginOffset)。
其中nextBeginOffset是consumer在下一轮消息拉取时offset的重要依据,无论当次拉取的消息消费是否正常,nextBeginOffset都不会回滚,这是因为rocketMQ对消费异常的消息的处理是将消息重新发回broker端的重试队列(会为每个topic创建一个重试队列,以%RERTY%开头),达到重试时间后将消息投递到重试队列中进行消费重试。对消费异常的处理不是通过offset回滚,这使得客户端简化了offset的管理。
consumer启动过程中(Consumer主函数默认调用DefaultMQPushConsumer.start()方法)根据MessageModel选择对应的offsetStore,然后调用offsetStore.load()对offset进行加载,LocalFileOffsetStore是对本地文件的加载,而RemotebrokerOffsetStore是没有本地文件的,因此load()方法没有实现。在rebalance完成对messageQueue的分配之后会对messageQueue对应的消费位置offset进行更新。
Push模式下,computePullFromWhere()方法的实现类为RebalancePushImpl.class。根据配置信息consumeFromWhere进行不同的操作。ConsumeFromWhere的类型枚举如下,其中有三个已经被标记为Deprecated(基于rocketmq-all 4.6.0版本)
从最新的offset开始消费。
获取consumer对当前消息队列messageQueue的消费进度lastOffset,如果lastOffset>=0,从lastOffset开始消费;如果lastOffset小于0说明是first start,没有offset信息,topic为重试topic时从0开始消费,否则请求获取该消息队列对应的消费队列consumeQueue的最大offset(maxOffset),从maxOffset开始消费
从第一个offset开始消费。
获取consumer对当前消息队列messageQueue的消费进度lastOffset,如果lastOffset>=0,从lastOffset开始消费;
否则从0开始消费。
获取consumer对当前消息队列messageQueue的消费进度lastOffset,如果lastOffset>=0,从lastOffset开始消费;
当lastOffset<0,如果为重试topic,获取consumeQueue的最大offset;否则获取ConsumeTimestamp(consumer启动时间),根据时间戳请求查找offset。
上述三种消费位置的设置流程有一个共同点,都请求获取consumer对当前消息队列messageQueue的消费进度lastOffset,如果lastOffset不小于0,则从lastOffset开始消费。这也是有时候设置了CONSUME_FROM_FIRST_OFFSET却不是从0开始重新消费的原因,rocketMQ减少了由于配置原因造成的重复消费。
对于lastOffset、maxOffset、时间戳查找offset都是通过MQClientAPIImpl提供的接口进行查询的,MQClientAPIImplclient对broker请求的封装类,使用Netty进行异步请求,对应的RequestCode分别为RequestCode. QUERY_CONSUMER_OFFSET、 RequestCode. GET_MAX_OFFSET、 RequestCode. SEARCH_OFFSET_BY_TIMESTAMP。
consumer从broker拉取消息后,会将消息的扩展信息MessageExt存放到ProcessQueue的属性TreeMap<Long, MessageExt> msgTreeMap 中,key值为消息对应的queueOffset,value为扩展信息(包括queueID等)。并发消费模式下(Concurrently),获取的待消费消息会分批提交给消费线程进行消费,默认批次为1,即每个消费线程消费一条消息。消费完成后调用ConsumerMessageConcurrentlyService.processConsumeResult()方法对结果进行处理:消费成功确认ack,消费失败发回broker进行重试。之后便是对offset的更新操作。
首先是调用ProcessQueue.removeMessage()方法,将已经消费完成的消息从msgTreeMap中根据queueOffset移除,然后判断当前msgTreeMap是否为空,不为空则返回当前msgTreeMap第一个元素,即offset最小的元素,否则返回-1。
如果removeMessage()返回的offset大于0,则更新到offsetTable中。offsetTable的结构为ConcurrentMap<MessageQueue, AtomicLong> offsetTable ,是一个线程安全的Map,key为MessageQueue,value为AtomicLong对象,值为offset,记录当前messageQueue的消费位移。
到这里一条消息的消费流程已经结束,offset更新到了本地缓存offsetTable,而将offset上传到broker是由定时任务执行的。MQClientInstance.start()会启动客户端相关的定时任务,包括NameService通信、offset提交等。
LocalFileOffsetStore模式下,将offset信息转化成json保存到本地文件中;RemoteBrokerOffsetStore则offsetTable将需要提交的MessageQueue的offset信息通过MQClientAPIImpl提供的接口updateConsumerOffsetOneway()提交到broker进行持久化存储。
另一种情况,当应用正常关闭时,consumer的shutdown()方法会主动触发一次持久化offset到broker的操作。
client对offset的更新是在消息消费完成后将offset更新到offsetTable,再由定时任务进行持久化。这个过程有需要注意的地方。
问题:consumer从broker拉取的待消费消息时批量的(默认情况下pullBatchSize=32),并发消费时,offset的更新不是按大小顺序的,比如拉取消息m1到m10,m1可能是最后消费完成的,那提交的offset的正确性如何保证?m10 offset的更新不会导致m1会误认为已消费完成。
上一小节提到消费完成后,会将线程消费的批次消息从msgTreeMap中删除,并返回当前msgTreeMap的第一个元素,也就是拉取批次最小的offset,offsetTable更新的offset一直会是拉取批次中未消费的最小的offset值。也就是m1未消费完成,m10消费完成的情况下,更新到offsetTable的当前messageQueue的消费进度为m1对应的offset值。
因此,offsetTable中存放的可能不是messageQueue真正消费的offset的最大值,但是consumer拉取消息时使用的是上一次拉取请求返回的nextBeginOffset,并不是依据offsetTable,正常情况下不会重复拉取数据。当发生宕机等异常时,与offsetTable未提交宕机异常一样,需要通过业务流程来保证幂等性。业务流程的幂等性是rocketMQ一直强调的。
RocketMQ广播消费本地Offset文件丢失问题探秘
文章目录
今天本来在用RocketMQ做一个大的功能改造,中间有个小问题随意跟了下源码,突然还发现一个小BUG。一通跟踪调试,虽然最后还是没有解决问题,但是受益匪浅。记录一下。
一、问题出发点
我们知道RocketMQ的消费者有两种消费模式MessageModel,一种是集群消费,一种是广播消费。这两种消费机制最本质的区别在于,集群消费是在Broker端保存各个ConsumeGroup的消费进度Offset,而广播消费则是在Consumer本地记录消费进度Offset。
集群消费用得比较多,之前也认真跟过源码。但是广播消费因为用得比较少,所以基本也跟你一样,背过一点八股文,也就没有太过关注。但是,今天一个偶然的业务场景用到了广播消费。当我想要去本地看一下Offset的记录时,却发现怎么也找不到。这到底是我哪个业务配置配错了?还是万能的八股文欺骗了我?这背后藏着什么样的秘密?
问题1:Offset的本地存储的目录如何进行定制?
在RocketMQ的原生API中,提供了DefaultLitePullConsumer和DefaultMQPushConsumer(还有一个DefaultMQPullConsumer已经过时了)。在这两个消费实例中,都可以设定一个MessageModel属性,这个属性有两个枚举值,MessageModel.BROADCASTING(广播消费)和MessageModel.CLUSTERING(集群消费)。对于广播消费的消费者,默认会在消费者程序所在的机器本地的$user.home/rocketmq_offset/$clientIp@DEFAULT/$group/offset.json文件中保存消费进度。
其中$部分问变量。
$user.home为系统所属用户目录。在windows下默认是C:\\Users\\$用户名。
$clientIp是消费者端的IP地址。
$group是消费者指定的所属消费者组。
这里就有了第一个问题:这个本地存储的目录如何进行定制?
问题2:rocketmq-spring-boot-starter插件如何配置这个本地存储目录?
在使用RocketMQ进行业务开发时,经常会用到SpringBoott框架来整合RocketMQ的客户端。于是,会使用到rocketmq提供的下面的Maven依赖包。
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-spring-boot-starter</artifactId>
<version>2.2.1</version>
</dependency>
然后,就可以使用配置或注解的方式来声明客户端,而不用再去接触RocketMQ的原生消费者API了。例如
@Component
@RocketMQMessageListener(consumerGroup = "MyConsumerGroup", topic = "TestTopic",messageModel= MessageModel.BROADCASTING)
public class SpringConsumer implements RocketMQListener<String>
@Override
public void onMessage(String message)
System.out.println("Received message : "+ message);
工作起来没有问题,但是,当你沿着上面的思路,想要去本地找一找这个消费者的本地缓存时,你会发现一个问题,找不到本地Offset文件(不要怀疑,如果你是用windows开发,一定找不到)。
于是,我开始冒出另外的两个问题:**问题2:rocketmq-spring-boot-starter中有几种声明消费者的方式?**是不是我的使用方式不对?**问题3:rocketmq-spring-boot-starter中是如何指定本地目录地址的?**要怎么找到本地缓存?
什么?你还不会用RocketMQ,不知道我在说什么?试试看看我的RocketMQ教程。顺便支持鼓励一下。
二、问题原因分析
查问题的时候是忙忙碌碌瞎查的,现在回顾问题,就把思路整理一下,也便于你理解。
1、RocketMQ中是如何记录广播消费的本地Offset的?
原生的RocketMQ消费者API是基础,所以这个问题必须先梳理清楚。 以下分析RocketMQ源码用的是4.9.1版本
原生的RocketMQ提供了两个消费者对象,DefaultLitePullConsumer和DefaultMQPushConsumer(还有一个DefaultMQPullConsumer已经过时了)。在这两个消费实例中,都可以设定一个MessageModel属性,这个属性有两个枚举值,MessageModel.BROADCASTING和MessageModel.CLUSTERING。这个属性是怎么跟Offset文件存储对应上的呢?其实从服务的启动过程中就能很清晰的跟踪到。
1.1、看看DefaultMQPushConsumer。
他的start()启动方法中,会启动一个defaultMQPushConsumerImpl实例,而这个defaultMQPushConsumerImpl示例会调用一个同步的start()方法。这个start()是同步的,针对Offset文件存盘问题,是为了保证存盘的Offset文件,在内存中只有一份统一的副本。带着今天的问题,可以在这个start()方法中找到一段很刺眼的代码:
if (this.defaultMQPushConsumer.getOffsetStore() != null)
this.offsetStore = this.defaultMQPushConsumer.getOffsetStore();
else
//从这里可以看出,广播模式与集群模式的最本质区别就是offset存储的地方不一样。
switch (this.defaultMQPushConsumer.getMessageModel())
//广播模式是在消费者本地存储offset
case BROADCASTING:
this.offsetStore = new LocalFileOffsetStore(this.mQClientFactory, this.defaultMQPushConsumer.getConsumerGroup());
break;
//集群模式是在Broker远端存储offset
case CLUSTERING:
this.offsetStore = new RemoteBrokerOffsetStore(this.mQClientFactory, this.defaultMQPushConsumer.getConsumerGroup());
break;
default:
break;
this.defaultMQPushConsumer.setOffsetStore(this.offsetStore);
这里就能看到广播模式和集群模式在Offset存储上的区别。
1.2:再来看看DefaultLitePullConsumer。
在他的启动过程中,类似的会创建一个defaultLitePullConsumerImpl对象。而在defaultLitePullConsumerImpl对象的启动过程中,会调用到一个initOffsetStore()方法。这个方法里的实现也是一样的刺眼:
private void initOffsetStore() throws MQClientException
if (this.defaultLitePullConsumer.getOffsetStore() != null)
this.offsetStore = this.defaultLitePullConsumer.getOffsetStore();
else
switch (this.defaultLitePullConsumer.getMessageModel())
case BROADCASTING:
this.offsetStore = new LocalFileOffsetStore(this.mQClientFactory, this.defaultLitePullConsumer.getConsumerGroup());
break;
case CLUSTERING:
this.offsetStore = new RemoteBrokerOffsetStore(this.mQClientFactory, this.defaultLitePullConsumer.getConsumerGroup());
break;
default:
break;
this.defaultLitePullConsumer.setOffsetStore(this.offsetStore);
this.offsetStore.load();
所以,问题自然就集中到了这个LocalFileOffsetStore中了。
1.3:看LocalFileOffsetStore是如何指定存储路径的
RocketMQ的源码中采用了充血模型的实现方式,所有关于LocalFileOffset的业务动作,都集中到了LocalFileOffsetStore.java当中。在他的构造方法当中就直接维护了一个storePath属性来维护本地存储地址。
public class LocalFileOffsetStore implements OffsetStore
//本地存储目录
public final static String LOCAL_OFFSET_STORE_DIR = System.getProperty(
"rocketmq.client.localOffsetStoreDir",
System.getProperty("user.home") + File.separator + ".rocketmq_offsets");
.....
public LocalFileOffsetStore(MQClientInstance mQClientFactory, String groupName)
.....
//本地存储文件
this.storePath = LOCAL_OFFSET_STORE_DIR + File.separator +
this.mQClientFactory.getClientId() + File.separator +
this.groupName + File.separator +
"offsets.json";
.....
}
先看LOCAL_OFFSET_STORE_DIR,这是本地存储的根目录。跟rocketmq.client.localOffsetStoreDir这个系统属性有关,**所以要定制本地存储的目录,只需要设定rocketmq.client.localOffsetStoreDir系统属性即可。**而这个系统属性还没有支持前端配置,所以,修改的方式,只能是在应用启动时手动进行指定。 例如 System.setProperty(“rocketmq.client.localOffsetStoreDir”,“D:/.rockemtq_offset”)。
然后,再来看offsets.json的具体存储路径storePath。看这个结构,就跟最开始直接指出的结论对应上了。但是这样其中还有一个变量,就是clinetId。这个属性是如何指定的呢?是不是可以定制呢?
这里面的mQClientFactory是一个MQClientInstance的实例对象。而MQClientInstance则是RocketMQ中对所有客户端的抽象。生产者和消费者最终都会交由MQClientInstance进行统一的服务启动。在MQClientInstance的start()方法中给所有的客户端定义了一个统一的启动标准,不同的客户端只要按照标准去注册不同的信息即可。而ClientId就是MQClientInstance在初始化的过程中指定的一个属性(初始化过程见MQClientManager.java#getOrCreateMQClientInstance方法)。ClientId的具体生成逻辑则是在ClientConfig对象中的buildMQClientId方法中。
//指定instanceName
private String instanceName = System.getProperty("rocketmq.client.name", "DEFAULT");
//构建clientId
public String buildMQClientId()
StringBuilder sb = new StringBuilder();
sb.append(this.getClientIP());
sb.append("@");
sb.append(this.getInstanceName());
if (!UtilAll.isBlank(this.unitName))
sb.append("@");
sb.append(this.unitName);
return sb.toString();
这里面又引出了一个变量instanceName,这个变量对于广播消息的本地Offset存储路径起了很重要的作用。
-
这个instanceName,默认值是DEFAULT。
-
可以由系统属性rocketmq.client.name来替代这个默认值,这个系统属性依然是没有配置属性定制的,只能手动修改。
-
这个instanceName在ClientConfig中还有对应的setter方法。可以由消费者客户端自行指定。而常用的RocketMQ客户端都是ClientConfig的子类,所以,他们都可以通过setter方法定制instanceName。
这个unitMode和unitName不太清楚具体干嘛的,unitMode默认是false,然后在源码中搜索了一下,好像也就在ClientConfig配置本地存储目录时用到了,在核心业务中并没有太多的作用。
github仓库中只有一条关于unitMode的issue,回复也是目前并没有做什么实际的工作。参见https://github.com/apache/rocketmq/issues/639
1.4:Offsets.json文件何时写入?
基于充血模型设计的好处在于,系统内的所有核心业务能力都在充血实体中统一整理,不需要去其他地方到处乱搜。对于Offsets.json文件的核心写入能力,也一起体现在了LocalFileOffsetStore类中。在LocalFileOffsetStore类中,有一个persistAll方法,实现了文件的写入。
public void persistAll(Set<MessageQueue> mqs)
if (null == mqs || mqs.isEmpty())
return;
OffsetSerializeWrapper offsetSerializeWrapper = new OffsetSerializeWrapper();
for (Map.Entry<MessageQueue, AtomicLong> entry : this.offsetTable.entrySet())
if (mqs.contains(entry.getKey()))
AtomicLong offset = entry.getValue();
offsetSerializeWrapper.getOffsetTable().put(entry.getKey(), offset);
String jsonString = offsetSerializeWrapper.toJson(true);
if (jsonString != null)
try
MixAll.string2File(jsonString, this.storePath);
catch (IOException e)
log.error("persistAll consumer offset Exception, " + this.storePath, e);
在这个方法中可以看到,如果offsets.json文件写入失败,RocketMQ只是记录一条log日志就没事了,甚至连异常都没有往外抛。这意味着如果广播消息本地的offsets.json进度没有更新,RocketMQ不会做任何的补救措施。
不过,RocketMQ的客户端会启动一个线程,不断的尝试将这些offset偏移信息写入到文件当中,这也算是一种处理的方式把。
this.scheduledExecutorService.scheduleAtFixedRate(new Runnable()
@Override
public void run()
try
MQClientInstance.this.persistAllConsumerOffset();
catch (Exception e)
log.error("ScheduledTask persistAllConsumerOffset exception", e);
, 1000 * 10, this.clientConfig.getPersistConsumerOffsetInterval(), TimeUnit.MILLISECONDS);
1.5: 针对问题的总结
经过一通整理,回到之前提到的广播消息的offsets文件内容定制的问题,可以总结出这样几个结论:
- 消费者端存储广播消费的本地offsets文件的默认缓存目录是 System.getProperty(“user.home”) + File.separator + “.rocketmq_offsets” ,可以通过定制 rocketmq.client.localOffsetStoreDir 系统属性进行修改。
- 本地offsets文件在缓存目录中的具体位置与消费者的clientIp 和 instanceName有关。其中instanceName默认是DEFAULT,可以通过定制系统属性 rocketmq.client.name 进行修改。另外,每个消费者对象也可以单独设定instanceName。
- RocketMQ会通过定时任务不断尝试本地Offsets文件的写入,但是,如果本地Offsets文件写入失败,RocketMQ不会进行任何的不就,也就是说不会对业务有很大的影响。
2、rocketmq-spring-boot-starter如何创建RocketMQ的消费者?
接下来开始梳理rocketmq-spring-boot-starter对消费者端的封装。梳理封装的过程,其实也是在整理如何高效的使用RocketMQ的原生API,这一对于加深对于RocketMQ原生API的理解是非常有帮助的。如果你也尝试跟着这篇文章一起梳理源码,记得带上之前剔除的小问题。问题的本身相对比较简单,但是带上一个具体的问题去看源码,绝对会让你看源码的感觉不一样。
rocketmq-spring-boot-starter中在RocketMQ消费者这一块,封装了封装了两种模式,Push模式和Pull模式。
- Push模式就是通过@RocketMQMessageListener注解声明一个RocketMQListener接口的实现类来声明一个消费者。Broker推过来的消息,会自行进入其中的onMessage方法进行处理。
- Pull模式是通过内置的RocketMQTemplate对象的receive方法以及一系列的sendAndReceive方法,由消费者端主动去拉取消息。消息拉取过来后,可以通过里面的convert对象转换成所需要的结果对象。
- 在Pull模式中,如果一个restTemplate实例不够用,还可以通过@ExtRocketMQTemplateConfiguration和@ExtRocketMQConsumerConfiguration两个注解来注册额外的rocketmqTemplate实例。一个用来初始化rocketmqTemplate中的消息发送者。例如发送事务消息时,一个restTemplate只能对应一个事务监听逻辑。如果你的项目中有多个事务消息逻辑,就需要注册多个restTemplate实例,来对应不同的事务监听器。另一个用来初始化rocketmqTemplate中的消费者。
老规矩,如果使用还不太熟练,可以看下我的RocketMQ教程。支持鼓励一下。
接下来,就按Push模式和Pull模式分开来梳理。其中,Push模式是分析的重点,因为这是平常用得做多的一种模式。Pull模式用得比较少,就当做是补充的彩蛋了。
2.1、Push模式
Push模式对于@RocketMQMessageListener注解的处理方式,入口在rocketmq-spring-boot-2.2.1.jar中的org.apache.rocketmq.spring.autoconfigure.ListenerContainerConfiguration类中。
怎么找到的?评经验猜以及碰运气。
这个ListenerContainerConfiguration类继承了Spring当中的SmartInitializingSingleton接口,当Spring容器当中所有非懒加载的实例加载完成后,就会触发他的afterSingletonsInstantiated方法进行初始化。在这个方法中会去扫描所有带有注解@RocketMQMessageListener注解的类,将他注册到内部一个Container容器当中。
public void afterSingletonsInstantiated()
Map<String, Object> beans = this.applicationContext.getBeansWithAnnotation(RocketMQMessageListener.class)
.entrySet().stream().filter(entry -> !ScopedProxyUtils.isScopedTarget(entry.getKey()))
.collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
beans.forEach(this::registerContainer);
这里这个Container可以认为是客户端实例的一个容器,通过这个容器来封装RocketMQ的原生API。
registerContainer的方法挺长的,我这里截取出跟今天的主题相关的几行重要的源码:
private void registerContainer(String beanName, Object bean)
.....
//获取Bean上面的注解
RocketMQMessageListener annotation = clazz.getAnnotation(RocketMQMessageListener.class);
...
//检查注解的配置情况
validate(annotation);
String containerBeanName = String.format("%s_%s", DefaultRocketMQListenerContainer.class.getName(),
counter.incrementAndGet());
GenericApplicationContext genericApplicationContext = (GenericApplicationContext) applicationContext;
//将扫描到的注解转化成为Container,并注册到上下文中。
genericApplicationContext.registerBean(containerBeanName, DefaultRocketMQListenerContainer.class,
() -> createRocketMQListenerContainer(containerBeanName, bean, annotation));
DefaultRocketMQListenerContainer container = genericApplicationContext.getBean(containerBeanName,
DefaultRocketMQListenerContainer.class);
//启动容器,这里就相当于是启动了消费者
if (!container.isRunning())
try
container.start();
catch (Exception e)
log.error("Started container failed. ", container, e);
throw new RuntimeException(e);
log.info("Register the listener to container, listenerBeanName:, containerBeanName:", beanName, containerBeanName);
这其中最关注的,当然是创建容器的createRocketMQListenerContainer方法中。而在这个方法中,你基本看不到RocketMQ的原生API,都是在创建并维护一个DefaultRocketMQListenerContainer对象。而这个DefaultRocketMQListenerContainer类,就是我们今天关注的重点。
DefaultRocketMQListenerContainer类实现了InitializingBean接口,自然要先关注他的afterPropertiesSet方法。这是Spring提供的对象初始化的扩展机制。
public void afterPropertiesSet() throws Exception
initRocketMQPushConsumer();
this.messageType = getMessageType();
this.methodParameter = getMethodParameter();
log.debug("RocketMQ messageType: ", messageType);
这个方法就是用来初始化RocketMQ消费者的。在这个方法里就会创建一个RocketMQ原生的DefaultMQPushConsumer消费者。同样,方法很长,抽取出比较关注的重点源码。
private void initRocketMQPushConsumer() throws MQClientException
.....
//检查并创建consumer对象。
if (Objects.nonNull(rpcHook))
consumer = new DefaultMQPushConsumer(consumerGroup, rpcHook, new AllocateMessageQueueAveragely(),
enableMsgTrace, this.applicationContext.getEnvironment().
resolveRequiredPlaceholders(this.rocketMQMessageListener.customizedTraceTopic()));
consumer.setVipChannelEnabled(false);
else
log.debug("Access-key or secret-key not configure in " + this + ".");
consumer = new DefaultMQPushConsumer(consumerGroup, enableMsgTrace,
this.applicationContext.getEnvironment().
resolveRequiredPlaceholders(this.rocketMQMessageListener.customizedTraceTopic()));
// 定制instanceName,有没有很熟悉!!!
consumer.setInstanceName(RocketMQUtil.getInstanceName(nameServer));
.....
//设定广播消费还是集群消费。
switch (messageModel)
case BROADCASTING:
consumer.setMessageModel(org.apache.rocketmq.common.protocol.heartbeat.MessageModel.BROADCASTING);
break;
case CLUSTERING:
consumer.setMessageModel(org.apache.rocketmq.common.protocol.heartbeat.MessageModel.CLUSTERING);
break;
default:
throw new IllegalArgumentException("Property 'messageModel' was wrong.");
//维护消费者的其他属性。
...
//指定Consumer的消费监听 --》在消费以上是关于rocketMQ -- offset管理的主要内容,如果未能解决你的问题,请参考以下文章
rocketMQ rocketMQ 消息队列Offset和CommitLog