从dubbo处理视角看Netty处理网络传输原理 -- 粘包与拆包

Posted yougewe

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了从dubbo处理视角看Netty处理网络传输原理 -- 粘包与拆包相关的知识,希望对你有一定的参考价值。

  如今,我们想要开发一个网络应用,那是相当地方便。不过就是引入一个框架,然后设置些参数,然后写写业务代码就搞定了。

  写业务代码自然很重要,但是你知道:

    你的数据是怎么来的吗?通过网络传输过来的呗。

    你知道网络是通过什么方式传输过来的吗?光纤呗,TCP/IP协议呗。

  看起来都难不住我们的同学们,但是,以上问题都不是我们关注的重点,我们今天要关注的是,TCP.IP协议是如何把数据传输到我们的应用服务器,而且准确地交到对应的业务代码手上的?

  我们也不关注TCP协议的三次握手四次挥手,我们只需要确认一点,那就是TCP.IP协议是流式传输的,即数据是源源不断地从客户端传递到服务端的,而应用层是如何知道这些数据是什么的呢?当然这是上层的应用协议要做的事,比如http,smtp,ftp等等。

  抛开其他不说,咱们使用 netty 来开发应用程序时,netty本身就承担了一个高层应用协议的角色,所以,我们可以从它是怎么识别这些传输过来的数据的过程,来一窥应用层协议的端倪。

  其实大的方向都很简单,即客户端使用一种序列化协议将数据序列化,然后通过网络传输到服务端,然后服务端使用相应的反序列化协议,将数据解出来,再交给业务程序就好了。

  所以,看起来好像只是一个序列化反序列化的问题而已。但如果是这样,咱们今天就不用再想这个问题了。

  我们要考虑的是,客户端发送的数据是一次性到达服务端的吗?如果是这样,那太简单了,直接获取数据主好了。但是,如果我们要发送的数据非常大,TCP.IP能支持一下子传输吗?这是不可能的,TCP有一个MSS最大报文长度限制,超过这个之后,就必须进行拆分发送了。(粘包与拆包,太专业了)

我们来看下netty是如何处理这些相关数据的?

  

在dubbo中,是如何处利用netty理数据拆分的呢?
首先,我们看下dubbo创建netty的方式: (主要添加几个编码器解码器,以及handler)

    // org.apache.dubbo.remoting.transport.netty4.NettyServer
    @Override
    protected void doOpen() throws Throwable 
        bootstrap = new ServerBootstrap();

        bossGroup = new NioEventLoopGroup(1, new DefaultThreadFactory("NettyServerBoss", true));
        workerGroup = new NioEventLoopGroup(getUrl().getPositiveParameter(Constants.IO_THREADS_KEY, Constants.DEFAULT_IO_THREADS),
                new DefaultThreadFactory("NettyServerWorker", true));

        final NettyServerHandler nettyServerHandler = new NettyServerHandler(getUrl(), this);
        channels = nettyServerHandler.getChannels();

        bootstrap.group(bossGroup, workerGroup)
                .channel(NioserverSocketChannel.class)
                .childOption(ChannelOption.TCP_NODELAY, Boolean.TRUE)
                .childOption(ChannelOption.SO_REUSEADDR, Boolean.TRUE)
                .childOption(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT)
                .childHandler(new ChannelInitializer<NioSocketChannel>() 
                    @Override
                    protected void initChannel(NioSocketChannel ch) throws Exception 
                        // FIXME: should we use getTimeout()?
                        int idleTimeout = UrlUtils.getIdleTimeout(getUrl());
                        NettyCodecAdapter adapter = new NettyCodecAdapter(getCodec(), getUrl(), NettyServer.this);
                        ch.pipeline()//.addLast("logging",new LoggingHandler(LogLevel.INFO))//for debug
                                .addLast("decoder", adapter.getDecoder())
                                .addLast("encoder", adapter.getEncoder())
                                .addLast("server-idle-handler", new IdleStateHandler(0, 0, idleTimeout, MILLISECONDS))
                                .addLast("handler", nettyServerHandler);
                    
                );
        // bind
        ChannelFuture channelFuture = bootstrap.bind(getBindAddress());
        channelFuture.syncUninterruptibly();
        channel = channelFuture.channel();

    
    

  其实netty的使用就是这么简单,你只需定义你的协议,你的handler就可以了,其他复杂的底层工作,一概不管!

我们首先来看netty是如何监听网络数据到来的?(基于 nio 绑定端口连接)

    // io.netty.channel.socket.nio.NioServerSocketChannel
    // 绑定socket服务到 nio channel 上 
    @Override
    protected void doBind(SocketAddress localAddress) throws Exception 
        if (PlatformDependentVersion() >= 7) 
            javaChannel().bind(localAddress, config.getBacklog());
         else 
            javaChannel().socket().bind(localAddress, config.getBacklog());
        
    
    
    @Override
    protected ServerSocketChannel javaChannel() 
        return (ServerSocketChannel) superChannel();
    

  所以,其实自己写 nio 的 server/client 可能也不会太难吧,但是你要应用的各种异常情况太多,就不见得能把握好了。

 

