RocketMQ 源码阅读 ---- 顺序消息

Posted wenniuwuren

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了RocketMQ 源码阅读 ---- 顺序消息相关的知识,希望对你有一定的参考价值。

一、概述

顺序消息的大致原理是发送的时候,比如同一个订单 id 的发送到同一个 queueId 中,如下单、支付、扣库存这个流程需要保证同一个订单 id 消息有序才能正常执行。

在消费的时候,也只能有一个 consumer 并且单线程从这个 queue 中消费,这样才能保证消息消费顺序。

 

二、源码解析

在顺序消息的 produer 中,需要有一个选择队列的算法,来确定某个规则下的消息一定进入这个队列。 RocketMQ 提供了 MessageQueueSelector 类来辅助实现。

public class Producer 
public static void main(String[] args) throws UnsupportedEncodingException 
try 
MQProducer producer = new DefaultMQProducer("please_rename_unique_group_name");
((DefaultMQProducer) producer).setNamesrvAddr("127.0.0.1:9876");
producer.start();

String[] tags = new String[] "TagA", "TagB", "TagC", "TagD", "TagE";
for (int i = 0; i < 100; i++) 
int orderId = i % 10;
Message msg =
new Message("TopicTestjjj", tags[i % tags.length], "KEY" + i,
("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET));
SendResult sendResult = producer.send(msg, new MessageQueueSelector() 
@Override
public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) 
Integer id = (Integer) arg;
int index = id % mqs.size();
return mqs.get(index);

, orderId);

System.out.printf("%s%n", sendResult);


producer.shutdown();
 catch (MQClientException | RemotingException | MQBrokerException | InterruptedException e) 
e.printStackTrace();


发送顺序消息与普通消息唯一不同的是,在选择 queue 的时候,普通消息是负载均衡选择 queue,而顺序消息只能固定一个 queue(那么在 RocketMQ 这种主从不能切换的情况下,如果主宕机,消息就写不进来了,如何解决?)。

private SendResult sendSelectImpl(
Message msg,
MessageQueueSelector selector,
Object arg,
final CommunicationMode communicationMode,
final SendCallback sendCallback, final long timeout
) throws MQClientException, RemotingException, MQBrokerException, InterruptedException 
long beginStartTime = System.currentTimeMillis();
this.makeSureStateOK();
Validators.checkMessage(msg, this.defaultMQProducer);

TopicPublishInfo topicPublishInfo = this.tryToFindTopicPublishInfo(msg.getTopic());
if (topicPublishInfo != null && topicPublishInfo.ok()) 
MessageQueue mq = null;
try 
// 选出固定的 queue 进行消息投递
mq = selector.select(topicPublishInfo.getMessageQueueList(), msg, arg);
 catch (Throwable e) 
throw new MQClientException("select message queue throwed exception.", e);


long costTime = System.currentTimeMillis() - beginStartTime;
if (timeout < costTime) 
throw new RemotingTooMuchRequestException("sendSelectImpl call timeout");

if (mq != null) 
return this.sendKernelImpl(msg, mq, communicationMode, sendCallback, null, timeout - costTime);
 else 
throw new MQClientException("select message queue return null.", null);



throw new MQClientException("No route info for this topic, " + msg.getTopic(), null);

 Consumer 顺序消息测试用例:

public class Consumer 

public static void main(String[] args) throws MQClientException 
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("please_rename_unique_group_name_3");
consumer.setNamesrvAddr("127.0.0.1:9876");
consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);

consumer.subscribe("TopicTest", "TagA || TagC || TagD");

consumer.registerMessageListener(new MessageListenerOrderly() 
AtomicLong consumeTimes = new AtomicLong(0);

@Override
public ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs, ConsumeOrderlyContext context) 
context.setAutoCommit(false);
System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs);
this.consumeTimes.incrementAndGet();
if ((this.consumeTimes.get() % 2) == 0) 
return ConsumeOrderlyStatus.SUCCESS;
 else if ((this.consumeTimes.get() % 3) == 0) 
return ConsumeOrderlyStatus.ROLLBACK;
 else if ((this.consumeTimes.get() % 4) == 0) 
