netty事件模型实现原理

Posted 源码架构圈

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了netty事件模型实现原理相关的知识,希望对你有一定的参考价值。


说在前面


前期回顾

sharding-jdbc源码解析 更新完毕

spring源码解析 更新完毕

spring-mvc源码解析 更新完毕

spring-tx源码解析 更新完毕

spring-boot源码解析 更新完毕

rocketmq源码解析 更新完毕

dubbbo源码解析 更新完毕

netty源码解析 更新完毕

spring源码架构更新完毕

spring-mvc源码架构更新完毕

spring-boot源码架构更新完毕


github https://github.com/tianheframe

sharding-jdbc源码解析 更新完毕

rocketmq源码解析 更新完毕

seata 源码解析 更新完毕

dubbo 源码解析 更新完毕

netty 源码解析 更新完毕





源码解析

netty的线程模型是采用reactor设计,什么是reactor,reactor是基于事件驱动设计可以处理一个或者让多个请求通过requestHandler将事件event采用多路复用模式分发给serviceHandler处理,io多路复用模型如果是linux系统是基于操作系统epoll设计模型


传统的bio设计,一个client网络请求到server端,server端同步为连接的这个client进行交互,在高并发环境下会影响系统的吞吐量,因为一个服务节点可以创建的线程数受底层系统资源cpu、内存的限制,如果线程太多很可能导致大量线程block的情况,cpu使用率不饱和,还可能导致系统资源耗尽


为了避免资源耗尽,一般采用线程池来处理server端的读写服务,这样做的好处线程可以重复利用,可以有效的控制线程数不会导致系资源耗尽,但是还是会受到底层操作系统cpu、内存资源的限制,还是会有很多线程block的情况,多线程之间的上下文切换开销,cpu使用率不饱和,cpu资源浪费的情况,既然是是多线程就会存在数据一致性问题,怎么保证数据一致性,共享数据同步是一块不小的开销


reactor可以解决以上这些问题,它是基于事件驱动设计的基于selector,可以实现多个socketChannel的监听,进行分发给serviceHandler进行处理,socket处理handler和业务handler处理隔离,更好对业务模块进行解耦,资源隔离,对业务系统的扩展性有很大的帮助,selector线程只接受socket事件进行转发处理,所以可以接受很大的并发量,当然具体支持多大的并发量还有受底层操作系统的文件句柄限制,这种模式和以上两种模式相比有什么好处呢,更少的系统资源利用,不需要一个客户端一个线程,一定程度上减少上下文切换


我们结合netty源码看下netty reactor线程模型设计


单线程的reactor模式

所有io,请求io、处理io、handler处理都是一个线程在处理,效率较低

public static void main(String[] args) { EventLoopGroup eventLoopGroup = new NioEventLoopGroup(1); try { ServerBootstrap b = new ServerBootstrap(); b.group(eventLoopGroup) .channel(NioserverSocketChannel.class) .option(ChannelOption.SO_BACKLOG,100) .handler(new LoggingHandler(LogLevel.INFO)) .childHandler(new ChannelInitializer<SocketChannel>() { @Override protected void initChannel(SocketChannel ch) throws Exception { ChannelPipeline p = ch.pipeline(); p.addLast(new EchoServerHandler()); } }); ChannelFuture f = b.bind(8080).sync(); f.channel().closeFuture().sync(); }catch (Exception e){ e.printStackTrace(); }finally { eventLoopGroup.shutdownGracefully(); }}

先介绍几个关键组件

eventLoopGroup 基于事件的线程组,间接的实现了executorService,循环监听io事件进行下一步处理,ServerBootstrap 封装了基于链式调用组装netty server启动所需的组件,ChannelFuture 异步返回结果的封装

EventLoopGroup eventLoopGroup = new NioEventLoopGroup(1);

eventLoopGroup最终实现了io.netty.util.concurrent.MultithreadEventExecutorGroup#MultithreadEventExecutorGroup(int, java.util.concurrent.Executor, io.netty.util.concurrent.EventExecutorChooserFactory, java.lang.Object...),看这行代码只有一个事件处理器

children = new EventExecutor[nThreads];

在跟踪下这行代码

b.group(eventLoopGroup)

io.netty.bootstrap.ServerBootstrap#group(io.netty.channel.EventLoopGroup)这里看到bossGroup和workGroup是同一个

@Overridepublic ServerBootstrap group(EventLoopGroup group) { return group(group, group);}

io.netty.bootstrap.AbstractBootstrap#group(io.netty.channel.EventLoopGroup)设置bossGroup,io.netty.bootstrap.ServerBootstrap#group(io.netty.channel.EventLoopGroup, io.netty.channel.EventLoopGroup)中这行代码设置workGroup

public ServerBootstrap group(EventLoopGroup parentGroup, EventLoopGroup childGroup) { super.group(parentGroup); if (childGroup == null) { throw new NullPointerException("childGroup"); } if (this.childGroup != null) { throw new IllegalStateException("childGroup set already"); } this.childGroup = childGroup; return this;}

