Day477&478.Netty核心源码 -netty

Posted 阿昌喜欢吃黄桃

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Day477&478.Netty核心源码 -netty相关的知识,希望对你有一定的参考价值。

Netty核心源码

一、Netty 启动过程源码剖析

NioEventLoop 类 的run() 代码 ,无限循环,在服务器端运行

1、源码的基本理解

//服务器启动类源码
/**
* Echoes back any received data from a client.
*/
public final class EchoServer 
    static final boolean SSL = System.getProperty("ssl") != null;
    static final int PORT = Integer.parseInt(System.getProperty("port", "8007"));
    public static void main(String[] args) throws Exception 
        // Configure SSL.
        final SslContext sslCtx;
        if (SSL) 
            SelfSignedCertificate ssc = new SelfSignedCertificate();
            sslCtx = SslContextBuilder.forServer(ssc.certificate(), ssc.privateKey()).build();

         else 
            sslCtx = null;
        
        // Configure the server.
        EventLoopGroup bossGroup = new NioEventLoopGroup(1);
        EventLoopGroup workerGroup = new NioEventLoopGroup();
        try 
            ServerBootstrap b = new ServerBootstrap();
            b.group(bossGroup, workerGroup)
                .channel(NioserverSocketChannel.class)
                .option(ChannelOption.SO_BACKLOG, 100)
                .handler(new LoggingHandler(LogLevel.INFO))
                .childHandler(new ChannelInitializer<SocketChannel>() 
                    @Override
                    public void initChannel(SocketChannel ch) throws Exception 
                        ChannelPipeline p = ch.pipeline();
                        if (sslCtx != null) 
                            p.addLast(sslCtx.newHandler(ch.alloc()));
                        
                        //p.addLast(new LoggingHandler(LogLevel.INFO));
                        p.addLast(new EchoServerHandler());
                    
                );
            // Start the server.
            ChannelFuture f = b.bind(PORT).sync();
            // Wait until the server socket is closed.
            f.channel().closeFuture().sync();
         finally 
            // Shut down all event loops to terminate all threads.
            bossGroup.shutdownGracefully();
            workerGroup.shutdownGracefully();
        
    

说明

  1. 先看启动类:main 方法中,首先创建了关于 SSL 的配置类。

  2. 重点分析下 创建了两个 EventLoopGroup 对象:

  1. EventLoopGroup bossGroup = new NioEventLoopGroup(1);
  2. EventLoopGroup workerGroup = new NioEventLoopGroup();

.
(1) 这两个对象是整个 Netty 的核心对象,可以说,整个 Netty 的运作都依赖于他们。bossGroup 用于接受Tcp 请求,他会将请求交给 workerGroup ,workerGroup 会获取到真正的连接,然后和连接进行通信,比如读写解码编码等操作。

(2) EventLoopGroup 是 事件循环组(线程组) 含有多个 EventLoop,可以注册 channel ,用于在事件循环中去进行选择(和选择器相关)

(3) new NioEventLoopGroup(1); 这个 1 表示 bossGroup 事件组有 1 个线程你可以指定,如果 new NioEventLoopGroup() 会含有默认个线程 cpu 核数*2, 即可以充分的利用多核的优势

DEFAULT_EVENT_LOOP_THREADS = Math.max(1, SystemPropertyUtil.getInt(“io.netty.eventLoopThreads”, NettyRuntime.availableProcessors() * 2));

会创建 EventExecutor 数组 children = new EventExecutor[nThreads]; //debug 一下
每个元素的类型就是 NIOEventLoop, NIOEventLoop 实现了 EventLoop 接口 和 Executor 接口
try 块中创建了一个 ServerBootstrap 对象,他是一个引导类,用于启动服务器和引导整个程序的初始化( 看下源码 allows easy bootstrap of @link ServerChannel )。和 它和 ServerChannel , 关联, 而 而 ServerChannel 继承了Channel法 ,有一些方法 remoteAddress 等

随后,变量 b 调用了 group 方法将两个 group 放入了自己的字段中,用于后期引导使用【debug 下 group 方法

/**
* Set the @link EventLoopGroup for the parent (acceptor) and the child (client). These
* @link EventLoopGroup's are used to handle all the events and IO for @link ServerChannel and
* @link Channel's.
*/

