4. Netty源码分析之Unsafe

Posted lovezmc

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了4. Netty源码分析之Unsafe相关的知识,希望对你有一定的参考价值。

Unsafe类实际上是Channel接口的辅助类,实际的IO操作都是由Unsafe接口完成的。

一、Unsafe继承关系图

技术图片

二、AbstractUnsafe源码分析

1. register方法

  register方法主要用于将当前Unsafe对应的Channel注册到EventLoop的多路复用器上,然后调用DefaultChannelPipeline的fireChannelRegisted方法,如果Channel被激活,则调用fireChannelActive方法。

public final void register(final ChannelPromise promise) 
    // 当前线程是否为Channel对应的NioEventLoop线程
    if (eventLoop.inEventLoop()) 
        // 如果是,则不存在多线程并发操作,直接注册
        register0(promise);
     else 
        // 如果不是,说明是其他线程或用户线程发起的注册,存在并发操作,将其放进NioEventLoop任务队列中执行
        try 
            eventLoop.execute(new Runnable() 
                @Override
                public void run() 
                    register0(promise);
                
            );
         catch (Throwable t) 
            logger.warn(
                    "Force-closing a channel whose registration task was not accepted by an event loop: ",
                    AbstractChannel.this, t);
            closeForcibly();
            closeFuture.setClosed();
            promise.setFailure(t);
        
    


private void register0(ChannelPromise promise) 
    try 
        // 判断Channel是否打开了
        if (!ensureOpen(promise)) 
            return;
        
        // 调用AbstractNioChannel的doRegister方法。请见 Netty源码分析-Channel
        doRegister();
        registered = true;
        promise.setSuccess();
        // 注册成功
        pipeline.fireChannelRegistered();
        if (isActive()) 
            // Channel被激活
            pipeline.fireChannelActive();
        
     catch (Throwable t) 
        // Close the channel directly to avoid FD leak.
        closeForcibly();
        closeFuture.setClosed();
        if (!promise.tryFailure(t)) 
            logger.warn(
                    "Tried to fail the registration promise, but it is complete already. " +
                            "Swallowing the cause of the registration failure:", t);
        
    


// AbstractNioChannel.doRegister()
protected void doRegister() throws Exception 
    boolean selected = false;
    for (;;) 
        try 
            selectionKey = javaChannel().register(eventLoop().selector, 0, this);
            return;
         catch (CancelledKeyException e) 
            if (!selected) 
                // Force the Selector to select now as the "canceled" SelectionKey may still be
                // cached and not removed because no Select.select(..) operation was called yet.
                eventLoop().selectNow();
                selected = true;
             else 
                // We forced a select operation on the selector before but the SelectionKey is still cached
                // for whatever reason. JDK bug ?
                throw e;
            
        
    

2. bind方法

  bind方法主要用于绑定指定端口。对于服务端,用于绑定监听端口,并设置backlog参数;对于客户端,用于指定客户端Channel的本地绑定Socket地址。

public final void bind(final SocketAddress localAddress, final ChannelPromise promise) 
    if (!ensureOpen(promise)) 
        return;
    

    // See: https://github.com/netty/netty/issues/576
    if (!PlatformDependent.isWindows() && !PlatformDependent.isRoot() &&
        Boolean.TRUE.equals(config().getOption(ChannelOption.SO_BROADCAST)) &&
        localAddress instanceof InetSocketAddress &&
        !((InetSocketAddress) localAddress).getAddress().isAnyLocalAddress()) 
        // Warn a user about the fact that a non-root user can‘t receive a
        // broadcast packet on *nix if the socket is bound on non-wildcard address.
        logger.warn(
                "A non-root user can‘t receive a broadcast packet if the socket " +
                "is not bound to a wildcard address; binding to a non-wildcard " +
                "address (" + localAddress + ") anyway as requested.");
    
    
    // 是否是激活状态
    boolean wasActive = isActive();
    try 
        doBind(localAddress);
     catch (Throwable t) 
        promise.setFailure(t);
        closeIfClosed();
        return;
    
    if (!wasActive && isActive()) 
        // 如果是在绑定阶段成为active状态,则将调用fireChannelActive方法放进NioEventLoop执行队列中
        invokeLater(new Runnable() 
            @Override
            public void run() 
                pipeline.fireChannelActive();
            
        );
    
    promise.setSuccess();


