Netty Channel

Posted Different Java

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Netty Channel相关的知识,希望对你有一定的参考价值。

Channel和Unsafe

Netty提供了自己的Channel和其子类实现,用于异步I/O操作和其他相关操作。

Channel功能说明

Channel工作原理

JDK NIO Channel的缺点:

  1. JDK的SocketChannel和ServerSocketChannel没有统一的Channel接口供业务开发者使用

  2. JDK的SocketChannel和ServerSocketChannel主要职责就是网络I/O操作,由于他们的是SPI类接口,具体的实现由虚拟机厂家决定,所以扩展不方便。

  3. Netty的Channel需要能够跟Netty的整体架构融合在一起,例如I/O模型、基于ChannelPipeline的定制模型。以及基于元数据描述配置化的TCP参数等, 上述JDK的SocketChannel和ServerSocketChannel都没有提供,需要重新封装

Netty Channel设计原理:

  1. 在Channel接口层,采用Facade模式进行统一封装,将网络I/O操作、网络I/O相关联的其他操作封装起来,统一对外提供。

  2. Channel接口的定义尽量大而全,为SocketChannel和ServerSocketChannel提供统一的视图,由不同子类实现不同的功能,公共功能在抽象父类实现, 最大程度上实现功能和接口的重用

  3. 具体实现采用聚合而非包含的方式,将相关的功能类聚合在Channel中,由Channel统一负责分配和调度,功能实现更加灵活。

Channel功能介绍

网络I/O操作

  1. Channel read():从当前的Channel中读取数据到第一个inbound缓冲区,如果数据被成功读取,触发ChannelHandler.channelRead(ChannelHandler Context context, ChannelPromise promise)事件,读取操作API调用完成之后,紧接着会触发ChannelHandler.channelReadComplete事件

  2. ChannelFuture write(Object msg):请求将当前的msg通过ChannelPipeline写入到目标Channel中。write操作只是将消息存入到消息发送环形数 组中,并没有真正被发送,只有调用flush操作才会被写入Channel中,发送给对方。

  3. ChannelFuture write(Object msg, ChannelPromise promise):功能与write(Object msg)相同,但是携带了ChannelPromise参数负责设置 写入操作的结果

  4. ChannelFuture writeAndFlush(Object msg, ChannelPromise promise):与方法3类似,不同之处在于它会将消息写入到Channel中发送,等价 于单独调用write和flush。

  5. ChannelFuture writeAndFlush(Object msg):功能等同于4,但是没有携带ChannelPromise参数

  6. Channel flush():将之前写入消息发送环形数组中的消息全部写入目标Channel中,发送给通信对方。

  7. ChannelFuture close(ChannelPromise promise):主动关闭当前连接,通过ChannelPromise设置操作结果并进行结果通知,无论操作是否成功, 都可以通过ChannelPromise获取操作结果。该操作会级联触发ChannelPipeline中所有ChannelHandler的ChannelHandler.close(ChannelHandler Context context, ChannelPromise promise)事件

  8. ChannelFuture disconnect(ChannelPromise promise):请求断开与远程通信对端的连接并使用ChannelPromise来获取操作结果的通知消息。该 方法会级联触发ChannelHandler.disconnect(ChannelHandlerContext context, ChannelPromise promise)事件

  9. ChannelFuture connect(SocketAddress remoteAddress, ChannelPromise promise):该功能与9相似,唯一不同的是携带了ChannelPromise 参数用来写入操作结果

  10. ChannelFuture bind(SocketAddress localAddress, ChannelPromise promise):与13功能类似,携带了ChannelPromise参数来写入操作结果

  11. ChannelConfig config():获取当前Channel的配置信息,例如CONNECTTIMEOUTMILLIS

  12. boolean isOpen():判断当前Channel是否已经打开

  13. boolean isRegistered():判断当前Channel是否已经注册到EventLoop上

  14. boolean isActive():判断当前Channel是否已经处于激活状态

  15. ChannelMetadata metadata():获取当前Channel的元数据描述信息,包括TCP参数配置等。

