Zookeeper深入剖析(上篇)
Posted 传智教育博学谷
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Zookeeper深入剖析(上篇)相关的知识,希望对你有一定的参考价值。
1.1 ZK特性与集群架构设计
ZK特性:
Zookeeper是分布式协调框架,ZK从设计上有哪些特性?
- 顺序一致性
- 实时性
- 原子性
ZK集群架构设计:
ZK主要分为三种角色:
- Leader(领导者)
- Follower(跟随者)
- Observer(观察者)
1.2 ZK的应用场景
1.2.1 服务注册发现
分布式服务架构中,服务的注册与发现是最核心的基础服务之一,注册中心可以看做是分布式服务架构的通信中心。
Zookeeper是采用ZAB协议保证了数据的强一致性。ZAB协议的实现原理是怎样?ZK是如何实现选举, Paxos算法又是如何运用的?
1.2.2 分布式锁
分布式锁的实现需要注意的:
- 锁的可重入性。
- 锁的超时。
- 锁的阻塞。
- 锁的特性支持。
分布式锁的使用需要注意:
- 分布式锁的开销
- 加锁的粒度
- 加锁的方式
基于ZK的分布式锁实现:
整体思想:每个客户端发起加锁时,生成一个唯一的临时有序节点。 如何判断锁是否创建呢?只需要判断是否存在临时节点(若存在, 则根据序号判断)。
ZK的分布式锁的优缺点
优点: 可以有效的解决单点问题,不可重入问题,非阻塞问题以及锁无法释放的问题。
缺点: 性能上不如基于缓存实现的分布式锁。
ZK是如何实现分布式锁?
- 排它锁
排它锁实现流程:
- 定义锁:通过Zookeeper上的数据节点来表示一个锁
- 获取锁:客户端通过调用
create
方法创建表示锁的临时节点,可以认为创建成功的客户端获得了锁,同时可以让没有获得锁的节点在该节点上注册Watcher监听,以便实时监听到lock节点的变更情况 - 释放锁
- 符合以下两种情况都可以让锁释放
- 当前获得锁的客户端发生宕机或异常,那么Zookeeper上这个临时节点就会被删除
- 正常执行完业务逻辑,客户端主动删除自己创建的临时节点
2.共享锁
基于ZK的共享锁实现流程:
1)定义锁
2) 获取锁
3) 读写顺序判断处理
4) 释放锁
3.共享锁产生的羊群效应解决方案
羊群效应该如何解决呢?
其实只需要改进Watch的监听处理流程:
1.3 ZK数据结构与存储
1.3.1 ZK数据结构模型
ZNode节点类型:
Znode的分为四类:
持久节点(persistent node):节点会被持久化处理,执行命令:create /apps/app1 "order"
。
临时节点(ephemeral node):客户端断开连接后,ZooKeeper会自动删除临时节点, 执行命令:create -e /apps/app1 "order"
。
顺序节点(sequential node):每次创建顺序节点时,ZooKeeper都会在路径后面自动添加上10位的数字,从1开始,最大值为2147483647 (2^32-1),每个顺序节点都有一个单独的计数器,并且单调递增的,由leader
实例维护,执行命令:create -s /apps/app1 "order"
。
临时顺序节点(EPHEMERAL_SEQUENTIAL):基本特性与临时节点一致,创建节点的过程中,zookeeper会在其名字后自动追加一个单调增长的数字后缀,作为新的节点名。
ZNode节点属性:
[zk: localhost:2181(CONNECTED) 1] get /testNode
test
cZxid = 0x2
ctime = Fri Aug 06 22:28:23 CST 2020
mZxid = 0x2
mtime = Fri Aug 06 22:28:23 CST 2020
pZxid = 0x2
cversion = 0
dataVersion = 0
aclVersion = 0
ephemeralOwner = 0x0
dataLength = 4
numChildren = 0
1.3.2 ZK数据存储方式
数据存储方式分为三类:
- 内存数据
- 内存数据结构分为三类:
DataTree
DataNode
ZKDatabase
2.事务日志
事务日志处理流程:
事务日志文件内容示例:
查看日志命令:
java -classpath .:./lib/slf4j-api-1.7.25.jar:./zookeeper-3.4.14.jar org.apache.zookeeper.server.LogFormatter /data/zookeeper/version-2/log.100000001 > log1.log
产生的日志内容:
3/23/20 8:55:19 PM EDT session 0x20003a778cc0012 cxid 0xa9 zxid 0x1000001bc delete '/lock-namespace/shared_lock/order/W-0000000016
3/23/20 8:55:29 PM EDT session 0x20003a778cc0012 cxid 0xb0 zxid 0x1000001bd delete '/lock-namespace/shared_lock/order/W-0000000017
3/23/20 9:46:18 PM EDT session 0x20003a778cc0012 cxid 0xb1 zxid 0x1000001be create '/lock-namespace/shared_lock/order/W-0000000018,#3139322e3136382e3132332e313033,vs31,s'world,'anyone,T,19
3/23/20 9:46:38 PM EDT session 0x20003a778cc0012 cxid 0xb4 zxid 0x1000001bf delete '/lock-namespace/shared_lock/order/W-0000000018
3.数据快照(snapshot)
快照查看命令:
java -classpath .:./lib/slf4j-api-1.7.25.jar:./zookeeper-3.4.14.jar org.apache.zookeeper.server.SnapshotFormatter /data/zookeeper/version-2/snapshot.100000000 > snap1.log
其处理步骤如下:
1) 检查是否需要进行数据快照,每进行一次事务日志记录之后,Zookeeper都会检测当前是否需要进行数据快照,考虑到数据快照对于Zookeeper机器的影响,需要尽量避免ZK集群中的所有机器在同一时刻进行数据快照。采用过半随机策略进行数据快照操作。
2) 切换事务日志文件,表示当前的事务日志已经写满,需要重新创建一个新的事务日志。
3) 创建数据快照的异步线程,创建单独的异步线程来进行数据快照以避免影响Zookeeper主线程的运行状态。
4) 获取全量数据和会话信息,从ZKDatabase数据库中获取到DataTree和会话信息。
5) 生成快照数据文件名,ZK根据当前已经提交的最大ZXID来生成数据快照文件名。
6) 数据序列化,首先序列化文件头信息,然后再对会话信息和DataTree分别进行序列化,同时生成一个Checksum校验文件,一并写入快照文件中。
精华推荐 | 深入浅出 RocketMQ原理及实战「底层源码挖掘系列」透彻剖析贯穿RocketMQ的消费者端的运行核心的流程(上篇)
精华推荐 | 【深入浅出 RocketMQ原理及实战】「底层源码挖掘系列」透彻剖析贯穿RocketMQ的消费者端的运行核心的流程
- 上篇:分析对应总体消费流程的判断和校验以及限流控制和回调等处理流程分析
- 下篇:分析基于上篇的总体流程的底层的消息通讯以及拉去处理数据传输流程分析
RocketMQ的消息模型
RocketMQ的基础消息模型是发布-订阅(Pub/Sub)是一种消息范式,消息的发送者(称为发布者、生产者、Producer)会将消息直接发送给特定的接收者(称为订阅者、消费者、Comsumer),如下图所示。
消息通过生产者发送到某一个Topic,如果需要订阅该Topic并消费里面的消息的话,就要创建对应的消费者进行消费,而本文主要会进行介绍对应的消息队列的消费者。
本文主旨
本文主要会针对于RocketMQ的消费者Consumer的功能原理进行分析和介绍,消费者主要会通过以推(push),拉(pull)两种模式对消息进行消费。同时也支持集群方式和广播方式的消费。提供实时消息订阅机制,可以满足大多数用户的需求。
RocketMQ提供Push模式也提供了Pull模式
MQ的消费模式可以大致分为两种,一种是推Push,一种是拉Pull。
Push模式处理消费消费
Push是服务端主动推送消息给客户端,优点是及时性较好,但如果客户端没有做好流控,一旦服务端推送大量消息到客户端时,就会导致客户端消息堆积甚至崩溃。
DefaultMQPushConsumer的使用和初始化
Push模式主要通过初始化DefaultMQPushConsumer对象进行消费数据信息,案例代码如下所示。
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("consumer_group");
// 设置NameServer地址
consumer.setNamesrvAddr("localhost:9876");
//订阅一个或多个topic,并指定tag过滤条件,这里指定*表示接收所有tag的消息
consumer.subscribe("TopicTest", "*");
//注册回调接口来处理从Broker中收到的消息
consumer.registerMessageListener(new MessageListenerConcurrently()
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context)
// 返回消息消费状态,ConsumeConcurrentlyStatus.CONSUME_SUCCESS为消费成功
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
);
// 启动Consumer
consumer.start();
消费的位点配置
消费端的消费的位点计算值,可以在启动前进行配置,主要方法可以通过下面代码进行配置。
consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
- CONSUME_FROM_LAST_OFFSET:第一次启动从队列最后位置消费,后续再启动接着上次消费的进度开始消费
- CONSUME_FROM_FIRST_OFFSET:第一次启动从队列初始位置消费,后续再启动接着上次消费的进度开始消费
- CONSUME_FROM_TIMESTAMP:第一次启动从指定时间点位置消费,后续再启动接着上次消费的进度开始消费。
注意:第一次启动是指从来没有消费过消息的消费者,如果该消费者消费过,那么会在broker端记录该消费者的消费位置,如果该消费者挂了再启动,那么自动从上次消费的进度开始。
消费的模式的配置
消费模式主要分为:集群消费(Clustering)和广播消费(Broadcasting)这两种。
- 集群消费模式下,相同Consumer Group的每个Consumer实例平均分摊消息。
- 广播消费模式下,相同Consumer Group的每个Consumer实例都接收全量的消息。
consumer.setMessageModel(MessageModel.BROADCASTING);
- CLUSTERING:默认模式,同一个ConsumerGroup,每个consumer只消费所订阅消息的一部分内容,同一个ConsumerGroup里所有的Consumer消息加起来才是所订阅topic整体,从而达到负载均衡的目的。
- BROADCASTING:同一个ConsumerGroup每个consumer都消费到所订阅topic所有消息,也就是一个消费会被多次分发,被多个consumer消费。
DefaultMQPushConsumer的运行原理和流程
DefaultMQPushConsumerImpl中各个对象的主要功能如下:
平衡和分配队列组件实现类-RebalancePushImpl
RebalancePushImpl:主要负责进行分配对应当前服务实例的消费者会从当前消费的topic中的那个Queue中进行消费消息;此外当消费者宕机或者下线的时候,还会执行rebalance再次平衡和分配给其他消费者对应的队列控制。
长连接进行拉去消息组件实现类-PullAPIWrapper
PullAPIWrapper:主要与broker服务端建立长连接,一直进行定时从broker服务端处拉取消息数据,默认为:32条消息,之后还会调用ConsumeMessageService实现类,进行用户注册的Listener执行消息消费逻辑。
看一下consumer.registerMessageListener的源码,如下所示。
/**
* Register a callback to execute on message arrival for concurrent consuming.
* @param messageListener message handling callback.
*/
@Override
public void registerMessageListener(MessageListenerConcurrently messageListener)
this.messageListener = messageListener;
this.defaultMQPushConsumerImpl.registerMessageListener(messageListener);
回调用户的注册的MessageListener组件实现类-ConsumeMessageService
ConsumeMessageService:实现所谓的"Push-被动"消费机制;从Broker拉取的消息后,封装成ConsumeRequest提交给ConsumeMessageSerivce,此service负责回调用户的Listener消费消息。
存储Offset的消费记录的位移组件实现类--OffsetStore
OffsetStore:维护当前consumer的消费记录(offset);有两种实现,Local和Rmote,Local存储在本地磁盘上,适用于BROADCASTING广播消费模式;而Remote则将消费进度存储在Broker上,适用于CLUSTERING集群消费模式;
综合门面功能接口供各个Service组件实现类--MQClientFactory
MQClientFactory:负责管理client(consumer、producer),并提供多中功能接口供各个Service(Rebalance、PullMessage等)调用;大部分逻辑均在这个类中完成,总体流程架构如下图所示。
DefaultMQPushConsumerImpl的start方法的源码
我们先来看一下对应的DefaultMQPushConsumerImpl类的start方法源码,源码可以看出主要实现过程在consumer.start后调用DefaultMQPushConsumerImpl的同步start方法,如下所示。
public synchronized void start() throws MQClientException
switch (this.serviceState)
case CREATE_JUST:
log.info("the consumer [] start beginning. messageModel=, isUnitMode=", this.defaultMQPushConsumer.getConsumerGroup(),
this.defaultMQPushConsumer.getMessageModel(), this.defaultMQPushConsumer.isUnitMode());
this.serviceState = ServiceState.START_FAILED;
this.checkConfig();
this.copySubscription();
if (this.defaultMQPushConsumer.getMessageModel() == MessageModel.CLUSTERING)
this.defaultMQPushConsumer.changeInstanceNameToPID();
this.mQClientFactory = MQClientManager.getInstance().getAndCreateMQClientInstance(this.defaultMQPushConsumer, this.rpcHook);
this.rebalanceImpl.setConsumerGroup(this.defaultMQPushConsumer.getConsumerGroup());
this.rebalanceImpl.setMessageModel(this.defaultMQPushConsumer.getMessageModel());
this.rebalanceImpl.setAllocateMessageQueueStrategy(this.defaultMQPushConsumer.getAllocateMessageQueueStrategy());
this.rebalanceImpl.setmQClientFactory(this.mQClientFactory);
this.pullAPIWrapper = new PullAPIWrapper(
mQClientFactory,
this.defaultMQPushConsumer.getConsumerGroup(), isUnitMode());
this.pullAPIWrapper.registerFilterMessageHook(filterMessageHookList);
if (this.defaultMQPushConsumer.getOffsetStore() != null)
this.offsetStore = this.defaultMQPushConsumer.getOffsetStore();
else
switch (this.defaultMQPushConsumer.getMessageModel())
case BROADCASTING:
this.offsetStore = new LocalFileOffsetStore(this.mQClientFactory, this.defaultMQPushConsumer.getConsumerGroup());
break;
case CLUSTERING:
this.offsetStore = new RemoteBrokerOffsetStore(this.mQClientFactory, this.defaultMQPushConsumer.getConsumerGroup());
break;
default:
break;
this.defaultMQPushConsumer.setOffsetStore(this.offsetStore);
this.offsetStore.load();
if (this.getMessageListenerInner() instanceof MessageListenerOrderly)
this.consumeOrderly = true;
this.consumeMessageService =
new ConsumeMessageOrderlyService(this, (MessageListenerOrderly) this.getMessageListenerInner());
else if (this.getMessageListenerInner() instanceof MessageListenerConcurrently)
this.consumeOrderly = false;
this.consumeMessageService =
new ConsumeMessageConcurrentlyService(this, (MessageListenerConcurrently) this.getMessageListenerInner());
this.consumeMessageService.start();
boolean registerOK = mQClientFactory.registerConsumer(this.defaultMQPushConsumer.getConsumerGroup(), this);
if (!registerOK)
this.serviceState = ServiceState.CREATE_JUST;
this.consumeMessageService.shutdown();
throw new MQClientException("The consumer group[" + this.defaultMQPushConsumer.getConsumerGroup()
+ "] has been created before, specify another name please." + FAQUrl.suggestTodo(FAQUrl.GROUP_NAME_DUPLICATE_URL),
null);
mQClientFactory.start();
log.info("the consumer [] start OK.", this.defaultMQPushConsumer.getConsumerGroup());
this.serviceState = ServiceState.RUNNING;
break;
case RUNNING:
case START_FAILED:
case SHUTDOWN_ALREADY:
throw new MQClientException("The PushConsumer service state not OK, maybe started once, "
+ this.serviceState
+ FAQUrl.suggestTodo(FAQUrl.CLIENT_SERVICE_NOT_OK),
null);
default:
break;
this.updateTopicSubscribeInfoWhenSubscriptionChanged();
this.mQClientFactory.checkClientInBroker();
this.mQClientFactory.sendHeartbeatToAllBrokerWithLock();
this.mQClientFactory.rebalanceImmediately();
DefaultMQPushConsumerImpl的start方法的源码
主要我们能关注重点的代码和组件的start,通过mQClientFactory.start();发我们发现他调用了很多组件的start方法:
- this.mQClientAPIImpl.start():主要用于开启请求-响应的网络通道对象。
- this.startScheduledTask():主要开启多个定时任务的功能
- this.pullMessageService.start():主要开启拉取数据的业务组件。
- this.rebalanceService.start():主要开启rebalance业务服务组件。
- this.defaultMQProducer.getDefaultMQProducerImpl().start(false):开启push服务的对象组件作为门面。
public void start() throws MQClientException
synchronized (this)
switch (this.serviceState)
case CREATE_JUST:
this.serviceState = ServiceState.START_FAILED;
// If not specified,looking address from name server
if (null == this.clientConfig.getNamesrvAddr())
this.mQClientAPIImpl.fetchNameServerAddr();
// Start request-response channel
this.mQClientAPIImpl.start();
// Start various schedule tasks
this.startScheduledTask();
// Start pull service
this.pullMessageService.start();
// Start rebalance service
this.rebalanceService.start();
// Start push service
this.defaultMQProducer.getDefaultMQProducerImpl().start(false);
log.info("the client factory [] start OK", this.clientId);
this.serviceState = ServiceState.RUNNING;
break;
case RUNNING:
break;
case SHUTDOWN_ALREADY:
break;
case START_FAILED:
throw new MQClientException("The Factory object[" + this.getClientId() + "] has been created before, and failed.", null);
default:
break;
重点我们先来主要看pullMessageService.start(),通过这里我们发现RocketMQ的Push模式底层其实也是通过pull实现的,接下来我们先来分析一下pullMessageService中的pullMessage方法的源码。
private void pullMessage(final PullRequest pullRequest)
final MQConsumerInner consumer = this.mQClientFactory.selectConsumer(pullRequest.getConsumerGroup());
if (consumer != null)
DefaultMQPushConsumerImpl impl = (DefaultMQPushConsumerImpl) consumer;
impl.pullMessage(pullRequest);
else
log.warn("No matched consumer for the PullRequest , drop it", pullRequest);
DefaultMQPushConsumerImpl的pullMessage方法的源码
源码中需要进行根据消费组进行筛选对应的消费组,以方便选对应的消费组件DefaultMQPushConsumerImpl,如下图所示。
final MQConsumerInner consumer = this.mQClientFactory.selectConsumer(pullRequest.getConsumerGroup());
最后还是通过DefaultMQPushConsumerImpl类的pullMessage方法来进行消息的逻辑处理,
DefaultMQPushConsumerImpl impl = (DefaultMQPushConsumerImpl) consumer;
impl.pullMessage(pullRequest);
DefaultMQPushConsumerImpl的逻辑限流控制流程
总结一下针对于限流的总体流程控制:
- 首先拉去消息数据的时候会先去判断对应的ProcessQueue的对象元素是否还存在订阅关系或者被删除了,从而进行跳过那些不应该被消费的数据。
final ProcessQueue processQueue = pullRequest.getProcessQueue();
if (processQueue.isDropped())
log.info("the pull request[] is dropped.", pullRequest.toString());
return;
上面的逻辑是先会判断和校验PullRequest对象中的ProcessQueue对象的dropped是否为true(在RebalanceService线程中为topic下的MessageQueue创建拉取消息请求时要维护对应的ProcessQueue对象,若Consumer不再订阅该topic则会将该对象的dropped置为true);若是则认为该请求是已经取消的,则直接跳出该方法。
- 更新PullRequest对象中的ProcessQueue对象的时间戳(ProcessQueue.lastPullTimestamp)为当前时间戳。此外会判断当前的Consumer消费者组件是否运行中,主要是通过DefaultMQPushConsumerImpl.serviceState是否为RUNNING。
pullRequest.getProcessQueue().setLastPullTimestamp(System.currentTimeMillis());
try
this.makeSureStateOK();
catch (MQClientException e)
log.warn("pullMessage exception, consumer state not ok", e);
this.executePullRequestLater(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_EXCEPTION);
return;
if (this.isPause())
log.warn("consumer was paused, execute pull request later. instanceName=, group=", this.defaultMQPushConsumer.getInstanceName(), this.defaultMQPushConsumer.getConsumerGroup());
this.executePullRequestLater(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_SUSPEND);
return;
如果运行状态或者是暂停状态this.isPause()=false(DefaultMQPushConsumerImpl.pause=true),则会进行执行PullMessageService.executePullRequestLater(PullRequest pullRequest, long timeDelay)
方法延迟再拉取消息,其中timeDelay=3000;
该方法的目的是在3秒之后再次将该PullRequest对象放入PullMessageService. pullRequestQueue队列中;并跳出该方法
- 主要进行消费者端进行速度和控制消费速度的流控。若ProcessQueue对象的msgCount大于了消费端的流控阈值,默认值为1000,主要通过
DefaultMQPushConsumer.pullThresholdForQueue
的执行进行判断。当调用的processQueue.getMsgCount().get()
的数值大于DefaultMQPushConsumer.pullThresholdForQueue
的值时候会进行 PullMessageService.executePullRequestLater方法。
long cachedMessageCount = processQueue.getMsgCount().get();
long cachedMessageSizeInMiB = processQueue.getMsgSize().get() / (1024 * 1024);
if (cachedMessageCount > this.defaultMQPushConsumer.getPullThresholdForQueue())
this.executePullRequestLater(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_FLOW_CONTROL);
if ((queueFlowControlTimes++ % 1000) == 0)
log.warn(
"the cached message count exceeds the threshold , so do flow control, minOffset=, maxOffset=, count=, size= MiB, pullRequest=, flowControlTimes=",
this.defaultMQPushConsumer.getPullThresholdForQueue(), processQueue.getMsgTreeMap().firstKey(), processQueue.getMsgTreeMap().lastKey(), cachedMessageCount, cachedMessageSizeInMiB, pullRequest, queueFlowControlTimes);
return;
- 主要进行消费者端进行速度和控制消费速度的流控。主要会通过
this.defaultMQPushConsumer.getPullThresholdSizeForQueue()
与进行计算消息的内存空间的总大小进行对比,单位是M,当大于系统定义的 this.defaultMQPushConsumer.getPullThresholdSizeForQueue()
的阈值大小的时候,则会进行限流处理。
if (cachedMessageSizeInMiB > this.defaultMQPushConsumer.getPullThresholdSizeForQueue())
this.executePullRequestLater(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_FLOW_CONTROL);
if ((queueFlowControlTimes++ % 1000) == 0)
log.warn(
"the cached message size exceeds the threshold MiB, so do flow control, minOffset=, maxOffset=, count=, size= MiB, pullRequest=, flowControlTimes=",
this.defaultMQPushConsumer.getPullThresholdSizeForQueue(), processQueue.getMsgTreeMap().firstKey(), processQueue.getMsgTreeMap().lastKey(), cachedMessageCount, cachedMessageSizeInMiB, pullRequest, queueFlowControlTimes);
return;
以上3和4步骤中的(DELAY_MILLS_WHEN_FLOW_CONTROL)50毫秒之后,才会将该PullRequest请求放入PullMessageService.pullRequestQueue队列中。从而实现看限流的能力。
- 当上面的直接限流之后,还会有跨度限流的控制,首先系统还会判断当前的消费方式是否顺序消费(即DefaultMQPushConsumerImpl.consumeOrderly等于false)。
if (!this.consumeOrderly)
if (processQueue.getMaxSpan() > this.defaultMQPushConsumer.getConsumeConcurrentlyMaxSpan())
this.executePullRequestLater(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_FLOW_CONTROL);
if ((queueMaxSpanFlowControlTimes++ % 1000) == 0)
log.warn(
"the queues messages, span too long, so do flow control, minOffset=, maxOffset=, maxSpan=, pullRequest=, flowControlTimes=",
processQueue.getMsgTreeMap().firstKey(), processQueue.getMsgTreeMap().lastKey(), processQueue.getMaxSpan(),
pullRequest, queueMaxSpanFlowControlTimes);
return;
则检查ProcessQueue对象的msgTreeMap:TreeMap<Long,MessageExt>
变量的第一个key值与最后一个key值之间的的差值,该key值表示查询的队列偏移量queueoffset;
若queueoffset差值大于阈值(DefaultMQPushConsumer. consumeConcurrentlyMaxSpan指定,默认是2000),则调用PullMessageService.executePullRequestLater方法,在50毫秒之后再该PullRequest请求放入PullMessageService.pullRequestQueue队列中。
- PullRequest.messageQueue对象的topic值为参数RebalanceImpl.subscriptionInner,ConcurrentHashMap, SubscriptionData>中获取对应的SubscriptionData对象,若该对象为null,考虑到并发的关系,调用executePullRequestLater方法。
final SubscriptionData subscriptionData = this.rebalanceImpl.getSubscriptionInner().get(pullRequest.getMessageQueue().getTopic());
if (null == subscriptionData)
this.executePullRequestLater(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_EXCEPTION);
log.warn("find the consumers subscription failed, ", pullRequest);
return;
DefaultMQPushConsumerImpl的消费模式分类
如果当前的消费者属于集群模式(RebalanceImpl.messageModel等于CLUSTERING)。
- PullRequest对象的MessageQueue变量值,type =READ_FROM_MEMORY(从内存中获取消费进度offset值)为参数调用DefaultMQPushConsumerImpl的offsetStore对象。
- 实际代表着RemoteBrokerOffsetStore对象的readOffset(MessageQueue mq, ReadOffsetType type)方法从本地内存中获取消费进度offset值。
boolean commitOffsetEnable = false;
long commitOffsetValue = 0L;
if (MessageModel.CLUSTERING == this.defaultMQPushConsumer.getMessageModel())
commitOffsetValue = this.offsetStore.readOffset(pullRequest.getMessageQueue(), ReadOffsetType.READ_FROM_MEMORY);
if (commitOffsetValue > 0)
commitOffsetEnable = true;
若该offset值大于0,则置临时变量commitOffsetEnable等于true否则为false;该offset值作为pullKernelImpl方法中的commitOffset参数,在Broker端拉取消息之后根据commitOffsetEnable参数值决定是否用该offset更新消息进度。
该readOffset方法的逻辑是:以入参MessageQueue对象从RemoteBrokerOffsetStore.offsetTable:ConcurrentHashMap <MessageQueue,AtomicLong>变量中获取消费进度偏移量;若该偏移量不为null则返回该值,否则返回-1;
DefaultMQPushConsumerImpl的订阅模型
当每次拉取消息之后需要更新订阅关系(由DefaultMQPushConsumer. postSubscriptionWhenPull参数表示,默认为false)并且以topic值参数从RebalanceImpl.subscriptionInner获取的SubscriptionData对象的classFilterMode等于false(默认为false),则将sysFlag标记的第3个字节置为1,否则该字节置为0;
SubscriptionData sd = this.rebalanceImpl.getSubscriptionInner().get(pullRequest.getMessageQueue().getTopic());
if (sd != null)
if (this.defaultMQPushConsumer.isPostSubscriptionWhenPull() && !sd.isClassFilterMode())
subExpression = sd.getSubString();
classFilter = sd.isClassFilterMode();
int sysFlag = PullSysFlag.buildSysFlag(
commitOffsetEnable, // commitOffset
true, // suspend
subExpression != null, // subscription
classFilter // class filter
);
该sysFlag标记的第1个字节置为commitOffsetEnable的值;第2个字节(suspend标记)置为1;第4个字节置为classFilterMode的值。
DefaultMQPushConsumerImpl的底层客户端如何拉取消息的通信方法
调用底层的拉取消息API接口,方法进行消息拉取操作。
PullAPIWrapper.pullKernelImpl(MessageQueue mq, String subExpression, long subVersion,long offset, int maxNums, int sysFlag,long commitOffset,long brokerSuspendMaxTimeMillis, long timeoutMillis, CommunicationMode communicationMode, PullCallback pullCallback)
内部会存在将回调类PullCallback传入该方法中,当采用异步方式拉取消息时,在收到响应之后会回调该回调类的方法。
try
this.pullAPIWrapper.pullKernelImpl(
pullRequest.getMessageQueue(),
subExpression,
subscriptionData.getExpressionType(),
subscriptionData.getSubVersion(),
pullRequest.getNextOffset(),
this.defaultMQPushConsumer.getPullBatchSize(),
sysFlag,
commitOffsetValue,
BROKER_SUSPEND_MAX_TIME_MILLIS,
CONSUMER_TIMEOUT_MILLIS_WHEN_SUSPEND,
// 消息的通信方式为异步
CommunicationMode.ASYNC,
pullCallback
);
catch (Exception e)
log.error("pullKernelImpl exception", e);
this.executePullRequestLater(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_EXCEPTION);
初始化匿名内部类PullCallback,实现了onSuccess/onException方法; 该方法只有在异步请求的情况下才会回调;
PullCallback pullCallback = new PullCallback()
@Override
public void onSuccess(PullResult pullResult)
if (pullResult != null)
pullResult = DefaultMQPushConsumerImpl.this.pullAPIWrapper.processPullResult(pullRequest.getMessageQueue(), pullResult,
subscriptionData);
switch (pullResult.getPullStatus())
case FOUND:
long prevRequestOffset = pullRequest.getNextOffset();
pullRequest.setNextOffset(pullResult.getNextBeginOffset());
long pullRT = System.currentTimeMillis() - beginTimestamp;
DefaultMQPushConsumerImpl.this.getConsumerStatsManager().incPullRT(pullRequest.getConsumerGroup(),
pullRequest.getMessageQueue().getTopic(), pullRT);
long firstMsgOffset = Long.MAX_VALUE;
if (pullResult.getMsgFoundList() == null || pullResult.getMsgFoundList().isEmpty())
DefaultMQPushConsumerImpl.this.executePullRequestImmediately(pullRequest);
else
firstMsgOffset = pullResult.getMsgFoundList().get(0).getQueueOffset();
DefaultMQPushConsumerImpl.this.getConsumerStatsManager().incPullTPS(pullRequest.getConsumerGroup(),
pullRequest.getMessageQueue().getTopic(), pullResult.getMsgFoundList().size());
boolean dispatchToConsume = processQueue.putMessage(pullResult.getMsgFoundList());
DefaultMQPushConsumerImpl.this.consumeMessageService.submitConsumeRequest(
pullResult.getMsgFoundList(),
processQueue,
pullRequest.getMessageQueue(),
dispatchToConsume);
if (DefaultMQPushConsumerImpl.this.defaultMQPushConsumer.getPullInterval() > 0)
DefaultMQPushConsumerImpl.this.executePullRequestLater(pullRequest,
DefaultMQPushConsumerImpl.this.defaultMQPushConsumer.getPullInterval());
else
DefaultMQPushConsumerImpl.this.executePullRequestImmediately(pullRequest);
if (pullResult.getNextBeginOffset() < prevRequestOffset
|| firstMsgOffset < prevRequestOffset)
log.warn(
"[BUG] pull message result maybe data wrong, nextBeginOffset: firstMsgOffset: prevRequestOffset: ",
pullResult.getNextBeginOffset(),
firstMsgOffset,
prevRequestOffset);
break;
case NO_NEW_MSG:
pullRequest.setNextOffset(pullResult.getNextBeginOffset());
DefaultMQPushConsumerImpl.this.correctTagsOffset(pullRequest);
DefaultMQPushConsumerImpl.this.executePullRequestImmediately(pullRequest);
break;
case NO_MATCHED_MSG:
pullRequest.setNextOffset(pullResult.getNextBeginOffset());
DefaultMQPushConsumerImpl.this.correctTagsOffset(pullRequest);
DefaultMQPushConsumerImpl.this.executePullRequestImmediately(pullRequest);
break;
case OFFSET_ILLEGAL:
log.warn("the pull request offset illegal, ",
pullRequest.toString(), pullResult.toString());
pullRequest.setNextOffset(pullResult.getNextBeginOffset());
pullRequest.getProcessQueue().setDropped(true);
DefaultMQPushConsumerImpl.this.executeTaskLater(new Runnable()
@Override
public void run()
try
DefaultMQPushConsumerImpl.this.offsetStore.updateOffset(pullRequest.getMessageQueue(),
pullRequest.getNextOffset(), false);
DefaultMQPushConsumerImpl.this.offsetStore.persist(pullRequest.getMessageQueue());
DefaultMQPushConsumerImpl.this.rebalanceImpl.removeProcessQueue(pullRequest.getMessageQueue());
log.warn("fix the pull request offset, ", pullRequest);
catch (Throwable e)
log.error("executeTaskLater Exception", e);
, 10000);
break;
default:
break;
@Override
public void onException(Throwable e)
if (!pullRequest.getMessageQueue().getTopic().startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX))
log.warn("execute the pull request exception", e);
DefaultMQPushConsumerImpl.this.executePullRequestLater(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_EXCEPTION);
;
主要针对于拉去底层消息的状态进行分状态处理,针对于this.pullAPIWrapper.pullKernelImpl的方法,我会在【下篇】进行介绍,此处不做讲述分析。
此外针对于PullStatus状态的分析-FOUND状态的处理,主要更新本地的offset值,以及流程控制等。如下所示。
long prevRequestOffset = pullRequest.getNextOffset();
pullRequest.setNextOffset(pullResult.getNextBeginOffset());
long pullRT = System.currentTimeMillis() - beginTimestamp;
DefaultMQPushConsumerImpl.this.getConsumerStatsManager().incPullRT(pullRequest.getConsumerGroup(),
pullRequest.getMessageQueue().getTopic(), pullRT);
long firstMsgOffset = Long.MAX_VALUE;
if (pullResult.getMsgFoundList() == null || pullResult.getMsgFoundList().isEmpty())
DefaultMQPushConsumerImpl.this.executePullRequestImmediately(pullRequest);
else
firstMsgOffset = pullResult.getMsgFoundList().get(0).getQueueOffset(); DefaultMQPushConsumerImpl.this.getConsumerStatsManager().incPullTPS(pullRequest.getConsumerGroup(),
pullRequest.getMessageQueue().getTopic(), pullResult.getMsgFoundList().size());
boolean dispatchToConsume = processQueue.putMessage(pullResult.getMsgFoundList());
DefaultMQPushConsumerImpl.this.consumeMessageService.submitConsumeRequest(
pullResult.getMsgFoundList(),
processQueue,
pullRequest.getMessageQueue(),
dispatchToConsume);
if (DefaultMQPushConsumerImpl.this.defaultMQPushConsumer.getPullInterval() > 0)
DefaultMQPushConsumerImpl.this.executePullRequestLater(pullRequest,
DefaultMQPushConsumerImpl.this.defaultMQPushConsumer.getPullInterval());
else
DefaultMQPushConsumerImpl.this.executePullRequestImmediately(pullRequest);
if (pullResult.getNextBeginOffset() < prevRequestOffset
|| firstMsgOffset < prevRequestOffset)
log.warn(
"[BUG] pull message result maybe data wrong, nextBeginOffset: firstMsgOffset: prevRequestOffset: ",
pullResult.getNextBeginOffset(),
firstMsgOffset,
prevRequestOffset);
break;
- 此外针对于PullStatus状态的分析-NO_NEW_MSG状态的处理,主要进行更新topic标签的offset的数据值、以及下一次拉去消息的offset值。
pullRequest.setNextOffset(pullResult.getNextBeginOffset());
DefaultMQPushConsumerImpl.this.correctTagsOffset(pullRequest);
DefaultMQPushConsumerImpl.this.executePullRequestImmediately(pullRequest);
- 此外针对于PullStatus状态的分析-NO_MATCHED_MSG:状态的处理,主要进行更新topic标签的offset的数据值、以及下一次拉去消息的offset值。
pullRequest.setNextOffset(pullResult.getNextBeginOffset());
DefaultMQPushConsumerImpl.this.correctTagsOffset(pullRequest);
DefaultMQPushConsumerImpl.this.executePullRequestImmediately(pullRequest);
- 此外针对于PullStatus状态的分析-OFFSET_ILLEGAL:状态的处理,重新同步远程的offset数据值。
pullRequest.getProcessQueue().setDropped(true);
DefaultMQPushConsumerImpl.this.executeTaskLater(new Runnable()
@Override
public void run()
try
DefaultMQPushConsumerImpl.this.offsetStore.updateOffset(pullRequest.getMessageQueue(),
pullRequest.getNextOffset(), false);
DefaultMQPushConsumerImpl.this.offsetStore.persist(pullRequest.getMessageQueue()); DefaultMQPushConsumerImpl.this.rebalanceImpl.removeProcessQueue(pullRequest.getMessageQueue());
log.warn("fix the pull request offset, ", pullRequest);
catch (Throwable e)
log.error("executeTaskLater Exception", e);
, 10000);
以上是关于Zookeeper深入剖析(上篇)的主要内容,如果未能解决你的问题,请参考以下文章