从以上代码可以看出workGroup和bossGroup是同一个,这个eventLoopGroup又是同一个,所以所有的事情都是一个线程在处理,下面用一张图来体出现下

线程池类型的reactor线程模型,就是有多个线程处理所有请求io、事件io、handler,一个线程处理一个channel的请求io、事件io、handler,多个channel之间是并发处理的,提升了系统的并发度,用netty实现代码是这样

public static void main(String[] args) { EventLoopGroup eventLoopGroup = new NioEventLoopGroup(2); try { ServerBootstrap b = new ServerBootstrap(); b.group(eventLoopGroup) .channel(NioServerSocketChannel.class) .option(ChannelOption.SO_BACKLOG,100) .handler(new LoggingHandler(LogLevel.INFO)) .childHandler(new ChannelInitializer<SocketChannel>() { @Override protected void initChannel(SocketChannel ch) throws Exception { ChannelPipeline p = ch.pipeline(); p.addLast(new EchoServerHandler()); } }); ChannelFuture f = b.bind(8080).sync(); f.channel().closeFuture().sync(); }catch (Exception e){ e.printStackTrace(); }finally { eventLoopGroup.shutdownGracefully(); }}

下面跟踪下源码看下,io.netty.util.concurrent.MultithreadEventExecutorGroup#MultithreadEventExecutorGroup(int, java.util.concurrent.Executor, io.netty.util.concurrent.EventExecutorChooserFactory, java.lang.Object...)的这行代码会有多个事件循环处理器

children = new EventExecutor[nThreads];

这行代码的底层实现

b.group(eventLoopGroup)

io.netty.bootstrap.ServerBootstrap#group(io.netty.channel.EventLoopGroup)

@Overridepublic ServerBootstrap group(EventLoopGroup group) { return group(group, group);}

io.netty.bootstrap.ServerBootstrap#group(io.netty.channel.EventLoopGroup, io.netty.channel.EventLoopGroup)

public ServerBootstrap group(EventLoopGroup parentGroup, EventLoopGroup childGroup) { super.group(parentGroup); if (childGroup == null) { throw new NullPointerException("childGroup"); } if (this.childGroup != null) { throw new IllegalStateException("childGroup set already"); } this.childGroup = childGroup; return this;}

设置bossGroup,io.netty.bootstrap.AbstractBootstrap#group(io.netty.channel.EventLoopGroup)

public B group(EventLoopGroup group) { if (group == null) { throw new NullPointerException("group"); } if (this.group != null) { throw new IllegalStateException("group set already"); } this.group = group; return self();}

io.netty.bootstrap.ServerBootstrap#group(io.netty.channel.EventLoopGroup, io.netty.channel.EventLoopGroup)设置workGroup

public ServerBootstrap group(EventLoopGroup parentGroup, EventLoopGroup childGroup) { super.group(parentGroup); if (childGroup == null) { throw new NullPointerException("childGroup"); } if (this.childGroup != null) { throw new IllegalStateException("childGroup set already"); } this.childGroup = childGroup; return this;}

从代码可以看到bossGroup和workGroup还是同一个,和单线程reactor线程模型不同的区别是这里是多线程的,用一张图来体现下

netty事件模型实现原理

主从模式的reactor线程模型,也可以是一个线程实现的主,也可以是一个线程池实现的主,主线程或者主线程池只用来处理请求io、事件io和handler处理是从线程或者从线程池去处理,从可以是一个线程实现,也可以是一个线程池来实现,一般实际项目开发中为了提供从线程或者从线程池的利用率,业务handler是单独的线程池去处理,但是这样就会有线程上下文切换的开销,用netty实现主从reactor线程模型处理,先介绍主从都是一个线程,这种场景一般用在请求并发量不是特别大,业务handler处理事件耗时比较短,比如就是系统内部通知推送的处理,业务handler交给业务线程池去处理,用netty实现代码入下

public static void main(String[] args) { EventLoopGroup bossGroup = new NioEventLoopGroup(1); EventLoopGroup workGroup = new NioEventLoopGroup(1); try { ServerBootstrap b = new ServerBootstrap(); b.group(bossGroup,workGroup) .channel(NioServerSocketChannel.class) .option(ChannelOption.SO_BACKLOG,100) .handler(new LoggingHandler(LogLevel.INFO)) .childHandler(new ChannelInitializer<SocketChannel>() { @Override protected void initChannel(SocketChannel ch) throws Exception { ChannelPipeline p = ch.pipeline(); p.addLast(new EchoServerHandler()); } }); ChannelFuture f = b.bind(8080).sync(); f.channel().closeFuture().sync(); }catch (Exception e){ e.printStackTrace(); }finally { bossGroup.shutdownGracefully(); workGroup.shutdownGracefully(); }}

创建了2个事件循环组,主要看下这行代码底层是怎么实现的,这两个事件循环组是怎么运用的,b.group(bossGroup,workGroup),io.netty.bootstrap.ServerBootstrap#group(io.netty.channel.EventLoopGroup)

