(1)NameServer:在MQ集群中做的是做命名服务,更新和路由发现 broker服务;

(2)Broker-Master:broker 消息主机服务器;

(3)Broker-Slave:broker 消息从机服务器;



其中,RocketMQ集群的一部分通信如下: (1)Broker启动后需要完成一次将自己注册至NameServer的操作;随后每隔30s时间定期向NameServer上报Topic路由信息; (2)消息生产者Producer作为客户端发送消息时候,需要根据Msg的Topic从本地缓存的TopicPublishInfoTable获取路由信息。如果没有则更新路由信息会从NameServer上重新拉取; (3)消息生产者Producer根据(2)中获取的路由信息选择一个队列(MessageQueue)进行消息发送;Broker作为消息的接收者收消息并落盘存储; 从上面(1)~(3)中可以看出在消息生产者, Broker和NameServer之间都会发生通信(这里只说了MQ的部分通信),因此如何设计一个良好的网络通信模块在MQ中至关重要,它将决定RocketMQ集群整体的消息传输能力与最终的性能。 rocketmq-remoting 模块是 RocketMQ消息队列中负责网络通信的模块,它几乎被其他所有需要网络通信的模块(诸如rocketmq-client、rocketmq-server、rocketmq-namesrv)所依赖和引用。为了实现客户端与服务器之间高效的数据请求与接收,RocketMQ消息队列自定义了通信协议并在Netty的基础之上扩展了通信模块。ps:鉴于RocketMQ的通信模块是建立在Netty基础之上的,因此在阅读RocketMQ的源码之前,读者最好先对Netty的多线程模型、JAVA NIO模型均有一定的了解,这样子理解RocketMQ源码会较为快一些。作者阅读的RocketMQ版本是4.2.0, 依赖的netty版本是4.0.42.Final. RocketMQ的代码结构图如下:


源码部分主要可以分为rocketmq-broker,rocketmq-client,rocketmq-common,rocketmq-filterSrv,rocketmq-namesrv和rocketmq-remoting等模块,通信框架就封装在rocketmq-remoting模块中。 本文主要从RocketMQ的协议格式,消息编解码,通信方式(同步/异步/单向)和具体的发送/接收消息的通信流程来进行阐述等。






  1. void start();

  2. void shutdown();

  3. void registerRPCHook(RPCHook rpcHook);


  1. /**

  2.     * 同RemotingClient端一样

  3.     *

  4.     * @param requestCode

  5.     * @param processor

  6.     * @param executor

  7.     */

  8.    void registerProcessor(final int requestCode, final NettyRequestProcessor processor,

  9.        final ExecutorService executor);

  10.    /**

  11.     * 注册默认的处理器

  12.     *

  13.     * @param processor

  14.     * @param executor

  15.     */

  16.    void registerDefaultProcessor(final NettyRequestProcessor processor, final ExecutorService executor);

  17.    int localListenPort();

  18.    /**

  19.     * 根据请求code来获取不同的处理Pair

  20.     *

  21.     * @param requestCode

  22.     * @return

  23.     */

  24.    Pair<NettyRequestProcessor, ExecutorService> getProcessorPair(final int requestCode);

  25.    /**

  26.     * 同RemotingClient端一样,同步通信,有返回RemotingCommand

  27.     * @param channel

  28.     * @param request

  29.     * @param timeoutMillis

  30.     * @return

  31.     * @throws InterruptedException

  32.     * @throws RemotingSendRequestException

  33.     * @throws RemotingTimeoutException

  34.     */

  35.    RemotingCommand invokeSync(final Channel channel, final RemotingCommand request,

  36.        final long timeoutMillis) throws InterruptedException, RemotingSendRequestException,

  37.        RemotingTimeoutException;

  38.    /**

  39.     * 同RemotingClient端一样,异步通信,无返回RemotingCommand

  40.     *

  41.     * @param channel

  42.     * @param request

  43.     * @param timeoutMillis

  44.     * @param invokeCallback

  45.     * @throws InterruptedException

  46.     * @throws RemotingTooMuchRequestException

  47.     * @throws RemotingTimeoutException

  48.     * @throws RemotingSendRequestException

  49.     */

  50.    void invokeAsync(final Channel channel, final RemotingCommand request, final long timeoutMillis,

  51.        final InvokeCallback invokeCallback) throws InterruptedException,

  52.        RemotingTooMuchRequestException, RemotingTimeoutException, RemotingSendRequestException;

  53.    /**

  54.     * 同RemotingClient端一样,单向通信,诸如心跳包

  55.     *

  56.     * @param channel

  57.     * @param request

  58.     * @param timeoutMillis

  59.     * @throws InterruptedException

  60.     * @throws RemotingTooMuchRequestException

  61.     * @throws RemotingTimeoutException

  62.     * @throws RemotingSendRequestException

  63.     */

  64.    void invokeOneway(final Channel channel, final RemotingCommand request, final long timeoutMillis)

  65.        throws InterruptedException, RemotingTooMuchRequestException, RemotingTimeoutException,

  66.        RemotingSendRequestException;