其他API

  1. EventLoop eventLoop():获取Channel注册的EventLoop。Channel需要注册到EventLoop的多路复用器上,用来处理I/O事件,EventLoop本质上 就是处理网络读写事件的Reactor线程。在Netty中,EventLoop不仅仅用来处理网络事件,也可以用来执行定时任务和用户自定义NioTask等任务。

  2. Channel parent():对于服务端Channel而言,父Channel为空;对于客户端Channel,它的父Channel就是创建它的ServerSocketChannel

  3. ChannelId id():返回ChannelId对象,ChannelId是Channel的唯一标识

ChannelId的可能生成策略如下:

  1. 当前的进程ID

  2. 当前系统时间的毫秒——System.currentTimeMillis()

  3. 当前系统时间纳秒数——System.nanoTime()

  4. 32位的随机整形数

  5. 32位自增的序列数

Channel源码

Channel继承关系类图

服务端NioserverSocketChannel继承关系图

客户端NioSocketChannel继承关系图Netty Channel

AbstractChannel源码

成员变量定义

Netty Channel

  1. 首先定义了5个静态全局异常

  2. Channel parent:代表父类Channel

  3. ChannelId id:采用默认方式生成的全局唯一ID

  4. Unsafe unsafe:Unsafe实例

  5. DefaultChannelPipeline pipeline:当前Channel对应的DefaultChannelPipeline

  6. EventLoop eventLoop:当前Channel注册的EventLoop等一系列

通过变量定义可以看出,AbstractChannel聚合了所有Channel使用到的能力对象,由AbstractChannel提供初始化和统一封装,如果功能和子类强相关,则 定义成抽象方法由子类具体实现。

核心API

Netty基于事件驱动,当Channel进行I/O操作时会产生对应的I/O事件,然后驱动事件在ChannelPipeline中传播,由对应的ChannelHandler对事件进行拦截和处理,不关心的事件可以直接忽略。

网络I/O操作直接调用DefaultChannelPipeline的相关方法,由DefaultChannelPipeline中对应的ChannelHandler进行具体的逻辑处理。

AbstractChannel提供了一些公共API的具体实现,例如localAddress()和remoteAddress(),下面看一下remoteAddress的源码:

 
   
   
 
  1.    @Override

  2.    public SocketAddress remoteAddress() {

  3.        SocketAddress remoteAddress = this.remoteAddress;

  4.        if (remoteAddress == null) {

  5.            try {

  6.                this.remoteAddress = remoteAddress = unsafe().remoteAddress();

  7.            } catch (Throwable t) {

  8.                // Sometimes fails on a closed socket in Windows.

  9.                return null;

  10.            }

  11.        }

  12.        return remoteAddress;

  13.    }

首先从缓存的成员变量中获取,如果第一次调用为空,需要通过unsafe的remoteAddress获取,它是个抽象方法,具体由对应的Channel子类实现。

AbstractNioChannel源码

成员变量定义

  1. 定义了一个DOCLOSECLOSEDCHANNELEXCEPTION静态全局异常

  2. SelectableChannel ch:由于NIO Channel、NioSocketChannel和NioServerSocketChannel需要公用,所以定义了一个SocketChannel和 ServerSocketChannel的公共父类SelectableChannel,用于设置SelectableChannel参数和进行I/O操作。

  3. int readInterestOp:代表了JDK SelectionKey的OP_READ

  4. volatile SelectionKey selectionKey:该SelectionKey是Channel注册到EventLoop后返回的选择键。由于Channel会面临多个业务线程的并发 写操作,当SelectionKey由SelectionKey修改以后,为了能让其他业务线程感知到变化,所以需要使用volatile保证修改的可见性。

  5. ChannelPromise connectPromise:代表连接操作结果

  6. ScheduledFuture connectTimeoutFuture:连接超时定时器

核心源码