@Overridepublic ServerBootstrap group(EventLoopGroup group) { return group(group, group);}

io.netty.bootstrap.ServerBootstrap#group(io.netty.channel.EventLoopGroup, io.netty.channel.EventLoopGroup)初始化workGroup

public ServerBootstrap group(EventLoopGroup parentGroup, EventLoopGroup childGroup) { super.group(parentGroup); if (childGroup == null) { throw new NullPointerException("childGroup"); } if (this.childGroup != null) { throw new IllegalStateException("childGroup set already"); } this.childGroup = childGroup; return this;}

io.netty.bootstrap.AbstractBootstrap#group(io.netty.channel.EventLoopGroup)初始化bossGroup

public B group(EventLoopGroup group) { if (group == null) { throw new NullPointerException("group"); } if (this.group != null) { throw new IllegalStateException("group set already"); } this.group = group; return self();}

这里可以看出bossGroup和workGroup是不同的事件循环组,然后他们分别的工作是什么,看下这行代码的底层实现ChannelFuture f = b.bind(8080).sync();io.netty.bootstrap.AbstractBootstrap#bind(int),io.netty.bootstrap.AbstractBootstrap#initAndRegister这个方法的这行代码

init(channel);

跟踪到这个方法io.netty.bootstrap.ServerBootstrap#init的这行代码

p.addLast(new ChannelInitializer<Channel>() { @Override public void initChannel(final Channel ch) throws Exception { final ChannelPipeline pipeline = ch.pipeline(); ChannelHandler handler = config.handler(); if (handler != null) { pipeline.addLast(handler); }
ch.eventLoop().execute(new Runnable() { @Override public void run() {// 添加socket请求连接处理器 pipeline.addLast(new ServerBootstrapAcceptor( ch, currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs)); } }); } });

这里初始化了一个ServerBootstrapAcceptor handler用来处理请求io,初始化channelConfig的时候已经设置了连接简历就执行read io事件,进入到这个方法io.netty.bootstrap.ServerBootstrap.ServerBootstrapAcceptor#channelRead 这行代码

try {// 把channel注册到事件循环组中 childGroup.register(child).addListener(new ChannelFutureListener() { @Override public void operationComplete(ChannelFuture future) throws Exception { if (!future.isSuccess()) { forceClose(child, future.cause()); } } }); } catch (Throwable t) { forceClose(child, t); }

io.netty.channel.SingleThreadEventLoop#register(io.netty.channel.ChannelPromise)最后把channel绑定到具体的事件循环组

@Overridepublic ChannelFuture register(final ChannelPromise promise) { ObjectUtil.checkNotNull(promise, "promise"); promise.channel().unsafe().register(this, promise); return promise;}

这里是workGroup实现的,返回方法io.netty.bootstrap.AbstractBootstrap#initAndRegister这行代码

ChannelFuture regFuture = config().group().register(channel);if (regFuture.cause() != null) { if (channel.isRegistered()) { channel.close(); } else { channel.unsafe().closeForcibly(); }}

这里的group是bossGroup进行channel注册,就是用来处理请求io处理,下面会判断已注册过不在注册,用张图体现下

netty事件模型实现原理

基于主从线程池的reactor线程模型,和基于主从线程的reactor线程模型实现是一样的,只是单线程变成了多线程,可以进一步提供系统的并发度,请求量也就上来了,当然还有服务器文件句柄底层支持,再加上netty服务的水平扩展,基于这些可以实现百万级并发请求量,用张图体现下

netty事件模型实现原理

用netty实现

public static void main(String[] args) { EventLoopGroup bossGroup = new NioEventLoopGroup(2); EventLoopGroup workGroup = new NioEventLoopGroup(2); try { ServerBootstrap b = new ServerBootstrap(); b.group(bossGroup,workGroup) .channel(NioServerSocketChannel.class) .option(ChannelOption.SO_BACKLOG,100) .handler(new LoggingHandler(LogLevel.INFO)) .childHandler(new ChannelInitializer<SocketChannel>() { @Override protected void initChannel(SocketChannel ch) throws Exception { ChannelPipeline p = ch.pipeline(); p.addLast(new EchoServerHandler()); } }); ChannelFuture f = b.bind(8080).sync(); f.channel().closeFuture().sync(); }catch (Exception e){ e.printStackTrace(); }finally { bossGroup.shutdownGracefully(); workGroup.shutdownGracefully(); }}



说在最后

本次解析仅代表个人观点,仅供参考。

netty事件模型实现原理



扫码进入技术微信群

netty事件模型实现原理
netty事件模型实现原理

钉钉技术群





以上是关于netty事件模型实现原理的主要内容,如果未能解决你的问题,请参考以下文章

Netty的Reactor多线程模型,NioEventLoop,ChannelPipeline简介

Netty原理实践解析

Netty原理浅析

Java开发中Netty线程模型原理解析!

Netty模型篇一:Netty 线程模型架构 & 工作原理 解读

Netty4 事件处理传播机制