(4) 然后添加了一个 channel,其中参数一个 Class 对象,引导类将通过这个 Class 对象反射创建
ChannelFactory。然后添加了一些 TCP 的参数。[说明:Channel 的创建在 bind 方法,可以 Debug 下 bind ,会找到 channel = channelFactory.newChannel(); ]

(5) 再添加了一个服务器专属的日志处理器 handler。

(6) 再添加一个 SocketChannel(不是 ServerSocketChannel)的 handler。

(7) 然后绑定端口并阻塞至连接成功。

(8) 最后 main 线程阻塞等待关闭。

(9) finally 块中的代码将在服务器关闭时优雅关闭所有资源

//服务器端处理器源码
/**
* Handler implementation for the echo server.
*/
@Sharable
public class EchoServerHandler extends ChannelInboundHandlerAdapter 
    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) 
        ctx.write(msg);
    
    @Override
    public void channelReadComplete(ChannelHandlerContext ctx) 
        ctx.flush();
    
    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) 
        // Close the connection when an exception is raised.
        cause.printStackTrace();
        ctx.close();
    

说明

这是一个普通的处理器类,用于处理客户端发送来的消息,在我们这里,我们简单的解析出客户端传过来的内容,然后打印,最后发送字符串给客户端。


2、EventLoopGroup 的过程

  • 构造器方法
public NioEventLoopGroup(int nThreads) 
    this(nThreads, (Executor) null);

  • 上面的 this(nThreads, (Executor) null); 调用构造器 ( 通过 alt+d 看即可)
public NioEventLoopGroup(int nThreads, Executor executor) 
    this(nThreads, executor, SelectorProvider.provider());

  • 上面的 this(nThreads, executor, SelectorProvider.provider()); 调用下面构造器
public NioEventLoopGroup(
    int nThreads, Executor executor, final SelectorProvider selectorProvider) 
    this(nThreads, executor, selectorProvider, DefaultSelectStrategyFactory.INSTANCE);

  • 上面的 this ()... 调用构造器(alt+d)
public NioEventLoopGroup(int nThreads, Executor executor, final SelectorProvider selectorProvider,final SelectStrategyFactory selectStrategyFactory) 
    super(nThreads, executor, selectorProvider, selectStrategyFactory, RejectedExecutionHandlers.reject());

  • 上面的 super() .. 的方法是父类: MultithreadEventLoopGroup
protected MultithreadEventLoopGroup(int nThreads, Executor executor, Object... args) 
    super(nThreads == 0 ? DEFAULT_EVENT_LOOP_THREADS : nThreads, executor, args);

  • 追踪到源码抽象类 MultithreadEventExecutorGroup 的构造器方法 MultithreadEventExecutorGroup 才是
    NioEventLoopGroup 真正的构造方法, 这里可以看成是一个模板方法,使用了设计模式的模板模式,

3、MultithreadEventExecutorGroup

  • 参数说明

@param nThreads 使用的线程数,默认为 core *2 [可以追踪源码]

@param executor执行器 : 如果传入 null,则采用 Netty 默认的线程工厂和默认的执行器ThreadPerTaskExecutor

@param chooserFactory 单例 new DefaultEventExecutorChooserFactory()
@param args args 在创建执行器的时候传入固定参数