private void invokeLater(Runnable task) 
    eventLoop().execute(task);

NiosocketChannel 的 diBind 实现:

protected void doBind(SocketAddress localAddress) throws Exception 
    javaChannel().socket().bind(localAddress);

NioServerSocketChannel 的 doBind 实现:

protected void doBind(SocketAddress localAddress) throws Exception 
    javaChannel().socket().bind(localAddress, config.getBacklog());

3. disconnect方法

  该方法用于客户端或服务端主动关闭连接。

public final void disconnect(final ChannelPromise promise) 
    boolean wasActive = isActive();
    try 
        doDisconnect();
     catch (Throwable t) 
        promise.setFailure(t);
        closeIfClosed();
        return;
    
    if (wasActive && !isActive()) 
        invokeLater(new Runnable() 
            @Override
            public void run() 
                pipeline.fireChannelInactive();
            
        );
    
    promise.setSuccess();
    closeIfClosed(); // doDisconnect() might have closed the channel

NioServerSocketChannel.doDisconnect():服务端不支持主动关闭连接

protected void doDisconnect() throws Exception 
    throw new UnsupportedOperationException();

NioSocketChannel.doDisconnect():调用SocketChannel关闭连接

protected void doDisconnect() throws Exception 
    doClose();


protected void doClose() throws Exception 
    javaChannel().close();

4. close方法

public final void close(final ChannelPromise promise) 
    // 1. 是否处于刷新状态,如果处于刷新状态说明还有消息没发出去,需要等到所有消息发完后再关闭
    // 放入队列中处理
    if (inFlush0) 
        invokeLater(new Runnable() 
            @Override
            public void run() 
                close(promise);
            
        );
        return;
    

    // 2. 判断关闭操作是否完成,如果已完成,则不需要重复关闭链路,设置promise成功即可
    if (closeFuture.isDone()) 
        // Closed already.
        promise.setSuccess();
        return;
    

    // 3. 执行关闭操作,将消息发送缓冲数组置空,通知JVM回收
    boolean wasActive = isActive();
    ChannelOutboundBuffer outboundBuffer = this.outboundBuffer;
    this.outboundBuffer = null; // Disallow adding any messages and flushes to outboundBuffer.

    try 
        // 4. 关闭链路,本质是调用javaChannel的close方法
        doClose();
        closeFuture.setClosed();
        promise.setSuccess();
     catch (Throwable t) 
        closeFuture.setClosed();
        promise.setFailure(t);
    

    // 5. 调用ChannelOutboundBuffer.close()释放缓冲区消息,将链路关闭通知事件放进NioEventLoop执行队列中
    try 
        outboundBuffer.failFlushed(CLOSED_CHANNEL_EXCEPTION);
        outboundBuffer.close(CLOSED_CHANNEL_EXCEPTION);
     finally 

        if (wasActive && !isActive()) 
            invokeLater(new Runnable() 
                @Override
                public void run() 
                    pipeline.fireChannelInactive();
                
            );
        
        // 6. 将Channel从多路复用器上取消注册
        deregister();
    


protected void doDeregister() throws Exception 
    eventLoop().cancel(selectionKey());


// 实际上就是将SelectionKey对应的Channel从多路复用器上去取消注册
void cancel(SelectionKey key) 
    key.cancel();
    cancelledKeys ++;
    if (cancelledKeys >= CLEANUP_INTERVAL) 
        cancelledKeys = 0;
        needsToSelectAgain = true;
    

