Netty框架之线程模型与基础用法
Posted 木兮君
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Netty框架之线程模型与基础用法相关的知识,希望对你有一定的参考价值。
前言
小编最近好久没有更新文章了,为自己先辩解一下,最近上线后身体不适,而且工作比较繁忙(还要小编比较懒)。今天小编和大家分享一下netty框架的线程模型和用法。有了前面两篇博文Netty框架之深入了解NIO核心组件和Netty框架之NIO多路复用选择器的基础,再学习netty应该容易点了吧,同样废话不多说,进入正题。
Reactor模式
这里大家会不会有疑惑,为什么要介绍Reactor模式,这是因为Netty就是使用该模型实现的。
Reactor模式
Reactor 是反应堆的意思,Reactor 模型是指通过一个或多个输入同时传递给处理器。服务端将它们同步分派给请求对应的处理线程,Reactor 模式也叫 Dispatcher 模式。Netty 整体是整体采用了主从Reactor模型。
Reactor角色
Reactor模型中有三种角色,分别是:
- Acceptor:处理客户端新连接,并分派请求到处理器链中
- Reactor:负责监听和分配事件,将I/O事件分派给对应的Handler。
- Handler:事件处理,如编码、解码等
Reactor 线程模型
Reactor有三种线程模型分别是:单Reactor单线程模型、单Reactor多线程模型、主从Reactor多线程模型。而Netty正是采用最后一种。
单Reactor单线程模型
该模型下所有请求建立、IO读写、业务处理都在一个线程中完成。如果在业务中处理中出现了耗时操作,就会导致所有请求全部处理延时。因为他们是有由一个线程同步处理的。
单Reactor多线程模型
为了防止业务处理导致阻塞,在多线程模型下会用一个线程池来异步处理业务,当处理完成后在回写给客户端。
主从Reactor多线程模型
单React始终无法发挥现代服务器多核CPU的并行处理能力,所以Reactor是可以有多个的,并且有一主多从之分。一个主Reactor仅处理连接,而多个子Reactor用于处理IO读写。然后交给线程池处理业务。Tomcat就是采用该模式实现。
以上就是Reactor的三种线程模型,大家可以和前两篇博文中的代码对比一下。应该比较好理解。并且各个模型的优缺点也一目了然。好了进入咱们的重点,netty的线程模型。
Netty线程模型
Netty采用了主从Reactor模型实现,咱们先看下图:
上面主Reactor即对应Boss group 线程组,多个子Reactor对应Worker Group 线程组。主线程用于接收连接,并将建立好的连接注册到Workr 组,当最后IO事件触发后由对应Pipeline进行处理。
说明了netty线程模型,那小编接下来介绍一下上面模型中的各个组件。让我们进入netty的基本用法。
Netty基本用法
EventLoop
事件循环器,这里充当了Reactor的核心。每个EventLoop 都会包含一个Selector选择器,用于处理IO事件,和一个taskQueue用于存储用户提交的任务。此EventLoop会用一个独有的线程,默认是不启动的,当有任务触发时就会启动,并一直轮询下去。
代码示例
以下示例即声明1大小的线程组,当submit提交任务之后该EventLoop就会启动
@Test
public void nioEventLoopGroupTest(){
NioEventLoopGroup group = new NioEventLoopGroup(1);
group.submit(() -> System.out.println("submit:"+Thread.currentThread().getId()));
// 优雅关闭EventLoop
group.shutdownGracefully();
}
除了提交任务外,其更重要的事情是处理Channel 相关IO事件。如管道注册。调用register方法最终会调用NIO当中的register方法进行注册,只不过Netty已经实现封装好了,并且处理好了同步锁的问题。
EventLoopGroup.register(Channel)
Netty 为了安全的调用IO操作,把所有对IO的直接操作都封状了一个任务,交由IO线程执行。所以我们通过Netty来调用IO是不会立即返回的
NioChannel与Channelhandler
原有的Nio中的Channel 被封装成了NioChannel,当然最终底层还是在调用NIO的Channel。原来对Channel 中读写事件处理被封装成Channelhandler进行处理,并用引入Pipeline的概念。我们先来了解一下他们的用法。
代码示例:
小编依旧使用tcp来做个netty的基础用法:
@Test
public void nettySocketTest(){
//打开管道、注册选择器最后绑定端口
NioEventLoopGroup group = new NioEventLoopGroup(1);
NioserverSocketChannel nioServerSocketChannel = new NioServerSocketChannel();
group.register(nioServerSocketChannel);
nioServerSocketChannel.bind(new InetSocketAddress(8888));
nioServerSocketChannel.pipeline().addLast(new ChannelInboundHandlerAdapter() {
//处理连接和读操作
@Override
public void channelRead(ChannelHandlerContext channelHandlerContext, Object msg) throws Exception {
System.out.println("连接建立");
handlerAcceptAndRead(group,msg);
}
});
while (true);
}
private void handlerAcceptAndRead(NioEventLoopGroup group, Object msg) {
//这里因为netty已经帮我们封装好了,直接拿就行了
NioSocketChannel nioSocketChannel = (NioSocketChannel)msg;
group.register(nioSocketChannel);
nioSocketChannel.pipeline().addLast(new SimpleChannelInboundHandler<ByteBuf>() {
@Override
protected void channelRead0(ChannelHandlerContext channelHandlerContext, ByteBuf byteBuf) throws Exception {
System.out.println(byteBuf.toString(Charset.defaultCharset()));
}
});
}
然后是测试工具测试:
测试结果:
不过以上和小编话的线程模型不像啊,因为没有主从啊。接下来小编修改一下
@Test
public void nettySocketTest(){
//打开管道、注册选择器最后绑定端口
NioEventLoopGroup bossGroup = new NioEventLoopGroup(1);
//建立多个子EventLoopGrou
NioEventLoopGroup workerGroup = new NioEventLoopGroup(8);
NioServerSocketChannel nioServerSocketChannel = new NioServerSocketChannel();
//主EventLoopGrou只注册连接
bossGroup.register(nioServerSocketChannel);
nioServerSocketChannel.bind(new InetSocketAddress(8888));
nioServerSocketChannel.pipeline().addLast(new ChannelInboundHandlerAdapter() {
//处理连接和读操作
@Override
public void channelRead(ChannelHandlerContext channelHandlerContext, Object msg) throws Exception {
System.out.println("连接建立");
//干活的有子EventLoopGrou
handlerAcceptAndRead(workerGroup,msg);
}
});
while (true);
}
测试:
打印内容
ServerBootStrap用法
有了以上的基本用法的代码,小编给大家讲一下现在市面上或者博客中普遍的用法,使用ServerBootStrap。这是在上面的代码基础上又进行了一次封装。用起来更加简单。
小编再次用一个http服务分享一下使用方法:
public class HttpNettyTest {
public void open(int port) {
ServerBootstrap serverBootstrap = new ServerBootstrap();
EventLoopGroup boss = new NioEventLoopGroup(1);
EventLoopGroup worker = new NioEventLoopGroup(81);
serverBootstrap.group(boss, worker);
serverBootstrap.channel(NioServerSocketChannel.class).childHandler(new ChannelInitializer<Channel>() {
@Override
protected void initChannel(Channel channel) throws Exception {
// 输入流 先解码
channel.pipeline().addLast("decode", new HttpRequestDecoder());
//请求体和请求头放一起对应下面的FullHttpRequest
channel.pipeline().addLast("aggregator", new HttpObjectAggregator(65536));
//处理业务
channel.pipeline().addLast("servletHandler", new MyServlet());
// 输出流 编码
channel.pipeline().addFirst("encode", new HttpResponseEncoder());
}
});
ChannelFuture future = serverBootstrap.bind(port);
future.addListener(future1 -> System.out.println("服务启动成功"));
}
private class MyServlet extends SimpleChannelInboundHandler {
@Override
protected void channelRead0(ChannelHandlerContext channelHandlerContext, Object msg) throws Exception {
//请求头
if (msg instanceof HttpRequest) {
HttpRequest httpRequest = (HttpRequest) msg;
System.out.println("当前请求url:" + httpRequest.uri());
}
if (msg instanceof FullHttpRequest) {
//请求头和请求体在一块
//一般符合我们的开发
}
//请求体
if (msg instanceof HttpContent) {
HttpContent httpContent = (HttpContent) msg;
ByteBuf content = httpContent.content();
byte[] bytes = new byte[content.readableBytes()];
content.readBytes(bytes);
System.out.println("当前上传内容内容:" + new String(bytes));
}
if (msg instanceof LastHttpContent) {
LastHttpContent httpContent = (LastHttpContent) msg;
ByteBuf content = httpContent.content();
byte[] bytes = new byte[content.readableBytes()];
content.readBytes(bytes);
System.out.println("当前请求内容是最后一个包:" + new String(bytes));
FullHttpResponse response = new DefaultFullHttpResponse(HttpVersion.HTTP_1_0, HttpResponseStatus.OK);
response.headers().set(HttpHeaderNames.CONTENT_TYPE, "text/html;charset=utf-8");
response.content().writeBytes("上传完毕".getBytes());
ChannelFuture future = channelHandlerContext.writeAndFlush(response);
future.addListener(ChannelFutureListener.CLOSE);
}
}
}
@Test
public void openServer() {
open(8080);
while (true);
}
}
使用postman测试:
测试结果
以上示例就明白了。这个只是怎么用。ServerBootStrap底层就是上面代码。大家自行领悟。
总结
今天小编主要将了reactor的模型演讲以及netty线程模型,和其简单的应用。今天还是比较简单的。其实今天小编没有具体讲解netty中的核心组件,下次小编将会讲解,期待小编下一次的分享吧。
以上是关于Netty框架之线程模型与基础用法的主要内容,如果未能解决你的问题,请参考以下文章