netty事件模型实现原理
Posted 源码架构圈
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了netty事件模型实现原理相关的知识,希望对你有一定的参考价值。
说在前面
前期回顾
sharding-jdbc源码解析 更新完毕
spring源码解析 更新完毕
spring-mvc源码解析 更新完毕
spring-tx源码解析 更新完毕
spring-boot源码解析 更新完毕
rocketmq源码解析 更新完毕
dubbbo源码解析 更新完毕
netty源码解析 更新完毕
spring源码架构更新完毕
spring-mvc源码架构更新完毕
spring-boot源码架构更新完毕
github https://github.com/tianheframe
sharding-jdbc源码解析 更新完毕
rocketmq源码解析 更新完毕
seata 源码解析 更新完毕
dubbo 源码解析 更新完毕
netty 源码解析 更新完毕
源码解析
netty的线程模型是采用reactor设计,什么是reactor,reactor是基于事件驱动设计可以处理一个或者让多个请求通过requestHandler将事件event采用多路复用模式分发给serviceHandler处理,io多路复用模型如果是linux系统是基于操作系统epoll设计模型
传统的bio设计,一个client网络请求到server端,server端同步为连接的这个client进行交互,在高并发环境下会影响系统的吞吐量,因为一个服务节点可以创建的线程数受底层系统资源cpu、内存的限制,如果线程太多很可能导致大量线程block的情况,cpu使用率不饱和,还可能导致系统资源耗尽
为了避免资源耗尽,一般采用线程池来处理server端的读写服务,这样做的好处线程可以重复利用,可以有效的控制线程数不会导致系资源耗尽,但是还是会受到底层操作系统cpu、内存资源的限制,还是会有很多线程block的情况,多线程之间的上下文切换开销,cpu使用率不饱和,cpu资源浪费的情况,既然是是多线程就会存在数据一致性问题,怎么保证数据一致性,共享数据同步是一块不小的开销
reactor可以解决以上这些问题,它是基于事件驱动设计的基于selector,可以实现多个socketChannel的监听,进行分发给serviceHandler进行处理,socket处理handler和业务handler处理隔离,更好对业务模块进行解耦,资源隔离,对业务系统的扩展性有很大的帮助,selector线程只接受socket事件进行转发处理,所以可以接受很大的并发量,当然具体支持多大的并发量还有受底层操作系统的文件句柄限制,这种模式和以上两种模式相比有什么好处呢,更少的系统资源利用,不需要一个客户端一个线程,一定程度上减少上下文切换
我们结合netty源码看下netty reactor线程模型设计
单线程的reactor模式
所有io,请求io、处理io、handler处理都是一个线程在处理,效率较低
public static void main(String[] args) {
EventLoopGroup eventLoopGroup = new NioEventLoopGroup(1);
try {
ServerBootstrap b = new ServerBootstrap();
b.group(eventLoopGroup)
.channel(NioserverSocketChannel.class)
.option(ChannelOption.SO_BACKLOG,100)
.handler(new LoggingHandler(LogLevel.INFO))
.childHandler(new ChannelInitializer<SocketChannel>() {
protected void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline p = ch.pipeline();
p.addLast(new EchoServerHandler());
}
});
ChannelFuture f = b.bind(8080).sync();
f.channel().closeFuture().sync();
}catch (Exception e){
e.printStackTrace();
}finally {
eventLoopGroup.shutdownGracefully();
}
}
先介绍几个关键组件
eventLoopGroup 基于事件的线程组,间接的实现了executorService,循环监听io事件进行下一步处理,ServerBootstrap 封装了基于链式调用组装netty server启动所需的组件,ChannelFuture 异步返回结果的封装
EventLoopGroup eventLoopGroup = new NioEventLoopGroup(1);
eventLoopGroup最终实现了io.netty.util.concurrent.MultithreadEventExecutorGroup#MultithreadEventExecutorGroup(int, java.util.concurrent.Executor, io.netty.util.concurrent.EventExecutorChooserFactory, java.lang.Object...),看这行代码只有一个事件处理器
children = new EventExecutor[nThreads];
在跟踪下这行代码
b.group(eventLoopGroup)
io.netty.bootstrap.ServerBootstrap#group(io.netty.channel.EventLoopGroup)这里看到bossGroup和workGroup是同一个
@Override
public ServerBootstrap group(EventLoopGroup group) {
return group(group, group);
}
io.netty.bootstrap.AbstractBootstrap#group(io.netty.channel.EventLoopGroup)设置bossGroup,io.netty.bootstrap.ServerBootstrap#group(io.netty.channel.EventLoopGroup, io.netty.channel.EventLoopGroup)中这行代码设置workGroup
public ServerBootstrap group(EventLoopGroup parentGroup, EventLoopGroup childGroup) {
super.group(parentGroup);
if (childGroup == null) {
throw new NullPointerException("childGroup");
}
if (this.childGroup != null) {
throw new IllegalStateException("childGroup set already");
}
this.childGroup = childGroup;
return this;
}
从以上代码可以看出workGroup和bossGroup是同一个,这个eventLoopGroup又是同一个,所以所有的事情都是一个线程在处理,下面用一张图来体出现下
线程池类型的reactor线程模型,就是有多个线程处理所有请求io、事件io、handler,一个线程处理一个channel的请求io、事件io、handler,多个channel之间是并发处理的,提升了系统的并发度,用netty实现代码是这样
public static void main(String[] args) {
EventLoopGroup eventLoopGroup = new NioEventLoopGroup(2);
try {
ServerBootstrap b = new ServerBootstrap();
b.group(eventLoopGroup)
.channel(NioServerSocketChannel.class)
.option(ChannelOption.SO_BACKLOG,100)
.handler(new LoggingHandler(LogLevel.INFO))
.childHandler(new ChannelInitializer<SocketChannel>() {
protected void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline p = ch.pipeline();
p.addLast(new EchoServerHandler());
}
});
ChannelFuture f = b.bind(8080).sync();
f.channel().closeFuture().sync();
}catch (Exception e){
e.printStackTrace();
}finally {
eventLoopGroup.shutdownGracefully();
}
}
下面跟踪下源码看下,io.netty.util.concurrent.MultithreadEventExecutorGroup#MultithreadEventExecutorGroup(int, java.util.concurrent.Executor, io.netty.util.concurrent.EventExecutorChooserFactory, java.lang.Object...)的这行代码会有多个事件循环处理器
children = new EventExecutor[nThreads];
这行代码的底层实现
b.group(eventLoopGroup)
io.netty.bootstrap.ServerBootstrap#group(io.netty.channel.EventLoopGroup)
@Override
public ServerBootstrap group(EventLoopGroup group) {
return group(group, group);
}
io.netty.bootstrap.ServerBootstrap#group(io.netty.channel.EventLoopGroup, io.netty.channel.EventLoopGroup)
public ServerBootstrap group(EventLoopGroup parentGroup, EventLoopGroup childGroup) {
super.group(parentGroup);
if (childGroup == null) {
throw new NullPointerException("childGroup");
}
if (this.childGroup != null) {
throw new IllegalStateException("childGroup set already");
}
this.childGroup = childGroup;
return this;
}
设置bossGroup,io.netty.bootstrap.AbstractBootstrap#group(io.netty.channel.EventLoopGroup)
public B group(EventLoopGroup group) {
if (group == null) {
throw new NullPointerException("group");
}
if (this.group != null) {
throw new IllegalStateException("group set already");
}
this.group = group;
return self();
}
io.netty.bootstrap.ServerBootstrap#group(io.netty.channel.EventLoopGroup, io.netty.channel.EventLoopGroup)设置workGroup
public ServerBootstrap group(EventLoopGroup parentGroup, EventLoopGroup childGroup) {
super.group(parentGroup);
if (childGroup == null) {
throw new NullPointerException("childGroup");
}
if (this.childGroup != null) {
throw new IllegalStateException("childGroup set already");
}
this.childGroup = childGroup;
return this;
}
从代码可以看到bossGroup和workGroup还是同一个,和单线程reactor线程模型不同的区别是这里是多线程的,用一张图来体现下
主从模式的reactor线程模型,也可以是一个线程实现的主,也可以是一个线程池实现的主,主线程或者主线程池只用来处理请求io、事件io和handler处理是从线程或者从线程池去处理,从可以是一个线程实现,也可以是一个线程池来实现,一般实际项目开发中为了提供从线程或者从线程池的利用率,业务handler是单独的线程池去处理,但是这样就会有线程上下文切换的开销,用netty实现主从reactor线程模型处理,先介绍主从都是一个线程,这种场景一般用在请求并发量不是特别大,业务handler处理事件耗时比较短,比如就是系统内部通知推送的处理,业务handler交给业务线程池去处理,用netty实现代码入下
public static void main(String[] args) {
EventLoopGroup bossGroup = new NioEventLoopGroup(1);
EventLoopGroup workGroup = new NioEventLoopGroup(1);
try {
ServerBootstrap b = new ServerBootstrap();
b.group(bossGroup,workGroup)
.channel(NioServerSocketChannel.class)
.option(ChannelOption.SO_BACKLOG,100)
.handler(new LoggingHandler(LogLevel.INFO))
.childHandler(new ChannelInitializer<SocketChannel>() {
protected void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline p = ch.pipeline();
p.addLast(new EchoServerHandler());
}
});
ChannelFuture f = b.bind(8080).sync();
f.channel().closeFuture().sync();
}catch (Exception e){
e.printStackTrace();
}finally {
bossGroup.shutdownGracefully();
workGroup.shutdownGracefully();
}
}
创建了2个事件循环组,主要看下这行代码底层是怎么实现的,这两个事件循环组是怎么运用的,b.group(bossGroup,workGroup),io.netty.bootstrap.ServerBootstrap#group(io.netty.channel.EventLoopGroup)
@Override
public ServerBootstrap group(EventLoopGroup group) {
return group(group, group);
}
io.netty.bootstrap.ServerBootstrap#group(io.netty.channel.EventLoopGroup, io.netty.channel.EventLoopGroup)初始化workGroup
public ServerBootstrap group(EventLoopGroup parentGroup, EventLoopGroup childGroup) {
super.group(parentGroup);
if (childGroup == null) {
throw new NullPointerException("childGroup");
}
if (this.childGroup != null) {
throw new IllegalStateException("childGroup set already");
}
this.childGroup = childGroup;
return this;
}
io.netty.bootstrap.AbstractBootstrap#group(io.netty.channel.EventLoopGroup)初始化bossGroup
public B group(EventLoopGroup group) {
if (group == null) {
throw new NullPointerException("group");
}
if (this.group != null) {
throw new IllegalStateException("group set already");
}
this.group = group;
return self();
}
这里可以看出bossGroup和workGroup是不同的事件循环组,然后他们分别的工作是什么,看下这行代码的底层实现ChannelFuture f = b.bind(8080).sync();io.netty.bootstrap.AbstractBootstrap#bind(int),io.netty.bootstrap.AbstractBootstrap#initAndRegister这个方法的这行代码
init(channel);
跟踪到这个方法io.netty.bootstrap.ServerBootstrap#init的这行代码
p.addLast(new ChannelInitializer<Channel>() {
public void initChannel(final Channel ch) throws Exception {
final ChannelPipeline pipeline = ch.pipeline();
ChannelHandler handler = config.handler();
if (handler != null) {
pipeline.addLast(handler);
}
ch.eventLoop().execute(new Runnable() {
public void run() {
// 添加socket请求连接处理器
pipeline.addLast(new ServerBootstrapAcceptor(
ch, currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs));
}
});
}
});
这里初始化了一个ServerBootstrapAcceptor handler用来处理请求io,初始化channelConfig的时候已经设置了连接简历就执行read io事件,进入到这个方法io.netty.bootstrap.ServerBootstrap.ServerBootstrapAcceptor#channelRead 这行代码
try {
// 把channel注册到事件循环组中
childGroup.register(child).addListener(new ChannelFutureListener() {
public void operationComplete(ChannelFuture future) throws Exception {
if (!future.isSuccess()) {
forceClose(child, future.cause());
}
}
});
} catch (Throwable t) {
forceClose(child, t);
}
io.netty.channel.SingleThreadEventLoop#register(io.netty.channel.ChannelPromise)最后把channel绑定到具体的事件循环组
public ChannelFuture register(final ChannelPromise promise) {
ObjectUtil.checkNotNull(promise, "promise");
promise.channel().unsafe().register(this, promise);
return promise;
}
这里是workGroup实现的,返回方法io.netty.bootstrap.AbstractBootstrap#initAndRegister这行代码
ChannelFuture regFuture = config().group().register(channel);
if (regFuture.cause() != null) {
if (channel.isRegistered()) {
channel.close();
} else {
channel.unsafe().closeForcibly();
}
}
这里的group是bossGroup进行channel注册,就是用来处理请求io处理,下面会判断已注册过不在注册,用张图体现下
基于主从线程池的reactor线程模型,和基于主从线程的reactor线程模型实现是一样的,只是单线程变成了多线程,可以进一步提供系统的并发度,请求量也就上来了,当然还有服务器文件句柄底层支持,再加上netty服务的水平扩展,基于这些可以实现百万级并发请求量,用张图体现下
用netty实现
public static void main(String[] args) {
EventLoopGroup bossGroup = new NioEventLoopGroup(2);
EventLoopGroup workGroup = new NioEventLoopGroup(2);
try {
ServerBootstrap b = new ServerBootstrap();
b.group(bossGroup,workGroup)
.channel(NioServerSocketChannel.class)
.option(ChannelOption.SO_BACKLOG,100)
.handler(new LoggingHandler(LogLevel.INFO))
.childHandler(new ChannelInitializer<SocketChannel>() {
protected void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline p = ch.pipeline();
p.addLast(new EchoServerHandler());
}
});
ChannelFuture f = b.bind(8080).sync();
f.channel().closeFuture().sync();
}catch (Exception e){
e.printStackTrace();
}finally {
bossGroup.shutdownGracefully();
workGroup.shutdownGracefully();
}
}
说在最后
本次解析仅代表个人观点,仅供参考。
扫码进入技术微信群
以上是关于netty事件模型实现原理的主要内容,如果未能解决你的问题,请参考以下文章
Netty的Reactor多线程模型,NioEventLoop,ChannelPipeline简介