AbstractNioChannel注册源码

 
   
   
 
  1.    @Override

  2.    protected void doRegister() throws Exception {

  3.        boolean selected = false;

  4.        for (;;) {

  5.            try {

  6.                selectionKey = javaChannel().register(eventLoop().unwrappedSelector(), 0, this);

  7.                return;

  8.            } catch (CancelledKeyException e) {

  9.                if (!selected) {

  10.                    // Force the Selector to select now as the "canceled" SelectionKey may still be

  11.                    // cached and not removed because no Select.select(..) operation was called yet.

  12.                    eventLoop().selectNow();

  13.                    selected = true;

  14.                } else {

  15.                    // We forced a select operation on the selector before but the SelectionKey is still cached

  16.                    // for whatever reason. JDK bug ?

  17.                    throw e;

  18.                }

  19.            }

  20.        }

  21.    }

首先定义一个boolean类型的局部变量selected来标识注册操作是否成功,调用SelectableChannel的register方法,将当前的Channel注册到EventLoop 的多路复用器上,SelectableChannel的注册方法定义如下:

 
   
   
 
  1. public abstract SelectionKey register(Selector sel, int ops, Object att) throws ClosedChannelException;

注册Channel的时候需要指定监听的网络操作位来表示Channel对哪几类网络事件感兴趣,具体的定义如下:

  1. public static final int OP_READ = 1 << 0: 读操作位

  2. public static final int OP_WRITE = 1 << 2:写操作位

  3. public static final int OP_CONNECT = 1 << 3:客户端连接服务端操作位

  4. public static final int OP_ACCEPT = 1 << 4:服务端接受客户端连接操作位

AbstractNioChannel注册的是0,说明对任何事件不感兴趣,仅仅完成注册操作。注册的时候可以指定附件,后续Channel接收到网络事件通知时可以从SelectionKey中重新获取之前的附件进行处理。如果注册Channel成功,则返回SelectionKey,通过SelectionKey可以从多路复用器中获取Channel对象。

如果当前注册返回的SelectionKey已经被取消,则抛出CancelledKeyException异常,捕获该异常进行处理。如果是第一次处理该异常,调用多路复用器的selectNow()方法将已经取消的selectionKey从多路复用器中删除掉。操作成功之后,将selected置为true,说明之前失效的selectionKey已经被删除掉。继续发起下一次注册操作,如果成功则退出,如果仍然发生CancelledKeyException异常,说明我们无法删除已经被取消的selectionKey,发生这种问题,直接抛出CancelledKeyException异常到上层进行统一处理。

下面看一下准备处理读操作之前需要设置网络操作位为读的代码:

 
   
   
 
  1.    @Override

  2.    protected void doBeginRead() throws Exception {

  3.        // Channel.read() or ChannelHandlerContext.read() was called

  4.        final SelectionKey selectionKey = this.selectionKey;

  5.        if (!selectionKey.isValid()) {

  6.            return;

  7.        }

  8.        readPending = true;

  9.        final int interestOps = selectionKey.interestOps();

  10.        if ((interestOps & readInterestOp) == 0) {

  11.            selectionKey.interestOps(interestOps | readInterestOp);

  12.        }

  13.    }

获取当前的SelectionKey进行判断,如果可用说明Channel当前状态正常,则可以进行正常的操作位修改。先将等待读设置为true,将SelectionKey当前的操作位与读操作位按位于操作,如果等于0,说明目前并没有设置读操作位,通过interestOps | readInterestOp设置读操作位,最后调用selectionKey的interestOps方法重新设置通道的网络操作位,这样就可以监听网络的读事件。

AbstractNioByteChannel

成员变量定义

private final Runnable flushTask:负责继续写半包消息

API

看下doWrite(ChannelOutboundBuffer in)的源码:

 
   
   
 
  1.    @Override

  2.    protected void doWrite(ChannelOutboundBuffer in) throws Exception {

  3.        int writeSpinCount = config().getWriteSpinCount();

  4.        do {

  5.            Object msg = in.current();

  6.            if (msg == null) {

  7.                // Wrote all messages.

  8.                clearOpWrite();

  9.                // Directly return here so incompleteWrite(...) is not called.

  10.                return;

  11.            }

  12.            writeSpinCount -= doWriteInternal(in, msg);

  13.        } while (writeSpinCount > 0);

  14.        incompleteWrite(writeSpinCount < 0);

  15.    }