5. write方法

  write方法实际上是将消息添加到环形发送数组上,并不真正的写Channel(真正的写Channel是flush方法)。

public void write(Object msg, ChannelPromise promise) 
    if (!isActive()) 
        // 未激活,TCP链路还没建立成功,根据Channel打开情况设置不同的异常
        if (isOpen()) 
            promise.tryFailure(NOT_YET_CONNECTED_EXCEPTION);
         else 
            promise.tryFailure(CLOSED_CHANNEL_EXCEPTION);
        
        // 无法发送,释放msg对象
        ReferenceCountUtil.release(msg);
     else 
        // 链路状态正常,将数据和promise放进发送缓冲区
        outboundBuffer.addMessage(msg, promise);
    

6. flush方法

  前面提到,write方法负责将消息放进发送缓冲区,并没有真正的发送,而flush方法就负责将发送缓冲区中待发送的消息全部写进Channel中并发送。

public void flush() 
    ChannelOutboundBuffer outboundBuffer = this.outboundBuffer;
    if (outboundBuffer == null) 
        return;
    
    // 先将unflush指针修改为tail,标识本次发送的范围
    outboundBuffer.addFlush();
    flush0();


protected void flush0() 
    if (inFlush0) 
        // Avoid re-entrance
        return;
    

    final ChannelOutboundBuffer outboundBuffer = this.outboundBuffer;
    if (outboundBuffer == null || outboundBuffer.isEmpty()) 
        return;
    

    inFlush0 = true;

    // Mark all pending write requests as failure if the channel is inactive.
    if (!isActive()) 
        try 
            if (isOpen()) 
                outboundBuffer.failFlushed(NOT_YET_CONNECTED_EXCEPTION);
             else 
                outboundBuffer.failFlushed(CLOSED_CHANNEL_EXCEPTION);
            
         finally 
            inFlush0 = false;
        
        return;
    

    try 
        // 调用NioSocketChannel的write方法
        doWrite(outboundBuffer);
     catch (Throwable t) 
        outboundBuffer.failFlushed(t);
     finally 
        inFlush0 = false;
    

三、AbstractNioUnsafe源码分析

1. connect方法

  前面说到,NioSocketChannel的连接操作有三种可能:

    1. 连接成功

    2. 连接失败,关闭客户端连接

    3. 连接暂未响应,监听OP_CONNECT

  在connect方法中,如果连接成功,进行激活操作;如果连接暂未响应,则对其做一个监听,监听的内容是:如果连接失败,则关闭链路。

public final void connect(final SocketAddress remoteAddress, final SocketAddress localAddress, final ChannelPromise promise) 
    // 设置不可取消 && Channel是打开状态
    if (!promise.setUncancellable() || !ensureOpen(promise)) 
        return;
    

    try 
        if (connectPromise != null) 
            // 已经有一个连接正在处理,直接抛异常
            throw new ConnectionPendingException();
        

        boolean wasActive = isActive();
        
        // doConnect方法具体看NioSocketChannel.doConnect()实现
        if (doConnect(remoteAddress, localAddress)) 
            // 连接成功,进行连接后操作
            fulfillConnectPromise(promise, wasActive);
         else 
            // 连接失败,TCP无应答,结果暂未知晓
            connectPromise = promise;
            requestedRemoteAddress = remoteAddress;

            // Schedule connect timeout.
            int connectTimeoutMillis = config().getConnectTimeoutMillis();
            if (connectTimeoutMillis > 0) 
                connectTimeoutFuture = eventLoop().schedule(new Runnable() 
                    @Override
                    public void run() 
                        ChannelPromise connectPromise = AbstractNioChannel.this.connectPromise;
                        ConnectTimeoutException cause =
                                new ConnectTimeoutException("connection timed out: " + remoteAddress);
                        if (connectPromise != null && connectPromise.tryFailure(cause)) 
                            close(voidPromise());
                        
                    
                , connectTimeoutMillis, TimeUnit.MILLISECONDS);
            

            promise.addListener(new ChannelFutureListener() 
                @Override
                public void operationComplete(ChannelFuture future) throws Exception 
                    if (future.isCancelled()) 
                        if (connectTimeoutFuture != null) 
                            connectTimeoutFuture.cancel(false);
                        
                        connectPromise = null;
                        close(voidPromise());
                    
                
            );
        
     catch (Throwable t) 
        promise.tryFailure(annotateConnectException(t, remoteAddress));
        closeIfClosed();
    


