如何保证netty执行事件是顺序而且高效
Posted
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了如何保证netty执行事件是顺序而且高效相关的知识,希望对你有一定的参考价值。
参考技术A netty实现多个handler顺序调用在netty中,一次数据交互,可以由多个handler去处理,例如handler1和handler2,那么,在前面那个handler的messageReceived的最后要加上ctx.sendUpstream(e);理论请见:AChannelEventcanbehandledbyeitheraChannelUpstreamHandleroraChannelDownstreamHandlerandbeforwardedtotheclosesthandlerbycallingChannelHandlerContext.sendUpstream(ChannelEvent)orChannelHandlerContext.sendDownstream(ChannelEvent).代码:复制代码publicclassHandler1extendsSimpleChannelUpstreamHandler@OverridepublicvoidmessageReceived(ChannelHandlerContextctx,MessageEvente)System.out.println("1messagereceived");Stringa="11";Objecto=a;ctx.getChannel().write(a);ctx.sendUpstream(e);@OverridepublicvoidexceptionCaught(ChannelHandlerContextctx,ExceptionEvente)throwsExceptione.getChannel().close();复制代码复制代码publicclassHandler2extendsSimpleChannelUpstreamHandler@OverridepublicvoidmessageReceived(ChannelHandlerContextctx,MessageEvente)System.out.println("2messagereceived");e.getChannel().close();@OverridepublicvoidexceptionCaught(ChannelHandlerContextctx,ExceptionEvente)throwsExceptione.getChannel().close();复制代码复制代码publicclassTcpServerpublicstaticvoidmain(String[]args)System.out.println("startingatcpserver");ServerBootstrapsb=newServerBootstrap(newNioserverSocketChannelFactory(Executors.newCachedThreadPool(),Executors.newCachedThreadPool()));sb.setPipelineFactory(newPKServerPipelineFactory());sb.setOption("child.tcpNoDelay",true);sb.setOption("child.keepAlive",true);sb.bind(newInetSocketAddress(9999)); 参考技术B netty本身实现的长连接,就是一个连接一个worker。worker的数量是有限的(通常是cpu cores+1),所以你的服务器要是连接数多的话,得考虑使用“异步”Request(netty的http没实现这么个功能),或者说“Continuation”,当连接“无事可做”的时候Netty中线程封装与管理
在Netty中,netty对线程模型进行了重新封装,它们分别是EventExecutorGroup和EventExecutor.每个EventExecutor是一个单独的线程,可以执行Runnable任务。EventExecutorGroup相当于一个线程组,用于管理和分配一组EventExecutor.我们知道Netty是基于事件驱动的。这样的封装给我们的开发带来了非常好的顺序。而且它还封装了定时任务,更加方便我们在一个线程中执行一些定时任务。
netty中默认实现了一个DefaultEventExecutorGroup和DefaultEventExecutor。DefaultEventexecutorGroup继承于MultithreadEventExecutorGroup,MultilthreadEventExecutorGroup是线程管理的核心类。它有一系统的构造方法,最终执行的构造方法是:
/**
* Create a new instance.
*
* @param nThreads the number of threads that will be used by this instance.
* @param executor the Executor to use, or {@code null} if the default should be used.
* @param chooserFactory the {@link EventExecutorChooserFactory} to use.
* @param args arguments which will passed to each {@link #newChild(Executor, Object...)} call
*/
protected MultithreadEventExecutorGroup(int nThreads, Executor executor,
EventExecutorChooserFactory chooserFactory, Object... args)
对于默认的实现,里面的参数Executor的默认实现类是:ThreadPerTaskExecutor,它需要一个参数ThreadFactory,它的默认实现类是DefaultThreadFactory,用于创建线程。在初始化EventExecutorGroup的时候,需要初始化它管理的EventExecutor,默认的实现是DefaultEventExecutor,创建的方法在DefaultEventExecutorGroup实现。
当我们需要一些任务在同一个线程中执行时,可以从EventExecutorGroup中选择一个EventExecutor,向EventExecutor中添加要执行的任务。这时候需要一个选择器,来分配EventExecutorGroup中管理的EventExector。Netty也同样提供了一个默认的选择器实现:
DefaultEventExecutorChooserFactory,它是按使用的顺序返回的。Netty提供的默认实现DefaultEventExecutor,继承于SingleThreadEventExecutor,SingleThreadEventExecutor是EventExecutor实现的核心。当我们获取一个EventExecutor之后,调用execute,向EventExecutor中添加任务。
图1
如果在同一个线程内(inEventLoop())直接添加到任务队列,如果不在同一个线程中,会调用startThread()方法,它里面会判断是否有线程已启动,如果没有,则创建一个线程,并且这个线程归此EventExecutor所有。创建线程之后,会调用子类实现的run方法,因为run方法在,SingleThreadEventExecutor中是一个抽象方法,由子类具体实现。
这里takeTask()方法是SingleThreadEventExecutor中实现的方法,它会先获取定时任务,如果存在定时任务,先获取可执行的定时任务,如果不存在定时任务,直接获取其它任务,如果存在定时任务,先判断是否有到期可执行的定时任务,如果有,加入任务队列,再从任务队列中获取可执行的任务。所以在一个EventExecutor中,不管是什么任务,都要快速的完成,否则一个任务执行的慢,就会卡住之后所有的任务。
使用这种模型我们就可以很好的管理线程和使用线程了,比如攻打一个世界Boss,很多人一起攻击,对于boss的掉血计算,奖励计算需要提供一个线程安全的机制。一种方法是加锁,每个boss上有一个锁。另一个方法就是使用线程,使用事件驱动,把对boss的每个操作看一个事件,放到同一个线程中。可以New一个EventExecutor(DefaultEventExecutor),使用execute方法执行即可,这样一个生产者,消费者模式直接就建立起来了,非常方法。如果想要获取执行的结果,还可以使用Promise和future机制,当执行完成之后,调用Promise的setSuccess方法即可。
以上是关于如何保证netty执行事件是顺序而且高效的主要内容,如果未能解决你的问题,请参考以下文章