rocketmq发送消息的期间的broker选择
Posted notlate
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了rocketmq发送消息的期间的broker选择相关的知识,希望对你有一定的参考价值。
DefaultMQProducerImpl文件中有一个sendDefaultImpl,发送消息的时候就是从这里走的,路由信息怎么拿的 这里就不展开讲了。
在这个方法里面,同步模式下,消息一次没有发送成功就会按照重试次数继续走selectOneMessageQueue逻辑进行重试。
for (; times < timesTotal; times++) String lastBrokerName = null == mq ? null : mq.getBrokerName(); MessageQueue mqSelected = this.selectOneMessageQueue(topicPublishInfo, lastBrokerName); if (mqSelected != null) mq = mqSelected; brokersSent[times] = mq.getBrokerName(); try beginTimestampPrev = System.currentTimeMillis(); long costTime = beginTimestampPrev - beginTimestampFirst; if (timeout < costTime) callTimeout = true; break; sendResult = this.sendKernelImpl(msg, mq, communicationMode, sendCallback, topicPublishInfo, timeout - costTime); endTimestamp = System.currentTimeMillis(); this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, false); switch (communicationMode) case ASYNC: return null; case ONEWAY: return null; case SYNC: if (sendResult.getSendStatus() != SendStatus.SEND_OK) if (this.defaultMQProducer.isRetryAnotherBrokerWhenNotStoreOK()) continue; return sendResult; default: break; catch (RemotingException e) endTimestamp = System.currentTimeMillis(); this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, true); log.warn(String.format("sendKernelImpl exception, resend at once, InvokeID: %s, RT: %sms, Broker: %s", invokeID, endTimestamp - beginTimestampPrev, mq), e); log.warn(msg.toString()); exception = e; continue;
这里的selectOneMessageQueue的其实内部调用MQFaultStrategy内部对象的selectOneMessageQueue:
我个人看来,这个估算功能倒不是特别重要,所以mq默认是不使用这个逻辑,不过这个不妨碍我们研究下。下面是MQFaultStrategy的selectOneMessageQueue
public MessageQueue selectOneMessageQueue(final TopicPublishInfo tpInfo, final String lastBrokerName) if (this.sendLatencyFaultEnable) try int index = tpInfo.getSendWhichQueue().getAndIncrement(); for (int i = 0; i < tpInfo.getMessageQueueList().size(); i++) int pos = Math.abs(index++) % tpInfo.getMessageQueueList().size(); if (pos < 0) pos = 0; MessageQueue mq = tpInfo.getMessageQueueList().get(pos); if (latencyFaultTolerance.isAvailable(mq.getBrokerName())) if (null == lastBrokerName || mq.getBrokerName().equals(lastBrokerName)) return mq; final String notBestBroker = latencyFaultTolerance.pickOneAtLeast(); int writeQueueNums = tpInfo.getQueueIdByBroker(notBestBroker); if (writeQueueNums > 0) final MessageQueue mq = tpInfo.selectOneMessageQueue(); if (notBestBroker != null) mq.setBrokerName(notBestBroker); mq.setQueueId(tpInfo.getSendWhichQueue().getAndIncrement() % writeQueueNums); return mq; else latencyFaultTolerance.remove(notBestBroker); catch (Exception e) log.error("Error occurred when selecting message queue", e); return tpInfo.selectOneMessageQueue(); return tpInfo.selectOneMessageQueue(lastBrokerName);
如果sendLatencyFaultEnable是false,默认也是false。那么每次所有队列号+1取出消息队列(消息队列说白了就是每个broker单位有一个队列,队列长度由每个broker配置指定)里面的消息,同时剔除掉上次失败的brokername。
这里有一个问题是,如果只有两个broker那么可以解决大部分问题,但是如果broker很多,那么我们希望mq有一个时间维度上、可以估算出来一个broker什么时候可用。尤其对于rocketmq来说,因为broker发生变化的时候,producer不是第一时间被通知,而是异步轮训得到的。另外nameserver跟broker之间也是异步轮询探活。
打开sendLatencyFaultEnable的话,也就是在发送消息前,估算下这个broker是否可用的,如果是可用的那么直接返回。上面代码:
if (null == lastBrokerName || mq.getBrokerName().equals(lastBrokerName))
我感觉应该是写错了,应该是mq.getBrokerName().notEquals(lastBrokerName)
这里有一个调用latencyFaultTolerance.isAvailable来判断broker是否可用,这个怎么来的呢?
实际上,在sendDefaultImpl的时候,无论消息是否发送成功与否,都会调用producer内部MQFaultStrategy的updateFaultItem,在这里会去更新latencyFaultTolerance
下面是MQFaultStrategy一些重要成员和重要方法:
private long[] latencyMax = 50L, 100L, 550L, 1000L, 2000L, 3000L, 15000L; private long[] notAvailableDuration = 0L, 0L, 30000L, 60000L, 120000L, 180000L, 600000L; public void updateFaultItem(final String brokerName, final long currentLatency, boolean isolation) if (this.sendLatencyFaultEnable) long duration = computeNotAvailableDuration(isolation ? 30000 : currentLatency); this.latencyFaultTolerance.updateFaultItem(brokerName, currentLatency, duration); private long computeNotAvailableDuration(final long currentLatency) for (int i = latencyMax.length - 1; i >= 0; i--) if (currentLatency >= latencyMax[i]) return this.notAvailableDuration[i]; return 0;
在sendDefaultImpl的发送消息期间,只有发送成,这个isolation才是false,这个时候通过computeNotAvailableDuration拿到的duration一般就是0,否则发送消息消耗时间越大,从latencyMax拿到的序列号越大,从notAvailableDuration拿到的duration也就越大。
如果有故障,isolation是true,那么认为这个broker不可用时间是180000L,也就是3分钟
继续进入LatencyFaultToleranceImpl的updateFaultItem:
@Override public void updateFaultItem(final String name, final long currentLatency, final long notAvailableDuration) FaultItem old = this.faultItemTable.get(name); if (null == old) final FaultItem faultItem = new FaultItem(name); faultItem.setCurrentLatency(currentLatency); faultItem.setStartTimestamp(System.currentTimeMillis() + notAvailableDuration); old = this.faultItemTable.putIfAbsent(name, faultItem); if (old != null) old.setCurrentLatency(currentLatency); old.setStartTimestamp(System.currentTimeMillis() + notAvailableDuration); else old.setCurrentLatency(currentLatency); old.setStartTimestamp(System.currentTimeMillis() + notAvailableDuration);
这里构造一个faultitem,顾名思义就是错误的、有问题的科目,name就是broker-name,currentLatency就是上次发送消息从开始到结束的消耗时间,starttimestamp就是估算的下次可用的时间戳。
继续看FaultItem各个重要方法:
@Override public int compareTo(final FaultItem other) if (this.isAvailable() != other.isAvailable()) if (this.isAvailable()) return -1; if (other.isAvailable()) return 1; if (this.currentLatency < other.currentLatency) return -1; else if (this.currentLatency > other.currentLatency) return 1; if (this.startTimestamp < other.startTimestamp) return -1; else if (this.startTimestamp > other.startTimestamp) return 1; return 0; public boolean isAvailable() return (System.currentTimeMillis() - startTimestamp) >= 0;
再回到策略MQFaultStrategy的selectOneMessageQueue,结合上面的代码,如果找到一个可用broker那么直接返回。如果找不到调用pickOneAtLeast找一个差不多的返回
public String pickOneAtLeast() final Enumeration<FaultItem> elements = this.faultItemTable.elements(); List<FaultItem> tmpList = new LinkedList<FaultItem>(); while (elements.hasMoreElements()) final FaultItem faultItem = elements.nextElement(); tmpList.add(faultItem); if (!tmpList.isEmpty()) Collections.shuffle(tmpList); Collections.sort(tmpList); final int half = tmpList.size() / 2; if (half <= 0) return tmpList.get(0).getName(); else final int i = this.whichItemWorst.getAndIncrement() % half; return tmpList.get(i).getName(); return null;
faultiitem已经支持按照好坏排序,那么排好序后,从好的前半部分再进行随机选一个brokername
以上是关于rocketmq发送消息的期间的broker选择的主要内容,如果未能解决你的问题,请参考以下文章