private void fulfillConnectPromise(ChannelPromise promise, boolean wasActive) 
    if (promise == null) 
        // Closed via cancellation and the promise has been notified already.
        return;
    

    // 判断当前激活状态
    boolean active = isActive();

    // 如果用户取消了连接,则返回false,需调用close方法关闭链路
    boolean promiseSet = promise.trySuccess();

    // 如果doConnect之前未激活,doConnect之后激活了,需要调用fireChannelActive(即使被取消了也应该调)
    if (!wasActive && active) 
        pipeline().fireChannelActive();
    

    // If a user cancelled the connection attempt, close the channel, which is followed by channelInactive().
    if (!promiseSet) 
        close(voidPromise());
    

2. finishConnect方法

  该方法用于判断连接操作是否结束。

  首先判断当前线程是否就是EventLoop执行线程,不允许其他线程操作;

  缓存当前active状态,用以下面是否要执行fireChannelActive方法;

  调用javaChannel的finishConnect方法,该方法返回三种情况:

    1)连接成功,返回true

    2)连接失败,返回false

    3)发生链路被关闭、链路中断异常,连接失败

  根据javaChannel的返回值,如果返回false,直接抛出error,进入到catch模块

  然后就根据连接状态做不同的后续处理

public final void finishConnect() 
    // Note this method is invoked by the event loop only if the connection attempt was
    // neither cancelled nor timed out.

    assert eventLoop().inEventLoop();

    try 
        boolean wasActive = isActive();
        // 通过javaChannel的finishConnect方法判断连接结果(如果连接失败则抛出Error,会走到catch块里)
        doFinishConnect();
        // 连接成功方法:fulfillConnectPromise(ChannelPromise promise, boolean wasActive)
        fulfillConnectPromise(connectPromise, wasActive);
     catch (Throwable t) 
        // 关闭链路方法:fulfillConnectPromise(ChannelPromise promise, Throwable cause)
        fulfillConnectPromise(connectPromise, annotateConnectException(t, requestedRemoteAddress));
     finally 
        // 如果连接超时时仍然没有收到服务端应答,则由定时任务关闭客户端连接,将SocketChannel从多路复用器上删除
        if (connectTimeoutFuture != null) 
            connectTimeoutFuture.cancel(false);
        
        connectPromise = null;
    

四、NioByteUnsafe源码分析

  这里我们主要分析下它的 read方法。

public final void read() 
            final ChannelConfig config = config();
            if (shouldBreakReadReady(config)) 
                clearReadPending();
                return;
            
            final ChannelPipeline pipeline = pipeline();
            final ByteBufAllocator allocator = config.getAllocator();
            final RecvByteBufAllocator.Handle allocHandle = recvBufAllocHandle();
            allocHandle.reset(config);

            ByteBuf byteBuf = null;
            boolean close = false;
...

public RecvByteBufAllocator.Handle recvBufAllocHandle()
  if (recvHandle == null)
    recvHandle = config().getRecvByteBufAllocator().newHandle();
  
  return recvHandle;

首先,获取NioSocketChannel的SocketChannelConfig,用于设置客户端连接的TCP参数。

继续看allocHandle的初始化,则从SocketChannelConfig的RecvByteBufAllocator中创建一个新的handle。

RecvByteBufAllocator有两个实现,分别是FixedRecvByteBufAllocator 和 AdaptiveRecvByteBufAllocator。FixedRecvByteBufAllocator 比较简单,我们主要分析下AdaptiveRecvByteBufAllocator。