return ConsumeOrderlyStatus.COMMIT;
 else if ((this.consumeTimes.get() % 5) == 0) 
context.setSuspendCurrentQueueTimeMillis(3000);
return ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT;


return ConsumeOrderlyStatus.SUCCESS;

);

consumer.start();
System.out.printf("Consumer Started.%n");


常规消息是使用 ConsumeMessageConcurrentlyService 消费,而顺序消息使用 ConsumeMessageOrderlyService。

// DefaultMQPushConsumerImpl.java

...
this.consumeMessageService.start();
...

ConsumeMessageOrderlyService 会启动一个定时任务,定时锁定 queue(即把 clientId 与 queue 锁定)。

 

在 consumer 负载均衡分配 queue 之后,需要加锁,client 加锁成功才能消费新的 queue。

private boolean updateProcessQueueTableInRebalance(final String topic, final Set<MessageQueue> mqSet,
final boolean isOrder) 
boolean changed = false;

Iterator<Entry<MessageQueue, ProcessQueue>> it = this.processQueueTable.entrySet().iterator();
while (it.hasNext()) 
Entry<MessageQueue, ProcessQueue> next = it.next();
MessageQueue mq = next.getKey();
ProcessQueue pq = next.getValue();

if (mq.getTopic().equals(topic)) 
if (!mqSet.contains(mq)) 
pq.setDropped(true);
if (this.removeUnnecessaryMessageQueue(mq, pq)) 
it.remove();
changed = true;
log.info("doRebalance, , remove unnecessary mq, ", consumerGroup, mq);

 else if (pq.isPullExpired()) 
switch (this.consumeType()) 
case CONSUME_ACTIVELY:
break;
case CONSUME_PASSIVELY:
pq.setDropped(true);
if (this.removeUnnecessaryMessageQueue(mq, pq)) 
it.remove();
changed = true;
log.error("[BUG]doRebalance, , remove unnecessary mq, , because pull is pause, so try to fixed it",
consumerGroup, mq);

break;
default:
break;





List<PullRequest> pullRequestList = new ArrayList<PullRequest>();
for (MessageQueue mq : mqSet) 
if (!this.processQueueTable.containsKey(mq)) 
if (isOrder && !this.lock(mq))   // client 对 queue 加锁。 锁标记存在 ProcessQueue 中,ProcessQueue 协助实现顺序消息
log.warn("doRebalance, , add a new mq failed, , because lock failed", consumerGroup, mq);
continue;  // 没加锁,这个 mq 就不能背该 client 拉数据 


this.removeDirtyOffset(mq);
ProcessQueue pq = new ProcessQueue();
long nextOffset = this.computePullFromWhere(mq);
if (nextOffset >= 0) 
ProcessQueue pre = this.processQueueTable.putIfAbsent(mq, pq);
if (pre != null) 
log.info("doRebalance, , mq already exists, ", consumerGroup, mq);
 else 
log.info("doRebalance, , add a new mq, ", consumerGroup, mq);
PullRequest pullRequest = new PullRequest();
pullRequest.setConsumerGroup(consumerGroup);
pullRequest.setNextOffset(nextOffset);
pullRequest.setMessageQueue(mq);
pullRequest.setProcessQueue(pq);
pullRequestList.add(pullRequest);
changed = true;

 else 
log.warn("doRebalance, , add new mq failed, ", consumerGroup, mq);




this.dispatchPullRequest(pullRequestList);

return changed;

分配完 queue 之后,进入拉取消息阶段:

public void pullMessage(final PullRequest pullRequest) 
        final ProcessQueue processQueue = pullRequest.getProcessQueue();
        if (processQueue.isDropped()) 
            log.info("the pull request[] is dropped.", pullRequest.toString());
            return;
        

    .... 省略部分代码...

        // 非顺序消费
        if (!this.consumeOrderly) 
            if (processQueue.getMaxSpan() > this.defaultMQPushConsumer.getConsumeConcurrentlyMaxSpan()) 
                this.executePullRequestLater(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_FLOW_CONTROL);
                if ((queueMaxSpanFlowControlTimes++ % 1000) == 0) 
                    log.warn(
                        "the queue's messages, span too long, so do flow control, minOffset=, maxOffset=, maxSpan=, pullRequest=, flowControlTimes=",
                        processQueue.getMsgTreeMap().firstKey(), processQueue.getMsgTreeMap().lastKey(), processQueue.getMaxSpan(),
                        pullRequest, queueMaxSpanFlowControlTimes);
                
                return;
            
         else  // 顺序消费
            if (processQueue.isLocked()) 
                if (!pullRequest.isLockedFirst()) 
                    final long offset = this.rebalanceImpl.computePullFromWhere(pullRequest.getMessageQueue());
                    boolean brokerBusy = offset < pullRequest.getNextOffset();
                    log.info("the first time to pull message, so fix offset from broker. pullRequest:  NewOffset:  brokerBusy: ",
                        pullRequest, offset, brokerBusy);
                    if (brokerBusy) 
                        log.info("[NOTIFYME]the first time to pull message, but pull request offset larger than broker consume offset. pullRequest:  NewOffset: ",
                            pullRequest, offset);
                    

                    pullRequest.setLockedFirst(true);
                    pullRequest.setNextOffset(offset);
                
             else  // 顺序消息必须与 client 锁定后才能拉取
                this.executePullRequestLater(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_EXCEPTION);
                log.info("pull message later because not locked in broker, ", pullRequest);
                return;
            
        

        .. 省略 代码 ..
    

最终 consumer 消费消息。顺序消息  ConsumeMessageConcurrentlyService 会在 listener 回调前进行一些操作(例如mq锁检查),已经调用后失败的处理与非顺序消息不同。(顺序消息不能像无序消息一样,消费失败再次丢进 broker,这样就乱序了,只能延迟一会再消费。)

// ConsumeMessageOrderlyService.java
class ConsumeRequest implements Runnable 
private final ProcessQueue processQueue;
private final MessageQueue messageQueue;

public ConsumeRequest(ProcessQueue processQueue, MessageQueue messageQueue) 
this.processQueue = processQueue;
this.messageQueue = messageQueue;


public ProcessQueue getProcessQueue() 
return processQueue;


public MessageQueue getMessageQueue() 
return messageQueue;


@Override
public void run() 
if (this.processQueue.isDropped()) 
log.warn("run, the message queue not be able to consume, because it's dropped. ", this.messageQueue);
return;


final Object objLock = messageQueueLock.fetchLockObject(this.messageQueue);
synchronized (objLock)  // 这里和并发消费不同的是,这里要对 mq 加锁,这样单个 consumer 只有单线程消费这个 queue ,保证消费有序
if (MessageModel.BROADCASTING.equals(ConsumeMessageOrderlyService.this.defaultMQPushConsumerImpl.messageModel())
|| (this.processQueue.isLocked() && !this.processQueue.isLockExpired()))  // 广播模式或者 mq加锁并且锁没超时
final long beginTime = System.currentTimeMillis();
for (boolean continueConsume = true; continueConsume; ) 
if (this.processQueue.isDropped()) 
log.warn("the message queue not be able to consume, because it's dropped. ", this.messageQueue);
break;


if (MessageModel.CLUSTERING.equals(ConsumeMessageOrderlyService.this.defaultMQPushConsumerImpl.messageModel())
&& !this.processQueue.isLocked())  // 集群模式,没加锁,日志警告
log.warn("the message queue not locked, so consume later, ", this.messageQueue);
ConsumeMessageOrderlyService.this.tryLockLaterAndReconsume(this.messageQueue, this.processQueue, 10);
break;


if (MessageModel.CLUSTERING.equals(ConsumeMessageOrderlyService.this.defaultMQPushConsumerImpl.messageModel())
&& this.processQueue.isLockExpired())  // 集群模式,mq 加锁超时, 日志报警
log.warn("the message queue lock expired, so consume later, ", this.messageQueue);
ConsumeMessageOrderlyService.this.tryLockLaterAndReconsume(this.messageQueue, this.processQueue, 10);
break;