netty 的线程模型是  reactor 模型,有一个事件循环过程

    // io.netty.channel.nio.NioEventLoop
    // eventLoop 扫描事件
    @Override
    protected void run() 
        for (;;) 
            try 
                switch (selectStrategy.calculateStrategy(selectNowSupplier, hasTasks())) 
                    case SelectStrategy.CONTINUE:
                        continue;
                    case SelectStrategy.SELECT:
                        select(wakenUp.getAndSet(false));

                        // ‘wakenUp.compareAndSet(false, true)‘ is always evaluated
                        // before calling ‘selector.wakeup()‘ to reduce the wake-up
                        // overhead. (Selector.wakeup() is an expensive operation.)
                        //
                        // However, there is a race condition in this approach.
                        // The race condition is triggered when ‘wakenUp‘ is set to
                        // true too early.
                        //
                        // ‘wakenUp‘ is set to true too early if:
                        // 1) Selector is waken up between ‘wakenUp.set(false)‘ and
                        //    ‘selector.select(...)‘. (BAD)
                        // 2) Selector is waken up between ‘selector.select(...)‘ and
                        //    ‘if (wakenUp.get())  ... ‘. (OK)
                        //
                        // In the first case, ‘wakenUp‘ is set to true and the
                        // following ‘selector.select(...)‘ will wake up immediately.
                        // Until ‘wakenUp‘ is set to false again in the next round,
                        // ‘wakenUp.compareAndSet(false, true)‘ will fail, and therefore
                        // any attempt to wake up the Selector will fail, too, causing
                        // the following ‘selector.select(...)‘ call to block
                        // unnecessarily.
                        //
                        // To fix this problem, we wake up the selector again if wakenUp
                        // is true immediately after selector.select(...).
                        // It is inefficient in that it wakes up the selector for both
                        // the first case (BAD - wake-up required) and the second case
                        // (OK - no wake-up required).

                        if (wakenUp.get()) 
                            selector.wakeup();
                        
                    default:
                        // fallthrough
                

                cancelledKeys = 0;
                needsToSelectAgain = false;
                final int ioRatio = this.ioRatio;
                if (ioRatio == 100) 
                    try 
                        processSelectedKeys();
                     finally 
                        // Ensure we always run tasks.
                        runAllTasks();
                    
                 else 
                    final long ioStartTime = System.nanoTime();
                    try 
                        // 处理事件
                        processSelectedKeys();
                     finally 
                        // Ensure we always run tasks.
                        final long ioTime = System.nanoTime() - ioStartTime;
                        runAllTasks(ioTime * (100 - ioRatio) / ioRatio);
                    
                
             catch (Throwable t) 
                handleLoopException(t);
            
            // Always handle shutdown even if the loop processing threw an exception.
            try 
                if (isShuttingDown()) 
                    closeAll();
                    if (confirmShutdown()) 
                        return;
                    
                
             catch (Throwable t) 
                handleLoopException(t);
            
        
    

    // 处理事件
    private void processSelectedKeys() 
        if (selectedKeys != null) 
            // 使用selectKeys进行处理
            processSelectedKeysOptimized();
         else 
            processSelectedKeysPlain(selector.selectedKeys());
        
    

    private void processSelectedKeysOptimized() 
        for (int i = 0; i < selectedKeys.size; ++i) 
            final SelectionKey k = selectedKeys.keys[i];
            // null out entry in the array to allow to have it GC‘ed once the Channel close
            // See https://github.com.netty.netty.issues/2363
            selectedKeys.keys[i] = null;

            final Object a = k.attachment();

            if (a instanceof AbstractNioChannel) 
                // ...
                processSelectedKey(k, (AbstractNioChannel) a);
             else 
                @SuppressWarnings("unchecked")
                NioTask<SelectableChannel> task = (NioTask<SelectableChannel>) a;
                processSelectedKey(k, task);
            

            if (needsToSelectAgain) 
                // null out entries in the array to allow to have it GC‘ed once the Channel close
                // See https://github.com.netty.netty.issues/2363
                selectedKeys.reset(i + 1);

                selectAgain();
                i = -1;
            
        
    

    private void processSelectedKey(SelectionKey k, AbstractNioChannel ch) 
        final AbstractNioChannel.NioUnsafe unsafe = ch.unsafe();
        if (!k.isValid()) 
            final EventLoop eventLoop;
            try 
                eventLoop = ch.eventLoop();
             catch (Throwable ignored) 
                // If the channel implementation throws an exception because there is no event loop, we ignore this
                // because we are only trying to determine if ch is registered to this event loop and thus has authority
                // to close ch.
                return;
            
            // Only close ch if ch is still registered to this EventLoop. ch could have deregistered from the event loop
            // and thus the SelectionKey could be cancelled as part of the deregistration process, but the channel is
            // still healthy and should not be closed.
            // See https://github.com.netty.netty.issues/5125
            if (eventLoop != this || eventLoop == null) 
                return;
            
            // close the channel if the key is not valid anymore
            unsafe.close(unsafe.voidPromise());
            return;
        

        try 
            int readyOps = k.readyOps();
            // We first need to call finishConnect() before try to trigger a read(...) or write(...) as otherwise
            // the NIO JDK channel implementation may throw a NotYetConnectedException.
            if ((readyOps & SelectionKey.OP_CONNECT) != 0) 
                // remove OP_CONNECT as otherwise Selector.select(..) will always return without blocking
                // See https://github.com.netty.netty.issues/924
                int ops = k.interestOps();
                ops &= ~SelectionKey.OP_CONNECT;
                k.interestOps(ops);

                unsafe.finishConnect();
            

            // Process OP_WRITE first as we may be able to write some queued buffers and so free memory.
            if ((readyOps & SelectionKey.OP_WRITE) != 0) 
                // Call forceFlush which will also take care of clear the OP_WRITE once there is nothing left to write
                ch.unsafe().forceFlush();
            

            // Also check for readOps of 0 to workaround possible JDK bug which may otherwise lead
            // to a spin loop
            // 读取数据,由 unsafe 类进行循环数据读取
            if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) 
                unsafe.read();
            
         catch (CancelledKeyException ignored) 
            unsafe.close(unsafe.voidPromise());
        
    

    // io.netty.channel.nio.AbstractNioMessageChannel
    // 处理真正的读数据过程
    private final class NioMessageUnsafe extends AbstractNioUnsafe 

        private final List<Object> readBuf = new ArrayList<Object>();

        @Override
        public void read() 
            assert eventLoop().inEventLoop();
            final ChannelConfig config = config();
            final ChannelPipeline pipeline = pipeline();
            final RecvByteBufAllocator.Handle allocHandle = unsafe().recvBufAllocHandle();
            allocHandle.reset(config);

            boolean closed = false;
            Throwable exception = null;
            try 
                try 
                    // 循环读取数据,将数据读取到 readBuf 中
                    do 
                        int localRead = doReadMessages(readBuf);
                        if (localRead == 0) 
                            break;
                        
                        if (localRead < 0) 
                            closed = true;
                            break;
                        
                        // 记录被读取了多少次数据了
                        allocHandle.incMessagesRead(localRead);
                     while (allocHandle.continueReading());
                 catch (Throwable t) 
                    exception = t;
                

                int size = readBuf.size();
                for (int i = 0; i < size; i ++) 
                    readPending = false;
                    // 依次调用管道进行处理
                    pipeline.fireChannelRead(readBuf.get(i));
                
                readBuf.clear();
                allocHandle.readComplete();
                pipeline.fireChannelReadComplete();

                if (exception != null) 
                    closed = closeOnReadError(exception);

                    pipeline.fireExceptionCaught(exception);
                

                if (closed) 
                    inputShutdown = true;
                    if (isOpen()) 
                        close(voidPromise());
                    
                
             finally 
                // Check if there is a readPending which was not processed yet.
                // This could be for two reasons:
                // * The user called Channel.read() or ChannelHandlerContext.read() in channelRead(...) method
                // * The user called Channel.read() or ChannelHandlerContext.read() in channelReadComplete(...) method
                //
                // See https://github.com.netty.netty.issues/2254
                if (!readPending && !config.isAutoRead()) 
                    removeReadOp();
                
            
        
    

    // io.netty.channel.socket.nio.NioServerSocketChannel
    @Override
    protected int doReadMessages(List<Object> buf) throws Exception 
        SocketChannel ch = SocketUtils.accept(javaChannel());

        try 
            if (ch != null) 
                buf.add(new NioSocketChannel(this, ch));
                return 1;
            
         catch (Throwable t) 
            logger.warn("Failed to create a new channel from an accepted socket.", t);

            try 
                ch.close();
             catch (Throwable t2) 
                logger.warn("Failed to close a socket.", t2);
            
        

        return 0;
    


    // io.netty.channel.DefaultChannelPipeline
    @Override
    public final ChannelPipeline fireChannelRead(Object msg) 
        AbstractChannelHandlerContext.invokeChannelRead(head, msg);
        return this;
    

    // io.netty.channel.AbstractChannelHandlerContext
    static void invokeChannelRead(final AbstractChannelHandlerContext next, Object msg) 
        final Object m = next.pipeline.touch(ObjectUtil.checkNotNull(msg, "msg"), next);
        EventExecutor executor = next.executor();
        // 在处理中,则直接调用,否则放入线程池运行
        if (executor.inEventLoop()) 
            next.invokeChannelRead(m);
         else 
            executor.execute(new Runnable() 
                @Override
                public void run() 
                    next.invokeChannelRead(m);
                
            );
        
    
    private void invokeChannelRead(Object msg) 
        if (invokeHandler()) 
            try 
                // 调用入站处理器读取消息
                ((ChannelInboundHandler) handler()).channelRead(this, msg);
             catch (Throwable t) 
                notifyHandlerException(t);
            
         else 
            fireChannelRead(msg);
        
    

    final class HeadContext extends AbstractChannelHandlerContext
            implements ChannelOutboundHandler, ChannelInboundHandler 

        private final Unsafe unsafe;

        HeadContext(DefaultChannelPipeline pipeline) 
            super(pipeline, null, HEAD_NAME, false, true);
            unsafe = pipeline.channel().unsafe();
            setAddComplete();
        

        @Override
        public ChannelHandler handler() 
            return this;
        

        @Override
        public void handlerAdded(ChannelHandlerContext ctx) throws Exception 
            // NOOP
        

        @Override
        public void handlerRemoved(ChannelHandlerContext ctx) throws Exception 
            // NOOP
        

        @Override
        public void bind(
                ChannelHandlerContext ctx, SocketAddress localAddress, ChannelPromise promise)
                throws Exception 
            unsafe.bind(localAddress, promise);
        

        @Override
        public void connect(
                ChannelHandlerContext ctx,
                SocketAddress remoteAddress, SocketAddress localAddress,
                ChannelPromise promise) throws Exception 
            unsafe.connect(remoteAddress, localAddress, promise);
        

        @Override
        public void disconnect(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception 
            unsafe.disconnect(promise);
        

        @Override
        public void close(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception 
            unsafe.close(promise);
        

        @Override
        public void deregister(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception 
            unsafe.deregister(promise);
        

        @Override
        public void read(ChannelHandlerContext ctx) 
            unsafe.beginRead();
        

        @Override
        public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception 
            unsafe.write(msg, promise);
        

        @Override
        public void flush(ChannelHandlerContext ctx) throws Exception 
            unsafe.flush();
        

        @Override
        public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception 
            ctx.fireExceptionCaught(cause);
        

        @Override
        public void channelRegistered(ChannelHandlerContext ctx) throws Exception 
            invokeHandlerAddedIfNeeded();
            ctx.fireChannelRegistered();
        

        @Override
        public void channelUnregistered(ChannelHandlerContext ctx) throws Exception 
            ctx.fireChannelUnregistered();

            // Remove all handlers sequentially if channel is closed and unregistered.
            if (!channel.isOpen()) 
                destroy();
            
        

        @Override
        public void channelActive(ChannelHandlerContext ctx) throws Exception 
            ctx.fireChannelActive();

            readIfIsAutoRead();
        

        @Override
        public void channelInactive(ChannelHandlerContext ctx) throws Exception 
            ctx.fireChannelInactive();
        

        @Override
        public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception 
            ctx.fireChannelRead(msg);
        

        @Override
        public void channelReadComplete(ChannelHandlerContext ctx) throws Exception 
            ctx.fireChannelReadComplete();

            readIfIsAutoRead();
        

        private void readIfIsAutoRead() 
            if (channel.config().isAutoRead()) 
                channel.read();
            
        

        @Override
        public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception 
            ctx.fireUserEventTriggered(evt);
        

        @Override
        public void channelWritabilityChanged(ChannelHandlerContext ctx) throws Exception 
            ctx.fireChannelWritabilityChanged();
        
    



    // DefaultChannelPipeline
    // io.netty.channel.AbstractChannelHandlerContext
    @Override
    public ChannelHandlerContext fireChannelRead(final Object msg) 
        invokeChannelRead(findContextInbound(), msg);
        return this;
    
    
    private AbstractChannelHandlerContext findContextInbound() 
        AbstractChannelHandlerContext ctx = this;
        do 
            ctx = ctx.next;
         while (!ctx.inbound);
        return ctx;
    

    static void invokeChannelRead(final AbstractChannelHandlerContext next, Object msg) 
        final Object m = next.pipeline.touch(ObjectUtil.checkNotNull(msg, "msg"), next);
        EventExecutor executor = next.executor();
        if (executor.inEventLoop()) 
            next.invokeChannelRead(m);
         else 
            executor.execute(new Runnable() 
                @Override
                public void run() 
                    next.invokeChannelRead(m);
                
            );
        
    
    
    
        // io.netty.bootstrap.ServerBootstrap$ServerBootstrapAcceptor
        @Override
        @SuppressWarnings("unchecked")
        public void channelRead(ChannelHandlerContext ctx, Object msg) 
            final Channel child = (Channel) msg;

            child.pipeline().addLast(childHandler);

            setChannelOptions(child, childOptions, logger);

            for (Entry<AttributeKey<?>, Object> e: childAttrs) 
                child.attr((AttributeKey<Object>) e.getKey()).set(e.getValue());
            

            try 
                childGroup.register(child).addListener(new ChannelFutureListener() 
                    @Override
                    public void operationComplete(ChannelFuture future) throws Exception 
                        if (!future.isSuccess()) 
                            forceClose(child, future.cause());
                        
                    
                );
             catch (Throwable t) 
                forceClose(child, t);
            
        
    // DefaultChannelPipeline 
    @Override
    public final ChannelPipeline addLast(ChannelHandler... handlers) 
        return addLast(null, handlers);
    

    @Override
    public final ChannelPipeline addLast(EventExecutorGroup executor, ChannelHandler... handlers) 
        if (handlers == null) 
            throw new NullPointerException("handlers");
        

        for (ChannelHandler h: handlers) 
            if (h == null) 
                break;
            
            addLast(executor, null, h);
        

        return this;
    
    
    @Override
    public final ChannelPipeline addLast(EventExecutorGroup group, String name, ChannelHandler handler) 
        final AbstractChannelHandlerContext newCtx;
        synchronized (this) 
            checkMultiplicity(handler);

            newCtx = newContext(group, filterName(name, handler), handler);

            // 添加到pipeline的尾部
            addLast0(newCtx);

            // If the registered is false it means that the channel was not registered on an eventloop yet.
            // In this case we add the context to the pipeline and add a task that will call
            // ChannelHandler.handlerAdded(...) once the channel is registered.
            if (!registered) 
                newCtx.setAddPending();
                callHandlerCallbackLater(newCtx, true);
                return this;
            

            EventExecutor executor = newCtx.executor();
            if (!executor.inEventLoop()) 
                newCtx.setAddPending();
                executor.execute(new Runnable() 
                    @Override
                    public void run() 
                        callHandlerAdded0(newCtx);
                    
                );
                return this;
            
        
        callHandlerAdded0(newCtx);
        return this;
    

    private void addLast0(AbstractChannelHandlerContext newCtx) 
        AbstractChannelHandlerContext prev = tail.prev;
        newCtx.prev = prev;
        newCtx.next = tail;
        prev.next = newCtx;
        tail.prev = newCtx;
    


    // NioEventLoopGroup
    // io.netty.channel.MultithreadEventLoopGroup
    @Override
    public ChannelFuture register(Channel channel) 
        return next().register(channel);
    

    private static final class PowerOfTwoEventExecutorChooser implements EventExecutorChooser 
        private final AtomicInteger idx = new AtomicInteger();
        private final EventExecutor[] executors;

        PowerOfTwoEventExecutorChooser(EventExecutor[] executors) 
            this.executors = executors;
        

        @Override
        public EventExecutor next() 
            return executors[idx.getAndIncrement() & executors.length - 1];
        
    
    
    // io.netty.channel.SingleThreadEventLoop
    @Override
    public ChannelFuture register(Channel channel) 
        return register(new DefaultChannelPromise(channel, this));
    
    
    @Override
    public ChannelFuture register(final ChannelPromise promise) 
        ObjectUtil.checkNotNull(promise, "promise");
        // 此处注册好之后,就会开启另外的线程池来处理数据了
        promise.channel().unsafe().register(this, promise);
        return promise;
    
    
        // io.netty.channel.AbstractChannel $ AbstractUnsafe
        @Override
        public final void register(EventLoop eventLoop, final ChannelPromise promise) 
            if (eventLoop == null) 
                throw new NullPointerException("eventLoop");
            
            if (isRegistered()) 
                promise.setFailure(new IllegalStateException("registered to an event loop already"));
                return;
            
            if (!isCompatible(eventLoop)) 
                promise.setFailure(
                        new IllegalStateException("incompatible event loop type: " + eventLoop.getClass().getName()));
                return;
            

            AbstractChannel.this.eventLoop = eventLoop;

            if (eventLoop.inEventLoop()) 
                register0(promise);
             else 
                try 
                    eventLoop.execute(new Runnable() 
                        @Override
                        public void run() 
                            register0(promise);
                        
                    );
                 catch (Throwable t) 
                    logger.warn(
                            "Force-closing a channel whose registration task was not accepted by an event loop: ",
                            AbstractChannel.this, t);
                    closeForcibly();
                    closeFuture.setClosed();
                    safeSetFailure(promise, t);
                
            
        


    // io.netty.util.concurrent.SingleThreadEventExecutor
    @Override
    public void execute(Runnable task) 
        if (task == null) 
            throw new NullPointerException("task");
        

        boolean inEventLoop = inEventLoop();
        if (inEventLoop) 
            addTask(task);
         else 
            startThread();
            addTask(task);
            if (isShutdown() && removeTask(task)) 
                reject();
            
        

        if (!addTaskWakesUp && wakesUpForTask(task)) 
            // 唤醒下一次接收数据
            wakeup(inEventLoop);
        
    

    private void startThread() 
        if (STATE_UPDATER.get(this) == ST_NOT_STARTED) 
            if (STATE_UPDATER.compareAndSet(this, ST_NOT_STARTED, ST_STARTED)) 
                // 把事件放入到另一个线程池处理, 一个阶段处理结束
                doStartThread();
            
        
    

 

开启新的线程处理逻辑

    // 开启新的线程处理逻辑
    // 把事件放入到另一个线程池处理
    private void doStartThread() 
        assert thread == null;
        executor.execute(new Runnable() 
            @Override
            public void run() 
                thread = Thread.currentThread();
                if (interrupted) 
                    thread.interrupt();
                

                boolean success = false;
                updateLastExecutionTime();
                try 
                    SingleThreadEventExecutor.this.run();
                    success = true;
                 catch (Throwable t) 
                    logger.warn("Unexpected exception from an event executor: ", t);
                 finally 
                    for (;;) 
                        int oldState = STATE_UPDATER.get(SingleThreadEventExecutor.this);
                        if (oldState >= ST_SHUTTING_DOWN || STATE_UPDATER.compareAndSet(
                                SingleThreadEventExecutor.this, oldState, ST_SHUTTING_DOWN)) 
                            break;
                        
                    

                    // Check if confirmShutdown() was called at the end of the loop.
                    if (success && gracefulShutdownStartTime == 0) 
                        logger.error("Buggy " + EventExecutor.class.getSimpleName() + " implementation; " +
                                SingleThreadEventExecutor.class.getSimpleName() + ".confirmShutdown() must be called " +
                                "before run() implementation terminates.");
                    

                    try 
                        // Run all remaining tasks and shutdown hooks.
                        for (;;) 
                            if (confirmShutdown()) 
                                break;
                            
                        
                     finally 
                        try 
                            cleanup();
                         finally 
                            STATE_UPDATER.set(SingleThreadEventExecutor.this, ST_TERMINATED);
                            threadLock.release();
                            if (!taskQueue.isEmpty()) 
                                logger.warn(
                                        "An event executor terminated with " +
                                                "non-empty task queue (" + taskQueue.size() + ‘)‘);
                            

                            terminationFuture.setSuccess(null);
                        
                    
                
            
        );
    

public final class ThreadPerTaskExecutor implements Executor 
    private final ThreadFactory threadFactory;

    public ThreadPerTaskExecutor(ThreadFactory threadFactory) 
        if (threadFactory == null) 
            throw new NullPointerException("threadFactory");
        
        this.threadFactory = threadFactory;
    

    @Override
    public void execute(Runnable command) 
        threadFactory.newThread(command).start();
    


    @Override
    protected void wakeup(boolean inEventLoop) 
        if (!inEventLoop && wakenUp.compareAndSet(false, true)) 
            selector.wakeup();
        
    
    
    
实际解析数据信息是在 fireChannelRead 时触发的。

    @Override
    public final ChannelPipeline fireChannelRead(Object msg) 
        AbstractChannelHandlerContext.invokeChannelRead(head, msg);
        return this;
    
    // 从 inBound 入站链中依次调用 channelRead() 方法
    static void invokeChannelRead(final AbstractChannelHandlerContext next, Object msg) 
        final Object m = next.pipeline.touch(ObjectUtil.checkNotNull(msg, "msg"), next);
        EventExecutor executor = next.executor();
        if (executor.inEventLoop()) 
            next.invokeChannelRead(m);
         else 
            executor.execute(new Runnable() 
                @Override
                public void run() 
                    next.invokeChannelRead(m);
                
            );
        
    
    private void invokeChannelRead(Object msg) 
        if (invokeHandler()) 
            try 
                ((ChannelInboundHandler) handler()).channelRead(this, msg);
             catch (Throwable t) 
                notifyHandlerException(t);
            
         else 
            fireChannelRead(msg);
        
    

        // HeadContext
        @Override
        public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception 
            ctx.fireChannelRead(msg);
        
    
    // AbstractChannelHandlerContext
    @Override
    public ChannelHandlerContext fireChannelRead(final Object msg) 
        invokeChannelRead(findContextInbound(), msg);
        return this;
    
    
    // io.netty.handler.codec.ByteToMessageDecoder 
    // 我们对数据的解析由这个类进行处理
    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception 
        if (msg instanceof ByteBuf) 
            CodecOutputList out = CodecOutputList.newInstance();
            try 
                ByteBuf data = (ByteBuf) msg;
                first = cumulation == null;
                // 针对多次到来的包,进行重新计算
                if (first) 
                    cumulation = data;
                 else 
                    cumulation = cumulator.cumulate(ctx.alloc(), cumulation, data);
                
                // 调用解码方法
                callDecode(ctx, cumulation, out);
             catch (DecoderException e) 
                throw e;
             catch (Throwable t) 
                throw new DecoderException(t);
             finally 
                if (cumulation != null && !cumulation.isReadable()) 
                    numReads = 0;
                    cumulation.release();
                    cumulation = null;
                 else if (++ numReads >= discardAfterReads) 
                    // We did enough reads already try to discard some bytes so we not risk to see a OOME.
                    // See https://github.com.netty.netty.issues/4275
                    numReads = 0;
                    discardSomeReadBytes();
                

                int size = out.size();
                decodeWasNull = !out.insertSinceRecycled();
                // 如果解析到数据,就会往下一个 InBound 节点传
                fireChannelRead(ctx, out, size);
                out.recycle();
            
         else 
            ctx.fireChannelRead(msg);
        
    

    /**
     * Called once data should be decoded from the given @link ByteBuf. This method will call
     * @link #decode(ChannelHandlerContext, ByteBuf, List) as long as decoding should take place.
     *
     * @param ctx           the @link ChannelHandlerContext which this @link ByteToMessageDecoder belongs to
     * @param in            the @link ByteBuf from which to read data
     * @param out           the @link List to which decoded messages should be added
     */
    protected void callDecode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) 
        try 
            // 只要有可用的数据,会一直循环调用 decode 方法
            while (in.isReadable()) 
                int outSize = out.size();

                if (outSize > 0) 
                    fireChannelRead(ctx, out, outSize);
                    out.clear();

                    // Check if this handler was removed before continuing with decoding.
                    // If it was removed, it is not safe to continue to operate on the buffer.
                    //
                    // See:
                    // - https://github.com.netty.netty.issues/4635
                    if (ctx.isRemoved()) 
                        break;
                    
                    outSize = 0;
                

                int oldInputLength = in.readableBytes();
                // 调用自行实现的 decode 方法,实现数据的组装
                // 通过添加多个 pipeline 来实现业务的处理
                decode(ctx, in, out);

                // Check if this handler was removed before continuing the loop.
                // If it was removed, it is not safe to continue to operate on the buffer.
                //
                // See https://github.com.netty.netty.issues/1664
                if (ctx.isRemoved()) 
                    break;
                

                if (outSize == out.size()) 
                    if (oldInputLength == in.readableBytes()) 
                        break;
                     else 
                        continue;
                    
                

                if (oldInputLength == in.readableBytes()) 
                    throw new DecoderException(
                            StringUtil.simpleClassName(getClass()) +
                            ".decode() did not read anything but decoded a message.");
                

                if (isSingleDecode()) 
                    break;
                
            
         catch (DecoderException e) 
            throw e;
         catch (Throwable cause) 
            throw new DecoderException(cause);
        
    


    /**
     * Get @code numElements out of the @link CodecOutputList and forward these through the pipeline.
     */
    static void fireChannelRead(ChannelHandlerContext ctx, CodecOutputList msgs, int numElements) 
        // 每个解析到的元素都会调用一次 fireChannelRead
        for (int i = 0; i < numElements; i ++) 
            ctx.fireChannelRead(msgs.getUnsafe(i));
        
    

 

如果自己来写这个组装包的逻辑,可能会是这样的:(仅仅是等到所有数据都到后,再传入下一个处理器即可)

    @Override
    protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception 
        if (in.readableBytes() < 4) 
            return;
        
        in.markReaderIndex();
        int dataLength = in.readInt();
        // 如果整个包还没完整,则等待下次调用
        if (in.readableBytes() < dataLength) 
            in.resetReaderIndex();
            return;
        
        byte[] data = new byte[dataLength];
        in.readBytes(data);

        Object obj = JSON.parseObject(data, target); 
        out.add(obj);
    
    

 

针对外部多次调入站程序的方法,通过 cumulate 方法组装数据

    // 针对外部多次调入站程序的方法,通过 cumulate 方法组装数据
    
    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception 
        if (msg instanceof ByteBuf) 
            CodecOutputList out = CodecOutputList.newInstance();
            try 
                ByteBuf data = (ByteBuf) msg;
                first = cumulation == null;
                if (first) 
                    cumulation = data;
                 else 
                    // 合并数据
                    cumulation = cumulator.cumulate(ctx.alloc(), cumulation, data);
                
                callDecode(ctx, cumulation, out);
             catch (DecoderException e) 
                throw e;
             catch (Throwable t) 
                throw new DecoderException(t);
             finally 
                if (cumulation != null && !cumulation.isReadable()) 
                    numReads = 0;
                    cumulation.release();
                    cumulation = null;
                 else if (++ numReads >= discardAfterReads) 
                    // We did enough reads already try to discard some bytes so we not risk to see a OOME.
                    // See https://github.com.netty.netty.issues/4275
                    numReads = 0;
                    discardSomeReadBytes();
                

                int size = out.size();
                decodeWasNull = !out.insertSinceRecycled();
                fireChannelRead(ctx, out, size);
                out.recycle();
            
         else 
            ctx.fireChannelRead(msg);
        
    

    /**
     * Cumulate @link ByteBufs by merge them into one @link ByteBuf‘s, using memory copies.
     */
    public static final Cumulator MERGE_CUMULATOR = new Cumulator() 
        @Override
        public ByteBuf cumulate(ByteBufAllocator alloc, ByteBuf cumulation, ByteBuf in) 
            final ByteBuf buffer;
            if (cumulation.writerIndex() > cumulation.maxCapacity() - in.readableBytes()
                    || cumulation.refCnt() > 1 || cumulation.isReadOnly()) 
                // Expand cumulation (by replace it) when either there is not more room in the buffer
                // or if the refCnt is greater then 1 which may happen when the user use slice().retain() or
                // duplicate().retain() or if its read-only.
                //
                // See:
                // - https://github.com.netty.netty.issues/2327
                // - https://github.com.netty.netty.issues/1764
                buffer = expandCumulation(alloc, cumulation, in.readableBytes());
             else 
                buffer = cumulation;
            
            buffer.writeBytes(in);
            in.release();
            return buffer;
        
    ;
    

 

下面我们来看下 dubbo 是如何进行数据包的组装的呢?(NEED_MORE_INPUT 的应用)

// Decoder 处理逻辑
    // org.apache.dubbo.remoting.transport.netty4.NettyCodecAdapter
    private class InternalDecoder extends ByteToMessageDecoder 

        @Override
        protected void decode(ChannelHandlerContext ctx, ByteBuf input, List<Object> out) throws Exception 

            ChannelBuffer message = new NettyBackedChannelBuffer(input);

            NettyChannel channel = NettyChannel.getOrAddChannel(ctx.channel(), url, handler);

            try 
                // decode object.
                do 
                    int saveReaderIndex = message.readerIndex();
                    Object msg = codec.decode(channel, message);
                    // 只要遇到 NEED_MORE_INPUT 标识,则不会算本次接收完成,等待下一次回调
                    // 此处会先交给一连串的 codec 处理
                    if (msg == Codec2.DecodeResult.NEED_MORE_INPUT) 
                        message.readerIndex(saveReaderIndex);
                        break;
                     else 
                        //is it possible to go here ?
                        if (saveReaderIndex == message.readerIndex()) 
                            throw new IOException("Decode without read data.");
                        
                        if (msg != null) 
                            out.add(msg);
                        
                    
                 while (message.readable());
             finally 
                NettyChannel.removeChannelIfDisconnected(ctx.channel());
            
        
    
    
    // org.apache.dubbo.rpc.protocol.dubbo.DubboCountCodec
    @Override
    public Object decode(Channel channel, ChannelBuffer buffer) throws IOException 
        int save = buffer.readerIndex();
        MultiMessage result = MultiMessage.create();
        do 
            Object obj = codec.decode(channel, buffer);
            if (Codec2.DecodeResult.NEED_MORE_INPUT == obj) 
                buffer.readerIndex(save);
                break;
             else 
                result.addMessage(obj);
                logMessageLength(obj, buffer.readerIndex() - save);
                save = buffer.readerIndex();
            
         while (true);
        if (result.isEmpty()) 
            return Codec2.DecodeResult.NEED_MORE_INPUT;
        
        if (result.size() == 1) 
            return result.get(0);
        
        return result;
    

    // org.apache.dubbo.remoting.exchange.codec.ExchangeCodec
    @Override
    public Object decode(Channel channel, ChannelBuffer buffer) throws IOException 
        int readable = buffer.readableBytes();
        // 可以看到,每个包都会有一个包头,只要解析出来,就可以知道它的类型,长度了
        byte[] header = new byte[Math.min(readable, HEADER_LENGTH)];
        buffer.readBytes(header);
        return decode(channel, buffer, readable, header);
    

    @Override
    protected Object decode(Channel channel, ChannelBuffer buffer, int readable, byte[] header) throws IOException 
        // check magic number.
        if (readable > 0 && header[0] != MAGIC_HIGH
                || readable > 1 && header[1] != MAGIC_LOW) 
            int length = header.length;
            if (header.length < readable) 
                header = Bytes.copyOf(header, readable);
                buffer.readBytes(header, length, readable - length);
            
            for (int i = 1; i < header.length - 1; i++) 
                if (header[i] == MAGIC_HIGH && header[i + 1] == MAGIC_LOW) 
                    buffer.readerIndex(buffer.readerIndex() - header.length + i);
                    header = Bytes.copyOf(header, i);
                    break;
                
            
            return super.decode(channel, buffer, readable, header);
        
        // check length.
        if (readable < HEADER_LENGTH) 
            return DecodeResult.NEED_MORE_INPUT;
        

        // get data length.
        int len = Bytes.bytes2int(header, 12);
        checkPayload(channel, len);

        // 只要数据未达到要求的长度,则返回 NEED_MORE_INPUT
        int tt = len + HEADER_LENGTH;
        if (readable < tt) 
            return DecodeResult.NEED_MORE_INPUT;
        

        // limit input stream.
        ChannelBufferInputStream is = new ChannelBufferInputStream(buffer, len);

        try 
            return decodeBody(channel, is, header);
         finally 
            if (is.available() > 0) 
                try 
                    if (logger.isWarnEnabled()) 
                        logger.warn("Skip input stream " + is.available());
                    
                    StreamUtils.skipUnusedStream(is);
                 catch (IOException e) 
                    logger.warn(e.getMessage(), e);
                
            
        
    
    
    // org.apache.dubbo.remoting.telnet.codec.TelnetCodec
    @SuppressWarnings("unchecked")
    protected Object decode(Channel channel, ChannelBuffer buffer, int readable, byte[] message) throws IOException 
        if (isClientSide(channel)) 
            return toString(message, getCharset(channel));
        
        checkPayload(channel, readable);
        if (message == null || message.length == 0) 
            return DecodeResult.NEED_MORE_INPUT;
        

        if (message[message.length - 1] == ‘\b‘)  // Windows backspace echo
            try 
                boolean doublechar = message.length >= 3 && message[message.length - 3] < 0; // double byte char
                channel.send(new String(doublechar ? new byte[]32, 32, 8, 8 : new byte[]32, 8, getCharset(channel).name()));
             catch (RemotingException e) 
                throw new IOException(StringUtils.toString(e));
            
            return DecodeResult.NEED_MORE_INPUT;
        

        for (Object command : EXIT) 
            if (isEquals(message, (byte[]) command)) 
                if (logger.isInfoEnabled()) 
                    logger.info(new Exception("Close channel " + channel + " on exit command: " + Arrays.toString((byte[]) command)));
                
                channel.close();
                return null;
            
        

        boolean up = endsWith(message, UP);
        boolean down = endsWith(message, DOWN);
        if (up || down) 
            LinkedList<String> history = (LinkedList<String>) channel.getAttribute(HISTORY_LIST_KEY);
            if (CollectionUtils.isEmpty(history)) 
                return DecodeResult.NEED_MORE_INPUT;
            
            Integer index = (Integer) channel.getAttribute(HISTORY_INDEX_KEY);
            Integer old = index;
            if (index == null) 
                index = history.size() - 1;
             else 
                if (up) 
                    index = index - 1;
                    if (index < 0) 
                        index = history.size() - 1;
                    
                 else 
                    index = index + 1;
                    if (index > history.size() - 1) 
                        index = 0;
                    
                
            
            if (old == null || !old.equals(index)) 
                channel.setAttribute(HISTORY_INDEX_KEY, index);
                String value = history.get(index);
                if (old != null && old >= 0 && old < history.size()) 
                    String ov = history.get(old);
                    StringBuilder buf = new StringBuilder();
                    for (int i = 0; i < ov.length(); i++) 
                        buf.append("\b");
                    
                    for (int i = 0; i < ov.length(); i++) 
                        buf.append(" ");
                    
                    for (int i = 0; i < ov.length(); i++) 
                        buf.append("\b");
                    
                    value = buf.toString() + value;
                
                try 
                    channel.send(value);
                 catch (RemotingException e) 
                    throw new IOException(StringUtils.toString(e));
                
            
            return DecodeResult.NEED_MORE_INPUT;
        
        for (Object command : EXIT) 
            if (isEquals(message, (byte[]) command)) 
                if (logger.isInfoEnabled()) 
                    logger.info(new Exception("Close channel " + channel + " on exit command " + command));
                
                channel.close();
                return null;
            
        
        byte[] enter = null;
        for (Object command : ENTER) 
            if (endsWith(message, (byte[]) command)) 
                enter = (byte[]) command;
                break;
            
        
        if (enter == null) 
            return DecodeResult.NEED_MORE_INPUT;
        
        LinkedList<String> history = (LinkedList<String>) channel.getAttribute(HISTORY_LIST_KEY);
        Integer index = (Integer) channel.getAttribute(HISTORY_INDEX_KEY);
        channel.removeAttribute(HISTORY_INDEX_KEY);
        if (CollectionUtils.isNotEmpty(history) && index != null && index >= 0 && index < history.size()) 
            String value = history.get(index);
            if (value != null) 
                byte[] b1 = value.getBytes();
                byte[] b2 = new byte[b1.length + message.length];
                System.arraycopy(b1, 0, b2, 0, b1.length);
                System.arraycopy(message, 0, b2, b1.length, message.length);
                message = b2;
            
        
        String result = toString(message, getCharset(channel));
        if (result.trim().length() > 0) 
            if (history == null) 
                history = new LinkedList<String>();
                channel.setAttribute(HISTORY_LIST_KEY, history);
            
            if (history.isEmpty()) 
                history.addLast(result);
             else if (!result.equals(history.getLast())) 
                history.remove(result);
                history.addLast(result);
                if (history.size() > 10) 
                    history.removeFirst();
                
            
        
        return result;
    

  所以,其实 dubbo 实现拆包的方式,也是依赖于 netty, 通过判定数据长度来决定是否包已到齐的。

  同样,根据数据长度,也可以解决粘包问题,因为从头里指定的长度,即可知道数据到哪里时已取完,从而将粘在一起的包分开。

  

  以上就是基于netty的TCP数据包的处理问题,也是一个简单的应用层协议处理过程,使我们可以更直接地了解应用层协议的处理过程。

  当然,对于上面的基于数据长度进行数据包判定,会存在一些问题:

    1. 当数据包很大时,将会阻塞其他请求;
    2. 当数据包很大时,将会占用大量内存;
    3. 同一连接中,不可能存在数据包的乱序传输;(TCP是否支持乱序、混合包传输?这是个问题)

  当然,以上协议并不处理这种情况,针对大数据量请求,我们可以在客户端做好分包请求,从而减轻压力。

 

唠叨: 看透本质。

 

以上是关于从dubbo处理视角看Netty处理网络传输原理 -- 粘包与拆包的主要内容,如果未能解决你的问题,请参考以下文章

自然语言处理从CNN视角看在自然语言处理上的应用

看 Netty 在 Dubbo 中如何应用

Dubbo3中服务端线程模型,线程处理(基于Dubbo3)

3. Dubbo服务提供端请求处理流程

Netty架构原理,不怕你看不懂!

从统计视角看第二语言教学和自然语言处理的共同本质