protected MultithreadEventExecutorGroup(int nThreads, Executor executor,EventExecutorChooserFactory chooserFactory, Object... args) 
    if (nThreads <= 0) 
        throw new IllegalArgumentException(String.format("nThreads: %d (expected: > 0)", nThreads));
    
    if (executor == null)  //如果传入的执行器是空的则采用默认的线程工厂和默认的执行器
        executor = new ThreadPerTaskExecutor(newDefaultThreadFactory());
    
    //创建指定线程数的执行器数组
    children = new EventExecutor[nThreads];
    //初始化线程数组
    for (int i = 0; i < nThreads; i ++) 
        boolean success = false;
        try 
            // 创建 new NioEventLoop
            children[i] = newChild(executor, args);
            success = true;
         catch (Exception e) 
            // TODO: Think about if this is a good exception type
            throw new IllegalStateException("failed to create a child event loop", e);
         finally 
            // 如果创建失败,优雅关闭
            if (!success) 
                for (int j = 0; j < i; j ++) 
                    children[j].shutdownGracefully();
                
                for (int j = 0; j < i; j ++) 
                    EventExecutor e = children[j];
                    try 
                        while (!e.isTerminated()) 
                            e.awaitTermination(Integer.MAX_VALUE, TimeUnit.SECONDS);
                        
                     catch (InterruptedException interrupted) 
                        // Let the caller handle the interruption.
                        Thread.currentThread().interrupt();
                        break;
                    
                
            
        
    
    chooser = chooserFactory.newChooser(children);
    final FutureListener<Object> terminationListener = new FutureListener<Object>() 
        @Override
        public void operationComplete(Future<Object> future) throws Exception 
            if (terminatedChildren.incrementAndGet() == children.length) 
                terminationFuture.setSuccess(null);
            
        
    ;
    //为每一个单例线程池添加一个关闭监听器
    for (EventExecutor e: children) 
        e.terminationFuture().addListener(terminationListener);
    
    Set<EventExecutor> childrenSet = new LinkedHashSet<EventExecutor>(children.length);
    //将所有的单例线程池添加到一个 HashSet 中。
    Collections.addAll(childrenSet, children);
    readonlyChildren = Collections.unmodifiableSet(childrenSet);

说明:

  1. 如果 executor 是 null,创建一个默认的 ThreadPerTaskExecutor,使用 Netty 默认的线程工厂。
  2. 根据传入的线程数(CPU*2)创建一个线程池(单例线程池)数组。
  3. 循环填充数组中的元素。如果异常,则关闭所有的单例线程池。
  4. 根据线程选择工厂创建一个 线程选择器。
  5. 为每一个单例线程池添加一个关闭监听器。
  6. 将所有的单例线程池添加到一个 HashSet 中。

4、ServerBootstrap 创建和构造过程

  • ServerBootstrap 是个空构造,但是有默认的成员变量
private final Map<ChannelOption<?>, Object> childOptions = new LinkedHashMap<ChannelOption<?>, Object>();
private final Map<AttributeKey<?>, Object> childAttrs = new LinkedHashMap<AttributeKey<?>, Object>();
//config 对象,会在后面起很大作用
private final ServerBootstrapConfig config = new ServerBootstrapConfig(this);
private volatile EventLoopGroup childGroup;
private volatile ChannelHandler childHandler;
  • ServerBootstrap 基本使用情况
ServerBootstrap b = new ServerBootstrap();
b.group(bossGroup, workerGroup)
    .channel(NioServerSocketChannel.class)
    .option(ChannelOption.SO_BACKLOG, 100)
    .handler(new LoggingHandler(LogLevel.INFO))
    .childHandler(new ChannelInitializer<SocketChannel>() 
        @Override
        public void initChannel(SocketChannel ch) throws Exception 
            ChannelPipeline p = ch.pipeline();
            if (sslCtx != null) 
                p.addLast(sslCtx.newHandler(ch.alloc()));
            
            //p.addLast(new LoggingHandler(LogLevel.INFO));
            p.addLast(new EchoServerHandler());
        
    );

说明:

  1. 链式调用:group 方法,将 boss 和 worker 传入,boss 赋值给 parentGroup 属性,worker 赋值给 childGroup属性
  2. channel 方法传入 NioServerSocketChannel class 对象。会根据这个 class 创建 channel 对象。
  3. option 方法传入 TCP 参数,放在一个 LinkedHashMap 中。
  4. handler 方法传入一个 handler 中,这个 hanlder 只专属于 ServerSocketChannel 而不是SocketChannel
  5. childHandler 传入一个 hanlder ,这个 handler 将会在每个客户端连接的时候调用。供 SocketChannel 使用

5、绑定端口

  • 服务器就是在这个 bind 方法里启动完成的

  • bind 方法代码, 追踪到创建了一个端口对象,并做了一些空判断, 核心代码 doBind