long interval = System.currentTimeMillis() - beginTime;
if (interval > MAX_TIME_CONSUME_CONTINUOUSLY)  // 消费时间太长,则延迟点再拉消息
ConsumeMessageOrderlyService.this.submitConsumeRequestLater(processQueue, messageQueue, 10);
break;


final int consumeBatchSize =
ConsumeMessageOrderlyService.this.defaultMQPushConsumer.getConsumeMessageBatchMaxSize();

List<MessageExt> msgs = this.processQueue.takeMessags(consumeBatchSize);
if (!msgs.isEmpty()) 
final ConsumeOrderlyContext context = new ConsumeOrderlyContext(this.messageQueue);

ConsumeOrderlyStatus status = null;

ConsumeMessageContext consumeMessageContext = null;
if (ConsumeMessageOrderlyService.this.defaultMQPushConsumerImpl.hasHook()) 
consumeMessageContext = new ConsumeMessageContext();
consumeMessageContext
.setConsumerGroup(ConsumeMessageOrderlyService.this.defaultMQPushConsumer.getConsumerGroup());
consumeMessageContext.setMq(messageQueue);
consumeMessageContext.setMsgList(msgs);
consumeMessageContext.setSuccess(false);
// init the consume context type
consumeMessageContext.setProps(new HashMap<String, String>());
ConsumeMessageOrderlyService.this.defaultMQPushConsumerImpl.executeHookBefore(consumeMessageContext);


long beginTimestamp = System.currentTimeMillis();
ConsumeReturnType returnType = ConsumeReturnType.SUCCESS;
boolean hasException = false;
try 
this.processQueue.getLockConsume().lock(); // 对 processQueue 加锁消费
if (this.processQueue.isDropped()) 
log.warn("consumeMessage, the message queue not be able to consume, because it's dropped. ",
this.messageQueue);
break;


status = messageListener.consumeMessage(Collections.unmodifiableList(msgs), context); // 消费消息
 catch (Throwable e) 
log.warn("consumeMessage exception:  Group:  Msgs:  MQ: ",
RemotingHelper.exceptionSimpleDesc(e),
ConsumeMessageOrderlyService.this.consumerGroup,
msgs,
messageQueue);
hasException = true;
 finally 
this.processQueue.getLockConsume().unlock();


if (null == status
|| ConsumeOrderlyStatus.ROLLBACK == status
|| ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT == status) 
log.warn("consumeMessage Orderly return not OK, Group:  Msgs:  MQ: ",
ConsumeMessageOrderlyService.this.consumerGroup,
msgs,
messageQueue);


long consumeRT = System.currentTimeMillis() - beginTimestamp;
if (null == status) 
if (hasException) 
returnType = ConsumeReturnType.EXCEPTION;
 else 
returnType = ConsumeReturnType.RETURNNULL;

 else if (consumeRT >= defaultMQPushConsumer.getConsumeTimeout() * 60 * 1000) 
returnType = ConsumeReturnType.TIME_OUT;
 else if (ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT == status) 
returnType = ConsumeReturnType.FAILED;
 else if (ConsumeOrderlyStatus.SUCCESS == status) 
returnType = ConsumeReturnType.SUCCESS;


if (ConsumeMessageOrderlyService.this.defaultMQPushConsumerImpl.hasHook()) 
consumeMessageContext.getProps().put(MixAll.CONSUME_CONTEXT_TYPE, returnType.name());


if (null == status) 
status = ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT;


if (ConsumeMessageOrderlyService.this.defaultMQPushConsumerImpl.hasHook()) 
consumeMessageContext.setStatus(status.toString());
consumeMessageContext
.setSuccess(ConsumeOrderlyStatus.SUCCESS == status || ConsumeOrderlyStatus.COMMIT == status);
ConsumeMessageOrderlyService.this.defaultMQPushConsumerImpl.executeHookAfter(consumeMessageContext);


ConsumeMessageOrderlyService.this.getConsumerStatsManager()
.incConsumeRT(ConsumeMessageOrderlyService.this.consumerGroup, messageQueue.getTopic(), consumeRT);

