RocketMQ:消息ACK机制源码解析
Posted 又蠢又笨的懒羊羊程序猿
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了RocketMQ:消息ACK机制源码解析相关的知识,希望对你有一定的参考价值。
消息消费进度
概述
消费者消费消息过程中,为了避免消息的重复消费,应将消息消费进度保存起来,当其他消费者再对消息进行消费时,读取已消费的消息偏移量,对之后的消息进行消费即可。
消息模式分为两种:
- 集群模式:一条消息只能被一个消费者消费
- 广播模式:一条消息被所有消费者都消费一次
广播模式下,消息被所有消费者消费,因此消息消费的进度可以跟消费端保存在一起,即本地保存。
集群模式下,消息只能被集群内的一个消费者消费,进度不能保存在消费端,否则会导致消息重复消费,因此集群模式下消息进度集中保存在Broker中。
消息进度存储接口
OffsetStore
public interface OffsetStore
//加载消息消费进度
void load() throws MQClientException;
//更新消费进度并保存在内存中
void updateOffset(final MessageQueue mq, final long offset, final boolean increaseOnly);
//从本地存储中获取消息消费进度
long readOffset(final MessageQueue mq, final ReadOffsetType type);
//保存所有消息消费进度-本地/远程
void persistAll(final Set<MessageQueue> mqs);
void persist(final MessageQueue mq);
//移除偏移量
void removeOffset(MessageQueue mq);
//根据Topic克隆一份消息队列消费进度缓存表
Map<MessageQueue, Long> cloneOffsetTable(String topic);
//更新消费进度到Broker
void updateConsumeOffsetToBroker(MessageQueue mq, long offset, boolean isOneway) throws RemotingException,
MQBrokerException, InterruptedException, MQClientException;
DefaultMQPushConsumerImpl#start
switch (this.defaultMQPushConsumer.getMessageModel())
case BROADCASTING:
//广播模式下 将消息消费进度存储到本地
this.offsetStore = new LocalFileOffsetStore(this.mQClientFactory, this.defaultMQPushConsumer.getConsumerGroup());
break;
case CLUSTERING:
//集群模式下 将消息消费的进度存储到远端Broker中
this.offsetStore = new RemoteBrokerOffsetStore(this.mQClientFactory, this.defaultMQPushConsumer.getConsumerGroup());
break;
default:
break;
this.defaultMQPushConsumer.setOffsetStore(this.offsetStore);
如上所示,根据消息消费模式的不同,会创建不同的OffsetStore
对象。
广播模式消费进度存储(LocalFileOffsetStore)
public class LocalFileOffsetStore implements OffsetStore
//存储目录
//消费者启动时-可以通过"-D rocketmq.client.localOffsetStoreDir=路径"来指定
public final static String LOCAL_OFFSET_STORE_DIR = System.getProperty(
"rocketmq.client.localOffsetStoreDir",
System.getProperty("user.home") + File.separator + ".rocketmq_offsets");
private final static InternalLogger log = ClientLogger.getLog();
//MQ客户端
private final MQClientInstance mQClientFactory;
//消费组名
private final String groupName;
//存储路径
private final String storePath;
//以MessageQueue为键-消费偏移量为值的缓存表
private ConcurrentMap<MessageQueue, AtomicLong> offsetTable =
new ConcurrentHashMap<MessageQueue, AtomicLong>();
构造函数
public LocalFileOffsetStore(MQClientInstance mQClientFactory, String groupName)
this.mQClientFactory = mQClientFactory;
this.groupName = groupName;
//json格式存储
this.storePath = LOCAL_OFFSET_STORE_DIR + File.separator +
this.mQClientFactory.getClientId() + File.separator +
this.groupName + File.separator +
"offsets.json";
LocalFileOffsetStore#load
public void load() throws MQClientException
//从本地磁盘中进行读取json文件-并进行序列化封装转化为map
OffsetSerializeWrapper offsetSerializeWrapper = this.readLocalOffset();
if (offsetSerializeWrapper != null && offsetSerializeWrapper.getOffsetTable() != null)
//存入缓存表
offsetTable.putAll(offsetSerializeWrapper.getOffsetTable());
for (MessageQueue mq : offsetSerializeWrapper.getOffsetTable().keySet())
AtomicLong offset = offsetSerializeWrapper.getOffsetTable().get(mq);
log.info("load consumer's offset, ",
this.groupName,
mq,
offset.get());
public class OffsetSerializeWrapper extends RemotingSerializable
private ConcurrentMap<MessageQueue, AtomicLong> offsetTable =
new ConcurrentHashMap<MessageQueue, AtomicLong>();
LocalFileOffsetStore#persistAll
public void persistAll(Set<MessageQueue> mqs)
if (null == mqs || mqs.isEmpty())
return;
OffsetSerializeWrapper offsetSerializeWrapper = new OffsetSerializeWrapper();
for (Map.Entry<MessageQueue, AtomicLong> entry : this.offsetTable.entrySet())
if (mqs.contains(entry.getKey()))
AtomicLong offset = entry.getValue();
//填充<消息队列-消费偏移量>缓存表
offsetSerializeWrapper.getOffsetTable().put(entry.getKey(), offset);
//转为json格式
String jsonString = offsetSerializeWrapper.toJson(true);
if (jsonString != null)
try
//jsonString->file->保存到storePath
MixAll.string2File(jsonString, this.storePath);
catch (IOException e)
log.error("persistAll consumer offset Exception, " + this.storePath, e);
persistAll()
的入口是MQClientInstance#startScheduledTask
MQClientInstance#startScheduledTask
this.scheduledExecutorService.scheduleAtFixedRate(new Runnable()
@Override
public void run()
try
MQClientInstance.this.persistAllConsumerOffset();
catch (Exception e)
log.error("ScheduledTask persistAllConsumerOffset exception", e);
, 1000 * 10, this.clientConfig.getPersistConsumerOffsetInterval(), TimeUnit.MILLISECONDS);
消费端启动后延迟10s开启该定时任务,每隔5s进行一次持久化。
MQClientInstance#persistAllConsumerOffset
Iterator<Entry<String, MQConsumerInner>> it = this.consumerTable.entrySet().iterator();
while (it.hasNext())
Entry<String, MQConsumerInner> entry = it.next();
MQConsumerInner impl = entry.getValue();
impl.persistConsumerOffset();
DefaultMQPushConsumerImpl#persistConsumerOffset
try
this.makeSureStateOK();
Set<MessageQueue> mqs = new HashSet<MessageQueue>();
//获取重负载分配好的消息队列
Set<MessageQueue> allocateMq = this.rebalanceImpl.getProcessQueueTable().keySet();
mqs.addAll(allocateMq);
//当前是LocalFileOffsetStore.persistAll
this.offsetStore.persistAll(mqs);
catch (Exception e)
log.error("group: " + this.defaultMQPushConsumer.getConsumerGroup() + " persistConsumerOffset exception", e);
集群模式消费进度存储(RemoteBrokerOffsetStore)
RemoteBrokerOffsetStore
private final static InternalLogger log = ClientLogger.getLog();
//MQ客户端实例-该实例被同一个JVM下的消费者和生产者共用
private final MQClientInstance mQClientFactory;
//消费组名
private final String groupName;
//以消息队列为键-消费偏移量为值的缓存表
private ConcurrentMap<MessageQueue, AtomicLong> offsetTable =
new ConcurrentHashMap<MessageQueue, AtomicLong>();
RemoteBrokerOffsetStore#persistAll
for (Map.Entry<MessageQueue, AtomicLong> entry : this.offsetTable.entrySet())
MessageQueue mq = entry.getKey();
AtomicLong offset = entry.getValue();
if (offset != null)
if (mqs.contains(mq))
try
//更新消费偏移量到Broker
this.updateConsumeOffsetToBroker(mq, offset.get());
log.info("[persistAll] Group: ClientId: updateConsumeOffsetToBroker ",
this.groupName,
this.mQClientFactory.getClientId(),
mq,
offset.get());
catch (Exception e)
log.error("updateConsumeOffsetToBroker exception, " + mq.toString(), e);
RemoteBrokerOffsetStore#updateConsumeOffsetToBroker
同步更新消息消费偏移量,如Master关闭,则更新到Slave。
//从MQ客户端中根据BrokerName获取消息队列对应的Broker
FindBrokerResult findBrokerResult = this.mQClientFactory.findBrokerAddressInAdmin(mq.getBrokerName());
if (null == findBrokerResult)
//根据Topic从NameServer更新路由信息
this.mQClientFactory.updateTopicRouteInfoFromNameServer(mq.getTopic());
//重新查找
findBrokerResult = this.mQClientFactory.findBrokerAddressInAdmin(mq.getBrokerName());
if (findBrokerResult != null)
//封装消息消费队列更新请求头
UpdateConsumerOffsetRequestHeader requestHeader = new UpdateConsumerOffsetRequestHeader();
requestHeader.setTopic(mq.getTopic()); //主题信息
requestHeader.setConsumerGroup(this.groupName); //消费者组
requestHeader.setQueueId(mq.getQueueId()); //队列ID
requestHeader.setCommitOffset(offset); //消费偏移量
if (isOneway)
//Oneway->根据Broker地址->发送请求将消费偏移量保存到Broker-超时时间默认为5s
this.mQClientFactory.getMQClientAPIImpl().updateConsumerOffsetOneway(
findBrokerResult.getBrokerAddr(), requestHeader, 1000 * 5);
else
this.mQClientFactory.getMQClientAPIImpl().updateConsumerOffset(
findBrokerResult.getBrokerAddr(), requestHeader, 1000 * 5);
else
throw new MQClientException("The broker[" + mq.getBrokerName() + "] not exist", null);
RemoteBrokerOffsetStore#updateOffset
if (mq != null)
//从缓存中获取消息队列对应的偏移量
AtomicLong offsetOld = this.offsetTable.get(mq);
//为空
if (null == offsetOld)
//将存入的offset存入内存中
offsetOld = this.offsetTable.putIfAbsent(mq, new AtomicLong(offset));
//不为空->根据increaseOnly更新原先的offsetOld
if (null != offsetOld)
if (increaseOnly)
MixAll.compareAndIncreaseOnly(offsetOld, offset);
else
offsetOld.set(offset);
RemoteBrokerOffsetStore#readOffset
public long readOffset(final MessageQueue mq, //消息队列
final ReadOffsetType type) //读取偏移量类型
if (mq != null)
switch (type)
case MEMORY_FIRST_THEN_STORE:
//从内存中读取
case READ_FROM_MEMORY:
AtomicLong offset = this.offsetTable.get(mq);
if (offset != null)
return offset.get();
else if (ReadOffsetType.READ_FROM_MEMORY == type)
return -1;
//从Broker中读取
case READ_FROM_STORE:
try
//从Broker中获取消费偏移量
long brokerOffset = this.fetchConsumeOffsetFromBroker(mq);
AtomicLong offset = new AtomicLong(brokerOffset);
//更新至内存中(map)
this.updateOffset(mq, offset.get(), false);
return brokerOffset;
// No offset in broker
catch (MQBrokerException e)
return -1;
//Other exceptions
catch (Exception e)
log.warn("fetchConsumeOffsetFromBroker exception, " + mq, e);
return -2;
default:
break;
RemoteBrokerOffsetStore#fetchConsumeOffsetFromBroker
//从MQ客户端实例中获取Boker信息
FindBrokerResult findBrokerResult = this.mQClientFactory.findBrokerAddressInAdmin(mq.getBrokerName());
if (null == findBrokerResult)
//从NameServer中更新Topic的路由信息
this.mQClientFactory.updateTopicRouteInfoFromNameServer(mq.getTopic());
//重新获取Broker
findBrokerResult = this.mQClientFactory.findBrokerAddressInAdmin(mq.getBrokerName());
if (findBrokerResult != null)
//封装查询消费进度请求头
QueryConsumerOffsetRequestHeader requestHeader = new QueryConsumerOffsetRequestHeader();
requestHeader.setTopic(mq.getTopic());
requestHeader.setConsumerGroup(this.groupName);
requestHeader.setQueueId(mq.getQueueId());
//带上请求头调用MQClientAPI到Broker中获取
return this.mQClientFactory.getMQClientAPIImpl()以上是关于RocketMQ:消息ACK机制源码解析的主要内容,如果未能解决你的问题,请参考以下文章