Netty协议-Rocket MQ之NettyRemotingClient/Server

Posted 爱叨叨的程序狗

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Netty协议-Rocket MQ之NettyRemotingClient/Server相关的知识,希望对你有一定的参考价值。

RocketMQ性能强劲依赖于Netty通讯协议和特定的通讯协议。

通信流程

NettyRemotingAbstract类的内部有一个NettyServerHandler内部类,该类中封装了客户端与服务端交互的基础代码。

processMessageReceived根据cmd类型处理msg信息

    @ChannelHandler.Sharable
    class NettyServerHandler extends SimpleChannelInboundHandler<RemotingCommand> 

        @Override
        protected void channelRead0(ChannelHandlerContext ctx, RemotingCommand msg) throws Exception 
            processMessageReceived(ctx, msg);
        
    


public void processMessageReceived(ChannelHandlerContext ctx, RemotingCommand msg) throws Exception 
        final RemotingCommand cmd = msg;
        if (cmd != null) 
            switch (cmd.getType()) 
                case REQUEST_COMMAND:
                    processRequestCommand(ctx, cmd);
                    break;
                case RESPONSE_COMMAND:
                    processResponseCommand(ctx, cmd);
                    break;
                default:
                    break;
            
        
    

processResponseCommand

 /**
     * Process incoming request command issued by remote peer.
     *
     * @param ctx channel handler context.
     * @param cmd request command.
     */
    public void processRequestCommand(final ChannelHandlerContext ctx, final RemotingCommand cmd) 

                final Pair<NettyRequestProcessor, ExecutorService> matched = this.processorTable.get(cmd.getCode());
        final Pair<NettyRequestProcessor, ExecutorService> pair = null == matched ? this.defaultRequestProcessor : matched;
        final int opaque = cmd.getOpaque();
        if (pair != null) 
            Runnable run = new Runnable() 
                @Override
                public void run() 
                    try 
                        //解析远程(服务端)channel
                        String remoteAddr = RemotingHelper.parseChannelRemoteAddr(ctx.channel());
                        //RPC调用前执行钩子函数
                        doBeforeRpcHooks(remoteAddr, cmd);
                        final RemotingResponseCallback callback = new RemotingResponseCallback() 
                            @Override
                            public void callback(RemotingCommand response) 
                                doAfterRpcHooks(remoteAddr, cmd, response);
                                //判断是否为单向通信
                                if (!cmd.isOnewayRPC()) 
                                    if (response != null) 
                                        response.setOpaque(opaque);
                                        response.markResponseType();
                                        try 
                                            //消息追加到内存后刷入硬盘中
                                            ctx.writeAndFlush(response);
                                         catch (Throwable e) 
                                        
                                     else 
                                    
                                
                            
                        ;
                        if (pair.getObject1() instanceof AsyncNettyRequestProcessor) 
                            AsyncNettyRequestProcessor processor = (AsyncNettyRequestProcessor)pair.getObject1();
                            //异步处理请求
                            processor.asyncProcessRequest(ctx, cmd, callback);
                         else 
                            NettyRequestProcessor processor = pair.getObject1();
                            RemotingCommand response = processor.processRequest(ctx, cmd);
                            callback.callback(response);
                        
                     catch (Throwable e) 
                        log.error("process request exception", e);
                        log.error(cmd.toString());

                        if (!cmd.isOnewayRPC()) 
                            final RemotingCommand response = RemotingCommand.createResponseCommand(RemotingSysResponseCode.SYSTEM_ERROR,
                                RemotingHelper.exceptionSimpleDesc(e));
                            response.setOpaque(opaque);
                            ctx.writeAndFlush(response);
                        
                    
                
            ;


            try 
                final RequestTask requestTask = new RequestTask(run, ctx.channel(), cmd);
                pair.getObject2().submit(requestTask);
             catch (RejectedExecutionException e) 
                if ((System.currentTimeMillis() % 10000) == 0) 
                    log.warn(RemotingHelper.parseChannelRemoteAddr(ctx.channel())
                        + ", too many requests and system thread pool busy, RejectedExecutionException "
                        + pair.getObject2().toString()
                        + " request code: " + cmd.getCode());
                

                if (!cmd.isOnewayRPC()) 
                    //非oneWay方式消息发送,构造发送消息响应信息
                    final RemotingCommand response = RemotingCommand.createResponseCommand(RemotingSysResponseCode.SYSTEM_BUSY,
                        "[OVERLOAD]system busy, start flow control for a while");
                    response.setOpaque(opaque);
                    ctx.writeAndFlush(response);
                    
    ...
    

