RocketMQ 源码阅读 ---- 顺序消息
Posted wenniuwuren
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了RocketMQ 源码阅读 ---- 顺序消息相关的知识,希望对你有一定的参考价值。
一、概述
顺序消息的大致原理是发送的时候,比如同一个订单 id 的发送到同一个 queueId 中,如下单、支付、扣库存这个流程需要保证同一个订单 id 消息有序才能正常执行。
在消费的时候,也只能有一个 consumer 并且单线程从这个 queue 中消费,这样才能保证消息消费顺序。
二、源码解析
在顺序消息的 produer 中,需要有一个选择队列的算法,来确定某个规则下的消息一定进入这个队列。 RocketMQ 提供了 MessageQueueSelector 类来辅助实现。
public class Producer
public static void main(String[] args) throws UnsupportedEncodingException
try
MQProducer producer = new DefaultMQProducer("please_rename_unique_group_name");
((DefaultMQProducer) producer).setNamesrvAddr("127.0.0.1:9876");
producer.start();
String[] tags = new String[] "TagA", "TagB", "TagC", "TagD", "TagE";
for (int i = 0; i < 100; i++)
int orderId = i % 10;
Message msg =
new Message("TopicTestjjj", tags[i % tags.length], "KEY" + i,
("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET));
SendResult sendResult = producer.send(msg, new MessageQueueSelector()
@Override
public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg)
Integer id = (Integer) arg;
int index = id % mqs.size();
return mqs.get(index);
, orderId);
System.out.printf("%s%n", sendResult);
producer.shutdown();
catch (MQClientException | RemotingException | MQBrokerException | InterruptedException e)
e.printStackTrace();
发送顺序消息与普通消息唯一不同的是,在选择 queue 的时候,普通消息是负载均衡选择 queue,而顺序消息只能固定一个 queue(那么在 RocketMQ 这种主从不能切换的情况下,如果主宕机,消息就写不进来了,如何解决?)。
private SendResult sendSelectImpl(
Message msg,
MessageQueueSelector selector,
Object arg,
final CommunicationMode communicationMode,
final SendCallback sendCallback, final long timeout
) throws MQClientException, RemotingException, MQBrokerException, InterruptedException
long beginStartTime = System.currentTimeMillis();
this.makeSureStateOK();
Validators.checkMessage(msg, this.defaultMQProducer);
TopicPublishInfo topicPublishInfo = this.tryToFindTopicPublishInfo(msg.getTopic());
if (topicPublishInfo != null && topicPublishInfo.ok())
MessageQueue mq = null;
try
// 选出固定的 queue 进行消息投递
mq = selector.select(topicPublishInfo.getMessageQueueList(), msg, arg);
catch (Throwable e)
throw new MQClientException("select message queue throwed exception.", e);
long costTime = System.currentTimeMillis() - beginStartTime;
if (timeout < costTime)
throw new RemotingTooMuchRequestException("sendSelectImpl call timeout");
if (mq != null)
return this.sendKernelImpl(msg, mq, communicationMode, sendCallback, null, timeout - costTime);
else
throw new MQClientException("select message queue return null.", null);
throw new MQClientException("No route info for this topic, " + msg.getTopic(), null);
Consumer 顺序消息测试用例:
public class Consumer
public static void main(String[] args) throws MQClientException
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("please_rename_unique_group_name_3");
consumer.setNamesrvAddr("127.0.0.1:9876");
consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
consumer.subscribe("TopicTest", "TagA || TagC || TagD");
consumer.registerMessageListener(new MessageListenerOrderly()
AtomicLong consumeTimes = new AtomicLong(0);
@Override
public ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs, ConsumeOrderlyContext context)
context.setAutoCommit(false);
System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs);
this.consumeTimes.incrementAndGet();
if ((this.consumeTimes.get() % 2) == 0)
return ConsumeOrderlyStatus.SUCCESS;
else if ((this.consumeTimes.get() % 3) == 0)
return ConsumeOrderlyStatus.ROLLBACK;
else if ((this.consumeTimes.get() % 4) == 0)
return ConsumeOrderlyStatus.COMMIT;
else if ((this.consumeTimes.get() % 5) == 0)
context.setSuspendCurrentQueueTimeMillis(3000);
return ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT;
return ConsumeOrderlyStatus.SUCCESS;
);
consumer.start();
System.out.printf("Consumer Started.%n");
常规消息是使用 ConsumeMessageConcurrentlyService 消费,而顺序消息使用 ConsumeMessageOrderlyService。
// DefaultMQPushConsumerImpl.java
...
this.consumeMessageService.start();
...
ConsumeMessageOrderlyService 会启动一个定时任务,定时锁定 queue(即把 clientId 与 queue 锁定)。
在 consumer 负载均衡分配 queue 之后,需要加锁,client 加锁成功才能消费新的 queue。
private boolean updateProcessQueueTableInRebalance(final String topic, final Set<MessageQueue> mqSet,
final boolean isOrder)
boolean changed = false;
Iterator<Entry<MessageQueue, ProcessQueue>> it = this.processQueueTable.entrySet().iterator();
while (it.hasNext())
Entry<MessageQueue, ProcessQueue> next = it.next();
MessageQueue mq = next.getKey();
ProcessQueue pq = next.getValue();
if (mq.getTopic().equals(topic))
if (!mqSet.contains(mq))
pq.setDropped(true);
if (this.removeUnnecessaryMessageQueue(mq, pq))
it.remove();
changed = true;
log.info("doRebalance, , remove unnecessary mq, ", consumerGroup, mq);
else if (pq.isPullExpired())
switch (this.consumeType())
case CONSUME_ACTIVELY:
break;
case CONSUME_PASSIVELY:
pq.setDropped(true);
if (this.removeUnnecessaryMessageQueue(mq, pq))
it.remove();
changed = true;
log.error("[BUG]doRebalance, , remove unnecessary mq, , because pull is pause, so try to fixed it",
consumerGroup, mq);
break;
default:
break;
List<PullRequest> pullRequestList = new ArrayList<PullRequest>();
for (MessageQueue mq : mqSet)
if (!this.processQueueTable.containsKey(mq))
if (isOrder && !this.lock(mq)) // client 对 queue 加锁。 锁标记存在 ProcessQueue 中,ProcessQueue 协助实现顺序消息
log.warn("doRebalance, , add a new mq failed, , because lock failed", consumerGroup, mq);
continue; // 没加锁,这个 mq 就不能背该 client 拉数据
this.removeDirtyOffset(mq);
ProcessQueue pq = new ProcessQueue();
long nextOffset = this.computePullFromWhere(mq);
if (nextOffset >= 0)
ProcessQueue pre = this.processQueueTable.putIfAbsent(mq, pq);
if (pre != null)
log.info("doRebalance, , mq already exists, ", consumerGroup, mq);
else
log.info("doRebalance, , add a new mq, ", consumerGroup, mq);
PullRequest pullRequest = new PullRequest();
pullRequest.setConsumerGroup(consumerGroup);
pullRequest.setNextOffset(nextOffset);
pullRequest.setMessageQueue(mq);
pullRequest.setProcessQueue(pq);
pullRequestList.add(pullRequest);
changed = true;
else
log.warn("doRebalance, , add new mq failed, ", consumerGroup, mq);
this.dispatchPullRequest(pullRequestList);
return changed;
分配完 queue 之后,进入拉取消息阶段:
public void pullMessage(final PullRequest pullRequest)
final ProcessQueue processQueue = pullRequest.getProcessQueue();
if (processQueue.isDropped())
log.info("the pull request[] is dropped.", pullRequest.toString());
return;
.... 省略部分代码...
// 非顺序消费
if (!this.consumeOrderly)
if (processQueue.getMaxSpan() > this.defaultMQPushConsumer.getConsumeConcurrentlyMaxSpan())
this.executePullRequestLater(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_FLOW_CONTROL);
if ((queueMaxSpanFlowControlTimes++ % 1000) == 0)
log.warn(
"the queue's messages, span too long, so do flow control, minOffset=, maxOffset=, maxSpan=, pullRequest=, flowControlTimes=",
processQueue.getMsgTreeMap().firstKey(), processQueue.getMsgTreeMap().lastKey(), processQueue.getMaxSpan(),
pullRequest, queueMaxSpanFlowControlTimes);
return;
else // 顺序消费
if (processQueue.isLocked())
if (!pullRequest.isLockedFirst())
final long offset = this.rebalanceImpl.computePullFromWhere(pullRequest.getMessageQueue());
boolean brokerBusy = offset < pullRequest.getNextOffset();
log.info("the first time to pull message, so fix offset from broker. pullRequest: NewOffset: brokerBusy: ",
pullRequest, offset, brokerBusy);
if (brokerBusy)
log.info("[NOTIFYME]the first time to pull message, but pull request offset larger than broker consume offset. pullRequest: NewOffset: ",
pullRequest, offset);
pullRequest.setLockedFirst(true);
pullRequest.setNextOffset(offset);
else // 顺序消息必须与 client 锁定后才能拉取
this.executePullRequestLater(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_EXCEPTION);
log.info("pull message later because not locked in broker, ", pullRequest);
return;
.. 省略 代码 ..
最终 consumer 消费消息。顺序消息 ConsumeMessageConcurrentlyService 会在 listener 回调前进行一些操作(例如mq锁检查),已经调用后失败的处理与非顺序消息不同。(顺序消息不能像无序消息一样,消费失败再次丢进 broker,这样就乱序了,只能延迟一会再消费。)
// ConsumeMessageOrderlyService.java
class ConsumeRequest implements Runnable
private final ProcessQueue processQueue;
private final MessageQueue messageQueue;
public ConsumeRequest(ProcessQueue processQueue, MessageQueue messageQueue)
this.processQueue = processQueue;
this.messageQueue = messageQueue;
public ProcessQueue getProcessQueue()
return processQueue;
public MessageQueue getMessageQueue()
return messageQueue;
@Override
public void run()
if (this.processQueue.isDropped())
log.warn("run, the message queue not be able to consume, because it's dropped. ", this.messageQueue);
return;
final Object objLock = messageQueueLock.fetchLockObject(this.messageQueue);
synchronized (objLock) // 这里和并发消费不同的是,这里要对 mq 加锁,这样单个 consumer 只有单线程消费这个 queue ,保证消费有序
if (MessageModel.BROADCASTING.equals(ConsumeMessageOrderlyService.this.defaultMQPushConsumerImpl.messageModel())
|| (this.processQueue.isLocked() && !this.processQueue.isLockExpired())) // 广播模式或者 mq加锁并且锁没超时
final long beginTime = System.currentTimeMillis();
for (boolean continueConsume = true; continueConsume; )
if (this.processQueue.isDropped())
log.warn("the message queue not be able to consume, because it's dropped. ", this.messageQueue);
break;
if (MessageModel.CLUSTERING.equals(ConsumeMessageOrderlyService.this.defaultMQPushConsumerImpl.messageModel())
&& !this.processQueue.isLocked()) // 集群模式,没加锁,日志警告
log.warn("the message queue not locked, so consume later, ", this.messageQueue);
ConsumeMessageOrderlyService.this.tryLockLaterAndReconsume(this.messageQueue, this.processQueue, 10);
break;
if (MessageModel.CLUSTERING.equals(ConsumeMessageOrderlyService.this.defaultMQPushConsumerImpl.messageModel())
&& this.processQueue.isLockExpired()) // 集群模式,mq 加锁超时, 日志报警
log.warn("the message queue lock expired, so consume later, ", this.messageQueue);
ConsumeMessageOrderlyService.this.tryLockLaterAndReconsume(this.messageQueue, this.processQueue, 10);
break;
long interval = System.currentTimeMillis() - beginTime;
if (interval > MAX_TIME_CONSUME_CONTINUOUSLY) // 消费时间太长,则延迟点再拉消息
ConsumeMessageOrderlyService.this.submitConsumeRequestLater(processQueue, messageQueue, 10);
break;
final int consumeBatchSize =
ConsumeMessageOrderlyService.this.defaultMQPushConsumer.getConsumeMessageBatchMaxSize();
List<MessageExt> msgs = this.processQueue.takeMessags(consumeBatchSize);
if (!msgs.isEmpty())
final ConsumeOrderlyContext context = new ConsumeOrderlyContext(this.messageQueue);
ConsumeOrderlyStatus status = null;
ConsumeMessageContext consumeMessageContext = null;
if (ConsumeMessageOrderlyService.this.defaultMQPushConsumerImpl.hasHook())
consumeMessageContext = new ConsumeMessageContext();
consumeMessageContext
.setConsumerGroup(ConsumeMessageOrderlyService.this.defaultMQPushConsumer.getConsumerGroup());
consumeMessageContext.setMq(messageQueue);
consumeMessageContext.setMsgList(msgs);
consumeMessageContext.setSuccess(false);
// init the consume context type
consumeMessageContext.setProps(new HashMap<String, String>());
ConsumeMessageOrderlyService.this.defaultMQPushConsumerImpl.executeHookBefore(consumeMessageContext);
long beginTimestamp = System.currentTimeMillis();
ConsumeReturnType returnType = ConsumeReturnType.SUCCESS;
boolean hasException = false;
try
this.processQueue.getLockConsume().lock(); // 对 processQueue 加锁消费
if (this.processQueue.isDropped())
log.warn("consumeMessage, the message queue not be able to consume, because it's dropped. ",
this.messageQueue);
break;
status = messageListener.consumeMessage(Collections.unmodifiableList(msgs), context); // 消费消息
catch (Throwable e)
log.warn("consumeMessage exception: Group: Msgs: MQ: ",
RemotingHelper.exceptionSimpleDesc(e),
ConsumeMessageOrderlyService.this.consumerGroup,
msgs,
messageQueue);
hasException = true;
finally
this.processQueue.getLockConsume().unlock();
if (null == status
|| ConsumeOrderlyStatus.ROLLBACK == status
|| ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT == status)
log.warn("consumeMessage Orderly return not OK, Group: Msgs: MQ: ",
ConsumeMessageOrderlyService.this.consumerGroup,
msgs,
messageQueue);
long consumeRT = System.currentTimeMillis() - beginTimestamp;
if (null == status)
if (hasException)
returnType = ConsumeReturnType.EXCEPTION;
else
returnType = ConsumeReturnType.RETURNNULL;
else if (consumeRT >= defaultMQPushConsumer.getConsumeTimeout() * 60 * 1000)
returnType = ConsumeReturnType.TIME_OUT;
else if (ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT == status)
returnType = ConsumeReturnType.FAILED;
else if (ConsumeOrderlyStatus.SUCCESS == status)
returnType = ConsumeReturnType.SUCCESS;
if (ConsumeMessageOrderlyService.this.defaultMQPushConsumerImpl.hasHook())
consumeMessageContext.getProps().put(MixAll.CONSUME_CONTEXT_TYPE, returnType.name());
if (null == status)
status = ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT;
if (ConsumeMessageOrderlyService.this.defaultMQPushConsumerImpl.hasHook())
consumeMessageContext.setStatus(status.toString());
consumeMessageContext
.setSuccess(ConsumeOrderlyStatus.SUCCESS == status || ConsumeOrderlyStatus.COMMIT == status);
ConsumeMessageOrderlyService.this.defaultMQPushConsumerImpl.executeHookAfter(consumeMessageContext);
ConsumeMessageOrderlyService.this.getConsumerStatsManager()
.incConsumeRT(ConsumeMessageOrderlyService.this.consumerGroup, messageQueue.getTopic(), consumeRT);
// 处理顺序消息消费结果
continueConsume = ConsumeMessageOrderlyService.this.processConsumeResult(msgs, status, context, this);
else
continueConsume = false;
else
if (this.processQueue.isDropped())
log.warn("the message queue not be able to consume, because it's dropped. ", this.messageQueue);
return;
ConsumeMessageOrderlyService.this.tryLockLaterAndReconsume(this.messageQueue, this.processQueue, 100);
// 处理顺序消息消费结果
public boolean processConsumeResult(
final List<MessageExt> msgs,
final ConsumeOrderlyStatus status,
final ConsumeOrderlyContext context,
final ConsumeRequest consumeRequest
)
boolean continueConsume = true;
long commitOffset = -1L;
if (context.isAutoCommit()) // 事务自动提交的情况
switch (status)
case COMMIT:
case ROLLBACK:
log.warn("the message queue consume result is illegal, we think you want to ack these message ",
consumeRequest.getMessageQueue());
case SUCCESS:
commitOffset = consumeRequest.getProcessQueue().commit(); // 提交成功消费信息
this.getConsumerStatsManager().incConsumeOKTPS(consumerGroup, consumeRequest.getMessageQueue().getTopic(), msgs.size());
break;
case SUSPEND_CURRENT_QUEUE_A_MOMENT:
this.getConsumerStatsManager().incConsumeFailedTPS(consumerGroup, consumeRequest.getMessageQueue().getTopic(), msgs.size());
if (checkReconsumeTimes(msgs))
consumeRequest.getProcessQueue().makeMessageToCosumeAgain(msgs);
this.submitConsumeRequestLater(
consumeRequest.getProcessQueue(),
consumeRequest.getMessageQueue(),
context.getSuspendCurrentQueueTimeMillis());
continueConsume = false;
else
commitOffset = consumeRequest.getProcessQueue().commit();
break;
default:
break;
else // 事务非自动提交
switch (status)
case SUCCESS:
this.getConsumerStatsManager().incConsumeOKTPS(consumerGroup, consumeRequest.getMessageQueue().getTopic(), msgs.size());
break;
case COMMIT:
commitOffset = consumeRequest.getProcessQueue().commit(); // 提交处理消息信息
break;
case ROLLBACK:
consumeRequest.getProcessQueue().rollback(); // 回滚处理消息信息
this.submitConsumeRequestLater(
consumeRequest.getProcessQueue(),
consumeRequest.getMessageQueue(),
context.getSuspendCurrentQueueTimeMillis()); // 过一会重新消费
continueConsume = false;
break;
case SUSPEND_CURRENT_QUEUE_A_MOMENT:
this.getConsumerStatsManager().incConsumeFailedTPS(consumerGroup, consumeRequest.getMessageQueue().getTopic(), msgs.size());
if (checkReconsumeTimes(msgs))
consumeRequest.getProcessQueue().makeMessageToCosumeAgain(msgs);
this.submitConsumeRequestLater(
consumeRequest.getProcessQueue(),
consumeRequest.getMessageQueue(),
context.getSuspendCurrentQueueTimeMillis());
continueConsume = false;
break;
default:
break;
if (commitOffset >= 0 && !consumeRequest.getProcessQueue().isDropped())
this.defaultMQPushConsumerImpl.getOffsetStore().updateOffset(consumeRequest.getMessageQueue(), commitOffset, false); // 更新消费 offset
return continueConsume;
以上是关于RocketMQ 源码阅读 ---- 顺序消息的主要内容,如果未能解决你的问题,请参考以下文章