Netty框架之线程模型与基础用法

Posted 木兮君

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Netty框架之线程模型与基础用法相关的知识,希望对你有一定的参考价值。

前言

小编最近好久没有更新文章了,为自己先辩解一下,最近上线后身体不适,而且工作比较繁忙(还要小编比较懒)。今天小编和大家分享一下netty框架的线程模型和用法。有了前面两篇博文Netty框架之深入了解NIO核心组件Netty框架之NIO多路复用选择器的基础,再学习netty应该容易点了吧,同样废话不多说,进入正题。

Reactor模式

这里大家会不会有疑惑,为什么要介绍Reactor模式,这是因为Netty就是使用该模型实现的。

Reactor模式

Reactor 是反应堆的意思,Reactor 模型是指通过一个或多个输入同时传递给处理器。服务端将它们同步分派给请求对应的处理线程,Reactor 模式也叫 Dispatcher 模式。Netty 整体是整体采用了主从Reactor模型。

Reactor角色

Reactor模型中有三种角色,分别是:

  • Acceptor:处理客户端新连接,并分派请求到处理器链中
  • Reactor:负责监听和分配事件,将I/O事件分派给对应的Handler。
  • Handler:事件处理,如编码、解码等
    在这里插入图片描述

Reactor 线程模型

Reactor有三种线程模型分别是:单Reactor单线程模型、单Reactor多线程模型、主从Reactor多线程模型。而Netty正是采用最后一种。

单Reactor单线程模型

该模型下所有请求建立、IO读写、业务处理都在一个线程中完成。如果在业务中处理中出现了耗时操作,就会导致所有请求全部处理延时。因为他们是有由一个线程同步处理的。
在这里插入图片描述

单Reactor多线程模型

为了防止业务处理导致阻塞,在多线程模型下会用一个线程池来异步处理业务,当处理完成后在回写给客户端。
在这里插入图片描述

主从Reactor多线程模型

单React始终无法发挥现代服务器多核CPU的并行处理能力,所以Reactor是可以有多个的,并且有一主多从之分。一个主Reactor仅处理连接,而多个子Reactor用于处理IO读写。然后交给线程池处理业务。Tomcat就是采用该模式实现。
在这里插入图片描述
以上就是Reactor的三种线程模型,大家可以和前两篇博文中的代码对比一下。应该比较好理解。并且各个模型的优缺点也一目了然。好了进入咱们的重点,netty的线程模型。

Netty线程模型

Netty采用了主从Reactor模型实现,咱们先看下图:

在这里插入图片描述

上面主Reactor即对应Boss group 线程组,多个子Reactor对应Worker Group 线程组。主线程用于接收连接,并将建立好的连接注册到Workr 组,当最后IO事件触发后由对应Pipeline进行处理。
说明了netty线程模型,那小编接下来介绍一下上面模型中的各个组件。让我们进入netty的基本用法。

Netty基本用法

EventLoop

事件循环器,这里充当了Reactor的核心。每个EventLoop 都会包含一个Selector选择器,用于处理IO事件,和一个taskQueue用于存储用户提交的任务。此EventLoop会用一个独有的线程,默认是不启动的,当有任务触发时就会启动,并一直轮询下去。

代码示例
以下示例即声明1大小的线程组,当submit提交任务之后该EventLoop就会启动

@Test
    public void nioEventLoopGroupTest(){
        NioEventLoopGroup group = new NioEventLoopGroup(1);
        group.submit(() -> System.out.println("submit:"+Thread.currentThread().getId()));
        // 优雅关闭EventLoop
        group.shutdownGracefully();
    }

除了提交任务外,其更重要的事情是处理Channel 相关IO事件。如管道注册。调用register方法最终会调用NIO当中的register方法进行注册,只不过Netty已经实现封装好了,并且处理好了同步锁的问题。

EventLoopGroup.register(Channel)

Netty 为了安全的调用IO操作,把所有对IO的直接操作都封状了一个任务,交由IO线程执行。所以我们通过Netty来调用IO是不会立即返回的

NioChannel与Channelhandler

原有的Nio中的Channel 被封装成了NioChannel,当然最终底层还是在调用NIO的Channel。原来对Channel 中读写事件处理被封装成Channelhandler进行处理,并用引入Pipeline的概念。我们先来了解一下他们的用法。

代码示例:
小编依旧使用tcp来做个netty的基础用法:

@Test
    public void nettySocketTest(){
        //打开管道、注册选择器最后绑定端口
        NioEventLoopGroup group = new NioEventLoopGroup(1);
        NioserverSocketChannel nioServerSocketChannel = new NioServerSocketChannel();
        group.register(nioServerSocketChannel);
        nioServerSocketChannel.bind(new InetSocketAddress(8888));

        nioServerSocketChannel.pipeline().addLast(new ChannelInboundHandlerAdapter() {
            //处理连接和读操作
            @Override
            public void channelRead(ChannelHandlerContext channelHandlerContext, Object msg) throws Exception {
                System.out.println("连接建立");
                handlerAcceptAndRead(group,msg);
            }
        });
        while (true);

    }

    private void handlerAcceptAndRead(NioEventLoopGroup group, Object msg) {
        //这里因为netty已经帮我们封装好了,直接拿就行了
        NioSocketChannel nioSocketChannel = (NioSocketChannel)msg;
        group.register(nioSocketChannel);
        nioSocketChannel.pipeline().addLast(new SimpleChannelInboundHandler<ByteBuf>() {

            @Override
            protected void channelRead0(ChannelHandlerContext channelHandlerContext, ByteBuf byteBuf) throws Exception {
                System.out.println(byteBuf.toString(Charset.defaultCharset()));
            }
        });
    }