processResponseCommand

/**
 * Process response from remote peer to the previous issued requests.
 *
 * @param ctx channel handler context.
 * @param cmd response command instance.
 */
public void processResponseCommand(ChannelHandlerContext ctx, RemotingCommand cmd) 
    //opaque被赋值为requestId
    final int opaque = cmd.getOpaque();
    //从map缓存获取正在进行的其中一个请求
    final ResponseFuture responseFuture = responseTable.get(opaque);
    if (responseFuture != null) 
        responseFuture.setResponseCommand(cmd);
        //移除本次请求
        responseTable.remove(opaque);

        if (responseFuture.getInvokeCallback() != null) 
            //执行回调(排序、执行完成)
            executeInvokeCallback(responseFuture);
         else 
            responseFuture.putResponse(cmd);
            responseFuture.release();
        
     else 
        log.warn("receive response, but not matched any request, " + RemotingHelper.parseChannelRemoteAddr(ctx.channel()));
        log.warn(cmd.toString());
    

protected final HashMap<Integer/* request code */, Pair<NettyRequestProcessor, ExecutorService>> processorTable =
new HashMap<Integer, Pair<NettyRequestProcessor, ExecutorService>>(64);

protected final ConcurrentMap<Integer /* opaque */, ResponseFuture> responseTable =
new ConcurrentHashMap<Integer, ResponseFuture>(256);

processorTable :记录全部请求、以及处理器。

responseTable记录所有响应。

更清晰的文件请下载:

点我下载

通讯协议

RocketMQ自定义的私有协议栈都是基于TCP/IP协议,使用Netty的NIO TCP协议栈进行私有协议栈的定制和开发。

RocketMQ协议共分为四个部分:

  • Header data:协议头,数据是序列化【fastjosn】后的json,json的每个key字段都是固定的,
  • body data:请求的二进制实际数据,例如发送消息的网络请求中,Body传输实际的消息内容。
  • Length:消息总长度
  • Header length:序列化类型&消息头长度,第一个字节表示序列化类型,后面三个自己表示消息头长度。

RemotingCommand类封装了通讯消息、编码、解码方式,这些组成了RocketMQ的通讯协议。

//根据自定义协议解析消息头、消息体
    public static RemotingCommand decode(final ByteBuffer byteBuffer) throws RemotingCommandException 
        int length = byteBuffer.limit();
        int oriHeaderLen = byteBuffer.getInt();
        //获取头部报文长度
        int headerLength = getHeaderLength(oriHeaderLen);  
//获取头部报文数据
    byte[] headerData = new byte[headerLength];
    byteBuffer.get(headerData);

    //反序列化解析Header data和RemotingCommand类
    RemotingCommand cmd = headerDecode(headerData, getProtocolType(oriHeaderLen));

    //获取body长度
    int bodyLength = length - 4 - headerLength;
    byte[] bodyData = null;
    if (bodyLength > 0) 
        //获取Body数据
        bodyData = new byte[bodyLength];
        byteBuffer.get(bodyData);
    

    cmd.body = bodyData;

    return cmd;


//根据自定义协议编码消息头、消息体
    public ByteBuffer encode() 
        // 1> header length size
        int length = 4;

        // 2> header data length
        byte[] headerData = this.headerEncode();
        length += headerData.length;

        // 3> body data length
        if (this.body != null) 
            length += body.length;
        

        ByteBuffer result = ByteBuffer.allocate(4 + length);

        // length
        result.putInt(length);

        // header length
        result.put(markProtocolType(headerData.length, serializeTypeCurrentRPC));

        // header data
        result.put(headerData);

        // body data;
        if (this.body != null) 
            result.put(this.body);
        

        result.flip();

        return result;
    

以上是关于Netty协议-Rocket MQ之NettyRemotingClient/Server的主要内容,如果未能解决你的问题,请参考以下文章

Netty协议-Rocket MQ之NettyRemotingClient/Server

rocket MQ

Rocket MQ 1 - 用

Rocket MQ 问题排查命令

Windown10下Rocket MQ 安装

Rocket MQ报错No route info of this topic的问题探究