从dubbo处理视角看Netty处理网络传输原理 -- 粘包与拆包
Posted yougewe
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了从dubbo处理视角看Netty处理网络传输原理 -- 粘包与拆包相关的知识,希望对你有一定的参考价值。
如今,我们想要开发一个网络应用,那是相当地方便。不过就是引入一个框架,然后设置些参数,然后写写业务代码就搞定了。
写业务代码自然很重要,但是你知道:
你的数据是怎么来的吗?通过网络传输过来的呗。
你知道网络是通过什么方式传输过来的吗?光纤呗,TCP/IP协议呗。
看起来都难不住我们的同学们,但是,以上问题都不是我们关注的重点,我们今天要关注的是,TCP.IP协议是如何把数据传输到我们的应用服务器,而且准确地交到对应的业务代码手上的?
我们也不关注TCP协议的三次握手四次挥手,我们只需要确认一点,那就是TCP.IP协议是流式传输的,即数据是源源不断地从客户端传递到服务端的,而应用层是如何知道这些数据是什么的呢?当然这是上层的应用协议要做的事,比如http,smtp,ftp等等。
抛开其他不说,咱们使用 netty 来开发应用程序时,netty本身就承担了一个高层应用协议的角色,所以,我们可以从它是怎么识别这些传输过来的数据的过程,来一窥应用层协议的端倪。
其实大的方向都很简单,即客户端使用一种序列化协议将数据序列化,然后通过网络传输到服务端,然后服务端使用相应的反序列化协议,将数据解出来,再交给业务程序就好了。
所以,看起来好像只是一个序列化反序列化的问题而已。但如果是这样,咱们今天就不用再想这个问题了。
我们要考虑的是,客户端发送的数据是一次性到达服务端的吗?如果是这样,那太简单了,直接获取数据主好了。但是,如果我们要发送的数据非常大,TCP.IP能支持一下子传输吗?这是不可能的,TCP有一个MSS最大报文长度限制,超过这个之后,就必须进行拆分发送了。(粘包与拆包,太专业了)
我们来看下netty是如何处理这些相关数据的?
在dubbo中,是如何处利用netty理数据拆分的呢?
首先,我们看下dubbo创建netty的方式: (主要添加几个编码器解码器,以及handler)
// org.apache.dubbo.remoting.transport.netty4.NettyServer @Override protected void doOpen() throws Throwable bootstrap = new ServerBootstrap(); bossGroup = new NioEventLoopGroup(1, new DefaultThreadFactory("NettyServerBoss", true)); workerGroup = new NioEventLoopGroup(getUrl().getPositiveParameter(Constants.IO_THREADS_KEY, Constants.DEFAULT_IO_THREADS), new DefaultThreadFactory("NettyServerWorker", true)); final NettyServerHandler nettyServerHandler = new NettyServerHandler(getUrl(), this); channels = nettyServerHandler.getChannels(); bootstrap.group(bossGroup, workerGroup) .channel(NioserverSocketChannel.class) .childOption(ChannelOption.TCP_NODELAY, Boolean.TRUE) .childOption(ChannelOption.SO_REUSEADDR, Boolean.TRUE) .childOption(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT) .childHandler(new ChannelInitializer<NioSocketChannel>() @Override protected void initChannel(NioSocketChannel ch) throws Exception // FIXME: should we use getTimeout()? int idleTimeout = UrlUtils.getIdleTimeout(getUrl()); NettyCodecAdapter adapter = new NettyCodecAdapter(getCodec(), getUrl(), NettyServer.this); ch.pipeline()//.addLast("logging",new LoggingHandler(LogLevel.INFO))//for debug .addLast("decoder", adapter.getDecoder()) .addLast("encoder", adapter.getEncoder()) .addLast("server-idle-handler", new IdleStateHandler(0, 0, idleTimeout, MILLISECONDS)) .addLast("handler", nettyServerHandler); ); // bind ChannelFuture channelFuture = bootstrap.bind(getBindAddress()); channelFuture.syncUninterruptibly(); channel = channelFuture.channel();
其实netty的使用就是这么简单,你只需定义你的协议,你的handler就可以了,其他复杂的底层工作,一概不管!
我们首先来看netty是如何监听网络数据到来的?(基于 nio 绑定端口连接)
// io.netty.channel.socket.nio.NioServerSocketChannel // 绑定socket服务到 nio channel 上 @Override protected void doBind(SocketAddress localAddress) throws Exception if (PlatformDependentVersion() >= 7) javaChannel().bind(localAddress, config.getBacklog()); else javaChannel().socket().bind(localAddress, config.getBacklog()); @Override protected ServerSocketChannel javaChannel() return (ServerSocketChannel) superChannel();
所以,其实自己写 nio 的 server/client 可能也不会太难吧,但是你要应用的各种异常情况太多,就不见得能把握好了。
netty 的线程模型是 reactor 模型,有一个事件循环过程
// io.netty.channel.nio.NioEventLoop // eventLoop 扫描事件 @Override protected void run() for (;;) try switch (selectStrategy.calculateStrategy(selectNowSupplier, hasTasks())) case SelectStrategy.CONTINUE: continue; case SelectStrategy.SELECT: select(wakenUp.getAndSet(false)); // ‘wakenUp.compareAndSet(false, true)‘ is always evaluated // before calling ‘selector.wakeup()‘ to reduce the wake-up // overhead. (Selector.wakeup() is an expensive operation.) // // However, there is a race condition in this approach. // The race condition is triggered when ‘wakenUp‘ is set to // true too early. // // ‘wakenUp‘ is set to true too early if: // 1) Selector is waken up between ‘wakenUp.set(false)‘ and // ‘selector.select(...)‘. (BAD) // 2) Selector is waken up between ‘selector.select(...)‘ and // ‘if (wakenUp.get()) ... ‘. (OK) // // In the first case, ‘wakenUp‘ is set to true and the // following ‘selector.select(...)‘ will wake up immediately. // Until ‘wakenUp‘ is set to false again in the next round, // ‘wakenUp.compareAndSet(false, true)‘ will fail, and therefore // any attempt to wake up the Selector will fail, too, causing // the following ‘selector.select(...)‘ call to block // unnecessarily. // // To fix this problem, we wake up the selector again if wakenUp // is true immediately after selector.select(...). // It is inefficient in that it wakes up the selector for both // the first case (BAD - wake-up required) and the second case // (OK - no wake-up required). if (wakenUp.get()) selector.wakeup(); default: // fallthrough cancelledKeys = 0; needsToSelectAgain = false; final int ioRatio = this.ioRatio; if (ioRatio == 100) try processSelectedKeys(); finally // Ensure we always run tasks. runAllTasks(); else final long ioStartTime = System.nanoTime(); try // 处理事件 processSelectedKeys(); finally // Ensure we always run tasks. final long ioTime = System.nanoTime() - ioStartTime; runAllTasks(ioTime * (100 - ioRatio) / ioRatio); catch (Throwable t) handleLoopException(t); // Always handle shutdown even if the loop processing threw an exception. try if (isShuttingDown()) closeAll(); if (confirmShutdown()) return; catch (Throwable t) handleLoopException(t); // 处理事件 private void processSelectedKeys() if (selectedKeys != null) // 使用selectKeys进行处理 processSelectedKeysOptimized(); else processSelectedKeysPlain(selector.selectedKeys()); private void processSelectedKeysOptimized() for (int i = 0; i < selectedKeys.size; ++i) final SelectionKey k = selectedKeys.keys[i]; // null out entry in the array to allow to have it GC‘ed once the Channel close // See https://github.com.netty.netty.issues/2363 selectedKeys.keys[i] = null; final Object a = k.attachment(); if (a instanceof AbstractNioChannel) // ... processSelectedKey(k, (AbstractNioChannel) a); else @SuppressWarnings("unchecked") NioTask<SelectableChannel> task = (NioTask<SelectableChannel>) a; processSelectedKey(k, task); if (needsToSelectAgain) // null out entries in the array to allow to have it GC‘ed once the Channel close // See https://github.com.netty.netty.issues/2363 selectedKeys.reset(i + 1); selectAgain(); i = -1; private void processSelectedKey(SelectionKey k, AbstractNioChannel ch) final AbstractNioChannel.NioUnsafe unsafe = ch.unsafe(); if (!k.isValid()) final EventLoop eventLoop; try eventLoop = ch.eventLoop(); catch (Throwable ignored) // If the channel implementation throws an exception because there is no event loop, we ignore this // because we are only trying to determine if ch is registered to this event loop and thus has authority // to close ch. return; // Only close ch if ch is still registered to this EventLoop. ch could have deregistered from the event loop // and thus the SelectionKey could be cancelled as part of the deregistration process, but the channel is // still healthy and should not be closed. // See https://github.com.netty.netty.issues/5125 if (eventLoop != this || eventLoop == null) return; // close the channel if the key is not valid anymore unsafe.close(unsafe.voidPromise()); return; try int readyOps = k.readyOps(); // We first need to call finishConnect() before try to trigger a read(...) or write(...) as otherwise // the NIO JDK channel implementation may throw a NotYetConnectedException. if ((readyOps & SelectionKey.OP_CONNECT) != 0) // remove OP_CONNECT as otherwise Selector.select(..) will always return without blocking // See https://github.com.netty.netty.issues/924 int ops = k.interestOps(); ops &= ~SelectionKey.OP_CONNECT; k.interestOps(ops); unsafe.finishConnect(); // Process OP_WRITE first as we may be able to write some queued buffers and so free memory. if ((readyOps & SelectionKey.OP_WRITE) != 0) // Call forceFlush which will also take care of clear the OP_WRITE once there is nothing left to write ch.unsafe().forceFlush(); // Also check for readOps of 0 to workaround possible JDK bug which may otherwise lead // to a spin loop // 读取数据,由 unsafe 类进行循环数据读取 if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) unsafe.read(); catch (CancelledKeyException ignored) unsafe.close(unsafe.voidPromise()); // io.netty.channel.nio.AbstractNioMessageChannel // 处理真正的读数据过程 private final class NioMessageUnsafe extends AbstractNioUnsafe private final List<Object> readBuf = new ArrayList<Object>(); @Override public void read() assert eventLoop().inEventLoop(); final ChannelConfig config = config(); final ChannelPipeline pipeline = pipeline(); final RecvByteBufAllocator.Handle allocHandle = unsafe().recvBufAllocHandle(); allocHandle.reset(config); boolean closed = false; Throwable exception = null; try try // 循环读取数据,将数据读取到 readBuf 中 do int localRead = doReadMessages(readBuf); if (localRead == 0) break; if (localRead < 0) closed = true; break; // 记录被读取了多少次数据了 allocHandle.incMessagesRead(localRead); while (allocHandle.continueReading()); catch (Throwable t) exception = t; int size = readBuf.size(); for (int i = 0; i < size; i ++) readPending = false; // 依次调用管道进行处理 pipeline.fireChannelRead(readBuf.get(i)); readBuf.clear(); allocHandle.readComplete(); pipeline.fireChannelReadComplete(); if (exception != null) closed = closeOnReadError(exception); pipeline.fireExceptionCaught(exception); if (closed) inputShutdown = true; if (isOpen()) close(voidPromise()); finally // Check if there is a readPending which was not processed yet. // This could be for two reasons: // * The user called Channel.read() or ChannelHandlerContext.read() in channelRead(...) method // * The user called Channel.read() or ChannelHandlerContext.read() in channelReadComplete(...) method // // See https://github.com.netty.netty.issues/2254 if (!readPending && !config.isAutoRead()) removeReadOp(); // io.netty.channel.socket.nio.NioServerSocketChannel @Override protected int doReadMessages(List<Object> buf) throws Exception SocketChannel ch = SocketUtils.accept(javaChannel()); try if (ch != null) buf.add(new NioSocketChannel(this, ch)); return 1; catch (Throwable t) logger.warn("Failed to create a new channel from an accepted socket.", t); try ch.close(); catch (Throwable t2) logger.warn("Failed to close a socket.", t2); return 0; // io.netty.channel.DefaultChannelPipeline @Override public final ChannelPipeline fireChannelRead(Object msg) AbstractChannelHandlerContext.invokeChannelRead(head, msg); return this; // io.netty.channel.AbstractChannelHandlerContext static void invokeChannelRead(final AbstractChannelHandlerContext next, Object msg) final Object m = next.pipeline.touch(ObjectUtil.checkNotNull(msg, "msg"), next); EventExecutor executor = next.executor(); // 在处理中,则直接调用,否则放入线程池运行 if (executor.inEventLoop()) next.invokeChannelRead(m); else executor.execute(new Runnable() @Override public void run() next.invokeChannelRead(m); ); private void invokeChannelRead(Object msg) if (invokeHandler()) try // 调用入站处理器读取消息 ((ChannelInboundHandler) handler()).channelRead(this, msg); catch (Throwable t) notifyHandlerException(t); else fireChannelRead(msg); final class HeadContext extends AbstractChannelHandlerContext implements ChannelOutboundHandler, ChannelInboundHandler private final Unsafe unsafe; HeadContext(DefaultChannelPipeline pipeline) super(pipeline, null, HEAD_NAME, false, true); unsafe = pipeline.channel().unsafe(); setAddComplete(); @Override public ChannelHandler handler() return this; @Override public void handlerAdded(ChannelHandlerContext ctx) throws Exception // NOOP @Override public void handlerRemoved(ChannelHandlerContext ctx) throws Exception // NOOP @Override public void bind( ChannelHandlerContext ctx, SocketAddress localAddress, ChannelPromise promise) throws Exception unsafe.bind(localAddress, promise); @Override public void connect( ChannelHandlerContext ctx, SocketAddress remoteAddress, SocketAddress localAddress, ChannelPromise promise) throws Exception unsafe.connect(remoteAddress, localAddress, promise); @Override public void disconnect(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception unsafe.disconnect(promise); @Override public void close(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception unsafe.close(promise); @Override public void deregister(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception unsafe.deregister(promise); @Override public void read(ChannelHandlerContext ctx) unsafe.beginRead(); @Override public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception unsafe.write(msg, promise); @Override public void flush(ChannelHandlerContext ctx) throws Exception unsafe.flush(); @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception ctx.fireExceptionCaught(cause); @Override public void channelRegistered(ChannelHandlerContext ctx) throws Exception invokeHandlerAddedIfNeeded(); ctx.fireChannelRegistered(); @Override public void channelUnregistered(ChannelHandlerContext ctx) throws Exception ctx.fireChannelUnregistered(); // Remove all handlers sequentially if channel is closed and unregistered. if (!channel.isOpen()) destroy(); @Override public void channelActive(ChannelHandlerContext ctx) throws Exception ctx.fireChannelActive(); readIfIsAutoRead(); @Override public void channelInactive(ChannelHandlerContext ctx) throws Exception ctx.fireChannelInactive(); @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception ctx.fireChannelRead(msg); @Override public void channelReadComplete(ChannelHandlerContext ctx) throws Exception ctx.fireChannelReadComplete(); readIfIsAutoRead(); private void readIfIsAutoRead() if (channel.config().isAutoRead()) channel.read(); @Override public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception ctx.fireUserEventTriggered(evt); @Override public void channelWritabilityChanged(ChannelHandlerContext ctx) throws Exception ctx.fireChannelWritabilityChanged(); // DefaultChannelPipeline // io.netty.channel.AbstractChannelHandlerContext @Override public ChannelHandlerContext fireChannelRead(final Object msg) invokeChannelRead(findContextInbound(), msg); return this; private AbstractChannelHandlerContext findContextInbound() AbstractChannelHandlerContext ctx = this; do ctx = ctx.next; while (!ctx.inbound); return ctx; static void invokeChannelRead(final AbstractChannelHandlerContext next, Object msg) final Object m = next.pipeline.touch(ObjectUtil.checkNotNull(msg, "msg"), next); EventExecutor executor = next.executor(); if (executor.inEventLoop()) next.invokeChannelRead(m); else executor.execute(new Runnable() @Override public void run() next.invokeChannelRead(m); ); // io.netty.bootstrap.ServerBootstrap$ServerBootstrapAcceptor @Override @SuppressWarnings("unchecked") public void channelRead(ChannelHandlerContext ctx, Object msg) final Channel child = (Channel) msg; child.pipeline().addLast(childHandler); setChannelOptions(child, childOptions, logger); for (Entry<AttributeKey<?>, Object> e: childAttrs) child.attr((AttributeKey<Object>) e.getKey()).set(e.getValue()); try childGroup.register(child).addListener(new ChannelFutureListener() @Override public void operationComplete(ChannelFuture future) throws Exception if (!future.isSuccess()) forceClose(child, future.cause()); ); catch (Throwable t) forceClose(child, t); // DefaultChannelPipeline @Override public final ChannelPipeline addLast(ChannelHandler... handlers) return addLast(null, handlers); @Override public final ChannelPipeline addLast(EventExecutorGroup executor, ChannelHandler... handlers) if (handlers == null) throw new NullPointerException("handlers"); for (ChannelHandler h: handlers) if (h == null) break; addLast(executor, null, h); return this; @Override public final ChannelPipeline addLast(EventExecutorGroup group, String name, ChannelHandler handler) final AbstractChannelHandlerContext newCtx; synchronized (this) checkMultiplicity(handler); newCtx = newContext(group, filterName(name, handler), handler); // 添加到pipeline的尾部 addLast0(newCtx); // If the registered is false it means that the channel was not registered on an eventloop yet. // In this case we add the context to the pipeline and add a task that will call // ChannelHandler.handlerAdded(...) once the channel is registered. if (!registered) newCtx.setAddPending(); callHandlerCallbackLater(newCtx, true); return this; EventExecutor executor = newCtx.executor(); if (!executor.inEventLoop()) newCtx.setAddPending(); executor.execute(new Runnable() @Override public void run() callHandlerAdded0(newCtx); ); return this; callHandlerAdded0(newCtx); return this; private void addLast0(AbstractChannelHandlerContext newCtx) AbstractChannelHandlerContext prev = tail.prev; newCtx.prev = prev; newCtx.next = tail; prev.next = newCtx; tail.prev = newCtx; // NioEventLoopGroup // io.netty.channel.MultithreadEventLoopGroup @Override public ChannelFuture register(Channel channel) return next().register(channel); private static final class PowerOfTwoEventExecutorChooser implements EventExecutorChooser private final AtomicInteger idx = new AtomicInteger(); private final EventExecutor[] executors; PowerOfTwoEventExecutorChooser(EventExecutor[] executors) this.executors = executors; @Override public EventExecutor next() return executors[idx.getAndIncrement() & executors.length - 1]; // io.netty.channel.SingleThreadEventLoop @Override public ChannelFuture register(Channel channel) return register(new DefaultChannelPromise(channel, this)); @Override public ChannelFuture register(final ChannelPromise promise) ObjectUtil.checkNotNull(promise, "promise"); // 此处注册好之后,就会开启另外的线程池来处理数据了 promise.channel().unsafe().register(this, promise); return promise; // io.netty.channel.AbstractChannel $ AbstractUnsafe @Override public final void register(EventLoop eventLoop, final ChannelPromise promise) if (eventLoop == null) throw new NullPointerException("eventLoop"); if (isRegistered()) promise.setFailure(new IllegalStateException("registered to an event loop already")); return; if (!isCompatible(eventLoop)) promise.setFailure( new IllegalStateException("incompatible event loop type: " + eventLoop.getClass().getName())); return; AbstractChannel.this.eventLoop = eventLoop; if (eventLoop.inEventLoop()) register0(promise); else try eventLoop.execute(new Runnable() @Override public void run() register0(promise); ); catch (Throwable t) logger.warn( "Force-closing a channel whose registration task was not accepted by an event loop: ", AbstractChannel.this, t); closeForcibly(); closeFuture.setClosed(); safeSetFailure(promise, t); // io.netty.util.concurrent.SingleThreadEventExecutor @Override public void execute(Runnable task) if (task == null) throw new NullPointerException("task"); boolean inEventLoop = inEventLoop(); if (inEventLoop) addTask(task); else startThread(); addTask(task); if (isShutdown() && removeTask(task)) reject(); if (!addTaskWakesUp && wakesUpForTask(task)) // 唤醒下一次接收数据 wakeup(inEventLoop); private void startThread() if (STATE_UPDATER.get(this) == ST_NOT_STARTED) if (STATE_UPDATER.compareAndSet(this, ST_NOT_STARTED, ST_STARTED)) // 把事件放入到另一个线程池处理, 一个阶段处理结束 doStartThread();
开启新的线程处理逻辑
// 开启新的线程处理逻辑 // 把事件放入到另一个线程池处理 private void doStartThread() assert thread == null; executor.execute(new Runnable() @Override public void run() thread = Thread.currentThread(); if (interrupted) thread.interrupt(); boolean success = false; updateLastExecutionTime(); try SingleThreadEventExecutor.this.run(); success = true; catch (Throwable t) logger.warn("Unexpected exception from an event executor: ", t); finally for (;;) int oldState = STATE_UPDATER.get(SingleThreadEventExecutor.this); if (oldState >= ST_SHUTTING_DOWN || STATE_UPDATER.compareAndSet( SingleThreadEventExecutor.this, oldState, ST_SHUTTING_DOWN)) break; // Check if confirmShutdown() was called at the end of the loop. if (success && gracefulShutdownStartTime == 0) logger.error("Buggy " + EventExecutor.class.getSimpleName() + " implementation; " + SingleThreadEventExecutor.class.getSimpleName() + ".confirmShutdown() must be called " + "before run() implementation terminates."); try // Run all remaining tasks and shutdown hooks. for (;;) if (confirmShutdown()) break; finally try cleanup(); finally STATE_UPDATER.set(SingleThreadEventExecutor.this, ST_TERMINATED); threadLock.release(); if (!taskQueue.isEmpty()) logger.warn( "An event executor terminated with " + "non-empty task queue (" + taskQueue.size() + ‘)‘); terminationFuture.setSuccess(null); ); public final class ThreadPerTaskExecutor implements Executor private final ThreadFactory threadFactory; public ThreadPerTaskExecutor(ThreadFactory threadFactory) if (threadFactory == null) throw new NullPointerException("threadFactory"); this.threadFactory = threadFactory; @Override public void execute(Runnable command) threadFactory.newThread(command).start(); @Override protected void wakeup(boolean inEventLoop) if (!inEventLoop && wakenUp.compareAndSet(false, true)) selector.wakeup(); 实际解析数据信息是在 fireChannelRead 时触发的。 @Override public final ChannelPipeline fireChannelRead(Object msg) AbstractChannelHandlerContext.invokeChannelRead(head, msg); return this; // 从 inBound 入站链中依次调用 channelRead() 方法 static void invokeChannelRead(final AbstractChannelHandlerContext next, Object msg) final Object m = next.pipeline.touch(ObjectUtil.checkNotNull(msg, "msg"), next); EventExecutor executor = next.executor(); if (executor.inEventLoop()) next.invokeChannelRead(m); else executor.execute(new Runnable() @Override public void run() next.invokeChannelRead(m); ); private void invokeChannelRead(Object msg) if (invokeHandler()) try ((ChannelInboundHandler) handler()).channelRead(this, msg); catch (Throwable t) notifyHandlerException(t); else fireChannelRead(msg); // HeadContext @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception ctx.fireChannelRead(msg); // AbstractChannelHandlerContext @Override public ChannelHandlerContext fireChannelRead(final Object msg) invokeChannelRead(findContextInbound(), msg); return this; // io.netty.handler.codec.ByteToMessageDecoder // 我们对数据的解析由这个类进行处理 @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception if (msg instanceof ByteBuf) CodecOutputList out = CodecOutputList.newInstance(); try ByteBuf data = (ByteBuf) msg; first = cumulation == null; // 针对多次到来的包,进行重新计算 if (first) cumulation = data; else cumulation = cumulator.cumulate(ctx.alloc(), cumulation, data); // 调用解码方法 callDecode(ctx, cumulation, out); catch (DecoderException e) throw e; catch (Throwable t) throw new DecoderException(t); finally if (cumulation != null && !cumulation.isReadable()) numReads = 0; cumulation.release(); cumulation = null; else if (++ numReads >= discardAfterReads) // We did enough reads already try to discard some bytes so we not risk to see a OOME. // See https://github.com.netty.netty.issues/4275 numReads = 0; discardSomeReadBytes(); int size = out.size(); decodeWasNull = !out.insertSinceRecycled(); // 如果解析到数据,就会往下一个 InBound 节点传 fireChannelRead(ctx, out, size); out.recycle(); else ctx.fireChannelRead(msg); /** * Called once data should be decoded from the given @link ByteBuf. This method will call * @link #decode(ChannelHandlerContext, ByteBuf, List) as long as decoding should take place. * * @param ctx the @link ChannelHandlerContext which this @link ByteToMessageDecoder belongs to * @param in the @link ByteBuf from which to read data * @param out the @link List to which decoded messages should be added */ protected void callDecode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) try // 只要有可用的数据,会一直循环调用 decode 方法 while (in.isReadable()) int outSize = out.size(); if (outSize > 0) fireChannelRead(ctx, out, outSize); out.clear(); // Check if this handler was removed before continuing with decoding. // If it was removed, it is not safe to continue to operate on the buffer. // // See: // - https://github.com.netty.netty.issues/4635 if (ctx.isRemoved()) break; outSize = 0; int oldInputLength = in.readableBytes(); // 调用自行实现的 decode 方法,实现数据的组装 // 通过添加多个 pipeline 来实现业务的处理 decode(ctx, in, out); // Check if this handler was removed before continuing the loop. // If it was removed, it is not safe to continue to operate on the buffer. // // See https://github.com.netty.netty.issues/1664 if (ctx.isRemoved()) break; if (outSize == out.size()) if (oldInputLength == in.readableBytes()) break; else continue; if (oldInputLength == in.readableBytes()) throw new DecoderException( StringUtil.simpleClassName(getClass()) + ".decode() did not read anything but decoded a message."); if (isSingleDecode()) break; catch (DecoderException e) throw e; catch (Throwable cause) throw new DecoderException(cause); /** * Get @code numElements out of the @link CodecOutputList and forward these through the pipeline. */ static void fireChannelRead(ChannelHandlerContext ctx, CodecOutputList msgs, int numElements) // 每个解析到的元素都会调用一次 fireChannelRead for (int i = 0; i < numElements; i ++) ctx.fireChannelRead(msgs.getUnsafe(i));
如果自己来写这个组装包的逻辑,可能会是这样的:(仅仅是等到所有数据都到后,再传入下一个处理器即可)
@Override protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception if (in.readableBytes() < 4) return; in.markReaderIndex(); int dataLength = in.readInt(); // 如果整个包还没完整,则等待下次调用 if (in.readableBytes() < dataLength) in.resetReaderIndex(); return; byte[] data = new byte[dataLength]; in.readBytes(data); Object obj = JSON.parseObject(data, target); out.add(obj);
针对外部多次调入站程序的方法,通过 cumulate 方法组装数据
// 针对外部多次调入站程序的方法,通过 cumulate 方法组装数据 @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception if (msg instanceof ByteBuf) CodecOutputList out = CodecOutputList.newInstance(); try ByteBuf data = (ByteBuf) msg; first = cumulation == null; if (first) cumulation = data; else // 合并数据 cumulation = cumulator.cumulate(ctx.alloc(), cumulation, data); callDecode(ctx, cumulation, out); catch (DecoderException e) throw e; catch (Throwable t) throw new DecoderException(t); finally if (cumulation != null && !cumulation.isReadable()) numReads = 0; cumulation.release(); cumulation = null; else if (++ numReads >= discardAfterReads) // We did enough reads already try to discard some bytes so we not risk to see a OOME. // See https://github.com.netty.netty.issues/4275 numReads = 0; discardSomeReadBytes(); int size = out.size(); decodeWasNull = !out.insertSinceRecycled(); fireChannelRead(ctx, out, size); out.recycle(); else ctx.fireChannelRead(msg); /** * Cumulate @link ByteBufs by merge them into one @link ByteBuf‘s, using memory copies. */ public static final Cumulator MERGE_CUMULATOR = new Cumulator() @Override public ByteBuf cumulate(ByteBufAllocator alloc, ByteBuf cumulation, ByteBuf in) final ByteBuf buffer; if (cumulation.writerIndex() > cumulation.maxCapacity() - in.readableBytes() || cumulation.refCnt() > 1 || cumulation.isReadOnly()) // Expand cumulation (by replace it) when either there is not more room in the buffer // or if the refCnt is greater then 1 which may happen when the user use slice().retain() or // duplicate().retain() or if its read-only. // // See: // - https://github.com.netty.netty.issues/2327 // - https://github.com.netty.netty.issues/1764 buffer = expandCumulation(alloc, cumulation, in.readableBytes()); else buffer = cumulation; buffer.writeBytes(in); in.release(); return buffer; ;
下面我们来看下 dubbo 是如何进行数据包的组装的呢?(NEED_MORE_INPUT 的应用)
// Decoder 处理逻辑 // org.apache.dubbo.remoting.transport.netty4.NettyCodecAdapter private class InternalDecoder extends ByteToMessageDecoder @Override protected void decode(ChannelHandlerContext ctx, ByteBuf input, List<Object> out) throws Exception ChannelBuffer message = new NettyBackedChannelBuffer(input); NettyChannel channel = NettyChannel.getOrAddChannel(ctx.channel(), url, handler); try // decode object. do int saveReaderIndex = message.readerIndex(); Object msg = codec.decode(channel, message); // 只要遇到 NEED_MORE_INPUT 标识,则不会算本次接收完成,等待下一次回调 // 此处会先交给一连串的 codec 处理 if (msg == Codec2.DecodeResult.NEED_MORE_INPUT) message.readerIndex(saveReaderIndex); break; else //is it possible to go here ? if (saveReaderIndex == message.readerIndex()) throw new IOException("Decode without read data."); if (msg != null) out.add(msg); while (message.readable()); finally NettyChannel.removeChannelIfDisconnected(ctx.channel()); // org.apache.dubbo.rpc.protocol.dubbo.DubboCountCodec @Override public Object decode(Channel channel, ChannelBuffer buffer) throws IOException int save = buffer.readerIndex(); MultiMessage result = MultiMessage.create(); do Object obj = codec.decode(channel, buffer); if (Codec2.DecodeResult.NEED_MORE_INPUT == obj) buffer.readerIndex(save); break; else result.addMessage(obj); logMessageLength(obj, buffer.readerIndex() - save); save = buffer.readerIndex(); while (true); if (result.isEmpty()) return Codec2.DecodeResult.NEED_MORE_INPUT; if (result.size() == 1) return result.get(0); return result; // org.apache.dubbo.remoting.exchange.codec.ExchangeCodec @Override public Object decode(Channel channel, ChannelBuffer buffer) throws IOException int readable = buffer.readableBytes(); // 可以看到,每个包都会有一个包头,只要解析出来,就可以知道它的类型,长度了 byte[] header = new byte[Math.min(readable, HEADER_LENGTH)]; buffer.readBytes(header); return decode(channel, buffer, readable, header); @Override protected Object decode(Channel channel, ChannelBuffer buffer, int readable, byte[] header) throws IOException // check magic number. if (readable > 0 && header[0] != MAGIC_HIGH || readable > 1 && header[1] != MAGIC_LOW) int length = header.length; if (header.length < readable) header = Bytes.copyOf(header, readable); buffer.readBytes(header, length, readable - length); for (int i = 1; i < header.length - 1; i++) if (header[i] == MAGIC_HIGH && header[i + 1] == MAGIC_LOW) buffer.readerIndex(buffer.readerIndex() - header.length + i); header = Bytes.copyOf(header, i); break; return super.decode(channel, buffer, readable, header); // check length. if (readable < HEADER_LENGTH) return DecodeResult.NEED_MORE_INPUT; // get data length. int len = Bytes.bytes2int(header, 12); checkPayload(channel, len); // 只要数据未达到要求的长度,则返回 NEED_MORE_INPUT int tt = len + HEADER_LENGTH; if (readable < tt) return DecodeResult.NEED_MORE_INPUT; // limit input stream. ChannelBufferInputStream is = new ChannelBufferInputStream(buffer, len); try return decodeBody(channel, is, header); finally if (is.available() > 0) try if (logger.isWarnEnabled()) logger.warn("Skip input stream " + is.available()); StreamUtils.skipUnusedStream(is); catch (IOException e) logger.warn(e.getMessage(), e); // org.apache.dubbo.remoting.telnet.codec.TelnetCodec @SuppressWarnings("unchecked") protected Object decode(Channel channel, ChannelBuffer buffer, int readable, byte[] message) throws IOException if (isClientSide(channel)) return toString(message, getCharset(channel)); checkPayload(channel, readable); if (message == null || message.length == 0) return DecodeResult.NEED_MORE_INPUT; if (message[message.length - 1] == ‘\b‘) // Windows backspace echo try boolean doublechar = message.length >= 3 && message[message.length - 3] < 0; // double byte char channel.send(new String(doublechar ? new byte[]32, 32, 8, 8 : new byte[]32, 8, getCharset(channel).name())); catch (RemotingException e) throw new IOException(StringUtils.toString(e)); return DecodeResult.NEED_MORE_INPUT; for (Object command : EXIT) if (isEquals(message, (byte[]) command)) if (logger.isInfoEnabled()) logger.info(new Exception("Close channel " + channel + " on exit command: " + Arrays.toString((byte[]) command))); channel.close(); return null; boolean up = endsWith(message, UP); boolean down = endsWith(message, DOWN); if (up || down) LinkedList<String> history = (LinkedList<String>) channel.getAttribute(HISTORY_LIST_KEY); if (CollectionUtils.isEmpty(history)) return DecodeResult.NEED_MORE_INPUT; Integer index = (Integer) channel.getAttribute(HISTORY_INDEX_KEY); Integer old = index; if (index == null) index = history.size() - 1; else if (up) index = index - 1; if (index < 0) index = history.size() - 1; else index = index + 1; if (index > history.size() - 1) index = 0; if (old == null || !old.equals(index)) channel.setAttribute(HISTORY_INDEX_KEY, index); String value = history.get(index); if (old != null && old >= 0 && old < history.size()) String ov = history.get(old); StringBuilder buf = new StringBuilder(); for (int i = 0; i < ov.length(); i++) buf.append("\b"); for (int i = 0; i < ov.length(); i++) buf.append(" "); for (int i = 0; i < ov.length(); i++) buf.append("\b"); value = buf.toString() + value; try channel.send(value); catch (RemotingException e) throw new IOException(StringUtils.toString(e)); return DecodeResult.NEED_MORE_INPUT; for (Object command : EXIT) if (isEquals(message, (byte[]) command)) if (logger.isInfoEnabled()) logger.info(new Exception("Close channel " + channel + " on exit command " + command)); channel.close(); return null; byte[] enter = null; for (Object command : ENTER) if (endsWith(message, (byte[]) command)) enter = (byte[]) command; break; if (enter == null) return DecodeResult.NEED_MORE_INPUT; LinkedList<String> history = (LinkedList<String>) channel.getAttribute(HISTORY_LIST_KEY); Integer index = (Integer) channel.getAttribute(HISTORY_INDEX_KEY); channel.removeAttribute(HISTORY_INDEX_KEY); if (CollectionUtils.isNotEmpty(history) && index != null && index >= 0 && index < history.size()) String value = history.get(index); if (value != null) byte[] b1 = value.getBytes(); byte[] b2 = new byte[b1.length + message.length]; System.arraycopy(b1, 0, b2, 0, b1.length); System.arraycopy(message, 0, b2, b1.length, message.length); message = b2; String result = toString(message, getCharset(channel)); if (result.trim().length() > 0) if (history == null) history = new LinkedList<String>(); channel.setAttribute(HISTORY_LIST_KEY, history); if (history.isEmpty()) history.addLast(result); else if (!result.equals(history.getLast())) history.remove(result); history.addLast(result); if (history.size() > 10) history.removeFirst(); return result;
所以,其实 dubbo 实现拆包的方式,也是依赖于 netty, 通过判定数据长度来决定是否包已到齐的。
同样,根据数据长度,也可以解决粘包问题,因为从头里指定的长度,即可知道数据到哪里时已取完,从而将粘在一起的包分开。
以上就是基于netty的TCP数据包的处理问题,也是一个简单的应用层协议处理过程,使我们可以更直接地了解应用层协议的处理过程。
当然,对于上面的基于数据长度进行数据包判定,会存在一些问题:
1. 当数据包很大时,将会阻塞其他请求;
2. 当数据包很大时,将会占用大量内存;
3. 同一连接中,不可能存在数据包的乱序传输;(TCP是否支持乱序、混合包传输?这是个问题)
当然,以上协议并不处理这种情况,针对大数据量请求,我们可以在客户端做好分包请求,从而减轻压力。
唠叨: 看透本质。
以上是关于从dubbo处理视角看Netty处理网络传输原理 -- 粘包与拆包的主要内容,如果未能解决你的问题,请参考以下文章