然后是测试工具测试:
在这里插入图片描述
测试结果:
在这里插入图片描述
不过以上和小编话的线程模型不像啊,因为没有主从啊。接下来小编修改一下

 @Test
    public void nettySocketTest(){
        //打开管道、注册选择器最后绑定端口
        NioEventLoopGroup bossGroup = new NioEventLoopGroup(1);
        //建立多个子EventLoopGrou
        NioEventLoopGroup workerGroup = new NioEventLoopGroup(8);
        NioServerSocketChannel nioServerSocketChannel = new NioServerSocketChannel();
        //主EventLoopGrou只注册连接
        bossGroup.register(nioServerSocketChannel);
        nioServerSocketChannel.bind(new InetSocketAddress(8888));

        nioServerSocketChannel.pipeline().addLast(new ChannelInboundHandlerAdapter() {
            //处理连接和读操作
            @Override
            public void channelRead(ChannelHandlerContext channelHandlerContext, Object msg) throws Exception {
                System.out.println("连接建立");
                //干活的有子EventLoopGrou
                handlerAcceptAndRead(workerGroup,msg);
            }
        });
        while (true);

    }

测试:
在这里插入图片描述
在这里插入图片描述
打印内容
在这里插入图片描述

ServerBootStrap用法

有了以上的基本用法的代码,小编给大家讲一下现在市面上或者博客中普遍的用法,使用ServerBootStrap。这是在上面的代码基础上又进行了一次封装。用起来更加简单。
小编再次用一个http服务分享一下使用方法:

public class HttpNettyTest {

    public void open(int port) {
        ServerBootstrap serverBootstrap = new ServerBootstrap();
        EventLoopGroup boss = new NioEventLoopGroup(1);
        EventLoopGroup worker = new NioEventLoopGroup(81);
        serverBootstrap.group(boss, worker);
        serverBootstrap.channel(NioServerSocketChannel.class).childHandler(new ChannelInitializer<Channel>() {
            @Override
            protected void initChannel(Channel channel) throws Exception {
                // 输入流 先解码
                channel.pipeline().addLast("decode", new HttpRequestDecoder());
                //请求体和请求头放一起对应下面的FullHttpRequest
                channel.pipeline().addLast("aggregator", new HttpObjectAggregator(65536));
                //处理业务
                channel.pipeline().addLast("servletHandler", new MyServlet());
                // 输出流 编码
                channel.pipeline().addFirst("encode", new HttpResponseEncoder());
            }
        });
        ChannelFuture future = serverBootstrap.bind(port);
        future.addListener(future1 -> System.out.println("服务启动成功"));
    }

    private class MyServlet extends SimpleChannelInboundHandler {

        @Override
        protected void channelRead0(ChannelHandlerContext channelHandlerContext, Object msg) throws Exception {
            //请求头
            if (msg instanceof HttpRequest) {
                HttpRequest httpRequest = (HttpRequest) msg;
                System.out.println("当前请求url:" + httpRequest.uri());
            }
            if (msg instanceof FullHttpRequest) {
                //请求头和请求体在一块
                //一般符合我们的开发
            }

            //请求体
            if (msg instanceof HttpContent) {
                HttpContent httpContent = (HttpContent) msg;
                ByteBuf content = httpContent.content();
                byte[] bytes = new byte[content.readableBytes()];
                content.readBytes(bytes);
                System.out.println("当前上传内容内容:" + new String(bytes));
            }
            if (msg instanceof LastHttpContent) {
                LastHttpContent httpContent = (LastHttpContent) msg;
                ByteBuf content = httpContent.content();
                byte[] bytes = new byte[content.readableBytes()];
                content.readBytes(bytes);
                System.out.println("当前请求内容是最后一个包:" + new String(bytes));
                FullHttpResponse response = new DefaultFullHttpResponse(HttpVersion.HTTP_1_0, HttpResponseStatus.OK);
                response.headers().set(HttpHeaderNames.CONTENT_TYPE, "text/html;charset=utf-8");
                response.content().writeBytes("上传完毕".getBytes());
                ChannelFuture future = channelHandlerContext.writeAndFlush(response);
                future.addListener(ChannelFutureListener.CLOSE);
            }

        }
    }
    @Test
    public void openServer() {
        open(8080);
        while (true);
    }

}

使用postman测试:
在这里插入图片描述
测试结果
在这里插入图片描述
以上示例就明白了。这个只是怎么用。ServerBootStrap底层就是上面代码。大家自行领悟。

总结

今天小编主要将了reactor的模型演讲以及netty线程模型,和其简单的应用。今天还是比较简单的。其实今天小编没有具体讲解netty中的核心组件,下次小编将会讲解,期待小编下一次的分享吧。

以上是关于Netty框架之线程模型与基础用法的主要内容,如果未能解决你的问题,请参考以下文章

Netty框架之核心组件

Netty框架之核心组件

Netty框架之Reactor线程模型

Netty之Reactor线程模型概述

Netty基础篇线程模型介绍

Netty框架之责任链模式及其应用