干货分享RocketMQ命名服务和路由组件——namesrv解析

Posted 苏研大云人

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了干货分享RocketMQ命名服务和路由组件——namesrv解析相关的知识,希望对你有一定的参考价值。

写在前面的话


【干货分享】RocketMQ命名服务和路由组件——namesrv解析

本文的代码分析基于RocketMQ的4.3.0版本。


【干货分享】RocketMQ命名服务和路由组件——namesrv解析

1

namesrv在RocketMQ集群中扮演的角色

先允许我从Apache RocketMQ官网盗一张图。RocketMQ集群部署架构图(多Master-多Slave集群)。

【干货分享】RocketMQ命名服务和路由组件——namesrv解析

RocketMQ集群分为四部分:生产者、消费者、broker、namesrv。生产者和消费者很好理解,就是消息的生产方和使用方。broker负责消息的接收、存储、投递等。


1.namesrv一般部署为集群模式,每台broker服务器启动时都会向所有namesrv节点注册自己,随后会定时(默认30s)向namesrv集群的每一个节点上报自己。这使得namesrv集群是一个热备份模式。注册和上报的数据包括

  • broker中topic的信息,封装在请求体中:每个topic的名字,以及该topic下读写队列的个数,该topic在此broker下的读写权限等。

2.客户端(producer/consumer)随机的和namesrv集群中的其中一个节点建立长连接,定时(默认30s)拉取最新的topic路由信息。

3.namesrv集群各节点无状态,具体体现在所有broker上报的数据都存储在内存中,不进行任何的持久化。

4.namesrv集群各节点之间没有通信(其实这么说是不太严谨的,如果namesrv采用ClusterTestRequestProcessor作为请求的处理器,是可能会和其他节点发生通信的),集群内部没有机制保证信息的一致

2

【干货分享】RocketMQ命名服务和路由组件——namesrv解析

namesrv 代码结构

先认识下namesrv模块的代码结构和各个类的作用,能使后面源码的阅读分析事半功倍。

1.代码结构

namesrv只有8个java文件,9个类。整个工程代码不超过1000行。

【干货分享】RocketMQ命名服务和路由组件——namesrv解析

2.各个类的介绍

1.KVConfigManager: key-value配置信息的操作类。提供以下操作方法:

  • load: 从指定文件(json格式)加载k-v配置到本地缓存configTable。

  • putKVConfig: 将指定的k-v配置加入到configTable中。

  • deleteKVConfig:将指定key值、指定namespace的配置从configTable中移除。

  • getKVConfig:获取指定key值、指定namespace的配置。

  • getKVListByNamespace:获取指定namespace的所有k-v配置信息。

  • persist: 持久化configTable到文件。

  • printAllPeriodically:打印configTable中所有的配置信息。

2.KVConfigSerializeWrapper:是为了方便序列化/反序列化的包装类。它和文件中的json格式数据对应。json字符串和对象间的转化使用了fastjson。

3.DefaultRequestProcessor:默认请求处理器类。用来处理客户端或者broker发来的RPC请求。其中就包括处理broker发来的注册请求。以下为几个常用的处理请求:

  • 注册Broker

  • 注销Broker

  • 根据指定topic获取路由信息

  • 获取集群的全量信息(包括每个集群内所有broker,每个broker的master/slave信息)。

  • 去除broker的写入权限。

  • 获取所有topic名字列表。

  • 从namesrv删除指定topic。

  • 获取指定cluster的所有topic信息。

  • 更新namesrv配置。

  • 获取namesrv配置。

4.ClusterTestRequestProcessor:请求处理类。继承自DefaultRequestProcessor。只重写了一个请求处理方法:根据指定topic获取路由信息。里面只增加了一行关键代码。在本namesrv中找不到请求的topic时,向其他namesrv节点发送请求获取。这里就发生了namesrv节点间通信。只是从其他namesrv拿来的topic路由信息直接返回给原始请求方,并不会更新到本地namesrv。

5.BrokerHousekeepingService:连接的监听器。实现了ChannelEventListener接口。监听与Broker建立的通道的状态(空闲、关闭、异常三个状态),并调用相应onChannel****方法。通道的空闲、关闭、异常状态均调用RouteInfoManager.onChannelDestory方法,来更新RouteInfoManager类中的5个HashMap对象。在构建NettyRemotingServer时作为参数传入。

6.RouteInfoManager:namesrv中最重要的类。维护着生产者、消费者活动所需要的所有信息。这些信息存储在5个HashMap里,不会进行持久化处理。