首先从消息发送环形数组中弹出一个消息,判断该消息是否为空,如果为空,说明所有的消息都已经发送完成,清除半包标识,退出循环(该循环的次数默认最多16次),设置半包消息有最大处理次数的原因是当循环发送的时候,I/O线程会一直进行写操作,此时I/O线程无法处理其他的I/O操作,例如读新的消息或执行定时任务和NioTask等,如果网络I/O阻塞或者对方接受消息太慢,可能会导致线程假死。看一下清除半包标识clearOpWrite()的逻辑代码:

 
   
   
 
  1.    protected final void clearOpWrite() {

  2.        final SelectionKey key = selectionKey();

  3.        // Check first if the key is still valid as it may be canceled as part of the deregistration

  4.        // from the EventLoop

  5.        // See https://github.com/netty/netty/issues/2104

  6.        if (!key.isValid()) {

  7.            return;

  8.        }

  9.        final int interestOps = key.interestOps();

  10.        if ((interestOps & SelectionKey.OP_WRITE) != 0) {

  11.            key.interestOps(interestOps & ~SelectionKey.OP_WRITE);

  12.        }

  13.    }

首先获取当前的SelectionKey,如果当前的SelectionKey已被取消或者无效,直接返回。如果有效,则获取当前的监控的网络操作位,判断当前的网络操作位是否监听写事件,如果正在监听,则取消对写事件的监听。

如果发送的消息不为空,则继续对消息进行处理,源码如下:

 
   
   
 
  1.    private int doWriteInternal(ChannelOutboundBuffer in, Object msg) throws Exception {

  2.        if (msg instanceof ByteBuf) {

  3.            ByteBuf buf = (ByteBuf) msg;

  4.            if (!buf.isReadable()) {

  5.                in.remove();

  6.                return 0;

  7.            }

  8.            final int localFlushedAmount = doWriteBytes(buf);

  9.            if (localFlushedAmount > 0) {

  10.                in.progress(localFlushedAmount);

  11.                if (!buf.isReadable()) {

  12.                    in.remove();

  13.                }

  14.                return 1;

  15.            }

  16.        } else if (msg instanceof FileRegion) {

  17.            FileRegion region = (FileRegion) msg;

  18.            if (region.transferred() >= region.count()) {

  19.                in.remove();

  20.                return 0;

  21.            }

  22.            long localFlushedAmount = doWriteFileRegion(region);

  23.            if (localFlushedAmount > 0) {

  24.                in.progress(localFlushedAmount);

  25.                if (region.transferred() >= region.count()) {

  26.                    in.remove();

  27.                }

  28.                return 1;

  29.            }

  30.        } else {

  31.            // Should not reach here.

  32.            throw new Error();

  33.        }

  34.        return WRITE_STATUS_SNDBUF_FULL;

  35.    }

首先判断消息的类型是否是ByteBuf,如果是进行强制转换,判断ByteBuf是否可读,如果不可读,将该消息从消息循环数组中删除,继续循环处理其他消息。如果可读,则由具体的实现子类完成将ByeBuf写入到Channel中,并返回写入的字节数。如果返回的字节数小于等于0,则返回整形的最大数值。如果返回的写入字节数大于0,设置该消息的处理的进度,然后再判断该消息是否可读,如果不可读,就把该消息移除,返回1。

最后还有一块处理半包发送任务的代码incompleteWrite,源码如下:

 
   
   
 
  1.    protected final void incompleteWrite(boolean setOpWrite) {

  2.        // Did not write completely.

  3.        if (setOpWrite) {

  4.            setOpWrite();

  5.        } else {

  6.            // It is possible that we have set the write OP, woken up by NIO because the socket is writable, and then

  7.            // use our write quantum. In this case we no longer want to set the write OP because the socket is still

  8.            // writable (as far as we know). We will find out next time we attempt to write if the socket is writable

  9.            // and set the write OP if necessary.

  10.            clearOpWrite();

  11.            // Schedule flush again later so other tasks can be picked up in the meantime

  12.            eventLoop().execute(flushTask);

  13.        }

  14.    }