public ChannelFuture bind(SocketAddress localAddress) 
    validate();
    if (localAddress == null) 
        throw new NullPointerException("localAddress");
    
    return doBind(localAddress);

  • doBind 源码剖析, 核心是两个方法 initAndRegister 和 doBind0

    private ChannelFuture doBind(final SocketAddress localAddress) 
        final ChannelFuture regFuture = initAndRegister();
        final Channel channel = regFuture.channel();
        if (regFuture.cause() != null) 
            return regFuture;
        
        if (regFuture.isDone()) 
            //At this point we know that the registration was complete and successful.
            ChannelPromise promise = channel.newPromise();
            //============================================
            //说明:执行 doBind0 方法,完成对端口的绑定
            //============================================
            doBind0(regFuture, channel, localAddress, promise);
            return promise;
         else 
            // Registration future is almost always fulfilled already, but just in case it's not.
            final PendingRegistrationPromise promise = new PendingRegistrationPromise(channel);
            regFuture.addListener(new ChannelFutureListener() 
                @Override
                public void operationComplete(ChannelFuture future) throws Exception 
                    Throwable cause = future.cause();
                    if (cause != null) 
                        // Registration on the EventLoop failed so fail the ChannelPromise directly to not
                        cause an
                            // IllegalStateException once we try to access the EventLoop of the Channel.
                            promise.setFailure(cause);
                     else 
                        // Registration was successful, so set the correct executor to use.
                        // See https://github.com/netty/netty/issues/2586
                        promise.registered();
                        doBind0(regFuture, channel, localAddress, promise);
                    
                
            );
            return promise;
        
    
    

6、initAndRegister

final ChannelFuture initAndRegister() 
    Channel channel = null;
    try // 说明: channelFactory.newChannel() 方法 的作用 通过 ServerBootstrap 的通道工厂反射创建一个NioServerSocketChannel, 具体追踪源码可以得到下面结论
        /*
    (1) 通过 NIO 的 SelectorProvider 的 openServerSocketChannel 方法得到 JDK 的 channel。目的是让 Netty 包装 JDK 的 channel。
    (2) 创建了一个唯一的 ChannelId,创建了一个 NioMessageUnsafe,用于操作消息,创建了一个 DefaultChannelPipeline 管道,是个双向链表结构,用于过滤所有的进出的消息。
    (3) 创建了一个 NioServerSocketChannelConfig 对象,用于对外展示一些配置。
        */
        channel = channelFactory.newChannel();//NioServerSocketChannel
        // 说明:init 个 初始化这个 NioServerSocketChannel, 具体追踪源码可以得到如下结论
        /*
      (1) init 方法,这是个抽象方法(AbstractBootstrap 类的)由 ,由 ServerBootstrap 实现(可以追一下源码 //setChannelOptions(channel, options, logger); )。
      (2) 设置 NioServerSocketChannel 的 的 TCP 属性。
      (3) 由于 LinkedHashMap 是非线程安全的,使用同步进行处理。
      (4) 对 NioServerSocketChannel 的 的 ChannelPipeline 加 添加 ChannelInitializer 处理器。
      (5) 可以看出, init 和 的方法的核心作用在和 ChannelPipeline 相关。
      (6) 从NioServerSocketChannel 的初始化过程中,我们知道,pipeline 是一个双向链表,并了且,他本身就初始化了 head 和tail的 ,这里调用了他的 addLast 个方法,也就是将整个handler到插入到tail的
为前面,因为 tail 永远会在后面,需要做一些系统的固定工作。
        */
        init(channel);
     catch (Throwable t) 
        if (channel != null) 
            channel.unsafe().closeForcibly();
            return new DefaultChannelPromise(channel, GlobalEventExecutor.INSTANCE).setFailure(t);
        
        return new DefaultChannelPromise(new FailedChannel(),
                                         GlobalEventExecutor.INSTANCE).setFailure(t);
    
    ChannelFuture regFuture = config().group().register(channel);
    if (regFuture.cause() != null) 
        if (channel.isRegistered()) <

以上是关于Day477&478.Netty核心源码 -netty的主要内容,如果未能解决你的问题,请参考以下文章

Day472&473&474.Netty 核心模块组件 -netty

Day680.大佬如何学习源码 -深入拆解 Tomcat & Jetty

Day769.Kaito大佬的学习方法&技巧&认识 -Redis 核心技术与实战

Day13 目录结构&自定义Yum仓库&源码包编译安装(Service02)

Day739.GEO经纬度数据结构&自定义数据结构 -Redis 核心技术与实战

Day550.kafka源码主内容解析 -kafka