Netty源码分析-Bootstrap客户端连接过程

Posted 征服.刘华强

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Netty源码分析-Bootstrap客户端连接过程相关的知识,希望对你有一定的参考价值。

 

Netty源码分析-Bootstrap客户端连接过程

下面是客户端连接模板代码

	public static void main(String[] args) throws Exception 
        EventLoopGroup workerGroup = new NioEventLoopGroup();
        try 
            Bootstrap b = new Bootstrap(); // (1)
            b.group(workerGroup); // (2)
            b.channel(NiosocketChannel.class); // (3)
            b.option(ChannelOption.SO_KEEPALIVE, true); // (4)
            b.handler(new ChannelInitializer<SocketChannel>() 
                @Override
                public void initChannel(SocketChannel ch) throws Exception 
                	ch.pipeline().addLast("fieldPrepender", new LengthFieldPrepender(2));
					ch.pipeline().addLast("stringEncoder", new StringEncoder(CharsetUtil.UTF_8));
					ch.pipeline().addLast("SendMsgHandler", new SendMsgHandler());
                
            );
            ChannelFuture f = b.connect("192.168.80.110", 8080).sync();
            f.channel().closeFuture().sync();
         finally 
            workerGroup.shutdownGracefully();
        
	

调用了connect方法

//客户端连接逻辑
    public ChannelFuture connect(SocketAddress remoteAddress) 
        ObjectUtil.checkNotNull(remoteAddress, "remoteAddress");
        validate();
        return doResolveAndConnect(remoteAddress, config.localAddress());
    

 

 private ChannelFuture doResolveAndConnect(final SocketAddress remoteAddress, final SocketAddress localAddress) 
        //初始化和注册逻辑
        //1、创建NioSocketChannel  2、注册到EventLoop当中去
        final ChannelFuture regFuture = initAndRegister();
        final Channel channel = regFuture.channel();

        if (regFuture.isDone()) 
            if (!regFuture.isSuccess()) 
                return regFuture;
            
            //连接服务器
            return doResolveAndConnect0(channel, remoteAddress, localAddress, channel.newPromise());
         else 
            // Registration future is almost always fulfilled already, but just in case it's not.
            final PendingRegistrationPromise promise = new PendingRegistrationPromise(channel);
            regFuture.addListener(new ChannelFutureListener() 
                @Override
                public void operationComplete(ChannelFuture future) throws Exception 
                    // Directly obtain the cause and do a null check so we only need one volatile read in case of a
                    // failure.
                    Throwable cause = future.cause();
                    if (cause != null) 
                        // Registration on the EventLoop failed so fail the ChannelPromise directly to not cause an
                        // IllegalStateException once we try to access the EventLoop of the Channel.
                        promise.setFailure(cause);
                     else 
                        // Registration was successful, so set the correct executor to use.
                        // See https://github.com/netty/netty/issues/2586
                        promise.registered();
                        doResolveAndConnect0(channel, remoteAddress, localAddress, promise);
                    
                
            );
            return promise;
        
    
initAndRegister方法,创建channel并注册到selector上面
final ChannelFuture initAndRegister() 
        Channel channel = null;
        try 
            //反射创建NioSocketChannel
            channel = channelFactory.newChannel();
            //初始化
            init(channel);
         catch (Throwable t) 
            if (channel != null) 
                // channel can be null if newChannel crashed (eg SocketException("too many open files"))
                channel.unsafe().closeForcibly();
                // as the Channel is not registered yet we need to force the usage of the GlobalEventExecutor
                return new DefaultChannelPromise(channel, GlobalEventExecutor.INSTANCE).setFailure(t);
            
            // as the Channel is not registered yet we need to force the usage of the GlobalEventExecutor
            return new DefaultChannelPromise(new FailedChannel(), GlobalEventExecutor.INSTANCE).setFailure(t);
        

        //把JDK底层Channel绑定到Selector上面
        ChannelFuture regFuture = config().group().register(channel);
        if (regFuture.cause() != null) 
            if (channel.isRegistered()) 
                channel.close();
             else 
                channel.unsafe().closeForcibly();
            
        