首先判断是否需要设置半包标识,如果需要则调用setOpWrite()来设置半包标识。如果没有设置写操作位,需要启动单独的Runnable flushTask,将其加入到EventLoop中执行,由Runnable负责半包消息的发送,它就是简单的调用flush方法来发送缓冲数组中的消息。

AbstractNioMessageChannel

成员变量定义

boolean inputShutdown

API

 
   
   
 
  1.    @Override

  2.    protected void doWrite(ChannelOutboundBuffer in) throws Exception {

  3.        final SelectionKey key = selectionKey();

  4.        final int interestOps = key.interestOps();

  5.        for (;;) {

  6.            Object msg = in.current();

  7.            if (msg == null) {

  8.                // Wrote all messages.

  9.                if ((interestOps & SelectionKey.OP_WRITE) != 0) {

  10.                    key.interestOps(interestOps & ~SelectionKey.OP_WRITE);

  11.                }

  12.                break;

  13.            }

  14.            try {

  15.                boolean done = false;

  16.                for (int i = config().getWriteSpinCount() - 1; i >= 0; i--) {

  17.                    if (doWriteMessage(msg, in)) {

  18.                        done = true;

  19.                        break;

  20.                    }

  21.                }

  22.                if (done) {

  23.                    in.remove();

  24.                } else {

  25.                    // Did not write all messages.

  26.                    if ((interestOps & SelectionKey.OP_WRITE) == 0) {

  27.                        key.interestOps(interestOps | SelectionKey.OP_WRITE);

  28.                    }

  29.                    break;

  30.                }

  31.            } catch (Exception e) {

  32.                if (continueOnWriteError()) {

  33.                    in.remove(e);

  34.                } else {

  35.                    throw e;

  36.                }

  37.            }

  38.        }

  39.    }

在循环体中对消息进行发送,首先从ChannelOutboundBuffer中弹出一条消息进行处理,如果消息为空,说明发送缓冲区为空,所有消息都被发送完成。此时清除写半包标识,退出循环。然后借用writeSpinCount对单条消息进行发送,调用doWriteMessage判断消息是否发送成功,如果成功,则将发送标识done设置为true,退出循环,否则继续执行循环,知道执行writeSpinCount次。发送完成后,判断发送结果,如果当前的消息被完全发送出去,则将该消息从缓冲数组中删除;否则设置半包标识,注册SelectionKey.OP_WRITE到多路复用器上,由多路复用器轮询对应的Channel重新发送尚未发送完全的半包消息。

AbstractNioMessageChannel和AbstractNioByteChannel不同之处是前者发送的是POJO对象,后者发送的是ByteBuf或者FileRegion。

NioServerSocketChannel

成员变量和静态方法

  1. ChannelMetadata METADATA:Channel元数据信息

  2. ServerSocketChannelConfig config:用于配置ServerSocketChannel的TCP参数

  3. ServerSocketChannel newSocket(SelectorProvider provider):借助SelectorProvider的openServerSocketChannel方法打开新的ServerSocketChannel

API

 
   
   
 
  1.    @Override

  2.    public boolean isActive() {

  3.        return javaChannel().socket().isBound();

  4.    }

  5.    @Override

  6.    public InetSocketAddress remoteAddress() {

  7.        return null;

  8.    }

  9.    @Override

  10.    protected ServerSocketChannel javaChannel() {

  11.        return (ServerSocketChannel) super.javaChannel();

  12.    }

  13.    @Override

  14.    protected SocketAddress localAddress0() {

  15.        return SocketUtils.localSocketAddress(javaChannel().socket());

  16.    }

  17.    @Override

  18.    protected void doBind(SocketAddress localAddress) throws Exception {

  19.        if (PlatformDependent.javaVersion() >= 7) {

  20.            javaChannel().bind(localAddress, config.getBacklog());

  21.        } else {

  22.            javaChannel().socket().bind(localAddress, config.getBacklog());

  23.        }

  24.    }