根据名称就可以判断,AdaptiveRecvByteBufAllocator是根据本地读取的字节数动态调整下次接收缓冲区容量

我们先看下AdaptiveRecvByteBufAllocator的 成员变量:

static final int DEFAULT_MINIMUM = 64;//最小缓冲区长度
static final int DEFAULT_INITIAL = 1024;//初始容量
static final int DEFAULT_MAXIMUM = 65536;//最大容量

private static final int INDEX_INCREMENT = 4;//动态调整扩张步进索引
private static final int INDEX_DECREMENT = 1;//动态调整收缩步进索引

private static final int[] SIZE_TABLE;//长度向量表,数组的每个值对应一个Buffer容量

// 初始化长度向量表
// 当容量小于512时,由于缓冲区已经比较小,需要降低步进值,容量每次下调幅度降低
// 当容量大于512时,说明需要解码的消息码流比较大,需要采用调大步进幅度的方式降低动态扩张频率
static 
    List<Integer> sizeTable = new ArrayList<Integer>();
    for (int i = 16; i < 512; i += 16) 
        sizeTable.add(i);
    

    for (int i = 512; i > 0; i <<= 1) 
        sizeTable.add(i);
    

    SIZE_TABLE = new int[sizeTable.size()];
    for (int i = 0; i < SIZE_TABLE.length; i ++) 
        SIZE_TABLE[i] = sizeTable.get(i);
    

然后再来看一下AdaptiveRecvByteBufAllocator.getSizeTableIndex(..)方法:根据容量size查找容器向量表对应的索引

private static int getSizeTableIndex(final int size) 
    for (int low = 0, high = SIZE_TABLE.length - 1;;) 
        if (high < low) 
            return low;
        
        if (high == low) 
            return high;
        

        int mid = low + high >>> 1;
        int a = SIZE_TABLE[mid];
        int b = SIZE_TABLE[mid + 1];
        if (size > b) 
            low = mid + 1;
         else if (size < a) 
            high = mid - 1;
         else if (size == a) 
            return mid;
         else 
            return mid + 1;
        
    

然后我们再来看一下AdaptiveRecvByteBufAllocator的静态内部类HandlerImpl,该类有五个成员变量:

private final int minIndex;            //最小索引
private final int maxIndex;            //最大索引
private int index;                    //当前索引
private int nextReceiveBufferSize;    //下一次预分配的Buffer大小
private boolean decreaseNow;        //是否立即执行容量收缩操作

该类有一个比较重要的方法,record(int actualReadBytes),当NioSocketChannel执行完读操作后,会计算获得本轮轮询读取的总字节数,也就是record方法的入参actualReadBytes,该方法根据读取的字节数对ByteBuf进行动态伸缩和扩张。record操作步骤如下

  1)将当前容量缩减后的值与实际读取的值做比较,如果实际读取的值小于收缩后的容量,则将缓冲区容量降低

  2)如果实际读取的值大于当前Buffer容量,说明实际分配容量不足,需要动态扩张

private void record(int actualReadBytes) 
    if (actualReadBytes <= SIZE_TABLE[max(0, index - INDEX_DECREMENT - 1)]) 
        if (decreaseNow) 
            index = max(index - INDEX_DECREMENT, minIndex);
            nextReceiveBufferSize = SIZE_TABLE[index];
            decreaseNow = false;
         else 
            decreaseNow = true;
        
     else if (actualReadBytes >= nextReceiveBufferSize) 
        index = min(index + INDEX_INCREMENT, maxIndex);
        nextReceiveBufferSize = SIZE_TABLE[index];
        decreaseNow = false;
    

AdaptiveRecvByteBufAllocator优点总结:

1. 性能更高。容量过大会导致内存占用开销增加,后续的Buffer处理性能会下降;容量过小时需要频繁的内存扩张来接收大的请求消息,同样会导致性能下降

