rocketmq的message里面的路由信息是啥
Posted notlate
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了rocketmq的message里面的路由信息是啥相关的知识,希望对你有一定的参考价值。
对于一个生产者来说,在进行sendmessage的时候,需要知道这个topic应该发给哪个broker。如果没有路由信息的话,需要取注册中心,通过GET_ROUTEINTO_BY_TOPIC去注册中心拿到消息。
介绍下面具体流程之前,还是先介绍注册额中心里面路由管理者RouteInfoManager:
:
private final HashMap<String/* topic */, List<QueueData>> topicQueueTable; private final HashMap<String/* brokerName */, BrokerData> brokerAddrTable; private final HashMap<String/* clusterName */, Set<String/* brokerName */>> clusterAddrTable; private final HashMap<String/* brokerAddr */, BrokerLiveInfo> brokerLiveTable; private final HashMap<String/* brokerAddr */, List<String>/* Filter Server */> filterServerTable;
broker-name:可以有多个broker-id,broker-id为0的就是master,否则是slave
clusterAddrTable:多个broker-name可以放在同一个cluster下面
topicqueueTable:一个topic下面可能有多个broker对应,QueueData里面存放每个broker-name的属性。所以一个topic下面可能有多个broker-name在贡献。
public class QueueData implements Comparable<QueueData> { private String brokerName; private int readQueueNums; private int writeQueueNums; private int perm; private int topicSynFlag;
brokerAddrTable:一个broker-name对应的信息,QueueData是真正描述一个broker-name的属性,BrokerData描述的是这个broker的上下级关系,上级cluster是谁,下级brokerAddrs描述这个broker-name下面所有broker-id对应的ip地址。
public class BrokerData implements Comparable<BrokerData> { private String cluster; private String brokerName; private HashMap<Long/* brokerId */, String/* broker address */> brokerAddrs;
RouteInfoManager主要存储的信息就是这么多。这些消息都是在一个broker启动的时候,都会到注册中心注册broker,在注册的时候把RouteInfoManager里面的信息进行填充。同时如果有变化的时候RouteInfoManager里面的数据也会跟着刷新,QueueData和BrokerData的equal方法都被覆盖了,这里面的属性任何一个有变化都会被认为有变化,然后被更新。
对于备机来说,注册完成以后,还可以从注册中心拿到主机的haServer-addrhaServer-port地址,也就是主机broker的这个地址:
public String getHAServerAddr() { return this.brokerConfig.getBrokerIP2() + ":" + this.messageStoreConfig.getHaListenPort(); }
拿到这个地址以后才能启动HaService里面的Haclient,上报自己的ack-offset,然后拿到同步数据。
对于一个生产者来说,在进行sendmessage的时候,需要知道这个topic应该发给哪个broker。如果没有路由信息的话,需要取注册中心,通过GET_ROUTEINTO_BY_TOPIC去注册中心拿到消息。
所谓的路由信息其实就是这个数据结构:
public class TopicRouteData extends RemotingSerializable { private String orderTopicConf; private List<QueueData> queueDatas; private List<BrokerData> brokerDatas; private HashMap<String/* brokerAddr */, List<String>/* Filter Server */> filterServerTable;
具体拿的方法在pickupTopicRouteData里面通过topic可以从RouteInfoManager拿到
拿到以后需要更新发布信息和订阅信息,其中发布信息就是针对生产者来说的,具体更新就是:(route,就是TopicRouteData)
List<QueueData> qds = route.getQueueDatas(); Collections.sort(qds); for (QueueData qd : qds) { if (PermName.isWriteable(qd.getPerm())) { BrokerData brokerData = null; for (BrokerData bd : route.getBrokerDatas()) { if (bd.getBrokerName().equals(qd.getBrokerName())) { brokerData = bd; break; } } if (null == brokerData) { continue; } if (!brokerData.getBrokerAddrs().containsKey(MixAll.MASTER_ID)) { continue; } for (int i = 0; i < qd.getWriteQueueNums(); i++) { MessageQueue mq = new MessageQueue(topic, qd.getBrokerName(), i); info.getMessageQueueList().add(mq); } }
public MessageQueue(String topic, String brokerName, int queueId) { this.topic = topic; this.brokerName = brokerName; this.queueId = queueId; }
也就是一个topic可能对应多个broker-name,同时每个broker-name也有多个QueueId,这个Queueid个数是getWriteQueueNums决定。
这里的info就是publishInfo,会被塞入this.topicPublishInfoTable.put(topic, info)中保存下来。
重点来了,这个有啥用?
后面生产者在发送消息的时候,需要有一个broker-addr,毕竟要知道一个broker地址,为了拿到这个broker-addr,其实是在这里拿到的:
private SendResult sendKernelImpl(final Message msg, final MessageQueue mq, final CommunicationMode communicationMode, final SendCallback sendCallback, final TopicPublishInfo topicPublishInfo, final long timeout) throws MQClientException, RemotingException, MQBrokerException, InterruptedException { long beginStartTime = System.currentTimeMillis(); String brokerAddr = this.mQClientFactory.findBrokerAddressInPublish(mq.getBrokerName());
也就是broker-addr是MessageQueue给出来的,这个MessageQueue怎么取得?
public MessageQueue selectOneMessageQueue() { int index = this.sendWhichQueue.getAndIncrement(); int pos = Math.abs(index) % this.messageQueueList.size(); if (pos < 0) pos = 0; return this.messageQueueList.get(pos); }
也就是每次从前面提的info.getMessageQueueList().add(mq)这个里面每次取一个messageQueue(第一次随机,后面每次加1),这个MessageQueue就是针对同一个broker-name多个QueueId。
在实际发送的时候,拿到一个MessageQueue就会直接发出去。对于一个topic有多个broker-name的情况,这种做法只是给一个broker-name的master发了信息,其他broker-name没有发送。这个就是topic层面的分片,不同broker分摊相同topic下的不同内容,而同一个broker通过主备完成信息冗余。而QueueId就是broker层面的再次分片。对于多个消费者的情况,消费相同broker-name的时候,可以根据queue-id并发消费。
以上是关于rocketmq的message里面的路由信息是啥的主要内容,如果未能解决你的问题,请参考以下文章