private final HashMap<string >topicQueueTable;

private final HashMap brokerAddrTable;

private final HashMap> lusterAddrTable;

private final HashMap brokerLiveTable;

private final HashMap/* Filter Server */> filterServerTable;

8.NamesrvController:namesrv控制类,控制着namesrv的启动、初始化、停止等生命周期。

9.NamesrvStart:namesrv启动类,也即main函数所在类。用于构建NamesrvController,随后初始化和启动namesrv。


【干货分享】RocketMQ命名服务和路由组件——namesrv解析

3

namesrv源码分析

1.启动流程

namesrv的启动流程并不复杂,参考下图:

【干货分享】RocketMQ命名服务和路由组件——namesrv解析

根据该时序图重点关注几个关键的步骤:

1.构建NamesrvController。时序图表述比较简单,构建NamesrvController详细逻辑为:

(1)根据启动namesrv时的系统参数,构建命令行。

(2)构建NamesrvConfig和NettyServerConfig。主要是将命令行中的参数解析到NamesrvConfig/ NettyServerConfig中。如果命令行中指定了参数文件,也会将文件中的所有配置都解析进去。

(3)最后用上一步中构建的NamesrvConfig/NettyServerConfig 创建NamesrvContorller。

2.NamesrvController的初始化。

(1)调用this.kvConfigManager.load(); 将文件中的配置load到内存中。继续看代码可以看到: 拿到配置文件的路径,通常为xxx/xxx/kvConfig.json,然后读到本地缓存configTable中。

     
       
       
     
  1. public void load() {

  2.    String content = null;

  3.    try {

  4.        content = MixAll.file2String(this.namesrvController.getNamesrvConfig().getKvConfigPath());

  5.    } catch (IOException e) {

  6.        log.warn("Load KV config table exception", e);

  7.    }

  8.    if (content != null) {

  9.        KVConfigSerializeWrapper kvConfigSerializeWrapper =

  10.            KVConfigSerializeWrapper.fromJson(content, KVConfigSerializeWrapper.class);

  11.        if (null != kvConfigSerializeWrapper) {

  12.            this.configTable.putAll(kvConfigSerializeWrapper.getConfigTable());

  13.            log.info("load KV config table OK");

  14.        }

  15.    }

  16. }

(2) 构建NettyRemotingServer。NettyRemotingServer是RocektMQ对Netty的再一次封装。它负责和客户端进行RPC通信。

(3) 构建一个默认8个线程的线程池 remotingExecutor。这个线程池就是最终处理业务的线程所在的线程池。 也就是运行DefaultRequestProcessor的processRequest方法的线程池。暂且称它为M2。

(4) 注册处理器。namesrv定义了2种处理器DefaultRequestProcessor和ClusterTestRequestProcessor,后者继承了前者,重写了一个方法getRouteInfoByTopic。即在当前namesrv中拿不到指定topic的路由信息时会到别的namesrv去拿。注册处理器就是在NettyRemotingServer中构建一个 protected PairdefaultRequestProcessor;其中NettyRequestProcessor就是DefaultRequestProcessor,ExecutorService就是remotingExecutor。

     
       
       
     
  1. private void registerProcessor() {

  2.    if (namesrvConfig.isClusterTest()) {

  3.        this.remotingServer.registerDefaultProcessor(new ClusterTestRequestProcessor(this, namesrvConfig.getProductEnvName()),

  4.        this.remotingExecutor);

  5.    } else {

  6.        this.remotingServer.registerDefaultProcessor(new DefaultRequestProcessor(this), this.remotingExecutor);

  7.    }

  8. }  

(5)下面是启动2个定时任务,分别来扫描notActive的broker并移除,打印所有的configTable(前面构建NamesrvController时从内存 加载的)。 需要注意的是这两个定时任务共用一个单线程的线程池scheduledExecutorService。

3.NamesrvController的启动(start方法)

其实就是调用RemotingService的start方法,逻辑如下:

(1) 构建一个线程数为8的线程池defaultEventExecutorGroup(内有8个执行器EventExecutor,每个执行器都只有一个线程)。 这个线程池组暂且称它为M1。后面会详细讲到M1是做什么的。

(2) 使用构建器模式构建netty server端引导类。首先是指明要用的Reactor线程模型,这里的eventLoopGroupBoss为只有一个线程的线程池(暂且叫它1),eventLoopGroupSelector是有 3个线程的线程池(暂且叫它n)。然后指定了Channel类型。下面的几个参数是对端口、TCP机制的设定,这里就不作分析了。然后是指定感兴趣的ip(对连接来的客户端做限制)以及监听端口。

