Day477&478.Netty核心源码 -netty
Posted 阿昌喜欢吃黄桃
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Day477&478.Netty核心源码 -netty相关的知识,希望对你有一定的参考价值。
Netty核心源码
一、Netty 启动过程源码剖析
NioEventLoop 类 的run()
代码 ,无限循环
,在服务器端运行
1、源码的基本理解
//服务器启动类源码
/**
* Echoes back any received data from a client.
*/
public final class EchoServer
static final boolean SSL = System.getProperty("ssl") != null;
static final int PORT = Integer.parseInt(System.getProperty("port", "8007"));
public static void main(String[] args) throws Exception
// Configure SSL.
final SslContext sslCtx;
if (SSL)
SelfSignedCertificate ssc = new SelfSignedCertificate();
sslCtx = SslContextBuilder.forServer(ssc.certificate(), ssc.privateKey()).build();
else
sslCtx = null;
// Configure the server.
EventLoopGroup bossGroup = new NioEventLoopGroup(1);
EventLoopGroup workerGroup = new NioEventLoopGroup();
try
ServerBootstrap b = new ServerBootstrap();
b.group(bossGroup, workerGroup)
.channel(NioserverSocketChannel.class)
.option(ChannelOption.SO_BACKLOG, 100)
.handler(new LoggingHandler(LogLevel.INFO))
.childHandler(new ChannelInitializer<SocketChannel>()
@Override
public void initChannel(SocketChannel ch) throws Exception
ChannelPipeline p = ch.pipeline();
if (sslCtx != null)
p.addLast(sslCtx.newHandler(ch.alloc()));
//p.addLast(new LoggingHandler(LogLevel.INFO));
p.addLast(new EchoServerHandler());
);
// Start the server.
ChannelFuture f = b.bind(PORT).sync();
// Wait until the server socket is closed.
f.channel().closeFuture().sync();
finally
// Shut down all event loops to terminate all threads.
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
说明
先看启动类:main 方法中,首先创建了关于 SSL 的配置类。
重点分析下 创建了两个 EventLoopGroup 对象:
- EventLoopGroup bossGroup = new NioEventLoopGroup(1);
- EventLoopGroup workerGroup = new NioEventLoopGroup();
.
(1) 这两个对象是整个 Netty 的核心对象,可以说,整个 Netty 的运作都依赖于他们。bossGroup 用于接受Tcp 请求,他会将请求交给 workerGroup ,workerGroup 会获取到真正的连接,然后和连接进行通信,比如读写解码编码等操作。(2) EventLoopGroup 是 事件循环组(线程组) 含有多个 EventLoop,可以注册 channel ,用于在事件循环中去进行选择(和选择器相关)
(3) new NioEventLoopGroup(1); 这个 1 表示 bossGroup 事件组有 1 个线程你可以指定,如果 new NioEventLoopGroup() 会含有默认个线程 cpu 核数*2, 即可以充分的利用多核的优势
DEFAULT_EVENT_LOOP_THREADS = Math.max(1, SystemPropertyUtil.getInt(“io.netty.eventLoopThreads”, NettyRuntime.availableProcessors() * 2));
会创建 EventExecutor 数组 children = new EventExecutor[nThreads]; //debug 一下
每个元素的类型就是 NIOEventLoop, NIOEventLoop 实现了 EventLoop 接口 和 Executor 接口
try 块中创建了一个 ServerBootstrap 对象,他是一个引导类,用于启动服务器和引导整个程序的初始化( 看下源码 allows easy bootstrap of @link ServerChannel )。和 它和 ServerChannel , 关联, 而 而 ServerChannel 继承了Channel法 ,有一些方法 remoteAddress 等随后,变量 b 调用了 group 方法将两个 group 放入了自己的字段中,用于后期引导使用【debug 下 group 方法
/** * Set the @link EventLoopGroup for the parent (acceptor) and the child (client). These * @link EventLoopGroup's are used to handle all the events and IO for @link ServerChannel and * @link Channel's. */
(4) 然后添加了一个 channel,其中参数一个 Class 对象,引导类将通过这个 Class 对象反射创建
ChannelFactory。然后添加了一些 TCP 的参数。[说明:Channel 的创建在 bind 方法,可以 Debug 下 bind ,会找到 channel = channelFactory.newChannel(); ](5) 再添加了一个服务器专属的日志处理器 handler。
(6) 再添加一个 SocketChannel(不是 ServerSocketChannel)的 handler。
(7) 然后绑定端口并阻塞至连接成功。
(8) 最后 main 线程阻塞等待关闭。
(9) finally 块中的代码将在服务器关闭时优雅关闭所有资源
//服务器端处理器源码
/**
* Handler implementation for the echo server.
*/
@Sharable
public class EchoServerHandler extends ChannelInboundHandlerAdapter
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg)
ctx.write(msg);
@Override
public void channelReadComplete(ChannelHandlerContext ctx)
ctx.flush();
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause)
// Close the connection when an exception is raised.
cause.printStackTrace();
ctx.close();
说明:
这是一个普通的处理器类,用于处理客户端发送来的消息,在我们这里,我们简单的解析出客户端传过来的内容,然后打印,最后发送字符串给客户端。
2、EventLoopGroup 的过程
- 构造器方法
public NioEventLoopGroup(int nThreads)
this(nThreads, (Executor) null);
- 上面的
this(nThreads, (Executor) null);
调用构造器 ( 通过 alt+d 看即可)
public NioEventLoopGroup(int nThreads, Executor executor)
this(nThreads, executor, SelectorProvider.provider());
- 上面的
this(nThreads, executor, SelectorProvider.provider());
调用下面构造器
public NioEventLoopGroup(
int nThreads, Executor executor, final SelectorProvider selectorProvider)
this(nThreads, executor, selectorProvider, DefaultSelectStrategyFactory.INSTANCE);
- 上面的
this ()...
调用构造器(alt+d)
public NioEventLoopGroup(int nThreads, Executor executor, final SelectorProvider selectorProvider,final SelectStrategyFactory selectStrategyFactory)
super(nThreads, executor, selectorProvider, selectStrategyFactory, RejectedExecutionHandlers.reject());
- 上面的
super() ..
的方法是父类:MultithreadEventLoopGroup
protected MultithreadEventLoopGroup(int nThreads, Executor executor, Object... args)
super(nThreads == 0 ? DEFAULT_EVENT_LOOP_THREADS : nThreads, executor, args);
- 追踪到源码抽象类 MultithreadEventExecutorGroup 的构造器方法 MultithreadEventExecutorGroup 才是
NioEventLoopGroup 真正的构造方法, 这里可以看成是一个模板方法,使用了设计模式的模板模式
,
3、MultithreadEventExecutorGroup
- 参数说明
@param nThreads
使用的线程数,默认为 core *2 [可以追踪源码]
@param executor
执行器 : 如果传入 null,则采用 Netty 默认的线程工厂和默认的执行器ThreadPerTaskExecutor
@param chooserFactory
单例 new DefaultEventExecutorChooserFactory()
@param args args
在创建执行器的时候传入固定参数
protected MultithreadEventExecutorGroup(int nThreads, Executor executor,EventExecutorChooserFactory chooserFactory, Object... args)
if (nThreads <= 0)
throw new IllegalArgumentException(String.format("nThreads: %d (expected: > 0)", nThreads));
if (executor == null) //如果传入的执行器是空的则采用默认的线程工厂和默认的执行器
executor = new ThreadPerTaskExecutor(newDefaultThreadFactory());
//创建指定线程数的执行器数组
children = new EventExecutor[nThreads];
//初始化线程数组
for (int i = 0; i < nThreads; i ++)
boolean success = false;
try
// 创建 new NioEventLoop
children[i] = newChild(executor, args);
success = true;
catch (Exception e)
// TODO: Think about if this is a good exception type
throw new IllegalStateException("failed to create a child event loop", e);
finally
// 如果创建失败,优雅关闭
if (!success)
for (int j = 0; j < i; j ++)
children[j].shutdownGracefully();
for (int j = 0; j < i; j ++)
EventExecutor e = children[j];
try
while (!e.isTerminated())
e.awaitTermination(Integer.MAX_VALUE, TimeUnit.SECONDS);
catch (InterruptedException interrupted)
// Let the caller handle the interruption.
Thread.currentThread().interrupt();
break;
chooser = chooserFactory.newChooser(children);
final FutureListener<Object> terminationListener = new FutureListener<Object>()
@Override
public void operationComplete(Future<Object> future) throws Exception
if (terminatedChildren.incrementAndGet() == children.length)
terminationFuture.setSuccess(null);
;
//为每一个单例线程池添加一个关闭监听器
for (EventExecutor e: children)
e.terminationFuture().addListener(terminationListener);
Set<EventExecutor> childrenSet = new LinkedHashSet<EventExecutor>(children.length);
//将所有的单例线程池添加到一个 HashSet 中。
Collections.addAll(childrenSet, children);
readonlyChildren = Collections.unmodifiableSet(childrenSet);
说明:
- 如果 executor 是 null,创建一个默认的 ThreadPerTaskExecutor,使用 Netty 默认的线程工厂。
- 根据传入的线程数(CPU*2)创建一个线程池(单例线程池)数组。
- 循环填充数组中的元素。如果异常,则关闭所有的单例线程池。
- 根据线程选择工厂创建一个 线程选择器。
- 为每一个单例线程池添加一个关闭监听器。
- 将所有的单例线程池添加到一个 HashSet 中。
4、ServerBootstrap 创建和构造过程
- ServerBootstrap 是个空构造,但是有默认的成员变量
private final Map<ChannelOption<?>, Object> childOptions = new LinkedHashMap<ChannelOption<?>, Object>();
private final Map<AttributeKey<?>, Object> childAttrs = new LinkedHashMap<AttributeKey<?>, Object>();
//config 对象,会在后面起很大作用
private final ServerBootstrapConfig config = new ServerBootstrapConfig(this);
private volatile EventLoopGroup childGroup;
private volatile ChannelHandler childHandler;
- ServerBootstrap 基本使用情况
ServerBootstrap b = new ServerBootstrap();
b.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
.option(ChannelOption.SO_BACKLOG, 100)
.handler(new LoggingHandler(LogLevel.INFO))
.childHandler(new ChannelInitializer<SocketChannel>()
@Override
public void initChannel(SocketChannel ch) throws Exception
ChannelPipeline p = ch.pipeline();
if (sslCtx != null)
p.addLast(sslCtx.newHandler(ch.alloc()));
//p.addLast(new LoggingHandler(LogLevel.INFO));
p.addLast(new EchoServerHandler());
);
说明:
- 链式调用:group 方法,将 boss 和 worker 传入,boss 赋值给 parentGroup 属性,worker 赋值给 childGroup属性
- channel 方法传入 NioServerSocketChannel class 对象。会根据这个 class 创建 channel 对象。
- option 方法传入 TCP 参数,放在一个 LinkedHashMap 中。
- handler 方法传入一个 handler 中,这个 hanlder 只专属于 ServerSocketChannel 而不是SocketChannel
- childHandler 传入一个 hanlder ,这个 handler 将会在每个客户端连接的时候调用。供 SocketChannel 使用
5、绑定端口
-
服务器就是在这个 bind 方法里启动完成的
-
bind 方法代码, 追踪到创建了一个端口对象,并做了一些空判断, 核心代码 doBind
public ChannelFuture bind(SocketAddress localAddress)
validate();
if (localAddress == null)
throw new NullPointerException("localAddress");
return doBind(localAddress);
-
doBind 源码剖析, 核心是两个方法 initAndRegister 和 doBind0
private ChannelFuture doBind(final SocketAddress localAddress) final ChannelFuture regFuture = initAndRegister(); final Channel channel = regFuture.channel(); if (regFuture.cause() != null) return regFuture; if (regFuture.isDone()) //At this point we know that the registration was complete and successful. ChannelPromise promise = channel.newPromise(); //============================================ //说明:执行 doBind0 方法,完成对端口的绑定 //============================================ doBind0(regFuture, channel, localAddress, promise); return promise; else // Registration future is almost always fulfilled already, but just in case it's not. final PendingRegistrationPromise promise = new PendingRegistrationPromise(channel); regFuture.addListener(new ChannelFutureListener() @Override public void operationComplete(ChannelFuture future) throws Exception Throwable cause = future.cause(); if (cause != null) // Registration on the EventLoop failed so fail the ChannelPromise directly to not cause an // IllegalStateException once we try to access the EventLoop of the Channel. promise.setFailure(cause); else // Registration was successful, so set the correct executor to use. // See https://github.com/netty/netty/issues/2586 promise.registered(); doBind0(regFuture, channel, localAddress, promise); ); return promise;
6、initAndRegister
final ChannelFuture initAndRegister()
Channel channel = null;
try // 说明: channelFactory.newChannel() 方法 的作用 通过 ServerBootstrap 的通道工厂反射创建一个NioServerSocketChannel, 具体追踪源码可以得到下面结论
/*
(1) 通过 NIO 的 SelectorProvider 的 openServerSocketChannel 方法得到 JDK 的 channel。目的是让 Netty 包装 JDK 的 channel。
(2) 创建了一个唯一的 ChannelId,创建了一个 NioMessageUnsafe,用于操作消息,创建了一个 DefaultChannelPipeline 管道,是个双向链表结构,用于过滤所有的进出的消息。
(3) 创建了一个 NioServerSocketChannelConfig 对象,用于对外展示一些配置。
*/
channel = channelFactory.newChannel();//NioServerSocketChannel
// 说明:init 个 初始化这个 NioServerSocketChannel, 具体追踪源码可以得到如下结论
/*
(1) init 方法,这是个抽象方法(AbstractBootstrap 类的)由 ,由 ServerBootstrap 实现(可以追一下源码 //setChannelOptions(channel, options, logger); )。
(2) 设置 NioServerSocketChannel 的 的 TCP 属性。
(3) 由于 LinkedHashMap 是非线程安全的,使用同步进行处理。
(4) 对 NioServerSocketChannel 的 的 ChannelPipeline 加 添加 ChannelInitializer 处理器。
(5) 可以看出, init 和 的方法的核心作用在和 ChannelPipeline 相关。
(6) 从NioServerSocketChannel 的初始化过程中,我们知道,pipeline 是一个双向链表,并了且,他本身就初始化了 head 和tail的 ,这里调用了他的 addLast 个方法,也就是将整个handler到插入到tail的
为前面,因为 tail 永远会在后面,需要做一些系统的固定工作。
*/
init(channel);
catch (Throwable t)
if (channel != null)
channel.unsafe().closeForcibly();
return new DefaultChannelPromise(channel, GlobalEventExecutor.INSTANCE).setFailure(t);
return new DefaultChannelPromise(new FailedChannel(),
GlobalEventExecutor.INSTANCE).setFailure(t);
ChannelFuture regFuture = config().group().register(channel);
if (regFuture.cause() != null)
if (channel.isRegistered()) <以上是关于Day477&478.Netty核心源码 -netty的主要内容,如果未能解决你的问题,请参考以下文章
Day472&473&474.Netty 核心模块组件 -netty
Day680.大佬如何学习源码 -深入拆解 Tomcat & Jetty
Day769.Kaito大佬的学习方法&技巧&认识 -Redis 核心技术与实战
Day13 目录结构&自定义Yum仓库&源码包编译安装(Service02)