连接的逻辑核心在意doconnect方法

private ChannelFuture doResolveAndConnect0(final Channel channel, SocketAddress remoteAddress,
                                               final SocketAddress localAddress, final ChannelPromise promise) 
        try 
            final EventLoop eventLoop = channel.eventLoop();
            AddressResolver<SocketAddress> resolver;
            try 
                //地址解析工具
                resolver = this.resolver.getResolver(eventLoop);
             catch (Throwable cause) 
                channel.close();
                return promise.setFailure(cause);
            

            if (!resolver.isSupported(remoteAddress) || resolver.isResolved(remoteAddress)) 
                // Resolver has no idea about what to do with the specified remote address or it's resolved already.
                //连接具体操作,这里已经解析出IP地址
                doConnect(remoteAddress, localAddress, promise);
                return promise;
            

            final Future<SocketAddress> resolveFuture = resolver.resolve(remoteAddress);

            if (resolveFuture.isDone()) 
                final Throwable resolveFailureCause = resolveFuture.cause();

                if (resolveFailureCause != null) 
                    // Failed to resolve immediately
                    channel.close();
                    promise.setFailure(resolveFailureCause);
                 else 
                    // Succeeded to resolve immediately; cached? (or did a blocking lookup)
                    doConnect(resolveFuture.getNow(), localAddress, promise);
                
                return promise;
            

            // Wait until the name resolution is finished.
            resolveFuture.addListener(new FutureListener<SocketAddress>() 
                @Override
                public void operationComplete(Future<SocketAddress> future) throws Exception 
                    if (future.cause() != null) 
                        channel.close();
                        promise.setFailure(future.cause());
                     else 
                        doConnect(future.getNow(), localAddress, promise);
                    
                
            );
         catch (Throwable cause) 
            promise.tryFailure(cause);
        
        return promise;
    

 

    private static void doConnect(
            final SocketAddress remoteAddress, final SocketAddress localAddress, final ChannelPromise connectPromise) 

        // This method is invoked before channelRegistered() is triggered.  Give user handlers a chance to set up
        // the pipeline in its channelRegistered() implementation.
        final Channel channel = connectPromise.channel();
        channel.eventLoop().execute(new Runnable() 
            @Override
            public void run() 
                if (localAddress == null) 
                    //连接逻辑,会交给pipeline,从tail-head
                    //在Head里调用了Unsafe.connect
                    channel.connect(remoteAddress, connectPromise);
                 else 
                    channel.connect(remoteAddress, localAddress, connectPromise);
                
                connectPromise.addListener(ChannelFutureListener.CLOSE_ON_FAILURE);
            
        );
    

调用的channel.connect,会通过PPLine层层调用,最后调用到Unsafe里面

@Override
        public final void connect(
                final SocketAddress remoteAddress, final SocketAddress localAddress, final ChannelPromise promise) 
            if (!promise.setUncancellable() || !ensureOpen(promise)) 
                return;
            

            try 
                if (connectPromise != null) 
                    // Already a connect in process.
                    throw new ConnectionPendingException();
                

                boolean wasActive = isActive();
                //开始连接服务器端
                if (doConnect(remoteAddress, localAddress)) 
                    //连接成功则触发事件
                    fulfillConnectPromise(promise, wasActive);
                 else 
                    //如果没有立即连接到服务器,则进行异步处理
                    connectPromise = promise;
                    requestedRemoteAddress = remoteAddress;

                    //创建一个连接超时的任务
                    // Schedule connect timeout.
                    int connectTimeoutMillis = config().getConnectTimeoutMillis();
                    if (connectTimeoutMillis > 0) 
                        connectTimeoutFuture = eventLoop().schedule(new Runnable() 
                            @Override
                            public void run() 
                                //任务被触发说明超时,设置Futrue失败状态
                                ChannelPromise connectPromise = AbstractNioChannel.this.connectPromise;
                                ConnectTimeoutException cause =
                                        new ConnectTimeoutException("connection timed out: " + remoteAddress);
                                if (connectPromise != null && connectPromise.tryFailure(cause)) 
                                    close(voidPromise());
                                
                                //在到达超时时间后触发
                            
                        , connectTimeoutMillis, TimeUnit.MILLISECONDS);
                    

                    //给Futrue设置回调函数
                    promise.addListener(new ChannelFutureListener() 
                        @Override
                        public void operationComplete(ChannelFuture future) throws Exception 
                            if (future.isCancelled()) 
                                //取消超时任务
                                if (connectTimeoutFuture != null) 
                                    connectTimeoutFuture.cancel(false);
                                
                                connectPromise = null;
                                close(voidPromise());
                            
                        
                    );
                
             catch (Throwable t) 
                promise.tryFailure(annotateConnectException(t, remoteAddress));
                closeIfClosed();
            
        

 