doBind方法一来运行时JAVA的版本,如果大于7就调用ServerSocketChannel的bind方法,否则调用ServerSocket的bind方法。

 
   
   
 
  1.    @Override

  2.    protected int doReadMessages(List<Object> buf) throws Exception {

  3.        SocketChannel ch = SocketUtils.accept(javaChannel());

  4.        try {

  5.            if (ch != null) {

  6.                buf.add(new NioSocketChannel(this, ch));

  7.                return 1;

  8.            }

  9.        } catch (Throwable t) {

  10.            logger.warn("Failed to create a new channel from an accepted socket.", t);

  11.            try {

  12.                ch.close();

  13.            } catch (Throwable t2) {

  14.                logger.warn("Failed to close a socket.", t2);

  15.            }

  16.        }

  17.        return 0;

  18.    }

首先通过SocketUtils.accept来接受新的连接,如果新的连接不为空,则借助ServerSocketChannel和新接受的SocketChannel来创建一个NioSocketChannel, 并将NioSocketChannel添加到List buf,然后返回1,表示服务端接受消息成功。


对于NioServerSocketChannel来说,它的读取操作就是接收客户端的连接,创建NioSocketChannel对象。

无关API

一些方法是与客户端Channel相关的,因此,对于服务端Channel无须实现,如果这些方法被误调,则返回UnsupportedOperationException异常。

 
   
   
 
  1.    @Override

  2.    protected boolean doConnect(

  3.            SocketAddress remoteAddress, SocketAddress localAddress) throws Exception {

  4.        throw new UnsupportedOperationException();

  5.    }

  6.    @Override

  7.    protected void doFinishConnect() throws Exception {

  8.        throw new UnsupportedOperationException();

  9.    }

  10.    @Override

  11.    protected SocketAddress remoteAddress0() {

  12.        return null;

  13.    }

  14.    @Override

  15.    protected void doDisconnect() throws Exception {

  16.        throw new UnsupportedOperationException();

  17.    }

  18.    @Override

  19.    protected boolean doWriteMessage(Object msg, ChannelOutboundBuffer in) throws Exception {

  20.        throw new UnsupportedOperationException();

  21.    }

NioSocketChannel

连接操作

 
   
   
 
  1.    @Override

  2.    protected boolean doConnect(SocketAddress remoteAddress, SocketAddress localAddress) throws Exception {

  3.        if (localAddress != null) {

  4.            doBind0(localAddress);

  5.        }

  6.        boolean success = false;

  7.        try {

  8.            boolean connected = SocketUtils.connect(javaChannel(), remoteAddress);

  9.            if (!connected) {

  10.                selectionKey().interestOps(SelectionKey.OP_CONNECT);

  11.            }

  12.            success = true;

  13.            return connected;

  14.        } finally {

  15.            if (!success) {

  16.                doClose();

  17.            }

  18.        }

  19.    }

  1. 连接成功,返回true

  2. 暂时没有连接上,服务端没有返回ACK应答,连接结果不确定,返回false

  3. 连接失败,直接抛出I/O异常

如果是结果2,需要将NioSocketChannel的SelectionKey设置为OP_CONNECT,监听连接网络操作位。如果抛出了I/O异常,说明客户端的TCP握手请求直接 被RESET或者被拒绝,此时需要调用doClose()关闭客户端连接

 
   
   
 
  1.    protected void doClose() throws Exception {

  2.        super.doClose();

  3.        javaChannel().close();

  4.    }