// 处理顺序消息消费结果
continueConsume = ConsumeMessageOrderlyService.this.processConsumeResult(msgs, status, context, this);
 else 
continueConsume = false;


 else 
if (this.processQueue.isDropped()) 
log.warn("the message queue not be able to consume, because it's dropped. ", this.messageQueue);
return;


ConsumeMessageOrderlyService.this.tryLockLaterAndReconsume(this.messageQueue, this.processQueue, 100);




// 处理顺序消息消费结果

public boolean processConsumeResult(
final List<MessageExt> msgs,
final ConsumeOrderlyStatus status,
final ConsumeOrderlyContext context,
final ConsumeRequest consumeRequest
) 
boolean continueConsume = true;
long commitOffset = -1L;
if (context.isAutoCommit())  // 事务自动提交的情况
switch (status) 
case COMMIT:
case ROLLBACK:
log.warn("the message queue consume result is illegal, we think you want to ack these message ",
consumeRequest.getMessageQueue());
case SUCCESS:
commitOffset = consumeRequest.getProcessQueue().commit(); // 提交成功消费信息
this.getConsumerStatsManager().incConsumeOKTPS(consumerGroup, consumeRequest.getMessageQueue().getTopic(), msgs.size());
break;
case SUSPEND_CURRENT_QUEUE_A_MOMENT:
this.getConsumerStatsManager().incConsumeFailedTPS(consumerGroup, consumeRequest.getMessageQueue().getTopic(), msgs.size());
if (checkReconsumeTimes(msgs)) 
consumeRequest.getProcessQueue().makeMessageToCosumeAgain(msgs);
this.submitConsumeRequestLater(
consumeRequest.getProcessQueue(),
consumeRequest.getMessageQueue(),
context.getSuspendCurrentQueueTimeMillis());
continueConsume = false;
 else 
commitOffset = consumeRequest.getProcessQueue().commit();

break;
default:
break;

 else  // 事务非自动提交
switch (status) 
case SUCCESS:
this.getConsumerStatsManager().incConsumeOKTPS(consumerGroup, consumeRequest.getMessageQueue().getTopic(), msgs.size()); 
break;
case COMMIT:
commitOffset = consumeRequest.getProcessQueue().commit(); // 提交处理消息信息
break;
case ROLLBACK:
consumeRequest.getProcessQueue().rollback();  // 回滚处理消息信息
this.submitConsumeRequestLater(
consumeRequest.getProcessQueue(),
consumeRequest.getMessageQueue(),
context.getSuspendCurrentQueueTimeMillis()); // 过一会重新消费
continueConsume = false;
break;
case SUSPEND_CURRENT_QUEUE_A_MOMENT:
this.getConsumerStatsManager().incConsumeFailedTPS(consumerGroup, consumeRequest.getMessageQueue().getTopic(), msgs.size());
if (checkReconsumeTimes(msgs)) 
consumeRequest.getProcessQueue().makeMessageToCosumeAgain(msgs);
this.submitConsumeRequestLater(
consumeRequest.getProcessQueue(),
consumeRequest.getMessageQueue(),
context.getSuspendCurrentQueueTimeMillis());
continueConsume = false;

break;
default:
break;



if (commitOffset >= 0 && !consumeRequest.getProcessQueue().isDropped()) 
this.defaultMQPushConsumerImpl.getOffsetStore().updateOffset(consumeRequest.getMessageQueue(), commitOffset, false); // 更新消费 offset


return continueConsume;

 

以上是关于RocketMQ 源码阅读 ---- 顺序消息的主要内容,如果未能解决你的问题,请参考以下文章

RocketMQ源码 — 十 RocketMQ顺序消息

RocketMQ源码 — 十 RocketMQ顺序消息

RocketMQ源码(22)—ConsumeMessageOrderlyService顺序消费消息源码

RocketMQ 源码阅读 ---- Tag 过滤

RocketMQ 源码阅读 ---- 消息消费(普通消息)

RocketMQ 源码阅读 ---- 消息消费(普通消息)