先尝试连接服务器端,连接返回false则设值为异步模式

 @Override
    protected boolean doConnect(SocketAddress remoteAddress, SocketAddress localAddress) throws Exception 
        //本地地址不为空,则用指定的地址创建socket
        if (localAddress != null) 
            doBind0(localAddress);
        

        boolean success = false;
        try 
            //连接到服务器端
            boolean connected = SocketUtils.connect(javaChannel(), remoteAddress);
            if (!connected) 
                //如果暂时连接不上,则改为异步连接,注册连接事件
                selectionKey().interestOps(SelectionKey.OP_CONNECT);
            
            success = true;
            return connected;
         finally 
            //异常关闭socket
            if (!success) 
                doClose();
            
        
    

把OP_CONNECT事件注册到selector上,等待系统通知

  try 
            int readyOps = k.readyOps();
            // We first need to call finishConnect() before try to trigger a read(...) or write(...) as otherwise
            // the NIO JDK channel implementation may throw a NotYetConnectedException.
            //底层事件为Socket客户端连接到服务器端的事件
            if ((readyOps & SelectionKey.OP_CONNECT) != 0) 
                // remove OP_CONNECT as otherwise Selector.select(..) will always return without blocking
                // See https://github.com/netty/netty/issues/924
                int ops = k.interestOps();
                ops &= ~SelectionKey.OP_CONNECT;
                k.interestOps(ops);
                //通过unsafe调用
                unsafe.finishConnect();
            

            // Process OP_WRITE first as we may be able to write some queued buffers and so free memory.
            if ((readyOps & SelectionKey.OP_WRITE) != 0) 
                // Call forceFlush which will also take care of clear the OP_WRITE once there is nothing left to write
                ch.unsafe().forceFlush();
            

            // Also check for readOps of 0 to workaround possible JDK bug which may otherwise lead
            // to a spin loop
            if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) 
                unsafe.read();

 

Unsafe的连接逻辑

@Override
        public final void finishConnect() 
            // Note this method is invoked by the event loop only if the connection attempt was
            // neither cancelled nor timed out.

            assert eventLoop().inEventLoop();

            try 
                //调用底层socket连接逻辑
                boolean wasActive = isActive();
                doFinishConnect();
                //触发事件
                fulfillConnectPromise(connectPromise, wasActive);
             catch (Throwable t) 
                //异常处理
                fulfillConnectPromise(connectPromise, annotateConnectException(t, requestedRemoteAddress));
             finally 
                //取消连接超时的任务
                // Check for null as the connectTimeoutFuture is only created if a connectTimeoutMillis > 0 is used
                // See https://github.com/netty/netty/issues/1770
                if (connectTimeoutFuture != null) 
                    connectTimeoutFuture.cancel(false);
                
                connectPromise = null;
            
        

 

  @Override
    protected void doFinishConnect() throws Exception 
        //在获取到可以连接的事件后被调用,完成客户端到服务器端的连接
        if (!javaChannel().finishConnect()) 
            throw new Error();
        
    

 

以上是关于Netty源码分析-Bootstrap客户端连接过程的主要内容,如果未能解决你的问题,请参考以下文章

Netty网络编程--Bootstrap

netty服务端启动--ServerBootstrap源码解析

Netty入门实践

Netty源码分析之NioEventLoop执行流程

Netty-源码分析WebSocketClient客户端

Pandora Bootstrap源码分析