转netty线程模型
Posted 风中摆动的狗尾巴草
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了转netty线程模型相关的知识,希望对你有一定的参考价值。
一切从ServerBootstrap开始
ServerBootstrap
负责初始话netty服务器,并且开始监听端口的socket请求。
- bootstrap bootstrap = new ServerBootstrap(
- new NioserverSocketChannelFactory(
- Executors.newCachedThreadPool(),//boss线程池
- Executors.newCachedThreadPool()//worker线程池
- )
- );
- bootstrap.setPipelineFactory(new HttpChannelPipelineFactory());
- bootstrap.setOption("child.tcpNoDelay", true);
- bootstrap.setOption("child.keepAlive", true);
- bootstrap.bind(new InetSocketAddress(httpPort));//端口开始监听
ServerBootstrap 用一个ServerSocketChannelFactory 来实例化。ServerSocketChannelFactory 有两种选择,一种是NioServerSocketChannelFactory,一种是OioServerSocketChannelFactory。 前者使用NIO,后则使用普通的阻塞式IO。它们都需要两个线程池实例作为参数来初始化,一个是boss线程池,一个是worker线程池。
ServerBootstrap.bind(int)负责绑定端口,当这个方法执行后,ServerBootstrap就可以接受指定端口上的socket连接了。一个ServerBootstrap可以绑定多个端口。
boss线程和worker线程
可 以这么说,ServerBootstrap监听的一个端口对应一个boss线程,它们一一对应。比如你需要netty监听80和443端口,那么就会有两 个boss线程分别负责处理来自两个端口的socket请求。在boss线程接受了socket连接求后,会产生一个channel(一个打开的 socket对应一个打开的channel),并把这个channel交给ServerBootstrap初始化时指定的 ServerSocketChannelFactory来处理,boss线程则继续处理socket的请求。
ServerSocketChannelFactory则会从worker线程池中找出一个worker线程来继续处理这个请求。
如 果是OioServerSocketChannelFactory的话,那个这个channel上所有的socket消息消息,从开始到
channel(socket)关闭,都只由这个特定的worker来处理,也就是说一个打开的socket对应一个指定的worker线程,这个
worker线程在socket没有关闭的情况下,也只能为这个socket处理消息,无法服务器他socket。
如果是NioServerSocketChannelFactory的话则不然,每个worker可以服务不同的socket或者说channel,worker线程和channel不再有一一对应的关系。
显然,NioServerSocketChannelFactory只需要少量活动的worker线程及能很好的处理众多的channel,而OioServerSocketChannelFactory则需要与打开channel等量的worker线程来服务。
线 程是一种资源,所以当netty服务器需要处理长连接的时候,最好选择NioServerSocketChannelFactory,这样可以避免创建大 量的worker线程。在用作http服务器的时候,也最好选择NioServerSocketChannelFactory,因为现代浏览器都会使用 http keepalive功能(可以让浏览器的不同http请求共享一个信道),这也是一种长连接。
worker线程的生命周期(life circle)
当 某个channel有消息到达或者有消息需要写入socket的时候,worker线程就会从线程池中取出一个。在worker线程中,消息会经过设定好 的ChannelPipeline处理。ChannelPipeline就是一堆有顺序的filter,它分为两部分:UpstreamHandler和 DownStreamHandler。本文着重介绍netty的线程模型,所以关于pipeline的内容从简说明。
客户端送入的消息会首先由许多UpstreamHandler依次处理,处理得到的数据送入应用的业务逻辑handler,通常用SimpleChannelUpstreamHandler来实现这一部分。
- public class SimpleChannelUpstreamHandler{
- public void messageReceived(ChannelHandlerContext c,MessageEvent e) throws Exception{
- //业务逻辑开始掺入
- }
- }
对 于Nio当messageReceived()方法执行后,如果没有产生异常,worker线程就执行完毕了,它会被线程池回收。业务逻辑hanlder 会通过一些方法,把返回的数据交给指定好顺序的DownStreamHandler处理,处理后的数据如果需要,会被写入channel,进而通过绑定的 socket发送给客户端。这个过程是由另外一个线程池中的worker线程来完成的。
对于Oio来说,从始到终,都是由一个指定的worker来处理。
减少worker线程的处理占用时间
worker 线程是由netty内部管理,统一调配的一种资源,所以最好应该尽快的把让worker线程执行完毕,返回给线程池回收利用。worker线程的大部分时 间消耗在在ChannelPipeline的各种handler中,而在这些handler中,一般是负责应用程序业务逻辑掺入的那个handler最占 时间,它通常是排在最后的UpstreamHandler。所以通过把这部分处理内容交给另外一个线程来处理,可以有效的减少worker线程的周期循环 时间。一般有两种方法:
messageReceived()方法中开启一个新的线程来处理业务逻辑
- public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) throws Exception{
- ...
- new Thread(...).start();
- }
在messageReceived()中开启一个新线程来继续处理业务逻辑,而worker线程在执行完messageReceived()就会结束了。更加优雅的方法是另外构造一个线程池来提交业务逻辑处理任务。
利用netty框架自带的ExecutionHandler
基本使用方法:
- public class DatabaseGatewayPipelineFactory implements ChannelPipelineFactory {
- private final ExecutionHandler executionHandler;
- public DatabaseGatewayPipelineFactory(ExecutionHandler executionHandler) {
- this.executionHandler = executionHandler;
- }
- public ChannelPipeline getPipeline() {
- return Channels.pipeline(
- new DatabaseGatewayProtocolEncoder(),
- new DatabaseGatewayProtocolDecoder(),
- executionHandler, // 多个pipeline之间必须共享同一个ExecutionHandler
- new DatabaseQueryingHandler());//业务逻辑handler,IO密集
- }
- }
把 共享的ExecutionHandler实例放在业务逻辑handler之前即可,注意ExecutionHandler一定要在不同的pipeline 之间共享。它的作用是自动从ExecutionHandler自己管理的一个线程池中拿出一个线程来处理排在它后面的业务逻辑handler。而 worker线程在经过ExecutionHandler后就结束了,它会被ChannelFactory的worker线程池所回收。
它的构造方法是ExecutionHandler(Executor executor) ,很显然executor就是ExecutionHandler内部管理的线程池了。netty额外给我们提供了两种线程池:
MemoryAwareThreadPoolExecutor和OrderedMemoryAwareThreadPoolExecutor,它们都在org.jboss.netty.handler.execution 包下。
MemoryAwareThreadPoolExecutor
确保jvm不会因为过多的线程而导致内存溢出错误,OrderedMemoryAwareThreadPoolExecutor是前一个线程池的子类,除
了保证没有内存溢出之外,还可以保证channel event的处理次序。具体可以查看API文档,上面有详细说明。
Netty服务端启动步骤:
代码:
- // 构造一个服务端Bootstrap实例,并通过构造方法指定一个ChannelFactory实现
- // 其中后两个参数分别是BOSS和WORK的线程池
- Bootstrap serverBootstrap = new ServerBootstrap(
- new NioServerSocketChannelFactory(
- Executors.newCachedThreadPool(),
- Executors.newCachedThreadPool()));
- // 注册用户自己实现的ChannelPipelineFactory
- serverBootstrap.setPipelineFactory(this.pipelineFactory);
- // 调用bind等待客户端来连接
- ((ServerBootstrap) serverBootstrap).bind(socketAddress);
Netty提供NIO与BIO两种模式,我们主要关心NIO的模式:
NIO处理方式:
1.Netty用一个BOSS线程去处理客户端的接入,创建Channel
2.从WORK线程池(WORK线程数量默认为cpu cores的2倍)拿出一个WORK线程交给BOSS创建好的Channel实例(Channel实例持有java网络对象)
3.WORK线程进行数据读入(读到ChannelBuffer)
4.接着触发相应的事件传递给ChannelPipeline进行业务处理(ChannelPipeline中包含一系列用户自定义的ChannelHandler组成的链)
有
一点要注意的是,执行整个ChannelHandler链这个过程是串行的,如果业务逻辑(比如DB操作)比较耗时,会导致WORK线程长时间被占用得不
到释放,最终影响整个服务端的并发处理能力,所以一般我们通过ExecutionHandler线程池来异步处理ChannelHandler调用链,使
得WORK线程经过ExecutionHandler时得到释放。
要解决这个问题增加下面代码即可:
- ExecutionHandler executionHandler =
- new ExecutionHandler(
- new OrderedMemoryAwareThreadPoolExecutor(16, 1048576, 1048576));
- public ChannelPipeline getPipeline() {
- return Channels.pipeline(
- new DatabaseGatewayProtocolEncoder(),
- new DatabaseGatewayProtocolDecoder(),
- executionHandler, // Must be shared
- new DatabaseQueryingHandler());
- }
Netty为ExecutionHandler提供了两种可选的线程池模型:
1) MemoryAwareThreadPoolExecutor
通过对线程池内存的使用控制,可控制Executor中待处理任务的上限(超过上限时,后续进来的任务将被阻塞),并可控制单个Channel待处理任务的上限,防止内存溢出错误;
2) OrderedMemoryAwareThreadPoolExecutor
是
1)的子类。除了MemoryAwareThreadPoolExecutor
的功能之外,它还可以保证同一Channel中处理的事件流的顺序性,这主要是控制事件在异步处理模式下可能出现的错误的事件顺序,但它并不保证同一
Channel中的事件都在一个线程中执行,也没必要保证这个。
我们看下OrderedMemoryAwareThreadPoolExecutor中的注释:
Thread X: --- Channel A (Event A1) --. .-- Channel B (Event B2) --- Channel B (Event B3) --->
\ /
X
/ \
Thread Y: --- Channel B (Event B1) --‘ ‘-- Channel A (Event A2) --- Channel A (Event A3) --->
处理同一个Channel的事件,是串行的方式执行的,但是同一个Channel的多个事件,可能会分布到线程中池中的多个线程去处理,不同的Channel事件可以并发处理,互相并不影响
再来看看MemoryAwareThreadPoolExecutor中的注释
Thread X: --- Channel A (Event 2) --- Channel A (Event 1) --------------------------->
Thread Y: --- Channel A (Event 3) --- Channel B (Event 2) --- Channel B (Event 3) --->
Thread Z: --- Channel B (Event 1) --- Channel B (Event 4) --- Channel A (Event 4) --->
同
一个Channel的事件,并不保证处理顺序,可能一个线程先处理了Channel A (Event 3),然后另一个线程才处理Channel A
(Event 2),如果业务不要求保证事件的处理顺序,我认为还是尽量使用MemoryAwareThreadPoolExecutor比较好
Netty采用标准的SEDA(Staged Event-Driven Architecture) 架构
SEDA的核心思想是把一个请求处理过程分成几个Stage,不同资源消耗的Stag使用不同数量的线程来处理,Stag间使用事件驱动的异步通信模式。更进一步,在每个Stage中可以动态配置自己的线程数,在超载时降级运行或拒绝服务。
Netty所设计的事件类型,代表了网络交互的各个阶段,每个阶段发生时,会触发相应的事件并交给ChannelPipeline进行处理。事件处理都是通过Channels类中的静态方法调用开始的。
Channels中事件流转静态方法:
1. fireChannelOpen
2. fireChannelBound
3. fireChannelConnected
4. fireMessageReceived
5. fireWriteCompleteLater
6. fireWriteComplete
7. fireChannelInterestChangedLater
8. fireChannelDisconnectedLater
9. fireChannelDisconnected
10. fireChannelUnboundLater
11. fireChannelUnbound
12. fireChannelClosedLater
13. fireChannelClosed
14. fireExceptionCaughtLater
15. fireExceptionCaught
16. fireChildChannelStateChanged
Netty将网络事件分为两种类型:
1.Upstresam:上行,主要是由网络底层反馈给Netty的,比如messageReceived、channelConnected
2.Downstream:下行,框架自己发起的,比如bind、write、connect等
Netty的ChannelHandler 分为3种类型:
1.只处理Upstream事件:实现ChannelUpstreamHandler接口
2.只处理Downstream事件:实现ChannelDownstreamHandler接口
3.同时处理Upstream和Downstream事件:同时实现ChannelUpstreamHandler和ChannelDownstreamHandler接口
ChannelPipeline
维持所有ChannelHandler的有序链表,当有Upstresam或Downstream网络事件发生时,调用匹配事件类型的
ChannelHandler来处理。ChannelHandler自身可以控制是否要流转到调用链中的下一个
ChannelHandler(ctx.sendUpstream(e)或者ctx.sendDownstream(e)),这一样有一个好处,比如业务
数据Decoder出现非法数据时不必继续流转到下一个ChannelHandler
下面是我胡乱的画的一个图:
以上是关于转netty线程模型的主要内容,如果未能解决你的问题,请参考以下文章