2. 更节约内存。根据不同的场景动态的扩张或缩减内存,达到内存使用最优化。

 

然后我们接着来分析 read方法,这里循环读取缓冲区数据,并根据上次读取字节数动态调整ByteBuffer大小。每次读取都要触发一次read事件 fireChannelRead,注意,这里并不是说一次read就读完了全部消息,可能存在粘包拆包情况。

当上次读取了0个字节,说明已经读完了,跳出循环,触发读操作完成事件 fireChannelReadComplete。

public final void read() 
        final ChannelConfig config = config();
        if (shouldBreakReadReady(config)) 
            clearReadPending();
            return;
        
        final ChannelPipeline pipeline = pipeline();
        final ByteBufAllocator allocator = config.getAllocator();
        final RecvByteBufAllocator.Handle allocHandle = recvBufAllocHandle();
        allocHandle.reset(config);

        ByteBuf byteBuf = null;
        boolean close = false;
        try 
            do 
                // 通过接收缓冲区分配器计算下次预分配的缓冲区容量并创建ByteBuffer
                byteBuf = allocHandle.allocate(allocator);
                // 这里分两步:1. doReadBytes(byteBuf):调用NioSocketChannel.doReadBytes(..),返回本次读取的字节数(返回0-无消息   返回小于0-发生了IO异常)
                // 2. 设置lastBytesRead,用以下面的处理
                allocHandle.lastBytesRead(doReadBytes(byteBuf));
                if (allocHandle.lastBytesRead() <= 0) 
                    // 走到这里说明上一步没有读取到数据,释放ByteBuffer
                    byteBuf.release();
                    byteBuf = null;
                    close = allocHandle.lastBytesRead() < 0;
                    if (close) 
                        // 发生了IO异常,需关闭连接
                        readPending = false;
                    
                    break;
                

                allocHandle.incMessagesRead(1);
                readPending = false;
                // 一次读操作,触发一次read事件
                pipeline.fireChannelRead(byteBuf);
                byteBuf = null;
             while (allocHandle.continueReading());
            
            allocHandle.readComplete();
            // 触发读操作结束事件
            pipeline.fireChannelReadComplete();

            if (close) 
                closeOnRead(pipeline);
            
         catch (Throwable t) 
            handleReadException(pipeline, byteBuf, t, close, allocHandle);
         finally 
            // Check if there is a readPending which was not processed yet.
            // This could be for two reasons:
            // * The user called Channel.read() or ChannelHandlerContext.read() in channelRead(...) method
            // * The user called Channel.read() or ChannelHandlerContext.read() in channelReadComplete(...) method
            //
            // See https://github.com/netty/netty/issues/2254
            if (!readPending && !config.isAutoRead()) 
                removeReadOp();
            
        
    




protected int doReadBytes(ByteBuf byteBuf) throws Exception 
    final RecvByteBufAllocator.Handle allocHandle = unsafe().recvBufAllocHandle();
    allocHandle.attemptedBytesRead(byteBuf.writableBytes());
    return byteBuf.writeBytes(javaChannel(), allocHandle.attemptedBytesRead());


public int writeBytes(ScatteringByteChannel in, int length) throws IOException 
    ensureWritable(length);
    int writtenBytes = setBytes(writerIndex, in, length);
    if (writtenBytes > 0) 
        writerIndex += writtenBytes;
    
    return writtenBytes;

 

以上是关于4. Netty源码分析之Unsafe的主要内容,如果未能解决你的问题,请参考以下文章

[编织消息框架][netty源码分析]7 Unsafe 实现类NioSocketChannelUnsafe职责与实现

Netty源码分析-AbstractUnsafe(register注册流程)

Netty源码分析-NioByteUnsafe(read读取流程)

死磕 java并发包之AtomicInteger源码分析

Netty4.xNetty源码分析之LineBasedFrameDecoder

2. Netty源码分析之使用篇