写半包

 
   
   
 
  1.    @Override

  2.    protected void doWrite(ChannelOutboundBuffer in) throws Exception {

  3.        SocketChannel ch = javaChannel();

  4.        int writeSpinCount = config().getWriteSpinCount();

  5.        do {

  6.            if (in.isEmpty()) {

  7.                // All written so clear OP_WRITE

  8.                clearOpWrite();

  9.                // Directly return here so incompleteWrite(...) is not called.

  10.                return;

  11.            }

  12.            // Ensure the pending writes are made of ByteBufs only.

  13.            int maxBytesPerGatheringWrite = ((NioSocketChannelConfig) config).getMaxBytesPerGatheringWrite();

  14.            ByteBuffer[] nioBuffers = in.nioBuffers(1024, maxBytesPerGatheringWrite);

  15.            int nioBufferCnt = in.nioBufferCount();

  16.            // Always us nioBuffers() to workaround data-corruption.

  17.            // See https://github.com/netty/netty/issues/2761

  18.            switch (nioBufferCnt) {

  19.                case 0:

  20.                    // We have something else beside ByteBuffers to write so fallback to normal writes.

  21.                    writeSpinCount -= doWrite0(in);

  22.                    break;

  23.                case 1: {

  24.                    // Only one ByteBuf so use non-gathering write

  25.                    // Zero length buffers are not added to nioBuffers by ChannelOutboundBuffer, so there is no need

  26.                    // to check if the total size of all the buffers is non-zero.

  27.                    ByteBuffer buffer = nioBuffers[0];

  28.                    int attemptedBytes = buffer.remaining();

  29.                    final int localWrittenBytes = ch.write(buffer);

  30.                    if (localWrittenBytes <= 0) {

  31.                        incompleteWrite(true);

  32.                        return;

  33.                    }

  34.                    adjustMaxBytesPerGatheringWrite(attemptedBytes, localWrittenBytes, maxBytesPerGatheringWrite);

  35.                    in.removeBytes(localWrittenBytes);

  36.                    --writeSpinCount;

  37.                    break;

  38.                }

  39.                default: {

  40.                    // Zero length buffers are not added to nioBuffers by ChannelOutboundBuffer, so there is no need

  41.                    // to check if the total size of all the buffers is non-zero.

  42.                    // We limit the max amount to int above so cast is safe

  43.                    long attemptedBytes = in.nioBufferSize();

  44.                    final long localWrittenBytes = ch.write(nioBuffers, 0, nioBufferCnt);

  45.                    if (localWrittenBytes <= 0) {

  46.                        incompleteWrite(true);

  47.                        return;

  48.                    }

  49.                    // Casting to int is safe because we limit the total amount of data in the nioBuffers to int above.

  50.                    adjustMaxBytesPerGatheringWrite((int) attemptedBytes, (int) localWrittenBytes,

  51.                            maxBytesPerGatheringWrite);

  52.                    in.removeBytes(localWrittenBytes);

  53.                    --writeSpinCount;

  54.                    break;

  55.                }

  56.            }

  57.        } while (writeSpinCount > 0);

  58.        incompleteWrite(writeSpinCount < 0);

  59.    }

首先判断ChannelOutboundBuffer消息环形数组中是否有待发送的消息,如果没有,直接清除写操作位然后返回。从消息环形数组中获取可发送的ByteBuffer 数组以及可发送的数量,如果消息只有一个,直接取第一个消息,将消息写入Channel,如果写入的字节数小于等于0,设置网络监听位为写操作位,然后后返回。 如果消息的数量大于大于1,就先取出可发送数组的总字节数。

读写操作

 
   
   
 
  1.    @Override

  2.    protected int doReadBytes(ByteBuf byteBuf) throws Exception {

  3.        final RecvByteBufAllocator.Handle allocHandle = unsafe().recvBufAllocHandle();

  4.        allocHandle.attemptedBytesRead(byteBuf.writableBytes());

  5.        return byteBuf.writeBytes(javaChannel(), allocHandle.attemptedBytesRead());

  6.    }

首先通过RecvByteBufAllocator.Handle设置从NioSocketChannel读取的字节数为ByteBuf可写的字节数,然后调用ByteBuf的writeBytes从Channel 中读取指定长度的字节。

以上是关于Netty Channel的主要内容,如果未能解决你的问题,请参考以下文章

Netty入门——组件(Channel)一

Netty入门——组件(Channel)二

netty源码走读(服务端Channel创建流程)

netty代码解析

netty优雅关闭channel通道

netty集群channel怎么跨集群