childHandler(new ChannelInitializer() 是重点。在新的连接被接受时,新的Channel被创建,这里的ChannelInitializer就是对这个Channel进行初始化。可以看到在这个Channel的pipeline里添加了几个ChannelHandler,并且这些ChannelHandler共用一个线程池defaultEventExecutorGroup来执行逻辑,也就是在(1)中创建的M1。这些ChannelHandler分别关注不同的I/O事件,当对应的I/O事件被触发后会调用执行里面的方法。对于最后一个NettyServerHandler,最终会被defaultEventExecutorGroup线程池再委托给remotingExecutor来执行(还记得在做NamesrvController的初始化时构建了一个8个线程的线程池M2吗),最终会执行DefaultRequestProcessor的processRequest方法,对请求做真正的处理。

(3)netty server端启动:this.serverBootstrap.bind().sync()。

以上逻辑分析对应的代码如下:

     
       
       
     
  1. @Override

  2. public void start() {

  3.    this.defaultEventExecutorGroup = new DefaultEventExecutorGroup(

  4.    nettyServerConfig.getServerWorkerThreads(),

  5.    new ThreadFactory() {

  6.        private AtomicInteger threadIndex = new AtomicInteger(0);

  7.        @Override

  8.        public Thread newThread(Runnable r) {

  9.            return new Thread(r, "NettyServerCodecThread_" + this.threadIndex.incrementAndGet());

  10.        }

  11.    });

  12. ServerBootstrap childHandler =

  13.    this.serverBootstrap.group(this.eventLoopGroupBoss, this.eventLoopGroupSelector)

  14.        .channel(useEpoll() ? EpollServerSocketChannel.class : NioserverSocketChannel.class)

  15.        .option(ChannelOption.SO_BACKLOG, 1024)

  16.        .option(ChannelOption.SO_REUSEADDR, true)

  17.        .option(ChannelOption.SO_KEEPALIVE, false)

  18.        .childOption(ChannelOption.TCP_NODELAY, true)

  19.        .childOption(ChannelOption.SO_SNDBUF, nettyServerConfig.getServerSocketSndBufSize())

  20.        .childOption(ChannelOption.SO_RCVBUF, nettyServerConfig.getServerSocketRcvBufSize())

  21.        .localAddress(new InetSocketAddress(this.nettyServerConfig.getListenPort()))

  22.        .childHandler(new ChannelInitializer<SocketChannel>() {

  23.            @Override

  24.            public void initChannel(SocketChannel ch) throws Exception {

  25.                ch.pipeline()

  26.                    .addLast(defaultEventExecutorGroup, HANDSHAKE_HANDLER_NAME,

  27.                        new HandshakeHandler(TlsSystemConfig.tlsMode))

  28.                    .addLast(defaultEventExecutorGroup,

  29.                        new NettyEncoder(),

  30.                        new NettyDecoder(),

  31.                        new IdleStateHandler(0, 0, nettyServerConfig.getServerChannelMaxIdleTimeSeconds()),

  32.                        new NettyConnectManageHandler(),

  33.                        new NettyServerHandler()

  34.                    );

  35.            }

  36.        });

  37. if (nettyServerConfig.isServerPooledByteBufAllocatorEnable()) {

  38.    childHandler.childOption(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT);

  39. }

  40. try {

  41.    ChannelFuture sync = this.serverBootstrap.bind().sync();

  42.    InetSocketAddress addr = (InetSocketAddress) sync.channel().localAddress();

  43.    this.port = addr.getPort();

  44. } catch (InterruptedException e1) {

  45.    throw new RuntimeException("this.serverBootstrap.bind().sync() InterruptedException", e1);

  46. }

  47. if (this.channelEventListener != null) {

  48. this.nettyEventExecutor.start();

  49. }

  50. this.timer.scheduleAtFixedRate(new TimerTask() {

  51.    @Override

  52.    public void run() {

  53.        try {

  54.            NettyRemotingServer.this.scanResponseTable();

  55.        } catch (Throwable e) {

  56.            log.error("scanResponseTable exception", e);

  57.        }

  58.    }

  59.    }, 1000 * 3, 1000);

  60. }

2.namesrv使用的线程模型:

Reactor多线程模型(1+n+M1+M2)

通过namesrv的启动流程,可以看到其中创建了许多的线程池(1,n,M1,M2),来执行客户端请求或者是执行定时任务。这就是我们下面要讨论的namesrv的RPC通信+业务处理的线程模型:1+n+M1+M2线程模型。

namesrv(RocektMQ的其他组件也是)的PRC通信采用Netty, Netty的设计就是Reactor线程模型(Reactor的三种线程模型感兴趣的同学可以学习下),但Netty不是采用的传统的Reactor多线程模型(1+n,即一个线程接收连接、一组线程处理IO事件),而是在此基础上加以扩展,发展成了1+n+M1+M2。

下面的图描述了线程之间的关系以及分工。

【干货分享】RocketMQ命名服务和路由组件——namesrv解析

1.其中的1是eventLoopGroupBoss,他的任务是负责监听并接受来自客户端的TCP连接请求。连接建立好以后马上扔给eventLoopGroupSelector,也就是n。

2.eventLoopGroupSelector工作就是在前一步的基础上将建立好的Channel注冊到EventLoop上(这里是NioEventLoop),对应到java底层就是调用NIO接口将SocketChannel注册到selector上。然后监听网络数据,拿到网络数据后,丢给defaultEventExecutorGroup(M1)线程池。

3.defaultEventExecutorGroup主要负责处理网络通信相关的,具体是编码/解码、空闲链接管理、网络连接管理以及网络请求处理,分别对应如下几个ChannelHandler:NettyEncoder/NettyDecoder、IdleStateHandler、NettyConnectManageHandler、NettyServerHandler。M1在处理好这些任务后,将最终处理业务逻辑的工作交给M2,remotingExecutor。

     
       
       
     
  1. ServerBootstrap childHandler =

  2.    this.serverBootstrap.group(this.eventLoopGroupBoss, this.eventLoopGroupSelector)

  3.        .channel(useEpoll() ? EpollServerSocketChannel.class : NioServerSocketChannel.class)

  4.        .option(ChannelOption.SO_BACKLOG, 1024)

  5.        .option(ChannelOption.SO_REUSEADDR, true)

  6.        .option(ChannelOption.SO_KEEPALIVE, false)

  7.        .childOption(ChannelOption.TCP_NODELAY, true)

  8.        .childOption(ChannelOption.SO_SNDBUF, nettyServerConfig.getServerSocketSndBufSize())

  9.        .childOption(ChannelOption.SO_RCVBUF, nettyServerConfig.getServerSocketRcvBufSize())

  10.        .localAddress(new InetSocketAddress(this.nettyServerConfig.getListenPort()))

  11.        .childHandler(new ChannelInitializer<SocketChannel>() {

  12.            @Override

  13.            public void initChannel(SocketChannel ch) throws Exception {

  14.                ch.pipeline()

  15.                    .addLast(defaultEventExecutorGroup, HANDSHAKE_HANDLER_NAME,

  16.                        new HandshakeHandler(TlsSystemConfig.tlsMode))

  17.                    .addLast(defaultEventExecutorGroup,

  18.                        new NettyEncoder(), //编码

  19.                        new NettyDecoder(), //解码

  20.                        new IdleStateHandler(0, 0, nettyServerConfig.getServerChannelMaxIdleTimeSeconds()), //空闲连接管理

  21.                        new NettyConnectManageHandler(), //网络连接管理

  22.                        new NettyServerHandler() //网络请求处理

  23.                    );

  24.            }

  25.        });

其中最后一个NettyServerHandler是进行网络请求处理的ChannelHandler。代码如下:

     
       
       
     
  1. class NettyServerHandler extends SimpleChannelInboundHandler<RemotingCommand> {

  2.    @Override

  3.    protected void channelRead0(ChannelHandlerContext ctx, RemotingCommand msg) throws Exception {

  4.        processMessageReceived(ctx, msg);

  5.    }

  6. }

再追进去:

     
       
       
     
  1. public void processMessageReceived(ChannelHandlerContext ctx, RemotingCommand msg) throws Exception {

  2.    final RemotingCommand cmd = msg;

  3.    if (cmd != null) {

  4.        switch (cmd.getType()) {

  5.            case REQUEST_COMMAND:

  6.                processRequestCommand(ctx, cmd);

  7.                break;

  8.            case RESPONSE_COMMAND:

  9.                processResponseCommand(ctx, cmd);

  10.                break;

  11.            default:

  12.                break;

  13.        }

  14.    }

  15. }

可以看到根据当前是Client端(接收到的是回复)还是Server端(接收到的是请求),而进行不同处理。以Server端为例:

     
       
       
     
  1. public void processRequestCommand(final ChannelHandlerContext ctx, final RemotingCommand cmd) {

  2.    final Pair<NettyRequestProcessor, ExecutorService> matched = this.processorTable.get(cmd.getCode());

  3.    final Pair<NettyRequestProcessor, ExecutorService> pair = null == matched ? this.defaultRequestProcessor : matched;

  4.    final int opaque = cmd.getOpaque();

  5.    if (pair != null) {

  6.        //将请求封装成可执行任务

  7.        Runnable run = new Runnable() {

  8.            @Override

  9.            public void run() {

  10.                try {

  11.                    //调用DefaultRequestProcessorprocessRequest方法执行业务逻辑

  12.                    final RemotingCommand response = pair.getObject1().processRequest(ctx, cmd);

  13.                    //省略代码

  14.                } catch (Throwable e) {

  15.                    //省略代码

  16.                }

  17.            }

  18.        };

  19.        //省略代码

  20.        try {

  21.            //将上面的可执行任务再封装为RequestTask,放到线程池remotingExecutor中。

  22.            final RequestTask requestTask = new RequestTask(run, ctx.channel(), cmd);

  23.            pair.getObject2().submit(requestTask);

  24.        } catch (RejectedExecutionException e) {

  25.            //省略代码

  26.        }

  27.    }

  28. }

上面的代码中getObject2就是remotingExecutor(M2),也就是此时将后续逻辑交给了它处理。

4.remotingExecutor处理业务逻辑。根据上一步的分析,最终执行的其实就是DefaultRequestProcessor的processRequest方法。


3.namesrv中服务线程总结

【干货分享】RocketMQ命名服务和路由组件——namesrv解析

4

【干货分享】RocketMQ命名服务和路由组件——namesrv解析

1.编程式设定

     
       
       
     
  1. DefaultMQProducer producer = new DefaultMQProducer("please_rename_unique_group_name");

  2. producer.setNamesrvAddr("name-server1-ip:port;name-server2-ip:port");

  3. DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("please_rename_unique_group_name");

  4. consumer.setNamesrvAddr("name-server1-ip:port;name-server2-ip:port");

     
       
       
     
  1. sh mqadmin command-name -n name-server-ip1:port;name-server-ip2:port -X OTHER-OPTION

2.JVM/Java启动参数中指定

     
       
       
     
  1. -Drocketmq.namesrv.addr=name-server1-ip:port;name-server2-ip:port

【干货分享】RocketMQ命名服务和路由组件——namesrv解析

【干货分享】RocketMQ命名服务和路由组件——namesrv解析

3.环境变量中指定

【干货分享】RocketMQ命名服务和路由组件——namesrv解析

前面2和3主要用于在本地通过源码启动RocketMQ时采用。

4.http方式获取

     
       
       
     
  1. http://jmenv.tbsite.net:8080/rocketmq/nsaddr

producer/consumer客户端启动时会从配置的namesrv列表中随机选择一个建立TCP连接。broker端则会和所有的namesrv建立长连接。 注意,以上4种方式的优先级是:

1.编程式设定 > 2.JVM/Java启动参数中指定 > 3.环境变量中指定 > 4.http方式获取

【干货分享】RocketMQ命名服务和路由组件——namesrv解析

5

客户端/broker端如何保持与nameserver集群稳定连接

我们知道broker与所有nameserver都保持一个长连接,broker侧topic信息如有变化,则向所有nameserver都发送消息。而客户端(生产者和消费者)只是跟某一台nameserver节点保持联系。

假定一个场景,如果某个broker的topic配置发生了变化,它向所有nameserver发布通知,但是此时如果某一台nameserver推送失败(超时或者挂掉了),则nameserver集群之间的信息是不一致的,这种情况下RocketMQ如何保证数据的一致性?

再假定一个场景,如果客户端从namesrv拉取topic路由信息时,namesrv挂掉导致原先一直保持的长连接断开,客户端如何保证能获取到最新的路由信息?

让我们来看看RocektMQ是如何应对这些场景的。首先RocketMQ在进行任何RPC请求之前都会检查长连接的状态,如果channel的状态不OK,会尝试进行重连。

对以上2种场景分来来讲。第一个场景,broker与namesrv。如果namesrv没有真的挂掉,只是瞬间网络不稳定没有响应,经过重连后长连接恢复,则不会产生数据不一致。如果网络不稳定时间比较长,导致本次topic上报没有成功,在下一次上报(broker会定期上报给namesrv)之前长连接恢复,namesrv会接受到上次没接收到的信息。这期间会出现短暂的数据不一致。所以namesrv之间确实是弱一致性,这是namesrv提供高性能所牺牲的地方。即使namesrv真的挂掉,在恢复后broker与namesrv的连接也会自动重连。

第二个场景,客户端与namesrv。如果与该客户端一直保持长连接状态的namesrv挂掉。客户端会有failover(故障转移)机制。当检查到长连接无效后则会自动连接其他namesrv(轮询机制)。

6

【干货分享】RocketMQ命名服务和路由组件——namesrv解析

为何弃zookeeper重新开发namesrv

rocketmq设计的早期阶段是很大程度参考借鉴了kafka的,比如早期的rocketmq也是依赖zookeeper来进行集群管理,提供服务发现和服务治理能力。但现在的版本去掉了zookeeper,重新设计开发了更加轻量的namesrv。rocketmq为什么要重新造轮子呢?要理解这一点,我们还要对zookeeper和kafka都有一定的认识。

1.Kafka分区原理

首先我们通过一张图来了解一下kafka。 

【干货分享】RocketMQ命名服务和路由组件——namesrv解析

登场人物介绍:

  • broker:一台kafka服务器就是一个broker。和RocektMQ中的broker概念类似,不同的是RocektMQ中的一个broker可能对应于多个服务器(一个Master复数个Slave)。

  • zookeeper:对应RocketMQ中的namesrv概念,是服务发现和路由角色。除了保存broker/topic/partition等路由信息外,还负责为每个partition选举产生leader。

  • topic:标识一类消息的逻辑名字。

  • partition:分区。kafka引入这个概念是为了解决2个问题,性能和高可用。首先,如果某个topic的消息只存在一个broker中,这个broker就会成为并发瓶颈,无法水平扩展。自然而然就会想到将消息分布到整个集群,提高并发。由此引入分区的概念。这是一个物理概念,每个分区对应一个文件夹,文件夹中存储具体的消息。每个分区可以指定多个副本,其中一个为leader partition,其他为follower partition,这就是为了解决高可用问题了,当leader partition所在的broker宕机时,分布于其他broker上的follower partition同样拥有完整的消息信息。

  • producer:生产者,和RocektMQ中的生产者没有分别。

在上图中,topicA共有3个partition,每个partition有3个副本,分别分布在broker1~broker3中。对于partition1,生产者发消息时自然不是向3个副本都发送,而是向其中一个leader partition发送,然后leader partition采用同步或异步的方式将消息复制给其他的follower partition。那这个leader partition是如何产生的?又是在哪里维护的?生产者是从哪儿得知leader partition是在哪个broker上,从而准确的将消息发送给该broker?

这就是zookeeper所做的了。zookeeper会保存每个partition的信息,并且会选举产生一个leader partition。当生产者发送消息时,首先会根据策略计算出发往哪一个partition,然后根据从zookeeper获取的分区信息,找到该partition的leader partition所在的broker并将消息发往该broker。当某个有leader partition的broker宕机时,由zookeeper负责选出新的leader partition并通知给生产者。

2.RocketMQ主从架构

RocketMQ的架构在文章的开始已经叙述。broker一般采用Master-Slave方式。即在部署阶段已经明确主从节点,主从节点也不会发生切换。生产者只会和主节点通信。没有选举动作。

通过以上分析,zookeeper除了提供命名服务外还承担着选举leader的责任。不仅如此,zookeeper在处理数据的逻辑关系上比较繁琐。而RocektMQ不会用到zookeeper中的选举功能,与其使用笨重的zookeeper,短短1000多行代码的namesrv稳定性和性能肯定会更好。

7

总结

文章阐述了namesrv在集群中的作用以及和其他组件的关系,然后以namesrv的启动流程为引分析了关键代码,重点分析了namesrv中使用的1+n+M1+M2线程模型并总结了启动的线程服务,最后对为什么使用namesrv而非zookeeper进行了分析、阐述。文章中的观点如果有理解不到位、表述不清晰的地方,欢迎指出。


END

往期精选

1.

2. 

3. 

以上是关于干货分享RocketMQ命名服务和路由组件——namesrv解析的主要内容,如果未能解决你的问题,请参考以下文章

RocketMQ NameServer

5RocketMQ 源码解析之 命名服务启动

5RocketMQ 源码解析之 命名服务启动

RocketMQ入门到精通— RocketMQ学习入门指南 | RocketMQ服务发现(Name Server)精讲

RocketMQ NameServer如何保证数据最终一致

RocketMQ NameServer如何保证数据最终一致