(4)NettyRemotingClient以及NettyRemotingServer:分别实现了RemotingClient和RemotingServer, 都继承了NettyRemotingAbstract抽象类。RocketMQ中其他的组件(如client、nameServer、broker在进行消息的发送和接收时均使用这两个组件)




Header字段 类型 Request说明 Response说明
code int 请求操作码,应答方根据不同的请求码进行不同的业务处理 应答响应码。0表示成功,非0则表示各种错误
language LanguageCode 请求方实现的语言 应答方实现的语言
version int 请求方程序的版本 应答方程序的版本
opaque int 相当于reqeustId,在同一个连接上的不同请求标识码,与响应消息中的相对应 应答不做修改直接返回
flag int 区分是普通RPC还是onewayRPC得标志 区分是普通RPC还是onewayRPC得标志
remark String 传输自定义文本信息 传输自定义文本信息
extFields HashMap 请求自定义扩展信息 响应自定义扩展信息


  1. [

  2. code=103,//这里的103对应的code就是broker向nameserver注册自己的消息

  3. language=JAVA,

  4. version=137,

  5. opaque=58,//这个就是requestId

  6. flag(B)=0,

  7. remark=null,

  8. extFields={

  9.    brokerId=0,

  10.    clusterName=DefaultCluster,

  11.    brokerAddr=ip1: 10911,

  12.    haServerAddr=ip1: 10912,

  13.    brokerName=LAPTOP-SMF2CKDN

  14. },

  15. serializeTypeCurrentRPC=JSON






(4)消息主体数据:消息主体的二进制字节数据内容; 消息的编码和解码分别在RemotingCommand类的encode和decode方法中完成,下面是消息编码encode方法的具体实现:

  1. public ByteBuffer encode() {

  2.    // 1> header length size

  3.    int length = 4;    //消息总长度

  4.    // 2> header data length

  5.    //将消息头编码成byte[]

  6.    byte[] headerData = this.headerEncode();

  7.    //计算头部长度

  8.    length += headerData.length;              

  9.    // 3> body data length

  10.    if (this.body != null) {

  11.        //消息主体长度

  12.        length += body.length;                

  13.    }

  14.    //分配ByteBuffer, 这边加了4,

  15.    //这是因为在消息总长度的计算中没有将存储头部长度的4个字节计算在内

  16.    ByteBuffer result = ByteBuffer.allocate(4 + length);  

  17.    // length

  18.    //将消息总长度放入ByteBuffer

  19.    result.putInt(length);  

  20.    // header length

  21.    //将消息头长度放入ByteBuffer

  22.    result.put(markProtocolType(headerData.length, serializeTypeCurrentRPC));

  23.    // header data

  24.    //将消息头数据放入ByteBuffer

  25.    result.put(headerData);    

  26.    // body data;

  27.    if (this.body != null) {

  28.        //将消息主体放入ByteBuffer

  29.        result.put(this.body);

  30.    }

  31.    //重置ByteBuffer的position位置

  32.    result.flip();    

  33.    return result;

  34. }

  35.    /**

  36.     * markProtocolType方法是将RPC类型和headerData长度编码放到一个byte[4]数组中

  37.     *

  38.     * @param source

  39.     * @param type

  40.     * @return

  41.     */

  42.    public static byte[] markProtocolType(int source, SerializeType type) {

  43.        byte[] result = new byte[4];

  44.        result[0] = type.getCode();

  45.        //右移16位后再和255与->“16-24位”

  46.        result[1] = (byte) ((source >> 16) & 0xFF);

  47.        //右移8位后再和255与->“8-16位”

  48.        result[2] = (byte) ((source >> 8) & 0xFF);

  49.        //右移0位后再和255与->“8-0位”

  50.        result[3] = (byte) (source & 0xFF);

  51.        return result;

  52.    }


  1. public static RemotingCommand decode(final ByteBuffer byteBuffer) {

  2.        //获取byteBuffer的总长度

  3.        int length = byteBuffer.limit();

  4.        //获取前4个字节,组装int类型,该长度为总长度

  5.        int oriHeaderLen = byteBuffer.getInt();

  6.        //获取消息头的长度,这里和0xFFFFFF做与运算,编码时候的长度即为24位

  7.        int headerLength = getHeaderLength(oriHeaderLen);

  8.        byte[] headerData = new byte[headerLength];

  9.        byteBuffer.get(headerData);

  10.        RemotingCommand cmd = headerDecode(headerData, getProtocolType(oriHeaderLen));

  11.        int bodyLength = length - 4 - headerLength;

  12.        byte[] bodyData = null;

  13.        if (bodyLength > 0) {

  14.            bodyData = new byte[bodyLength];

  15.            byteBuffer.get(bodyData);

  16.        }

  17.        cmd.body = bodyData;

  18.        return cmd;

  19.    }










  1.    /**

  2.     * invokeAsync(异步调用)

  3.     *

  4.     */

  5.    public void invokeAsyncImpl(final Channel channel, final RemotingCommand request, final long timeoutMillis,

  6.        final InvokeCallback invokeCallback)

  7.        throws InterruptedException, RemotingTooMuchRequestException, RemotingTimeoutException, RemotingSendRequestException {

  8.        //相当于request ID, RemotingCommand会为每一个request产生一个request ID, 从0开始, 每次加1

  9.        final int opaque = request.getOpaque();

  10.        boolean acquired = this.semaphoreAsync.tryAcquire(timeoutMillis, TimeUnit.MILLISECONDS);

  11.        if (acquired) {

  12.            final SemaphoreReleaseOnlyOnce once = new SemaphoreReleaseOnlyOnce(this.semaphoreAsync);

  13.            //根据request ID构建ResponseFuture

  14.            final ResponseFuture responseFuture = new ResponseFuture(opaque, timeoutMillis, invokeCallback, once);

  15.            //将ResponseFuture放入responseTable

  16.            this.responseTable.put(opaque, responseFuture);

  17.            try {

  18.                //使用Netty的channel发送请求数据

  19.                channel.writeAndFlush(request).addListener(new ChannelFutureListener() {

  20.                    //消息发送后执行

  21.                    @Override

  22.                    public void operationComplete(ChannelFuture f) throws Exception {

  23.                        if (f.isSuccess()) {

  24.                            //如果发送消息成功给Server,那么这里直接Set后return

  25.                            responseFuture.setSendRequestOK(true);

  26.                            return;

  27.                        } else {

  28.                            responseFuture.setSendRequestOK(false);

  29.                        }

  30.                        responseFuture.putResponse(null);

  31.                        responseTable.remove(opaque);

  32.                        try {

  33.                            //执行回调

  34.                            executeInvokeCallback(responseFuture);

  35.                        } catch (Throwable e) {

  36.                            log.warn("excute callback in writeAndFlush addListener, and callback throw", e);

  37.                        } finally {

  38.                            //释放信号量

  39.                            responseFuture.release();

  40.                        }

  41.                        log.warn("send a request command to channel <{}> failed.", RemotingHelper.parseChannelRemoteAddr(channel));

  42.                    }

  43.                });

  44.            } catch (Exception e) {

  45.                //异常处理

  46.                responseFuture.release();

  47.                log.warn("send a request command to channel <" + RemotingHelper.parseChannelRemoteAddr(channel) + "> Exception", e);

  48.                throw new RemotingSendRequestException(RemotingHelper.parseChannelRemoteAddr(channel), e);

  49.            }

  50.        } else {

  51.            if (timeoutMillis <= 0) {

  52.                throw new RemotingTooMuchRequestException("invokeAsyncImpl invoke too fast");

  53.            } else {

  54.                String info =

  55.                    String.format("invokeAsyncImpl tryAcquire semaphore timeout, %dms, waiting thread nums: %d semaphoreAsyncValue: %d",

  56.                        timeoutMillis,

  57.                        this.semaphoreAsync.getQueueLength(),

  58.                        this.semaphoreAsync.availablePermits()

  59.                    );

  60.                log.warn(info);

  61.                throw new RemotingTimeoutException(info);

  62.            }

  63.        }

  64.    }



  1. protected final ConcurrentHashMap<Integer /* opaque */, ResponseFuture> responseTable

opaque表示请求发起方在同个连接上不同的请求标识代码,每次发送一个消息的时候,可以选择同步阻塞/异步非阻塞的方式。无论是哪种通信方式,都会保存请求操作码至ResponseFuture的Map映射—responseTable中。 (2)ResponseFuture—保存返回响应(包括回调执行方法和信号量)

  1. public ResponseFuture(int opaque, long timeoutMillis, InvokeCallback invokeCallback,

  2.        SemaphoreReleaseOnlyOnce once) {

  3.        this.opaque = opaque;

  4.        this.timeoutMillis = timeoutMillis;

  5.        this.invokeCallback = invokeCallback;

  6.        this.once = once;

  7.    }

对于同步通信来说,第三、四个参数为null;而对于异步通信来说,invokeCallback是在收到消息响应的时候能够根据responseTable找到请求码对应的回调执行方法,semaphore参数用作流控,当多个线程同时往一个连接写数据时可以通过信号量控制permit同时写许可的数量。 (3)异常发送流程处理—定时扫描responseTable本地缓存

在发送消息时候,如果遇到异常情况(比如服务端没有response返回给客户端或者response因网络而丢失),上面所述的responseTable的本地缓存Map将会出现堆积情况。这个时候需要一个定时任务来专门做responseTable的清理回收。在RocketMQ的客户端/服务端启动时候会产生一个频率为1s调用一次来的定时任务检查所有的responseTable缓存中的responseFuture变量,判断是否已经得到返回, 并进行相应的处理。

  1. public void scanResponseTable() {

  2.        final List<ResponseFuture> rfList = new LinkedList<ResponseFuture>();

  3.        Iterator<Entry<Integer, ResponseFuture>> it = this.responseTable.entrySet().iterator();

  4.        while (it.hasNext()) {

  5.            Entry<Integer, ResponseFuture> next = it.next();

  6.            ResponseFuture rep = next.getValue();

  7.            if ((rep.getBeginTimestamp() + rep.getTimeoutMillis() + 1000) <= System.currentTimeMillis()) {

  8.                rep.release();

  9.                it.remove();

  10.                rfList.add(rep);

  11.                log.warn("remove timeout request, " + rep);

  12.            }

  13.        }

  14.        for (ResponseFuture rf : rfList) {

  15.            try {

  16.                executeInvokeCallback(rf);

  17.            } catch (Throwable e) {

  18.                log.warn("scanResponseTable, operationComplete Exception", e);

  19.            }

  20.        }

  21.    }



  1. public void processRequestCommand(final ChannelHandlerContext ctx, final RemotingCommand cmd) {

  2.    //根据RemotingCommand中的code获取processor和ExecutorService

  3.    final Pair<NettyRequestProcessor, ExecutorService> matched = this.processorTable.get(cmd.getCode());

  4.    final Pair<NettyRequestProcessor, ExecutorService> pair = null == matched ? this.defaultRequestProcessor : matched;

  5.    final int opaque = cmd.getOpaque();

  6.    if (pair != null) {

  7.        Runnable run = new Runnable() {

  8.            @Override

  9.            public void run() {

  10.                try {

  11.                    //rpc hook

  12.                    RPCHook rpcHook = NettyRemotingAbstract.this.getRPCHook();

  13.                    if (rpcHook != null) {

  14.                        rpcHook.doBeforeRequest(RemotingHelper.parseChannelRemoteAddr(ctx.channel()), cmd);

  15.                    }

  16.                    //processor处理请求

  17.                    final RemotingCommand response = pair.getObject1().processRequest(ctx, cmd);

  18.                    //rpc hook

  19.                    if (rpcHook != null) {

  20.                        rpcHook.doAfterResponse(RemotingHelper.parseChannelRemoteAddr(ctx.channel()), cmd, response);

  21.                    }

  22.                    if (!cmd.isOnewayRPC()) {

  23.                        if (response != null) {

  24.                            response.setOpaque(opaque);

  25.                            response.markResponseType();

  26.                            try {

  27.                                ctx.writeAndFlush(response);

  28.                            } catch (Throwable e) {

  29.                                PLOG.error("process request over, but response failed", e);

  30.                                PLOG.error(cmd.toString());

  31.                                PLOG.error(response.toString());

  32.                            }

  33.                        } else {

  34.                        }

  35.                    }

  36.                } catch (Throwable e) {

  37.                    if (!"com.aliyun.openservices.ons.api.impl.authority.exception.AuthenticationException"

  38.                        .equals(e.getClass().getCanonicalName())) {

  39.                        PLOG.error("process request exception", e);

  40.                        PLOG.error(cmd.toString());

  41.                    }

  42.                    if (!cmd.isOnewayRPC()) {

  43.                        final RemotingCommand response = RemotingCommand.createResponseCommand(RemotingSysResponseCode.SYSTEM_ERROR, //

  44.                            RemotingHelper.exceptionSimpleDesc(e));

  45.                        response.setOpaque(opaque);

  46.                        ctx.writeAndFlush(response);

  47.                    }

  48.                }

  49.            }

  50.        };

  51.        if (pair.getObject1().rejectRequest()) {

  52.            final RemotingCommand response = RemotingCommand.createResponseCommand(RemotingSysResponseCode.SYSTEM_BUSY,

  53.                "[REJECTREQUEST]system busy, start flow control for a while");

  54.            response.setOpaque(opaque);

  55.            ctx.writeAndFlush(response);

  56.            return;

  57.        }

  58.        try {

  59.            //封装requestTask

  60.            final RequestTask requestTask = new RequestTask(run, ctx.channel(), cmd);

  61.            //想线程池提交requestTask

  62.            pair.getObject2().submit(requestTask);

  63.        } catch (RejectedExecutionException e) {

  64.            if ((System.currentTimeMillis() % 10000) == 0) {

  65.                PLOG.warn(RemotingHelper.parseChannelRemoteAddr(ctx.channel()) //

  66.                    + ", too many requests and system thread pool busy, RejectedExecutionException " //

  67.                    + pair.getObject2().toString() //

  68.                    + " request code: " + cmd.getCode());

  69.            }

  70.            if (!cmd.isOnewayRPC()) {

  71.                final RemotingCommand response = RemotingCommand.createResponseCommand(RemotingSysResponseCode.SYSTEM_BUSY,

  72.                    "[OVERLOAD]system busy, start flow control for a while");

  73.                response.setOpaque(opaque);

  74.                ctx.writeAndFlush(response);

  75.            }

  76.        }

  77.    } else {

  78.        String error = " request type " + cmd.getCode() + " not supported";

  79.        //构建response

  80.        final RemotingCommand response =

  81.            RemotingCommand.createResponseCommand(RemotingSysResponseCode.REQUEST_CODE_NOT_SUPPORTED, error);

  82.        response.setOpaque(opaque);

  83.        ctx.writeAndFlush(response);

  84.        PLOG.error(RemotingHelper.parseChannelRemoteAddr(ctx.channel()) + error);

  85.    }

  86. }



  1.    protected final HashMap<Integer/* request code */, Pair<NettyRequestProcessor, ExecutorService>> processorTable =

  2.        new HashMap<Integer, Pair<NettyRequestProcessor, ExecutorService>>(64);




  1. private void sendMessageAsync(

  2.        final String addr,

  3.        final String brokerName,

  4.        final Message msg,

  5.        final long timeoutMillis,

  6.        final RemotingCommand request,

  7.        final SendCallback sendCallback,

  8.        final TopicPublishInfo topicPublishInfo,

  9.        final MQClientInstance instance,

  10.        final int retryTimesWhenSendFailed,

  11.        final AtomicInteger times,

  12.        final SendMessageContext context,

  13.        final DefaultMQProducerImpl producer

  14.    ) throws InterruptedException, RemotingException {

  15.        this.remotingClient.invokeAsync(addr, request, timeoutMillis, new InvokeCallback() {

  16.            @Override

  17.            public void operationComplete(ResponseFuture responseFuture) {

  18.                //先从Server端返回的responseFuture变量中获取RemotingCommand的值

  19.                RemotingCommand response = responseFuture.getResponseCommand();

  20.              if (null == sendCallback && response != null) {

  21.                    try {

  22.                        //Client端处理发送消息的Reponse返回(包括对消息返回体的头部进行解码,

  23.                        //取得“topic”、“BrokerName”、“QueueId”等值)

  24.                        //随后构建sendResult对象并设置Context上下文中

  25.                        SendResult sendResult = MQClientAPIImpl.this.processSendResponse(brokerName, msg, response);

  26.                        if (context != null && sendResult != null) {

  27.                            context.setSendResult(sendResult);

  28.                            context.getProducer().executeSendMessageHookAfter(context);

  29.                        }

  30.                    } catch (Throwable e) {

  31.                    }

  32.                    producer.updateFaultItem(brokerName, System.currentTimeMillis() - responseFuture.getBeginTimestamp(), false);

  33.                    return;

  34.                }

  35.            //省略其他部分代码

  36.            //......

  37. }


  1. public void processResponseCommand(ChannelHandlerContext ctx, RemotingCommand cmd) {

  2.        //从RemotingCommand中获取opaque值

  3.        final int opaque = cmd.getOpaque();‘

  4.        //从本地缓存的responseTable这个Map中取出本次异步通信连接对应的ResponseFuture变量

  5.        final ResponseFuture responseFuture = responseTable.get(opaque);

  6.        if (responseFuture != null) {

  7.            responseFuture.setResponseCommand(cmd);

  8.            responseTable.remove(opaque);

  9.            if (responseFuture.getInvokeCallback() != null) {

  10.                //在这里真正去执行Client注入进来的异步回调方法

  11.                executeInvokeCallback(responseFuture);

  12.            } else {

  13.                //否则释放responseFuture变量

  14.                responseFuture.putResponse(cmd);

  15.                responseFuture.release();

  16.            }

  17.        } else {

  18.            log.warn("receive response, but not matched any request, " + RemotingHelper.parseChannelRemoteAddr(ctx.channel()));

  19.            log.warn(cmd.toString());

  20.        }

  21.    }







